cbaines pushed a commit to branch master
in repository data-service.

commit 8b49884816c51593e6cd87b661a16f25b7f3e94a
Author: Christopher Baines <[email protected]>
AuthorDate: Sun Dec 15 19:08:28 2024 +0000

    Use knots
    
    A library of extracted Guile Fibers patterns and utilities.
---
 guix-data-service/jobs/load-new-guix-revision.scm |   22 +-
 guix-data-service/utils.scm                       | 1120 +--------------------
 guix-data-service/web/build-server/controller.scm |    1 +
 guix-data-service/web/build/controller.scm        |    6 +-
 guix-data-service/web/compare/controller.scm      |   38 +-
 guix-data-service/web/controller.scm              |   18 +-
 guix-data-service/web/jobs/controller.scm         |    4 +-
 guix-data-service/web/nar/controller.scm          |    4 +-
 guix-data-service/web/package/controller.scm      |    4 +-
 guix-data-service/web/repository/controller.scm   |   36 +-
 guix-data-service/web/revision/controller.scm     |   62 +-
 guix-data-service/web/server.scm                  |   11 +-
 guix-dev.scm                                      |   33 +
 13 files changed, 154 insertions(+), 1205 deletions(-)

diff --git a/guix-data-service/jobs/load-new-guix-revision.scm 
b/guix-data-service/jobs/load-new-guix-revision.scm
index dfa41ec..e18528a 100644
--- a/guix-data-service/jobs/load-new-guix-revision.scm
+++ b/guix-data-service/jobs/load-new-guix-revision.scm
@@ -38,6 +38,12 @@
   #:use-module (fibers timers)
   #:use-module (fibers channels)
   #:use-module (fibers operations)
+  #:use-module (knots)
+  #:use-module (knots queue)
+  #:use-module (knots promise)
+  #:use-module (knots parallelism)
+  #:use-module (knots resource-pool)
+  #:use-module (knots worker-threads)
   #:use-module (guix monads)
   #:use-module (guix base32)
   #:use-module (guix store)
@@ -1127,7 +1133,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE 
derivation_source_file_id = $1"
          (insert-derivations)))
 
     (unless (null? derivations)
-      (parallel-via-fibers
+      (fibers-parallel
        (insert-sources derivations
                        derivation-ids)
        (with-time-logging
@@ -1906,7 +1912,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE 
derivation_source_file_id = $1"
           (inferior-lint-checkers inferior)))))
 
     (when inferior-lint-checkers-data
-      (letpar& ((lint-checker-ids
+      (fibers-let ((lint-checker-ids
                  (with-resource-from-pool postgresql-connection-pool conn
                    (lint-checkers->lint-checker-ids
                     conn
@@ -2181,7 +2187,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE 
derivation_source_file_id = $1"
 
   (with-time-logging
       (simple-format #f "extract-information-from: ~A\n" store-item)
-    (parallel-via-fibers
+    (fibers-parallel
      (begin
        (fibers-force package-ids-promise)
        #f)
@@ -2267,7 +2273,7 @@ SELECT 1 FROM derivation_source_file_nars WHERE 
derivation_source_file_id = $1"
                                  extra-inferior-environment-variables)
   (define utility-thread-channel
     ;; There might be high demand for this, so order the requests
-    (make-queueing-channel
+    (spawn-queueing-fiber
      (call-with-default-io-waiters
       (lambda ()
         (make-worker-thread-channel
@@ -2791,6 +2797,12 @@ SKIP LOCKED")
 
        (exec-query conn "BEGIN")
 
+       ;; (spawn-fiber
+       ;;  (lambda ()
+       ;;    (while #t
+       ;;      (sleep (* 60 5))
+       ;;      (profile-heap))))
+
        (spawn-fiber
         (lambda ()
           (while (perform-operation
@@ -2864,7 +2876,7 @@ SKIP LOCKED")
                          id))))))
 
   (when result
-    (parallel-via-fibers
+    (fibers-parallel
      (with-postgresql-connection
       (simple-format #f "post load-new-guix-revision ~A counts" id)
       (lambda (conn)
diff --git a/guix-data-service/utils.scm b/guix-data-service/utils.scm
index b53f33f..a447a9c 100644
--- a/guix-data-service/utils.scm
+++ b/guix-data-service/utils.scm
@@ -38,43 +38,14 @@
   #:use-module (fibers timers)
   #:use-module (fibers conditions)
   #:use-module (fibers scheduler)
+  #:use-module (knots timeout)
   #:use-module (prometheus)
   #:export (call-with-time-logging
             with-time-logging
             prevent-inlining-for-tests
 
-            resource-pool-default-timeout
-            resource-pool-retry-checkout-timeout
-            %resource-pool-timeout-handler
-            resource-pool-timeout-error?
-            make-resource-pool
-            destroy-resource-pool
-            call-with-resource-from-pool
-            with-resource-from-pool
-            resource-pool-stats
-
-            call-with-default-io-waiters
-            make-worker-thread-channel
-            %worker-thread-default-timeout
-            call-with-worker-thread
-            worker-thread-timeout-error?
-
             fiberize
 
-            fibers-delay
-            fibers-force
-            fibers-promise-reset
-
-            fibers-batch-for-each
-            fibers-for-each
-            fibers-batch-map
-            fibers-map
-
-            parallel-via-fibers
-            par-map&
-            letpar&
-            fibers-map-with-progress
-
             chunk
             chunk!
             chunk-for-each!
@@ -84,7 +55,6 @@
             get-guix-metrics-updater
 
             call-with-sigint
-            run-server/patched
 
             spawn-port-monitoring-fiber
 
@@ -107,674 +77,6 @@
 (define-syntax-rule (prevent-inlining-for-tests var)
   (set! var var))
 
-(define-record-type <resource-pool>
-  (make-resource-pool-record name channel)
-  resource-pool?
-  (name    resource-pool-name)
-  (channel resource-pool-channel))
-
-(define* (make-resource-pool initializer max-size
-                             #:key (min-size max-size)
-                             (idle-seconds #f)
-                             (delay-logger (const #f))
-                             (duration-logger (const #f))
-                             destructor
-                             lifetime
-                             scheduler
-                             (name "unnamed"))
-  (define (initializer/safe)
-    (with-exception-handler
-        (lambda (exn)
-          (simple-format
-           (current-error-port)
-           "exception running ~A resource pool initializer: ~A:\n  ~A\n"
-           name
-           initializer
-           exn)
-          #f)
-      (lambda ()
-        (with-throw-handler #t
-          initializer
-          (lambda args
-            (backtrace))))
-      #:unwind? #t))
-
-  (define (destructor/safe args)
-    (let ((success?
-           (with-exception-handler
-               (lambda (exn)
-                 (simple-format
-                  (current-error-port)
-                  "exception running resource pool destructor (~A): ~A:\n  
~A\n"
-                  name
-                  destructor
-                  exn)
-                 #f)
-             (lambda ()
-               (with-throw-handler #t
-                 (lambda ()
-                   (destructor args)
-                   #t)
-                 (lambda _
-                   (backtrace))))
-             #:unwind? #t)))
-
-      (or success?
-          #t
-          (begin
-            (sleep 5)
-            (destructor/safe args)))))
-
-  (let ((channel (make-channel))
-        (checkout-failure-count 0))
-    (spawn-fiber
-     (lambda ()
-       (when idle-seconds
-         (spawn-fiber
-          (lambda ()
-            (while #t
-              (sleep idle-seconds)
-              (put-message channel '(check-for-idle-resources))))))
-
-       (while #t
-         (with-exception-handler
-             (lambda (exn)
-               (simple-format
-                (current-error-port)
-                "exception in the ~A pool fiber: ~A\n"
-                name
-                exn))
-           (lambda ()
-             (let loop ((resources '())
-                        (available '())
-                        (waiters '())
-                        (resources-last-used '()))
-
-               (match (get-message channel)
-                 (('checkout reply)
-                  (if (null? available)
-                      (if (= (length resources) max-size)
-                          (loop resources
-                                available
-                                (cons reply waiters)
-                                resources-last-used)
-                          (let ((new-resource (initializer/safe)))
-                            (if new-resource
-                                (let ((checkout-success?
-                                       (perform-operation
-                                        (choice-operation
-                                         (wrap-operation
-                                          (put-operation reply new-resource)
-                                          (const #t))
-                                         (wrap-operation (sleep-operation 1)
-                                                         (const #f))))))
-                                  (unless checkout-success?
-                                    (set! checkout-failure-count
-                                          (+ 1 checkout-failure-count)))
-
-                                  (loop (cons new-resource resources)
-                                        (if checkout-success?
-                                            available
-                                            (cons new-resource available))
-                                        waiters
-                                        (cons (get-internal-real-time)
-                                              resources-last-used)))
-                                (loop resources
-                                      available
-                                      (cons reply waiters)
-                                      resources-last-used))))
-                      (let ((checkout-success?
-                             (perform-operation
-                              (choice-operation
-                               (wrap-operation
-                                (put-operation reply (car available))
-                                (const #t))
-                               (wrap-operation (sleep-operation 1)
-                                               (const #f))))))
-                        (unless checkout-success?
-                          (set! checkout-failure-count
-                                (+ 1 checkout-failure-count)))
-
-                        (if checkout-success?
-                            (loop resources
-                                  (cdr available)
-                                  waiters
-                                  resources-last-used)
-                            (loop resources
-                                  available
-                                  waiters
-                                  resources-last-used)))))
-                 (('return resource)
-                  (if (null? waiters)
-                      (loop resources
-                            (cons resource available)
-                            waiters
-                            (begin
-                              (list-set!
-                               resources-last-used
-                               (list-index (lambda (x)
-                                             (eq? x resource))
-                                           resources)
-                               (get-internal-real-time))
-                              resources-last-used))
-                      (let ((checkout-success?
-                             (perform-operation
-                              (choice-operation
-                               (wrap-operation
-                                (put-operation (last waiters)
-                                               resource)
-                                (const #t))
-                               (wrap-operation (sleep-operation 1)
-                                               (const #f))))))
-                        (unless checkout-success?
-                          (set! checkout-failure-count
-                                (+ 1 checkout-failure-count)))
-
-                        (if checkout-success?
-                            (loop resources
-                                  available
-                                  (drop-right! waiters 1)
-                                  (begin
-                                    (list-set!
-                                     resources-last-used
-                                     (list-index (lambda (x)
-                                                   (eq? x resource))
-                                                 resources)
-                                     (get-internal-real-time))
-                                    resources-last-used))
-                            (begin
-                              (for-each
-                               (lambda (waiter)
-                                 (spawn-fiber
-                                  (lambda ()
-                                    (perform-operation
-                                     (choice-operation
-                                      (put-operation waiter 
'resource-pool-retry-checkout)
-                                      (sleep-operation 10))))))
-                               waiters)
-
-                              (loop resources
-                                    (cons resource available)
-                                    '()
-                                    (begin
-                                      (list-set!
-                                       resources-last-used
-                                       (list-index (lambda (x)
-                                                     (eq? x resource))
-                                                   resources)
-                                       (get-internal-real-time))
-                                      resources-last-used)))))))
-                 (('stats reply)
-                  (let ((stats
-                         `((resources              . ,(length resources))
-                           (available              . ,(length available))
-                           (waiters                . ,(length waiters))
-                           (checkout-failure-count . 
,checkout-failure-count))))
-
-                    (spawn-fiber
-                     (lambda ()
-                       (perform-operation
-                        (choice-operation
-                         (wrap-operation
-                          (put-operation reply stats)
-                          (const #t))
-                         (wrap-operation (sleep-operation 1)
-                                         (const #f)))))))
-
-                  (loop resources
-                        available
-                        waiters
-                        resources-last-used))
-                 (('check-for-idle-resources)
-                  (let* ((resources-last-used-seconds
-                          (map
-                           (lambda (internal-time)
-                             (/ (- (get-internal-real-time) internal-time)
-                                internal-time-units-per-second))
-                           resources-last-used))
-                         (resources-to-destroy
-                          (filter-map
-                           (lambda (resource last-used-seconds)
-                             (if (and (member resource available)
-                                      (> last-used-seconds idle-seconds))
-                                 resource
-                                 #f))
-                           resources
-                           resources-last-used-seconds)))
-
-                    (for-each
-                     (lambda (resource)
-                       (destructor/safe resource))
-                     resources-to-destroy)
-
-                    (loop (lset-difference eq? resources resources-to-destroy)
-                          (lset-difference eq? available resources-to-destroy)
-                          waiters
-                          (filter-map
-                           (lambda (resource last-used)
-                             (if (memq resource resources-to-destroy)
-                                 #f
-                                 last-used))
-                           resources
-                           resources-last-used))))
-                 (('destroy reply)
-                  (if (= (length resources) (length available))
-                      (begin
-                        (for-each
-                         (lambda (resource)
-                           (destructor/safe resource))
-                         resources)
-                        (put-message reply 'destroy-success))
-                      (begin
-                        (spawn-fiber
-                         (lambda ()
-                           (perform-operation
-                            (choice-operation
-                             (put-operation reply 
'resource-pool-destroy-failed)
-                             (sleep-operation 10)))))
-                        (loop resources
-                              available
-                              waiters
-                              resources-last-used))))
-                 (unknown
-                  (simple-format
-                   (current-error-port)
-                   "unrecognised message to ~A resource pool channel: ~A\n"
-                   name
-                   unknown)
-                  (loop resources
-                        available
-                        waiters
-                        resources-last-used)))))
-           #:unwind? #t)))
-     (or scheduler
-         (current-scheduler)))
-
-    (make-resource-pool-record name channel)))
-
-(define (destroy-resource-pool pool)
-  (let ((reply (make-channel)))
-    (put-message (resource-pool-channel pool)
-                 (list 'destroy reply))
-    (let ((msg (get-message reply)))
-      (unless (eq? msg 'destroy-success)
-        (error msg)))))
-
-(define resource-pool-default-timeout
-  (make-parameter #f))
-
-(define resource-pool-retry-checkout-timeout
-  (make-parameter 5))
-
-(define &resource-pool-timeout
-  (make-exception-type '&recource-pool-timeout
-                       &error
-                       '(name)))
-
-(define make-resource-pool-timeout-error
-  (record-constructor &resource-pool-timeout))
-
-(define resource-pool-timeout-error?
-  (record-predicate &resource-pool-timeout))
-
-(define %resource-pool-timeout-handler
-  (make-parameter #f))
-
-(define* (call-with-resource-from-pool pool proc #:key (timeout 'default)
-                                       (timeout-handler 
(%resource-pool-timeout-handler)))
-  "Call PROC with a resource from POOL, blocking until a resource becomes
-available.  Return the resource once PROC has returned."
-
-  (define retry-timeout
-    (resource-pool-retry-checkout-timeout))
-
-  (define timeout-or-default
-    (if (eq? timeout 'default)
-        (resource-pool-default-timeout)
-        timeout))
-
-  (let ((resource
-         (let ((reply (make-channel)))
-           (let loop ((start-time (get-internal-real-time)))
-             (let ((request-success?
-                    (perform-operation
-                     (choice-operation
-                      (wrap-operation
-                       (put-operation (resource-pool-channel pool)
-                                      `(checkout ,reply))
-                       (const #t))
-                      (wrap-operation (sleep-operation (or timeout-or-default
-                                                           retry-timeout))
-                                      (const #f))))))
-               (if request-success?
-                   (let ((time-remaining
-                          (- (or timeout-or-default
-                                 retry-timeout)
-                             (/ (- (get-internal-real-time)
-                                   start-time)
-                                internal-time-units-per-second))))
-                     (if (> time-remaining 0)
-                         (let ((response
-                                (perform-operation
-                                 (choice-operation
-                                  (get-operation reply)
-                                  (wrap-operation (sleep-operation 
time-remaining)
-                                                  (const #f))))))
-                           (if (or (not response)
-                                   (eq? response 
'resource-pool-retry-checkout))
-                               (if (> (- (or timeout-or-default
-                                             retry-timeout)
-                                         (/ (- (get-internal-real-time)
-                                               start-time)
-                                            internal-time-units-per-second))
-                                      0)
-                                   (loop start-time)
-                                   (if (eq? timeout-or-default #f)
-                                       (loop (get-internal-real-time))
-                                       #f))
-                               response))
-                         (if (eq? timeout-or-default #f)
-                             (loop (get-internal-real-time))
-                             #f)))
-                   (if (eq? timeout-or-default #f)
-                       (loop (get-internal-real-time))
-                       #f)))))))
-
-    (when (or (not resource)
-              (eq? resource 'resource-pool-retry-checkout))
-      (when timeout-handler
-        (timeout-handler pool proc timeout))
-
-      (raise-exception
-       (make-resource-pool-timeout-error (resource-pool-name pool))))
-
-    (with-exception-handler
-        (lambda (exception)
-          (put-message (resource-pool-channel pool)
-                       `(return ,resource))
-          (raise-exception exception))
-      (lambda ()
-        (call-with-values
-            (lambda ()
-              (with-throw-handler #t
-                (lambda ()
-                  (proc resource))
-                (lambda _
-                  (backtrace))))
-          (lambda vals
-            (put-message (resource-pool-channel pool)
-                         `(return ,resource))
-            (apply values vals))))
-      #:unwind? #t)))
-
-(define-syntax-rule (with-resource-from-pool pool resource exp ...)
-  (call-with-resource-from-pool
-      pool
-    (lambda (resource) exp ...)))
-
-(define* (resource-pool-stats pool #:key (timeout 5))
-  (let ((reply (make-channel))
-        (start-time (get-internal-real-time)))
-    (perform-operation
-     (choice-operation
-      (wrap-operation
-       (put-operation (resource-pool-channel pool)
-                      `(stats ,reply))
-       (const #t))
-      (wrap-operation (sleep-operation timeout)
-                      (lambda _
-                        (raise-exception
-                         (make-resource-pool-timeout-error))))))
-
-    (let ((time-remaining
-           (- timeout
-              (/ (- (get-internal-real-time)
-                    start-time)
-                 internal-time-units-per-second))))
-      (if (> time-remaining 0)
-          (perform-operation
-           (choice-operation
-            (get-operation reply)
-            (wrap-operation (sleep-operation time-remaining)
-                            (lambda _
-                              (raise-exception
-                               (make-resource-pool-timeout-error))))))
-          (raise-exception
-           (make-resource-pool-timeout-error))))))
-
-(define (call-with-default-io-waiters thunk)
-  (parameterize
-      ((current-read-waiter (@@ (ice-9 suspendable-ports)
-                                default-read-waiter))
-       (current-write-waiter (@@ (ice-9 suspendable-ports)
-                                 default-write-waiter)))
-    (thunk)))
-
-(define %worker-thread-args
-  (make-parameter #f))
-
-(define* (make-worker-thread-channel initializer
-                                     #:key (parallelism 1)
-                                     (delay-logger (lambda _ #f))
-                                     (duration-logger (const #f))
-                                     destructor
-                                     lifetime
-                                     (log-exception? (const #t))
-                                     (expire-on-exception? #f)
-                                     (name "unnamed"))
-  "Return a channel used to offload work to a dedicated thread.  ARGS are the
-arguments of the worker thread procedure."
-  (define thread-proc-vector
-    (make-vector parallelism #f))
-
-  (define (initializer/safe)
-    (let ((args
-           (with-exception-handler
-               (lambda (exn)
-                 (simple-format
-                  (current-error-port)
-                  "exception running initializer in worker thread (~A): ~A:\n  
~A\n"
-                  name
-                  initializer
-                  exn)
-                 #f)
-             (lambda ()
-               (with-throw-handler #t
-                 initializer
-                 (lambda args
-                   (backtrace))))
-             #:unwind? #t)))
-
-      (if args
-          args
-          ;; never give up, just keep retrying
-          (begin
-            (sleep 1)
-            (initializer/safe)))))
-
-  (define (destructor/safe args)
-    (let ((success?
-           (with-exception-handler
-               (lambda (exn)
-                 (simple-format
-                  (current-error-port)
-                  "exception running destructor in worker thread (~A): ~A:\n  
~A\n"
-                  name
-                  destructor
-                  exn)
-                 #f)
-             (lambda ()
-               (with-throw-handler #t
-                 (lambda ()
-                   (apply destructor args)
-                   #t)
-                 (lambda _
-                   (backtrace))))
-             #:unwind? #t)))
-
-      (or success?
-          #t
-          (begin
-            (sleep 1)
-            (destructor/safe args)))))
-
-  (define (process thread-index channel args)
-    (let loop ((current-lifetime lifetime))
-      (let ((exception?
-             (match (get-message channel)
-               (((? channel? reply) sent-time (? procedure? proc))
-                (let ((time-delay
-                       (- (get-internal-real-time)
-                          sent-time)))
-                  (delay-logger (/ time-delay
-                                   internal-time-units-per-second))
-
-                  (let* ((start-time (get-internal-real-time))
-                         (response
-                          (with-exception-handler
-                              (lambda (exn)
-                                (list 'worker-thread-error
-                                      (/ (- (get-internal-real-time)
-                                            start-time)
-                                         internal-time-units-per-second)
-                                      exn))
-                            (lambda ()
-                              (vector-set! thread-proc-vector
-                                           thread-index
-                                           proc)
-                              (with-throw-handler #t
-                                (lambda ()
-                                  (call-with-values
-                                      (lambda ()
-                                        (start-stack
-                                         'worker-thread
-                                         (apply proc args)))
-                                    (lambda vals
-                                      (cons (/ (- (get-internal-real-time)
-                                                  start-time)
-                                               internal-time-units-per-second)
-                                            vals))))
-                                (lambda args
-                                  (when (match args
-                                          (('%exception exn)
-                                           (log-exception? exn))
-                                          (_ #t))
-                                    (simple-format
-                                     (current-error-port)
-                                     "worker-thread: exception: ~A\n" args)
-                                    (backtrace)))))
-                            #:unwind? #t)))
-                    (put-message reply
-                                 response)
-
-                    (vector-set! thread-proc-vector
-                                 thread-index
-                                 #f)
-
-                    (match response
-                      (('worker-thread-error duration _)
-                       (when duration-logger
-                         (duration-logger duration proc))
-                       #t)
-                      ((duration . _)
-                       (when duration-logger
-                         (duration-logger duration proc))
-                       #f))))))))
-        (unless (and expire-on-exception?
-                     exception?)
-          (if (number? current-lifetime)
-              (unless (< current-lifetime 0)
-                (loop (if current-lifetime
-                          (- current-lifetime 1)
-                          #f)))
-              (loop #f))))))
-
-  (let ((channel (make-channel)))
-    (for-each
-     (lambda (thread-index)
-       (call-with-new-thread
-        (lambda ()
-          (catch 'system-error
-            (lambda ()
-              (set-thread-name
-               (string-append
-                name " w t "
-                (number->string thread-index))))
-            (const #t))
-
-          (let init ((args (initializer/safe)))
-            (with-exception-handler
-                (lambda (exn)
-                  (simple-format
-                   (current-error-port)
-                   "worker-thread-channel: exception: ~A\n" exn))
-              (lambda ()
-                (parameterize ((%worker-thread-args args))
-                  (process thread-index channel args)))
-              #:unwind? #t)
-
-            (when destructor
-              (destructor/safe args))
-
-            (init (initializer/safe))))))
-     (iota parallelism))
-
-    (values channel
-            thread-proc-vector)))
-
-(define &worker-thread-timeout
-  (make-exception-type '&worker-thread-timeout
-                       &error
-                       '()))
-
-(define make-worker-thread-timeout-error
-  (record-constructor &worker-thread-timeout))
-
-(define worker-thread-timeout-error?
-  (record-predicate &worker-thread-timeout))
-
-(define %worker-thread-default-timeout
-  (make-parameter 30))
-
-(define* (call-with-worker-thread channel proc #:key duration-logger
-                                  (timeout (%worker-thread-default-timeout)))
-  "Send PROC to the worker thread through CHANNEL.  Return the result of PROC.
-If already in the worker thread, call PROC immediately."
-  (let ((args (%worker-thread-args)))
-    (if args
-        (apply proc args)
-        (let* ((reply (make-channel))
-               (operation-success?
-                (perform-operation
-                 (let ((put
-                        (wrap-operation
-                         (put-operation channel
-                                        (list reply
-                                              (get-internal-real-time)
-                                              proc))
-                         (const #t))))
-
-                   (if timeout
-                       (choice-operation
-                        put
-                        (wrap-operation (sleep-operation timeout)
-                                        (const #f)))
-                       put)))))
-
-          (unless operation-success?
-            (raise-exception
-             (make-worker-thread-timeout-error)))
-
-          (match (get-message reply)
-            (('worker-thread-error duration exn)
-             (when duration-logger
-               (duration-logger duration))
-             (raise-exception exn))
-            ((duration . result)
-             (when duration-logger
-               (duration-logger duration))
-             (apply values result)))))))
-
 (define* (fiberize proc #:key (parallelism 1))
   (let ((channel (make-channel)))
     (for-each
@@ -810,235 +112,6 @@ If already in the worker thread, call PROC immediately."
           (('result . vals) (apply values vals))
           (('exception . exn) (raise-exception exn)))))))
 
-(define-record-type <fibers-promise>
-  (make-fibers-promise thunk values-box evaluated-condition)
-  fibers-promise?
-  (thunk                fibers-promise-thunk)
-  (values-box           fibers-promise-values-box)
-  (evaluated-condition  fibers-promise-evaluated-condition))
-
-(define (fibers-delay thunk)
-  (make-fibers-promise
-   thunk
-   (make-atomic-box #f)
-   (make-condition)))
-
-(define (fibers-force fp)
-  (let ((res (atomic-box-compare-and-swap!
-              (fibers-promise-values-box fp)
-              #f
-              'started)))
-    (if (eq? #f res)
-        (call-with-values
-            (lambda ()
-              (with-exception-handler
-                  (lambda (exn)
-                    (atomic-box-set! (fibers-promise-values-box fp)
-                                     exn)
-                    (signal-condition!
-                     (fibers-promise-evaluated-condition fp))
-                    (raise-exception exn))
-                (fibers-promise-thunk fp)
-                #:unwind? #t))
-          (lambda vals
-            (atomic-box-set! (fibers-promise-values-box fp)
-                             vals)
-            (signal-condition!
-             (fibers-promise-evaluated-condition fp))
-            (apply values vals)))
-        (if (eq? res 'started)
-            (begin
-              (wait (fibers-promise-evaluated-condition fp))
-              (let ((result (atomic-box-ref (fibers-promise-values-box fp))))
-                (if (exception? result)
-                    (raise-exception result)
-                    (apply values result))))
-            (if (exception? res)
-                (raise-exception res)
-                (apply values res))))))
-
-(define (fibers-promise-reset fp)
-  (atomic-box-set! (fibers-promise-values-box fp)
-                   #f))
-
-;; Like split-at, but don't care about the order of the resulting lists, and
-;; don't error if the list is shorter than i elements
-(define (split-at* lst i)
-  (let lp ((l lst) (n i) (acc '()))
-    (if (or (<= n 0) (null? l))
-        (values (reverse! acc) l)
-        (lp (cdr l) (- n 1) (cons (car l) acc)))))
-
-;; As this can be called with lists with tens of thousands of items in them,
-;; batch the
-(define (get-batch batch-size lists)
-  (let ((split-lists
-         (map (lambda (lst)
-                (let ((batch rest (split-at* lst batch-size)))
-                  (cons batch rest)))
-              lists)))
-    (values (map car split-lists)
-            (map cdr split-lists))))
-
-(define (defer-to-parallel-fiber thunk)
-  (let ((reply (make-channel)))
-    (spawn-fiber
-     (lambda ()
-       (with-exception-handler
-           (lambda (exn)
-             (put-message reply (cons 'exception exn)))
-         (lambda ()
-           (call-with-values
-               (lambda ()
-                 (with-throw-handler #t
-                   thunk
-                   (lambda _
-                     (backtrace))))
-             (lambda vals
-               (put-message reply vals))))
-         #:unwind? #t))
-     #:parallel? #t)
-    reply))
-
-(define (fetch-result-of-defered-thunks . reply-channels)
-  (let ((responses (map get-message
-                        reply-channels)))
-    (map
-     (match-lambda
-       (('exception . exn)
-        (raise-exception exn))
-       (result
-        (apply values result)))
-     responses)))
-
-(define (fibers-batch-map proc batch-size . lists)
-  (let loop ((lists lists)
-             (result '()))
-    (let ((batch
-           rest
-           (get-batch batch-size lists)))
-      (if (any null? batch)
-          result
-          (let ((response-channels
-                 (apply map
-                        (lambda args
-                          (defer-to-parallel-fiber
-                            (lambda ()
-                              (apply proc args))))
-                        batch)))
-            (loop rest
-                  (append! result
-                           (apply fetch-result-of-defered-thunks
-                                  response-channels))))))))
-
-(define (fibers-map proc . lists)
-  (apply fibers-batch-map proc 20 lists))
-
-(define (fibers-batch-for-each proc batch-size . lists)
-  (let loop ((lists lists))
-    (let ((batch
-           rest
-           (get-batch batch-size lists)))
-      (if (any null? batch)
-          *unspecified*
-          (let ((response-channels
-                 (apply map
-                        (lambda args
-                          (defer-to-parallel-fiber
-                            (lambda ()
-                              (apply proc args))))
-                        batch)))
-            (apply fetch-result-of-defered-thunks
-                   response-channels)
-            (loop rest))))))
-
-(define (fibers-for-each proc . lists)
-  (apply fibers-batch-for-each proc 20 lists))
-
-(define-syntax parallel-via-fibers
-  (lambda (x)
-    (syntax-case x ()
-      ((_ e0 ...)
-       (with-syntax (((tmp0 ...) (generate-temporaries (syntax (e0 ...)))))
-         #'(let ((tmp0 (defer-to-parallel-fiber
-                         (lambda ()
-                           e0)))
-                 ...)
-             (apply values (fetch-result-of-defered-thunks tmp0 ...))))))))
-
-(define-syntax-rule (letpar& ((v e) ...) b0 b1 ...)
-  (call-with-values
-      (lambda () (parallel-via-fibers e ...))
-    (lambda (v ...)
-      b0 b1 ...)))
-
-(define (par-mapper' mapper cons)
-  (lambda (proc . lists)
-    (apply
-     fetch-result-of-defered-thunks
-     (let loop ((lists lists))
-       (match lists
-         (((heads tails ...) ...)
-          (let ((tail (loop tails))
-                (head (defer-to-parallel-fiber
-                        (lambda ()
-                          (apply proc heads)))))
-            (cons head tail)))
-         (_
-          '()))))))
-
-(define par-map& (par-mapper' map cons))
-
-(define* (fibers-map-with-progress proc lists #:key report)
-  (let loop ((channels-to-results
-              (apply map
-                     (lambda args
-                       (cons (defer-to-parallel-fiber
-                               (lambda ()
-                                 (apply proc args)))
-                             #f))
-                     lists)))
-    (let ((active-channels
-           (filter-map car channels-to-results)))
-      (when report
-        (report (apply map
-                       list
-                       (map cdr channels-to-results)
-                       lists)))
-      (if (null? active-channels)
-          (map
-           (match-lambda
-             ((#f . ('exception . exn))
-              (raise-exception exn))
-             ((#f . ('result . val))
-              val))
-           channels-to-results)
-          (loop
-           (perform-operation
-            (apply
-             choice-operation
-             (filter-map
-              (lambda (p)
-                (match p
-                  ((channel . _)
-                   (if channel
-                       (wrap-operation
-                        (get-operation channel)
-                        (lambda (result)
-                          (map (match-lambda
-                                 ((c . r)
-                                  (if (eq? channel c)
-                                      (cons #f
-                                            (match result
-                                              (('exception . exn)
-                                               result)
-                                              (_
-                                               (cons 'result result))))
-                                      (cons c r))))
-                               channels-to-results)))
-                       #f))))
-              channels-to-results))))))))
-
 (define (chunk lst max-length)
   (if (> (length lst)
          max-length)
@@ -1126,173 +199,6 @@ If already in the worker thread, call PROC immediately."
                           0)))
         #:unwind? #t))))
 
-;; This variant of run-server from the fibers library supports running
-;; multiple servers within one process.
-(define run-server/patched
-  (let ((fibers-web-server-module
-         (resolve-module '(fibers web server))))
-
-    (define set-nonblocking!
-      (module-ref fibers-web-server-module 'set-nonblocking!))
-
-    (define make-default-socket
-      (module-ref fibers-web-server-module 'make-default-socket))
-
-    (define socket-loop
-      (module-ref fibers-web-server-module 'socket-loop))
-
-    (lambda* (handler
-              #:key
-              (host #f)
-              (family AF_INET)
-              (addr (if host
-                        (inet-pton family host)
-                        INADDR_LOOPBACK))
-              (port 8080)
-              (socket (make-default-socket family addr port)))
-      ;; We use a large backlog by default.  If the server is suddenly hit
-      ;; with a number of connections on a small backlog, clients won't
-      ;; receive confirmation for their SYN, leading them to retry --
-      ;; probably successfully, but with a large latency.
-      (listen socket 1024)
-      (set-nonblocking! socket)
-      (sigaction SIGPIPE SIG_IGN)
-      (spawn-fiber (lambda () (socket-loop socket handler))))))
-
-(define &port-timeout
-  (make-exception-type '&port-timeout
-                       &external-error
-                       '(port)))
-
-(define make-port-timeout-error
-  (record-constructor &port-timeout))
-
-(define port-timeout-error?
-  (record-predicate &port-timeout))
-
-(define &port-read-timeout
-  (make-exception-type '&port-read-timeout
-                       &port-timeout
-                       '()))
-
-(define make-port-read-timeout-error
-  (record-constructor &port-read-timeout))
-
-(define port-read-timeout-error?
-  (record-predicate &port-read-timeout))
-
-(define &port-write-timeout
-  (make-exception-type '&port-write-timeout
-                       &port-timeout
-                       '()))
-
-(define make-port-write-timeout-error
-  (record-constructor &port-write-timeout))
-
-(define port-write-timeout-error?
-  (record-predicate &port-write-timeout))
-
-;; These procedure are subject to spurious wakeups.
-
-(define (readable? port)
-  "Test if PORT is writable."
-  (match (select (vector port) #() #() 0)
-    ((#() #() #()) #f)
-    ((#(_) #() #()) #t)))
-
-(define (writable? port)
-  "Test if PORT is writable."
-  (match (select #() (vector port) #() 0)
-    ((#() #() #()) #f)
-    ((#() #(_) #()) #t)))
-
-(define (make-wait-operation ready? schedule-when-ready port
-                             port-ready-fd this-procedure)
-  (make-base-operation
-   #f
-   (lambda _
-     (and (ready? (port-ready-fd port)) values))
-   (lambda (flag sched resume)
-     (define (commit)
-       (match (atomic-box-compare-and-swap! flag 'W 'S)
-         ('W (resume values))
-         ('C (commit))
-         ('S #f)))
-     (schedule-when-ready
-      sched (port-ready-fd port) commit))))
-
-(define (wait-until-port-readable-operation port)
-  "Make an operation that will succeed when PORT is readable."
-  (unless (input-port? port)
-    (error "refusing to wait forever for input on non-input port"))
-  (make-wait-operation readable? schedule-task-when-fd-readable port
-                       port-read-wait-fd
-                       wait-until-port-readable-operation))
-
-(define (wait-until-port-writable-operation port)
-  "Make an operation that will succeed when PORT is writable."
-  (unless (output-port? port)
-    (error "refusing to wait forever for output on non-output port"))
-  (make-wait-operation writable? schedule-task-when-fd-writable port
-                       port-write-wait-fd
-                       wait-until-port-writable-operation))
-
-(define* (with-fibers-port-timeouts thunk
-                                    #:key timeout
-                                    (read-timeout timeout)
-                                    (write-timeout timeout))
-  (define (no-fibers-wait port mode timeout)
-    (define poll-timeout-ms 200)
-
-    ;; When the GC runs, it restarts the poll syscall, but the timeout
-    ;; remains unchanged! When the timeout is longer than the time
-    ;; between the syscall restarting, I think this renders the
-    ;; timeout useless. Therefore, this code uses a short timeout, and
-    ;; repeatedly calls poll while watching the clock to see if it has
-    ;; timed out overall.
-    (let ((timeout-internal
-           (+ (get-internal-real-time)
-              (* internal-time-units-per-second
-                 (/ timeout 1000)))))
-      (let loop ((poll-value
-                  (port-poll port mode poll-timeout-ms)))
-        (if (= poll-value 0)
-            (if (> (get-internal-real-time)
-                   timeout-internal)
-                (raise-exception
-                 (if (string=? mode "r")
-                     (make-port-read-timeout-error port)
-                     (make-port-write-timeout-error port)))
-                (loop (port-poll port mode poll-timeout-ms)))
-            poll-value))))
-
-  (parameterize
-      ((current-read-waiter
-        (lambda (port)
-          (if (current-scheduler)
-              (perform-operation
-               (choice-operation
-                (wait-until-port-readable-operation port)
-                (wrap-operation
-                 (sleep-operation read-timeout)
-                 (lambda ()
-                   (raise-exception
-                    (make-port-read-timeout-error thunk port))))))
-              (no-fibers-wait port "r" read-timeout))))
-       (current-write-waiter
-        (lambda (port)
-          (if (current-scheduler)
-              (perform-operation
-               (choice-operation
-                (wait-until-port-writable-operation port)
-                (wrap-operation
-                 (sleep-operation write-timeout)
-                 (lambda ()
-                   (raise-exception
-                    (make-port-write-timeout-error thunk port))))))
-              (no-fibers-wait port "w" write-timeout)))))
-    (thunk)))
-
 (define (spawn-port-monitoring-fiber port error-condition)
   (spawn-fiber
    (lambda ()
@@ -1305,7 +211,7 @@ If already in the worker thread, call PROC immediately."
                             port exn)
              (signal-condition! error-condition))
          (lambda ()
-           (with-fibers-port-timeouts
+           (with-port-timeouts
             (lambda ()
               (let ((sock (socket PF_INET SOCK_STREAM 0)))
                 (connect sock AF_INET INADDR_LOOPBACK port)
@@ -1327,25 +233,3 @@ If already in the worker thread, call PROC immediately."
             (sigaction SIGINT (car handler) (cdr handler))
             ;; restore original C handler.
             (sigaction SIGINT #f))))))
-
-(define (make-queueing-channel channel)
-  (define queue (make-q))
-
-  (let ((queue-channel (make-channel)))
-    (spawn-fiber
-     (lambda ()
-       (while #t
-         (if (q-empty? queue)
-             (enq! queue
-                   (perform-operation
-                    (get-operation queue-channel)))
-             (let ((front (q-front queue)))
-               (perform-operation
-                (choice-operation
-                 (wrap-operation (get-operation queue-channel)
-                                 (lambda (val)
-                                   (enq! queue val)))
-                 (wrap-operation (put-operation channel front)
-                                 (lambda _
-                                   (q-pop! queue))))))))))
-    queue-channel))
diff --git a/guix-data-service/web/build-server/controller.scm 
b/guix-data-service/web/build-server/controller.scm
index 22088b1..7d2bd24 100644
--- a/guix-data-service/web/build-server/controller.scm
+++ b/guix-data-service/web/build-server/controller.scm
@@ -22,6 +22,7 @@
   #:use-module (json)
   #:use-module (squee)
   #:use-module (fibers)
+  #:use-module (knots resource-pool)
   #:use-module (prometheus)
   #:use-module (guix-data-service utils)
   #:use-module (guix-data-service database)
diff --git a/guix-data-service/web/build/controller.scm 
b/guix-data-service/web/build/controller.scm
index bf77e03..7924dbb 100644
--- a/guix-data-service/web/build/controller.scm
+++ b/guix-data-service/web/build/controller.scm
@@ -18,6 +18,8 @@
 (define-module (guix-data-service web build controller)
   #:use-module (srfi srfi-1)
   #:use-module (ice-9 match)
+  #:use-module (knots parallelism)
+  #:use-module (knots resource-pool)
   #:use-module (guix-data-service utils)
   #:use-module (guix-data-service database)
   #:use-module (guix-data-service web render)
@@ -41,7 +43,7 @@
 
 (define parse-build-server
   (lambda (v)
-    (letpar& ((build-servers
+    (fibers-let ((build-servers
                (call-with-resource-from-pool (connection-pool)
                  select-build-servers)))
       (or (any (match-lambda
@@ -88,7 +90,7 @@
                              '()))
         (let ((system (assq-ref parsed-query-parameters 'system))
               (target (assq-ref parsed-query-parameters 'target)))
-          (letpar& ((build-server-options
+          (fibers-let ((build-server-options
                      (with-resource-from-pool (connection-pool) conn
                        (map (match-lambda
                               ((id url lookup-all-derivations
diff --git a/guix-data-service/web/compare/controller.scm 
b/guix-data-service/web/compare/controller.scm
index e1fab78..dbb4975 100644
--- a/guix-data-service/web/compare/controller.scm
+++ b/guix-data-service/web/compare/controller.scm
@@ -24,6 +24,8 @@
   #:use-module (texinfo)
   #:use-module (texinfo html)
   #:use-module (texinfo plain-text)
+  #:use-module (knots parallelism)
+  #:use-module (knots resource-pool)
   #:use-module (guix-data-service utils)
   #:use-module (guix-data-service database)
   #:use-module (guix-data-service web sxml)
@@ -229,7 +231,7 @@
 (define (render-compare mime-types
                         query-parameters)
   (if (any-invalid-query-parameters? query-parameters)
-      (letpar& ((base-job
+      (fibers-let ((base-job
                  (match (assq-ref query-parameters 'base_commit)
                    (($ <invalid-query-parameter> value)
                     (with-resource-from-pool (connection-pool) conn
@@ -275,7 +277,7 @@
                             #f
                             #f
                             #f)))))
-      (letpar& ((base-revision-id
+      (fibers-let ((base-revision-id
                  (with-resource-from-pool (connection-pool) conn
                    (commit->revision-id
                     conn
@@ -303,7 +305,7 @@
                 (version-changes
                  (package-data-version-changes base-packages-vhash
                                                target-packages-vhash)))
-            (letpar& ((lint-warnings-data
+            (fibers-let ((lint-warnings-data
                        (with-resource-from-pool (connection-pool) conn
                          (group-list-by-first-n-fields
                           2
@@ -396,7 +398,7 @@
                           lint-warnings-data))))
                   #:extra-headers http-headers-for-unchanging-content))
                 (else
-                 (letpar& ((lint-warnings-locale-options
+                 (fibers-let ((lint-warnings-locale-options
                             (map
                              (match-lambda
                                ((locale)
@@ -449,7 +451,7 @@
             (target-branch   (assq-ref query-parameters 'target_branch))
             (target-datetime (assq-ref query-parameters 'target_datetime))
             (locale          (assq-ref query-parameters 'locale)))
-        (letpar& ((base-revision-details
+        (fibers-let ((base-revision-details
                    (with-resource-from-pool (connection-pool) conn
                      (select-guix-revision-for-branch-and-datetime
                       conn
@@ -624,7 +626,7 @@
              '(application/json text/html)
              mime-types)
         ((application/json)
-         (letpar& ((base-job
+         (fibers-let ((base-job
                     (and=> (match (assq-ref query-parameters 'base_commit)
                              (($ <invalid-query-parameter> value)
                               (and (string? value) value))
@@ -663,7 +665,7 @@
               (base_job   . ,base-job)
               (target_job . ,target-job)))))
         (else
-         (letpar& ((systems
+         (fibers-let ((systems
                     (call-with-resource-from-pool (connection-pool)
                       list-systems))
                    (targets
@@ -695,7 +697,7 @@
             (limit-results  (assq-ref query-parameters 'limit_results)))
         (let ((data
                (concatenate!
-                (par-map&
+                (fibers-map
                  (lambda (system)
                    (with-resource-from-pool (connection-pool) conn
                      (package-derivation-differences-data
@@ -734,7 +736,7 @@
                        . ,derivation-changes))
                     #:stream? #t))
                   (else
-                   (letpar& ((systems
+                   (fibers-let ((systems
                               (call-with-resource-from-pool (connection-pool)
                                 list-systems))
                              (targets
@@ -788,7 +790,7 @@
                               string->symbol))
             (after-name     (assq-ref query-parameters 'after_name))
             (limit-results  (assq-ref query-parameters 'limit_results)))
-        (letpar&
+        (fibers-let
             ((base-revision-details
               (with-resource-from-pool (connection-pool) conn
                 (select-guix-revision-for-branch-and-datetime conn
@@ -800,7 +802,7 @@
                                                               target-branch
                                                               
target-datetime))))
           (let ((data
-                 (par-map&
+                 (fibers-map
                   (lambda (system)
                     (with-resource-from-pool (connection-pool) conn
                       (package-derivation-differences-data
@@ -875,7 +877,7 @@
          (render-json
           '((error . "invalid query"))))
         (else
-         (letpar& ((base-job
+         (fibers-let ((base-job
                     (match (assq-ref query-parameters 'base_commit)
                       (($ <invalid-query-parameter> value)
                        (with-resource-from-pool (connection-pool) conn
@@ -895,7 +897,7 @@
 
       (let ((base-commit    (assq-ref query-parameters 'base_commit))
             (target-commit  (assq-ref query-parameters 'target_commit)))
-        (letpar& ((base-revision-id
+        (fibers-let ((base-revision-id
                    (with-resource-from-pool (connection-pool) conn
                      (commit->revision-id
                       conn
@@ -944,7 +946,7 @@
          (render-json
           '((error . "invalid query"))))
         (else
-         (letpar& ((systems
+         (fibers-let ((systems
                     (with-resource-from-pool (connection-pool) conn
                      list-systems))
                    (build-server-urls
@@ -963,7 +965,7 @@
       (let ((base-commit    (assq-ref query-parameters 'base_commit))
             (target-commit  (assq-ref query-parameters 'target_commit))
             (system         (assq-ref query-parameters 'system)))
-        (letpar& ((data
+        (fibers-let ((data
                    (with-resource-from-pool (connection-pool) conn
                      (system-test-derivations-differences-data
                       conn
@@ -1014,7 +1016,7 @@
          (render-json
           '((error . "invalid query"))))
         (else
-         (letpar& ((systems
+         (fibers-let ((systems
                     (with-resource-from-pool (connection-pool) conn
                      list-systems))
                    (build-server-urls
@@ -1035,7 +1037,7 @@
             (target-branch   (assq-ref query-parameters 'target_branch))
             (target-datetime (assq-ref query-parameters 'target_datetime))
             (system         (assq-ref query-parameters 'system)))
-        (letpar&
+        (fibers-let
             ((base-revision-details
               (with-resource-from-pool (connection-pool) conn
                 (select-guix-revision-for-branch-and-datetime conn
@@ -1046,7 +1048,7 @@
                 (select-guix-revision-for-branch-and-datetime conn
                                                               target-branch
                                                               
target-datetime))))
-          (letpar& ((data
+          (fibers-let ((data
                      (with-resource-from-pool (connection-pool) conn
                        (system-test-derivations-differences-data
                         conn
diff --git a/guix-data-service/web/controller.scm 
b/guix-data-service/web/controller.scm
index d23c2f3..cdf2318 100644
--- a/guix-data-service/web/controller.scm
+++ b/guix-data-service/web/controller.scm
@@ -35,6 +35,8 @@
   #:use-module (texinfo html)
   #:use-module (squee)
   #:use-module (json)
+  #:use-module (knots parallelism)
+  #:use-module (knots resource-pool)
   #:use-module (prometheus)
   #:use-module (guix-data-service utils)
   #:use-module (guix-data-service config)
@@ -234,7 +236,7 @@
        #:always-rollback? #t))
 
     (lambda ()
-      (letpar& ((metric-values
+      (fibers-let ((metric-values
                  (with-exception-handler
                      (lambda (exn)
                        (simple-format
@@ -456,12 +458,12 @@
                   (write-metrics registry port))))))))
 
 (define (render-derivation derivation-file-name)
-  (letpar& ((derivation
+  (fibers-let ((derivation
              (with-resource-from-pool (connection-pool) conn
                (select-derivation-by-file-name conn derivation-file-name))))
 
     (if derivation
-        (letpar& ((derivation-inputs
+        (fibers-let ((derivation-inputs
                    (with-resource-from-pool (connection-pool) conn
                      (select-derivation-inputs-by-derivation-id
                       conn
@@ -495,7 +497,7 @@
            (select-derivation-by-file-name conn
                                            derivation-file-name))))
     (if derivation
-        (letpar& ((derivation-inputs
+        (fibers-let ((derivation-inputs
                    (with-resource-from-pool (connection-pool) conn
                      (select-derivation-inputs-by-derivation-id
                       conn
@@ -551,7 +553,7 @@
            (select-derivation-by-file-name conn
                                            derivation-file-name))))
     (if derivation
-        (letpar& ((derivation-inputs
+        (fibers-let ((derivation-inputs
                    (with-resource-from-pool (connection-pool) conn
                      (select-derivation-inputs-by-derivation-id
                       conn
@@ -596,7 +598,7 @@
          #:sxml (view-narinfos narinfos)))))
 
 (define (render-store-item filename)
-  (letpar& ((derivation
+  (fibers-let ((derivation
              (with-resource-from-pool (connection-pool) conn
                (select-derivation-by-output-filename conn filename))))
     (match derivation
@@ -619,7 +621,7 @@
                       filename)))
            #:extra-headers http-headers-for-unchanging-content))))
       (derivations
-       (letpar& ((nars
+       (fibers-let ((nars
                   (with-resource-from-pool (connection-pool) conn
                     (select-nars-for-output conn filename)))
                  (builds
@@ -656,7 +658,7 @@
                       conn
                       filename))))))))))
       (derivations
-       (letpar& ((nars
+       (fibers-let ((nars
                   (with-resource-from-pool (connection-pool) conn
                     (select-nars-for-output conn filename))))
          (render-json
diff --git a/guix-data-service/web/jobs/controller.scm 
b/guix-data-service/web/jobs/controller.scm
index 7e5084f..96621f9 100644
--- a/guix-data-service/web/jobs/controller.scm
+++ b/guix-data-service/web/jobs/controller.scm
@@ -17,6 +17,8 @@
 
 (define-module (guix-data-service web jobs controller)
   #:use-module (ice-9 match)
+  #:use-module (knots parallelism)
+  #:use-module (knots resource-pool)
   #:use-module (guix-data-service utils)
   #:use-module (guix-data-service database)
   #:use-module (guix-data-service web render)
@@ -74,7 +76,7 @@
 (define (render-jobs mime-types query-parameters)
   (define limit-results (assq-ref query-parameters 'limit_results))
 
-  (letpar& ((jobs
+  (fibers-let ((jobs
              (with-resource-from-pool (connection-pool) conn
                (select-jobs-and-events
                 conn
diff --git a/guix-data-service/web/nar/controller.scm 
b/guix-data-service/web/nar/controller.scm
index e2ace7a..f7edac6 100644
--- a/guix-data-service/web/nar/controller.scm
+++ b/guix-data-service/web/nar/controller.scm
@@ -27,6 +27,8 @@
   #:use-module (web uri)
   #:use-module (web request)
   #:use-module (web response)
+  #:use-module (knots parallelism)
+  #:use-module (knots resource-pool)
   #:use-module (guix pki)
   #:use-module (guix base32)
   #:use-module (guix base64)
@@ -155,7 +157,7 @@
                    #:code 200
                    #:headers '((content-type . (application/x-narinfo))))
                   (let ((derivation-file-name (second derivation)))
-                    (letpar&
+                    (fibers-let
                         ((derivation-text
                           (with-resource-from-pool (reserved-connection-pool) 
conn
                             (select-serialized-derivation-by-file-name
diff --git a/guix-data-service/web/package/controller.scm 
b/guix-data-service/web/package/controller.scm
index 8dc6b0f..792394c 100644
--- a/guix-data-service/web/package/controller.scm
+++ b/guix-data-service/web/package/controller.scm
@@ -19,6 +19,8 @@
   #:use-module (ice-9 match)
   #:use-module (web uri)
   #:use-module (web request)
+  #:use-module (knots parallelism)
+  #:use-module (knots resource-pool)
   #:use-module (guix-data-service utils)
   #:use-module (guix-data-service database)
   #:use-module (guix-data-service web render)
@@ -40,7 +42,7 @@
              request
              `((system ,parse-system #:default "x86_64-linux")
                (target ,parse-target #:default "")))))
-       (letpar& ((package-versions-with-branches
+       (fibers-let ((package-versions-with-branches
                   (with-resource-from-pool (connection-pool) conn
                     (branches-by-package-version conn name
                                                  (assq-ref 
parsed-query-parameters
diff --git a/guix-data-service/web/repository/controller.scm 
b/guix-data-service/web/repository/controller.scm
index 0d9434c..101687c 100644
--- a/guix-data-service/web/repository/controller.scm
+++ b/guix-data-service/web/repository/controller.scm
@@ -19,6 +19,8 @@
   #:use-module (ice-9 match)
   #:use-module (web uri)
   #:use-module (web request)
+  #:use-module (knots parallelism)
+  #:use-module (knots resource-pool)
   #:use-module (guix-data-service utils)
   #:use-module (guix-data-service database)
   #:use-module (guix-data-service web render)
@@ -47,7 +49,7 @@
 
   (match method-and-path-components
     (('GET "repositories")
-     (letpar& ((git-repositories
+     (fibers-let ((git-repositories
                 (call-with-resource-from-pool (connection-pool)
                   all-git-repositories)))
        (case (most-appropriate-mime-type
@@ -71,7 +73,7 @@
      (match (with-resource-from-pool (connection-pool) conn
               (select-git-repository conn id))
        ((label url cgit-url-base fetch-with-authentication? poll-interval)
-        (letpar& ((branches
+        (fibers-let ((branches
                    (with-resource-from-pool (connection-pool) conn
                      (all-branches-with-most-recent-commit
                       conn
@@ -119,7 +121,7 @@
              `((after_date     ,parse-datetime)
                (before_date    ,parse-datetime)
                (limit_results  ,parse-result-limit #:default 100)))))
-       (letpar& ((revisions
+       (fibers-let ((revisions
                   (with-resource-from-pool (connection-pool) conn
                     (most-recent-commits-for-branch
                      conn
@@ -160,7 +162,7 @@
                              parsed-query-parameters
                              revisions)))))))))
     (('GET "repository" repository-id "branch" branch-name "package" 
package-name)
-     (letpar& ((package-versions
+     (fibers-let ((package-versions
                 (with-resource-from-pool (connection-pool) conn
                   (package-versions-for-branch conn
                                                (string->number repository-id)
@@ -211,7 +213,7 @@
             (parse-query-parameters
              request
              `((system ,parse-system #:default "x86_64-linux")))))
-       (letpar& ((system-test-history
+       (fibers-let ((system-test-history
                   (with-resource-from-pool (connection-pool) conn
                     (system-test-derivations-for-branch
                      conn
@@ -256,7 +258,7 @@
                      valid-systems
                      system-test-history)))))))
     (('GET "repository" repository-id "branch" branch-name 
"latest-processed-revision")
-     (letpar& ((commit-hash
+     (fibers-let ((commit-hash
                 (with-resource-from-pool (connection-pool) conn
                   (latest-processed-commit-for-branch conn
                                                       repository-id
@@ -273,7 +275,7 @@
                                       repository-id
                                       branch-name))))
     (('GET "repository" repository-id "branch" branch-name 
"latest-processed-revision" "packages")
-     (letpar& ((commit-hash
+     (fibers-let ((commit-hash
                 (with-resource-from-pool (connection-pool) conn
                   (latest-processed-commit-for-branch conn
                                                       repository-id
@@ -313,7 +315,7 @@
                                       repository-id
                                       branch-name))))
     (('GET "repository" repository-id "branch" branch-name 
"latest-processed-revision" "package-derivations")
-     (letpar& ((commit-hash
+     (fibers-let ((commit-hash
                 (with-resource-from-pool (connection-pool) conn
                   (latest-processed-commit-for-branch conn
                                                       repository-id
@@ -422,7 +424,7 @@
                                       branch-name))))
     (('GET "repository" repository-id "branch" branch-name
            "latest-processed-revision" "system-tests")
-     (letpar& ((commit-hash
+     (fibers-let ((commit-hash
                 (with-resource-from-pool (connection-pool) conn
                   (latest-processed-commit-for-branch conn
                                                       repository-id
@@ -440,7 +442,7 @@
                                       repository-id
                                       branch-name))))
     (('GET "repository" repository-id "branch" branch-name 
"latest-processed-revision" "package-reproducibility")
-     (letpar& ((commit-hash
+     (fibers-let ((commit-hash
                 (with-resource-from-pool (connection-pool) conn
                   (latest-processed-commit-for-branch conn
                                                       repository-id
@@ -462,7 +464,7 @@
                                       repository-id
                                       branch-name))))
     (('GET "repository" repository-id "branch" branch-name 
"latest-processed-revision" "package-substitute-availability")
-     (letpar& ((commit-hash
+     (fibers-let ((commit-hash
                 (with-resource-from-pool (connection-pool) conn
                   (latest-processed-commit-for-branch conn
                                                       repository-id
@@ -476,7 +478,7 @@
                                       branch-name))))
     (('GET "repository" repository-id "branch" branch-name 
"latest-processed-revision"
            "lint-warnings")
-     (letpar& ((commit-hash
+     (fibers-let ((commit-hash
                 (with-resource-from-pool (connection-pool) conn
                   (latest-processed-commit-for-branch conn
                                                       repository-id
@@ -510,7 +512,7 @@
                                       repository-id
                                       branch-name))))
     (('GET "repository" repository-id "branch" branch-name 
"latest-processed-revision" "package" name version)
-     (letpar& ((commit-hash
+     (fibers-let ((commit-hash
                 (with-resource-from-pool (connection-pool) conn
                   (latest-processed-commit-for-branch conn
                                                       repository-id
@@ -583,7 +585,7 @@
            (assq-ref parsed-query-parameters 'system))
           (target
            (assq-ref parsed-query-parameters 'target)))
-      (letpar&
+      (fibers-let
           ((package-derivations
             (with-resource-from-pool (connection-pool) conn
               (package-derivations-for-branch conn
@@ -620,7 +622,7 @@
                                           . ,(list->vector builds)))))
                                     package-derivations))))))
           (else
-           (letpar& ((systems
+           (fibers-let ((systems
                       (call-with-resource-from-pool (connection-pool)
                         list-systems))
                      (targets
@@ -657,7 +659,7 @@
            (assq-ref parsed-query-parameters 'target))
           (output-name
            (assq-ref parsed-query-parameters 'output)))
-      (letpar&
+      (fibers-let
           ((package-outputs
             (with-resource-from-pool (connection-pool) conn
               (package-outputs-for-branch conn
@@ -695,7 +697,7 @@
                                           . ,(list->vector builds)))))
                                     package-outputs))))))
           (else
-           (letpar& ((systems
+           (fibers-let ((systems
                       (call-with-resource-from-pool (connection-pool)
                         list-systems))
                      (targets
diff --git a/guix-data-service/web/revision/controller.scm 
b/guix-data-service/web/revision/controller.scm
index 14a721a..c4a25f7 100644
--- a/guix-data-service/web/revision/controller.scm
+++ b/guix-data-service/web/revision/controller.scm
@@ -24,6 +24,8 @@
   #:use-module (texinfo html)
   #:use-module (texinfo plain-text)
   #:use-module (json)
+  #:use-module (knots parallelism)
+  #:use-module (knots resource-pool)
   #:use-module (guix-data-service utils)
   #:use-module (guix-data-service database)
   #:use-module (guix-data-service web render)
@@ -84,7 +86,7 @@
                       status))))
 
 (define (parse-build-server v)
-  (letpar& ((build-servers
+  (fibers-let ((build-servers
              (call-with-resource-from-pool (connection-pool)
                select-build-servers)))
     (or (any (match-lambda
@@ -395,7 +397,7 @@
       `((unknown_commit . ,commit-hash))
       #:code 404))
     (else
-     (letpar& ((job
+     (fibers-let ((job
                 (with-resource-from-pool (connection-pool) conn
                   (select-job-for-commit conn commit-hash)))
                (git-repositories-and-branches
@@ -423,7 +425,7 @@
       `((unknown_commit . ,commit-hash))
       #:code 404))
     (else
-     (letpar& ((job
+     (fibers-let ((job
                 (with-resource-from-pool (connection-pool) conn
                   (select-job-for-commit conn commit-hash)))
                (git-repositories-and-branches
@@ -448,7 +450,7 @@
                                (header-text
                                 `("Revision " (samp ,commit-hash)))
                                (max-age cache-control-default-max-age))
-  (letpar& ((packages-count
+  (fibers-let ((packages-count
              (with-resource-from-pool (connection-pool) conn
                (count-packages-in-revision conn commit-hash)))
             (git-repositories-and-branches
@@ -514,7 +516,7 @@
                                         `("Revision " (samp ,commit-hash)))
                                        (header-link
                                         (string-append "/revision/" 
commit-hash)))
-  (letpar& ((system-tests
+  (fibers-let ((system-tests
              (with-resource-from-pool (connection-pool) conn
                (select-system-tests-for-guix-revision
                 conn
@@ -542,7 +544,7 @@
                      (builds . ,(list->vector builds)))))
                 system-tests))))))
       (else
-       (letpar& ((git-repositories
+       (fibers-let ((git-repositories
                   (with-resource-from-pool (connection-pool) conn
                     (git-repositories-containing-commit conn
                                                         commit-hash)))
@@ -568,7 +570,7 @@
                                             (header-link
                                              (string-append "/revision/"
                                                             commit-hash)))
-  (letpar& ((channel-instances
+  (fibers-let ((channel-instances
              (with-resource-from-pool (connection-pool) conn
                (select-channel-instances-for-guix-revision conn commit-hash))))
     (case (most-appropriate-mime-type
@@ -596,7 +598,7 @@
 (define* (render-revision-package-substitute-availability mime-types
                                                           commit-hash
                                                           #:key path-base)
-  (letpar& ((substitute-availability
+  (fibers-let ((substitute-availability
              (with-resource-from-pool (connection-pool) conn
                (select-package-output-availability-for-revision conn
                                                                 commit-hash)))
@@ -610,7 +612,7 @@
       ((application/json)
        (render-json
         `((commit . ,commit-hash)
-          (substitute_servers
+          (xsubstitute_servers
            . ,(list->vector
                (map (match-lambda
                       ((build-server-id . data)
@@ -642,7 +644,7 @@
                                                  (header-link
                                                   (string-append "/revision/"
                                                                  commit-hash)))
-  (letpar& ((output-consistency
+  (fibers-let ((output-consistency
              (with-resource-from-pool (connection-pool) conn
                (select-output-consistency-for-revision conn commit-hash))))
     (case (most-appropriate-mime-type
@@ -676,7 +678,7 @@
           #:sxml (view-revision-news commit-hash
                                      query-parameters
                                      '()))))
-      (letpar& ((news-entries
+      (fibers-let ((news-entries
                  (with-resource-from-pool (connection-pool) conn
                    (select-channel-news-entries-contained-in-guix-revision
                     conn
@@ -735,7 +737,7 @@
                                99999)) ; TODO There shouldn't be a limit
             (fields (assq-ref query-parameters 'field))
             (locale (assq-ref query-parameters 'locale)))
-        (letpar&
+        (fibers-let
             ((packages
               (with-resource-from-pool (connection-pool) conn
                 (if search-query
@@ -832,7 +834,7 @@
                                                               "/revision/" 
commit-hash))
                                                             (header-text
                                                              `("Revision " 
(samp ,commit-hash))))
-  (letpar& ((package-synopsis-counts
+  (fibers-let ((package-synopsis-counts
              (with-resource-from-pool (connection-pool) conn
                (synopsis-counts-by-locale conn
                                           (commit->revision-id
@@ -872,7 +874,7 @@
                                   (header-link
                                    (string-append
                                     "/revision/" commit-hash)))
-  (letpar& ((package-versions
+  (fibers-let ((package-versions
              (with-resource-from-pool (connection-pool) conn
                (select-package-versions-for-revision conn
                                                      commit-hash
@@ -929,7 +931,7 @@
 
   (define has-replacement? (assq-ref query-parameters 'has_replacement))
 
-  (letpar& ((metadata
+  (fibers-let ((metadata
              (with-resource-from-pool (connection-pool) conn
                (select-package-metadata-by-revision-name-and-version
                 conn
@@ -1041,7 +1043,7 @@
          (render-json
           `((error . "invalid query"))))
         (else
-         (letpar& ((systems
+         (fibers-let ((systems
                     (call-with-resource-from-pool (connection-pool)
                       list-systems))
                    (targets
@@ -1067,7 +1069,7 @@
              (assq-ref query-parameters 'search_query))
             (fields
              (assq-ref query-parameters 'field)))
-        (letpar&
+        (fibers-let
             ((derivations
               (if search-query
                   (with-resource-from-pool (connection-pool) conn
@@ -1090,7 +1092,7 @@
                      #:after-name (assq-ref query-parameters 'after_name)
                      #:include-builds? (member "builds" fields)))
                   (concatenate!
-                   (par-map&
+                   (fibers-map
                     (lambda (system)
                       (with-resource-from-pool (connection-pool) conn
                         (select-package-derivations-in-revision
@@ -1149,7 +1151,7 @@
                                         derivations))))
                 #:stream? #t))
               (else
-               (letpar& ((systems
+               (fibers-let ((systems
                           (call-with-resource-from-pool (connection-pool)
                             list-systems))
                          (targets
@@ -1187,7 +1189,7 @@
          (render-json
           `((error . "invalid query"))))
         (else
-         (letpar& ((systems
+         (fibers-let ((systems
                     (call-with-resource-from-pool (connection-pool)
                       list-systems))
                    (targets
@@ -1213,7 +1215,7 @@
              (assq-ref query-parameters 'search_query))
             (fields
              (assq-ref query-parameters 'field)))
-        (letpar&
+        (fibers-let
             ((derivations
               (with-resource-from-pool (connection-pool) conn
                 (select-fixed-output-package-derivations-in-revision
@@ -1242,7 +1244,7 @@
                (render-json
                 `((derivations . ,(list->vector derivations)))))
               (else
-               (letpar& ((systems
+               (fibers-let ((systems
                           (call-with-resource-from-pool (connection-pool)
                             list-systems))
                          (targets
@@ -1284,7 +1286,7 @@
          (render-json
           `((error . "invalid query"))))
         (else
-         (letpar& ((systems
+         (fibers-let ((systems
                     (call-with-resource-from-pool (connection-pool)
                       list-systems))
                    (targets
@@ -1308,7 +1310,7 @@
              (assq-ref query-parameters 'all_results))
             (fields
              (assq-ref query-parameters 'field)))
-        (letpar&
+        (fibers-let
             ((derivation-outputs
               (with-resource-from-pool (connection-pool) conn
                 (select-derivation-outputs-in-revision
@@ -1390,7 +1392,7 @@
                                          "not-matching")))))))
                             derivation-outputs))))))
               (else
-               (letpar& ((systems
+               (fibers-let ((systems
                           (call-with-resource-from-pool (connection-pool)
                             list-systems))
                          (targets
@@ -1419,7 +1421,7 @@
                                  (header-link
                                   (string-append "/revision/" commit-hash)))
   (if (any-invalid-query-parameters? query-parameters)
-      (letpar& ((systems
+      (fibers-let ((systems
                  (call-with-resource-from-pool (connection-pool)
                    list-systems))
                 (targets
@@ -1437,7 +1439,7 @@
                                '())))
       (let ((system (assq-ref query-parameters 'system))
             (target (assq-ref query-parameters 'target)))
-        (letpar& ((systems
+        (fibers-let ((systems
                    (call-with-resource-from-pool (connection-pool)
                      list-systems))
                   (targets
@@ -1492,7 +1494,7 @@
                                          (header-link
                                           (string-append "/revision/" 
commit-hash)))
   (if (any-invalid-query-parameters? query-parameters)
-      (letpar& ((systems
+      (fibers-let ((systems
                  (call-with-resource-from-pool (connection-pool)
                    list-systems))
                 (targets
@@ -1509,7 +1511,7 @@
                                        '())))
       (let ((system (assq-ref query-parameters 'system))
             (target (assq-ref query-parameters 'target)))
-        (letpar& ((systems
+        (fibers-let ((systems
                    (call-with-resource-from-pool (connection-pool)
                      list-systems))
                   (targets
@@ -1592,7 +1594,7 @@
             (linters (assq-ref query-parameters 'linter))
             (message-query (assq-ref query-parameters 'message_query))
             (fields (assq-ref query-parameters 'field)))
-        (letpar&
+        (fibers-let
             ((git-repositories
               (with-resource-from-pool (connection-pool) conn
                 (git-repositories-containing-commit conn
diff --git a/guix-data-service/web/server.scm b/guix-data-service/web/server.scm
index 4e08161..a1a888b 100644
--- a/guix-data-service/web/server.scm
+++ b/guix-data-service/web/server.scm
@@ -30,6 +30,8 @@
   #:use-module (fibers channels)
   #:use-module (fibers scheduler)
   #:use-module (fibers conditions)
+  #:use-module (knots web-server)
+  #:use-module (knots resource-pool)
   #:use-module ((guix build syscalls)
                 #:select (set-thread-name))
   #:use-module (prometheus)
@@ -246,7 +248,7 @@ port. Also, the port used can be changed by passing the 
--port option.\n"
                          (make-counter-metric registry
                                               
"resource_pool_checkout_timeouts_total"
                                               #:labels '(pool_name))))
-                    (%resource-pool-timeout-handler
+                    (resource-pool-default-timeout-handler
                      (lambda (pool proc timeout)
                        (let ((pool-name
                               (cond
@@ -269,11 +271,12 @@ port. Also, the port used can be changed by passing the 
--port option.\n"
                    request-scheduler)
 
                   (let ((render-metrics (make-render-metrics registry)))
-                    (run-server/patched
-                     (lambda (request body)
+                    (run-knots-web-server
+                     (lambda (request)
                        (metric-increment requests-metric)
 
-                       (let ((reply (make-channel)))
+                       (let ((body (read-request-body request))
+                             (reply (make-channel)))
                          (spawn-fiber
                           (lambda ()
                             (call-with-values
diff --git a/guix-dev.scm b/guix-dev.scm
index 8d33657..eec15ec 100644
--- a/guix-dev.scm
+++ b/guix-dev.scm
@@ -41,6 +41,38 @@
              (gnu packages ruby)
              (srfi srfi-1))
 
+(define guile-knots
+  (let ((commit "0fab93e9ff5b16813ae1356c13d3c974d7277d81")
+        (revision "1"))
+    (package
+    (name "guile-knots")
+    (version (git-version "0" revision commit))
+    (source (origin
+              (method git-fetch)
+              (uri (git-reference
+                    (url "https://git.cbaines.net/git/guile/knots";)
+                    (commit commit)))
+              (sha256
+               (base32
+                "1x0wirq0db2704784ig00kz5kh8j6szp2gwm67wn714m1jbhz9ky"))
+              (file-name (string-append name "-" version "-checkout"))))
+    (build-system gnu-build-system)
+    (native-inputs
+     (list pkg-config
+           autoconf
+           automake
+           guile-3.0
+           guile-fibers))
+    (inputs
+     (list guile-3.0))
+    (propagated-inputs
+     (list guile-fibers))
+    (home-page "https://git.cbaines.net/guile/knots";)
+    (synopsis "Patterns and functionality to use with Guile Fibers")
+    (description
+     "")
+    (license license:gpl3+))))
+
 (package
   (name "guix-data-service")
   (version "0.0.0")
@@ -52,6 +84,7 @@
          guile-json-4
          guile-squee
          guile-fibers
+         guile-knots
          guile-gcrypt
          guile-lzlib
          guile-readline

Reply via email to