(define* (start-mirroring-fiber database
mirror
storage-limit
minimum-free-space
storage-root
metrics-registry
#:key scheduler)
(define storage-limit?
(integer? storage-limit))
(define minimum-free-space?
(integer? minimum-free-space))
(define storage-size-metric
(or (metrics-registry-fetch-metric metrics-registry
"storage_size_bytes")
(make-gauge-metric metrics-registry
"storage_size_bytes")))
(define storage-free-space-metric
(or (metrics-registry-fetch-metric metrics-registry
"storage_free_space_bytes")
(make-gauge-metric metrics-registry
"storage_free_space_bytes")))
(define (fetch-file storage-size file)
(let* ((string-url (string-append mirror file))
(uri (string->uri (string-append mirror file)))
(destination-file-name
(if (s3? storage-root)
(uri-decode file) (string-append storage-root
(uri-decode file))))
(tmp-port (pk 'tmpfile-fresh (tmpfile))))
(log-msg 'INFO "fetching " string-url)
(dynamic-wind
(const #t)
(lambda ()
(with-port-timeouts
(lambda ()
(call-with-values
(lambda ()
(let ((port
(non-blocking-open-socket-for-uri uri)))
(http-get uri
#:port port
#:decode-body? #f
#:streaming? #t)))
(lambda (response body)
(let ((code (response-code response)))
(cond
((= code 200)
(dump-port body (pk 'tmp-port-before-dump tmp-port))
(pk 'tmp-port-after-dump tmp-port))
((= code 404)
(raise-exception
(make-exception-with-message
"file not found")))
(else
(raise-exception
(make-exception-with-message
(simple-format #f "unknown response code ~A"
code)))))))))
#:timeout 30)
(pk 'made-it-to-s3-upload-file)
(if (s3? storage-root)
(s3-upload-file (pk 'bucket-name (s3-uri->bucket storage-root))
(pk 'tag destination-file-name)
(pk 'tmp-port tmp-port))
(begin
(mkdir-p (dirname destination-file-name))
(call-with-output-file destination-file-name
(cut dump-port tmp-port <>))))
(update-nar-files-metric
metrics-registry
'()
#:fetched-count 1)
(+ storage-size
(bytevector-length (get-bytevector-all tmp-port))))
(lambda () (close-port tmp-port)))))
(define (download-nars initial-storage-size initial-free-space)
(define effective-storage-limit
(cond
((and storage-limit? minimum-free-space?)
(min storage-limit
(+ initial-storage-size
(- initial-free-space minimum-free-space))))
(storage-limit? storage-limit)
(minimum-free-space?
(+ initial-storage-size
(- initial-free-space minimum-free-space)))
(else #f)))
(log-msg 'DEBUG "effective storage limit "
effective-storage-limit)
(if (or (eq? effective-storage-limit #f)
(< initial-storage-size effective-storage-limit))
(let ((result
nar-file-counts
(fold-nar-files
database
storage-root
(lambda (file result)
(log-msg 'DEBUG "considering "
(assq-ref file 'url))
(match result
((storage-size . fetched-count)
(let ((file-bytes (assq-ref file 'size)))
(if (< (+ storage-size file-bytes)
effective-storage-limit)
(let ((new-storage-size
(with-exception-handler
(lambda (exn)
(log-msg 'ERROR "failed to fetch "
(assq-ref file 'url)
": " exn)
#f)
(lambda ()
(with-exception-handler
(lambda (exn)
(backtrace)
(raise-exception exn))
(lambda ()
(retry-on-error
(lambda ()
(fetch-file storage-size
(assq-ref file 'url)))
#:times 3
#:delay-seconds 5
#:ignore
(lambda (exn)
(and (exception-with-message? exn)
(string=?
(exception-message exn)
"file not found")))))))
#:unwind? #t)))
(if new-storage-size
(cons new-storage-size
(1+ fetched-count))
result))
result)))))
(cons initial-storage-size 0)
#:stored? #f)))
(match result
((storage-size . fetched-count)
(values storage-size
fetched-count))))
(values initial-storage-size
0)))
(define (fast-download-nars initial-storage-size)
(define parallelism 3)
(let ((channel (make-channel)))
(for-each
(lambda _
(spawn-fiber
(lambda ()
(let loop ((fetched-count 0)
(added-storage-size 0))
(match (get-message channel)
(('finished . reply)
(put-message reply (list fetched-count
added-storage-size)))
(url
(log-msg 'DEBUG "considering " url)
(let ((new-added-storage-size
(with-exception-handler
(lambda (exn)
(log-msg 'ERROR "failed to fetch " url ": " exn)
#f)
(lambda ()
(retry-on-error
(lambda ()
(fetch-file added-storage-size url))
#:times 3
#:delay-seconds 5
#:ignore
(lambda (exn)
(and (exception-with-message? exn)
(string=?
(exception-message exn)
"file not found")))))
#:unwind? #t)))
(if new-added-storage-size
(loop (+ fetched-count 1)
new-added-storage-size)
(loop fetched-count
added-storage-size)))))))))
(iota parallelism))
(let ((result
nar-file-counts
(fold-nar-files
database
storage-root
(lambda (nar _)
(put-message channel
(assq-ref nar 'url))
#f)
#f
#:stored? #f)))
(let loop ((fibers (iota parallelism))
(fetched-count 0)
(storage-size initial-storage-size))
(if (null? fibers)
(values storage-size
fetched-count)
(let ((reply-channel (make-channel)))
(put-message channel
(cons 'finished reply-channel))
(match (get-message reply-channel)
((additional-fetched-count added-storage-size)
(loop (cdr fibers)
(+ fetched-count
additional-fetched-count)
(+ storage-size
added-storage-size))))))))))
(define (run-mirror-pass storage-size)
(log-msg 'DEBUG "running mirror pass")
(let ((free-space (free-disk-space* storage-root)))
(pk 'free-space free-space)
(metric-set storage-free-space-metric
free-space)
(let ((new-storage-size
fetched-count
(if (or storage-limit? minimum-free-space?)
(begin
(pk 'download-nars)
(download-nars storage-size
free-space))
(begin
(pk 'fast-download-nars)
(fast-download-nars storage-size)))))
(log-msg 'DEBUG "finished mirror pass (fetched " fetched-count
" nars, updated storage size: " new-storage-size ")")
new-storage-size)))
(define (free-space? storage-size)
(let* ((free-space
(free-disk-space* storage-root))
(effective-storage-limit
(cond
((and storage-limit? minimum-free-space?)
(min storage-limit
(+ storage-size
(- free-space minimum-free-space))))
(storage-limit? storage-limit)
(minimum-free-space?
(+ storage-size
(- free-space minimum-free-space)))
(else #f))))
(if effective-storage-limit
(< storage-size effective-storage-limit)
#t)))
(let ((channel (make-channel))
(channel-metric
(make-counter-metric metrics-registry
"mirroring_fiber_messages_total")))
(spawn-fiber
(lambda ()
(let loop ((storage-size
(with-time-logging "getting storage size"
(get-storage-size storage-root))))
(metric-set storage-size-metric
storage-size)
(loop
(match (get-message/increment-counter channel
channel-metric)
('full-pass
(with-exception-handler
(lambda (exn)
(log-msg 'ERROR "mirror pass failed " exn)
storage-size)
(lambda ()
(with-exception-handler
(lambda (exn)
(print-backtrace-and-exception/knots exn)
(raise-exception exn))
(lambda ()
(run-mirror-pass storage-size))))
#:unwind? #t))
(('fetch file)
(with-exception-handler
(lambda (exn)
(log-msg 'ERROR "failed to mirror " file ": " exn)
storage-size)
(lambda ()
(pk 'storage-root storage-root)
(pk 'file file)
(pk 'storage-size storage-size)
(if (file-exists?
(string-append storage-root
(uri-decode file)))
storage-size
(if (pk 'free-space (free-space? storage-size))
(begin
(pk 'fetch-file-called)
(fetch-file storage-size file))
(begin
(log-msg 'DEBUG "not mirroring " file
" as no free space")
storage-size))))
#:unwind? #t))))))
scheduler)
(spawn-fiber
(lambda ()
(while #t
(put-message channel 'full-pass)
(sleep (* 60 60 24))))
scheduler)
channel))