(define* (start-mirroring-fiber database
                                mirror
                                storage-limit
                                minimum-free-space
                                storage-root
                                metrics-registry
                                #:key scheduler)

  (define storage-limit?
    (integer? storage-limit))

  (define minimum-free-space?
    (integer? minimum-free-space))

  (define storage-size-metric
    (or (metrics-registry-fetch-metric metrics-registry
                                       "storage_size_bytes")
        (make-gauge-metric metrics-registry
                           "storage_size_bytes")))

  (define storage-free-space-metric
    (or (metrics-registry-fetch-metric metrics-registry
                                       "storage_free_space_bytes")
        (make-gauge-metric metrics-registry
                           "storage_free_space_bytes")))

  (define (fetch-file storage-size file)
    (let* ((string-url (string-append mirror file))
           (uri (string->uri (string-append mirror file)))
           (destination-file-name
            (if (s3? storage-root)
                (uri-decode file)       ;will be used as S3 object tag
                (string-append storage-root
                               (uri-decode file))))
           (tmp-port (pk 'tmpfile-fresh (tmpfile))))
      (log-msg 'INFO "fetching " string-url)

      (dynamic-wind
        (const #t)
        (lambda ()
          (with-port-timeouts
           (lambda ()
             (call-with-values
                 (lambda ()
                   (let ((port
                          (non-blocking-open-socket-for-uri uri)))
                     (http-get uri
                               #:port port
                               #:decode-body? #f
                               #:streaming? #t)))
               (lambda (response body)
                 (let ((code (response-code response)))
                   (cond
                    ((= code 200)
                     (dump-port body (pk 'tmp-port-before-dump tmp-port))
                     (pk 'tmp-port-after-dump tmp-port))
                    ((= code 404)
                     (raise-exception
                      (make-exception-with-message
                       "file not found")))
                    (else
                     (raise-exception
                      (make-exception-with-message
                       (simple-format #f "unknown response code ~A"
                                      code)))))))))
           #:timeout 30)

          (pk 'made-it-to-s3-upload-file)

          ;; TODO Check the size of the file
          (if (s3? storage-root)
              (s3-upload-file (pk 'bucket-name (s3-uri->bucket storage-root))
                              (pk 'tag destination-file-name)
                              (pk 'tmp-port tmp-port))
              (begin
                (mkdir-p (dirname destination-file-name))
                (call-with-output-file destination-file-name
                  (cut dump-port tmp-port <>))))

          (update-nar-files-metric
           metrics-registry
           '()
           #:fetched-count 1)

          (+ storage-size
             (bytevector-length (get-bytevector-all tmp-port))))
        (lambda ()                      ;clean-up
          (close-port tmp-port)))))

  (define (download-nars initial-storage-size initial-free-space)
    (define effective-storage-limit
      (cond
       ((and storage-limit? minimum-free-space?)
        (min storage-limit
             (+ initial-storage-size
                (- initial-free-space minimum-free-space))))
       (storage-limit? storage-limit)
       (minimum-free-space?
        (+ initial-storage-size
           (- initial-free-space minimum-free-space)))
       (else #f)))

    (log-msg 'DEBUG "effective storage limit "
             effective-storage-limit)
    (if (or (eq? effective-storage-limit #f)
            (< initial-storage-size effective-storage-limit))
        (let ((result
               nar-file-counts
               (fold-nar-files
                database
                storage-root
                (lambda (file result)
                  (log-msg 'DEBUG "considering "
                           (assq-ref file 'url))
                  (match result
                    ((storage-size . fetched-count)
                     (let ((file-bytes (assq-ref file 'size)))
                       (if (< (+ storage-size file-bytes)
                              effective-storage-limit)
                           (let ((new-storage-size
                                  (with-exception-handler
                                      (lambda (exn)
                                        (log-msg 'ERROR "failed to fetch "
                                                 (assq-ref file 'url)
                                                 ": " exn)
                                        #f)
                                    (lambda ()
                                      (with-exception-handler
                                          (lambda (exn)
                                            (backtrace)
                                            (raise-exception exn))
                                        (lambda ()
                                          (retry-on-error
                                           (lambda ()
                                             (fetch-file storage-size
                                                         (assq-ref file 'url)))
                                           #:times 3
                                           #:delay-seconds 5
                                           #:ignore
                                           ;; Skip over files that
                                           ;; aren't found, they might
                                           ;; have been removed from
                                           ;; the database
                                           (lambda (exn)
                                             (and (exception-with-message? exn)
                                                  (string=?
                                                   (exception-message exn)
                                                   "file not found")))))))
                                    #:unwind? #t)))
                             (if new-storage-size
                                 (cons new-storage-size
                                       (1+ fetched-count))
                                 result))
                           ;; This file won't fit, so try the next one
                           result)))))
                (cons initial-storage-size 0)
                #:stored? #f)))

          (match result
            ((storage-size . fetched-count)
             (values storage-size
                     fetched-count))))
        (values initial-storage-size
                0)))

  (define (fast-download-nars initial-storage-size)
    (define parallelism 3)

    (let ((channel (make-channel)))
      (for-each
       (lambda _
         (spawn-fiber
          (lambda ()
            (let loop ((fetched-count 0)
                       (added-storage-size 0))
              (match (get-message channel)
                (('finished . reply)
                 (put-message reply (list fetched-count
                                          added-storage-size)))
                (url
                 (log-msg 'DEBUG "considering " url)
                 (let ((new-added-storage-size
                        (with-exception-handler
                            (lambda (exn)
                              (log-msg 'ERROR "failed to fetch " url ": " exn)
                              #f)
                          (lambda ()
                            (retry-on-error
                             (lambda ()
                               (fetch-file added-storage-size url))
                             #:times 3
                             #:delay-seconds 5
                             #:ignore
                             ;; Skip over files that
                             ;; aren't found, they might
                             ;; have been removed from
                             ;; the database
                             (lambda (exn)
                               (and (exception-with-message? exn)
                                    (string=?
                                     (exception-message exn)
                                     "file not found")))))
                          #:unwind? #t)))
                   (if new-added-storage-size
                       (loop (+ fetched-count 1)
                             new-added-storage-size)
                       (loop fetched-count
                             added-storage-size)))))))))
       (iota parallelism))

      (let ((result
             nar-file-counts
             (fold-nar-files
              database
              storage-root
              (lambda (nar _)
                (put-message channel
                             (assq-ref nar 'url))
                #f)
              #f
              #:stored? #f)))

        (let loop ((fibers (iota parallelism))
                   (fetched-count 0)
                   (storage-size initial-storage-size))
          (if (null? fibers)
              (values storage-size
                      fetched-count)
              (let ((reply-channel (make-channel)))
                (put-message channel
                             (cons 'finished reply-channel))
                (match (get-message reply-channel)
                  ((additional-fetched-count added-storage-size)
                   (loop (cdr fibers)
                         (+ fetched-count
                            additional-fetched-count)
                         (+ storage-size
                            added-storage-size))))))))))

  (define (run-mirror-pass storage-size)
    (log-msg 'DEBUG "running mirror pass")
    (let ((free-space (free-disk-space* storage-root)))
      (pk 'free-space free-space)
      (metric-set storage-free-space-metric
                  free-space)
      (let ((new-storage-size
             fetched-count
             (if (or storage-limit? minimum-free-space?)
                 (begin
                   (pk 'download-nars)
                   (download-nars storage-size
                                  free-space))
                 (begin
                   (pk 'fast-download-nars)
                   (fast-download-nars storage-size)))))
        (log-msg 'DEBUG "finished mirror pass (fetched " fetched-count
                 " nars, updated storage size: " new-storage-size ")")
        new-storage-size)))

  (define (free-space? storage-size)
    (let* ((free-space
            (free-disk-space* storage-root))
           (effective-storage-limit
            (cond
             ((and storage-limit? minimum-free-space?)
              (min storage-limit
                   (+ storage-size
                      (- free-space minimum-free-space))))
             (storage-limit? storage-limit)
             (minimum-free-space?
              (+ storage-size
                 (- free-space minimum-free-space)))
             (else #f))))
      (if effective-storage-limit
          (< storage-size effective-storage-limit)
          #t)))

  (let ((channel (make-channel))
        (channel-metric
         (make-counter-metric metrics-registry
                              "mirroring_fiber_messages_total")))
    (spawn-fiber
     (lambda ()
       (let loop ((storage-size
                   (with-time-logging "getting storage size"
                     (get-storage-size storage-root))))
         (metric-set storage-size-metric
                     storage-size)
         (loop
          (match (get-message/increment-counter channel
                                                channel-metric)
            ('full-pass
             (with-exception-handler
                 (lambda (exn)
                   (log-msg 'ERROR "mirror pass failed " exn)
                   storage-size)
               (lambda ()
                 (with-exception-handler
                     (lambda (exn)
                       (print-backtrace-and-exception/knots exn)
                       (raise-exception exn))
                   (lambda ()
                     (run-mirror-pass storage-size))))
               #:unwind? #t))
            (('fetch file)
             (with-exception-handler
                 (lambda (exn)
                   (log-msg 'ERROR "failed to mirror " file ": " exn)
                   storage-size)
               (lambda ()
                 ;; TODO: Adjust existence check for S3.
                 (pk 'storage-root storage-root)
                 (pk 'file file)
                 (pk 'storage-size storage-size)
                 (if (file-exists?
                      (string-append storage-root
                                     (uri-decode file)))
                     ;; Can't mirror as already exists
                     storage-size
                     ;; TODO This is too crude, it needs to look at
                     ;; the size of the file rather than allowing the
                     ;; limit to be breached
                     (if (pk 'free-space (free-space? storage-size))
                         (begin
                           (pk 'fetch-file-called)
                           (fetch-file storage-size file))
                         (begin
                           (log-msg 'DEBUG "not mirroring " file
                                    " as no free space")
                           storage-size))))
               #:unwind? #t))))))
     scheduler)

    (spawn-fiber
     (lambda ()
       (while #t
         (put-message channel 'full-pass)
         (sleep (* 60 60 24))))
     scheduler)

    channel))

Generated by apteryx using scpaste at Thu Apr 16 14:11:44 2026. JST. (original)