(define* (start-mirroring-fiber database mirror storage-limit minimum-free-space storage-root metrics-registry #:key scheduler) (define storage-limit? (integer? storage-limit)) (define minimum-free-space? (integer? minimum-free-space)) (define storage-size-metric (or (metrics-registry-fetch-metric metrics-registry "storage_size_bytes") (make-gauge-metric metrics-registry "storage_size_bytes"))) (define storage-free-space-metric (or (metrics-registry-fetch-metric metrics-registry "storage_free_space_bytes") (make-gauge-metric metrics-registry "storage_free_space_bytes"))) (define (fetch-file storage-size file) (let* ((string-url (string-append mirror file)) (uri (string->uri (string-append mirror file))) (destination-file-name (if (s3? storage-root) (uri-decode file) ;will be used as S3 object tag (string-append storage-root (uri-decode file)))) (tmp-port (pk 'tmpfile-fresh (tmpfile)))) (log-msg 'INFO "fetching " string-url) (dynamic-wind (const #t) (lambda () (with-port-timeouts (lambda () (call-with-values (lambda () (let ((port (non-blocking-open-socket-for-uri uri))) (http-get uri #:port port #:decode-body? #f #:streaming? #t))) (lambda (response body) (let ((code (response-code response))) (cond ((= code 200) (dump-port body (pk 'tmp-port-before-dump tmp-port)) (pk 'tmp-port-after-dump tmp-port)) ((= code 404) (raise-exception (make-exception-with-message "file not found"))) (else (raise-exception (make-exception-with-message (simple-format #f "unknown response code ~A" code))))))))) #:timeout 30) (pk 'made-it-to-s3-upload-file) ;; TODO Check the size of the file (if (s3? storage-root) (s3-upload-file (pk 'bucket-name (s3-uri->bucket storage-root)) (pk 'tag destination-file-name) (pk 'tmp-port tmp-port)) (begin (mkdir-p (dirname destination-file-name)) (call-with-output-file destination-file-name (cut dump-port tmp-port <>)))) (update-nar-files-metric metrics-registry '() #:fetched-count 1) (+ storage-size (bytevector-length (get-bytevector-all tmp-port)))) (lambda () ;clean-up (close-port tmp-port))))) (define (download-nars initial-storage-size initial-free-space) (define effective-storage-limit (cond ((and storage-limit? minimum-free-space?) (min storage-limit (+ initial-storage-size (- initial-free-space minimum-free-space)))) (storage-limit? storage-limit) (minimum-free-space? (+ initial-storage-size (- initial-free-space minimum-free-space))) (else #f))) (log-msg 'DEBUG "effective storage limit " effective-storage-limit) (if (or (eq? effective-storage-limit #f) (< initial-storage-size effective-storage-limit)) (let ((result nar-file-counts (fold-nar-files database storage-root (lambda (file result) (log-msg 'DEBUG "considering " (assq-ref file 'url)) (match result ((storage-size . fetched-count) (let ((file-bytes (assq-ref file 'size))) (if (< (+ storage-size file-bytes) effective-storage-limit) (let ((new-storage-size (with-exception-handler (lambda (exn) (log-msg 'ERROR "failed to fetch " (assq-ref file 'url) ": " exn) #f) (lambda () (with-exception-handler (lambda (exn) (backtrace) (raise-exception exn)) (lambda () (retry-on-error (lambda () (fetch-file storage-size (assq-ref file 'url))) #:times 3 #:delay-seconds 5 #:ignore ;; Skip over files that ;; aren't found, they might ;; have been removed from ;; the database (lambda (exn) (and (exception-with-message? exn) (string=? (exception-message exn) "file not found"))))))) #:unwind? #t))) (if new-storage-size (cons new-storage-size (1+ fetched-count)) result)) ;; This file won't fit, so try the next one result))))) (cons initial-storage-size 0) #:stored? #f))) (match result ((storage-size . fetched-count) (values storage-size fetched-count)))) (values initial-storage-size 0))) (define (fast-download-nars initial-storage-size) (define parallelism 3) (let ((channel (make-channel))) (for-each (lambda _ (spawn-fiber (lambda () (let loop ((fetched-count 0) (added-storage-size 0)) (match (get-message channel) (('finished . reply) (put-message reply (list fetched-count added-storage-size))) (url (log-msg 'DEBUG "considering " url) (let ((new-added-storage-size (with-exception-handler (lambda (exn) (log-msg 'ERROR "failed to fetch " url ": " exn) #f) (lambda () (retry-on-error (lambda () (fetch-file added-storage-size url)) #:times 3 #:delay-seconds 5 #:ignore ;; Skip over files that ;; aren't found, they might ;; have been removed from ;; the database (lambda (exn) (and (exception-with-message? exn) (string=? (exception-message exn) "file not found"))))) #:unwind? #t))) (if new-added-storage-size (loop (+ fetched-count 1) new-added-storage-size) (loop fetched-count added-storage-size))))))))) (iota parallelism)) (let ((result nar-file-counts (fold-nar-files database storage-root (lambda (nar _) (put-message channel (assq-ref nar 'url)) #f) #f #:stored? #f))) (let loop ((fibers (iota parallelism)) (fetched-count 0) (storage-size initial-storage-size)) (if (null? fibers) (values storage-size fetched-count) (let ((reply-channel (make-channel))) (put-message channel (cons 'finished reply-channel)) (match (get-message reply-channel) ((additional-fetched-count added-storage-size) (loop (cdr fibers) (+ fetched-count additional-fetched-count) (+ storage-size added-storage-size)))))))))) (define (run-mirror-pass storage-size) (log-msg 'DEBUG "running mirror pass") (let ((free-space (free-disk-space* storage-root))) (pk 'free-space free-space) (metric-set storage-free-space-metric free-space) (let ((new-storage-size fetched-count (if (or storage-limit? minimum-free-space?) (begin (pk 'download-nars) (download-nars storage-size free-space)) (begin (pk 'fast-download-nars) (fast-download-nars storage-size))))) (log-msg 'DEBUG "finished mirror pass (fetched " fetched-count " nars, updated storage size: " new-storage-size ")") new-storage-size))) (define (free-space? storage-size) (let* ((free-space (free-disk-space* storage-root)) (effective-storage-limit (cond ((and storage-limit? minimum-free-space?) (min storage-limit (+ storage-size (- free-space minimum-free-space)))) (storage-limit? storage-limit) (minimum-free-space? (+ storage-size (- free-space minimum-free-space))) (else #f)))) (if effective-storage-limit (< storage-size effective-storage-limit) #t))) (let ((channel (make-channel)) (channel-metric (make-counter-metric metrics-registry "mirroring_fiber_messages_total"))) (spawn-fiber (lambda () (let loop ((storage-size (with-time-logging "getting storage size" (get-storage-size storage-root)))) (metric-set storage-size-metric storage-size) (loop (match (get-message/increment-counter channel channel-metric) ('full-pass (with-exception-handler (lambda (exn) (log-msg 'ERROR "mirror pass failed " exn) storage-size) (lambda () (with-exception-handler (lambda (exn) (print-backtrace-and-exception/knots exn) (raise-exception exn)) (lambda () (run-mirror-pass storage-size)))) #:unwind? #t)) (('fetch file) (with-exception-handler (lambda (exn) (log-msg 'ERROR "failed to mirror " file ": " exn) storage-size) (lambda () ;; TODO: Adjust existence check for S3. (pk 'storage-root storage-root) (pk 'file file) (pk 'storage-size storage-size) (if (file-exists? (string-append storage-root (uri-decode file))) ;; Can't mirror as already exists storage-size ;; TODO This is too crude, it needs to look at ;; the size of the file rather than allowing the ;; limit to be breached (if (pk 'free-space (free-space? storage-size)) (begin (pk 'fetch-file-called) (fetch-file storage-size file)) (begin (log-msg 'DEBUG "not mirroring " file " as no free space") storage-size)))) #:unwind? #t)))))) scheduler) (spawn-fiber (lambda () (while #t (put-message channel 'full-pass) (sleep (* 60 60 24)))) scheduler) channel))