branch: externals/futur
commit 44112d22595f1bc3df580236ecb92f7c649d4132
Author: Stefan Monnier <[email protected]>
Commit: Stefan Monnier <[email protected]>

    futur.el: Begin implementing bound on parallelism
    
    * futur.el (futur-process-max): New custom.
    (futur--process-active, futur--process-waiting): New vars.
    (futur--process-bounded, futur--process-next): New functions.
    (futur-blocker-abort) <waiting>: New method.
    (futur--process-make): Rename from `futur-process-make`.
---
 futur.el | 42 ++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 40 insertions(+), 2 deletions(-)

diff --git a/futur.el b/futur.el
index 2096c8d6c9..12fda77f5e 100644
--- a/futur.el
+++ b/futur.el
@@ -600,6 +600,43 @@ If IDLE is non-nil, then wait for that amount of idle 
time."
 
 ;;;; Processes
 
+(defcustom futur-process-max
+  (if (fboundp 'num-processors) (num-processors) 2)
+  "Maximum number of concurrent subprocesses."
+  :type 'integer)
+
+(defvar futur--process-active nil
+  "List of active process-futures.")
+
+(defvar futur--process-waiting nil
+  "List of process-futures waiting to start.")
+
+(defun futur--process-bounded (&rest args)
+  (if (< (length futur--process-active) futur-process-max)
+      (let ((new (apply #'funcall args)))
+        (push new futur--process-active)
+        (futur-register-callback
+         new (oclosure-lambda (futur--aux) (_ _) (futur--process-next new)))
+        new)
+    (let ((new (futur--waiting 'waiting)))
+      (push (cons new args) futur--process-waiting)
+      new)))
+
+(defun futur--process-next (done)
+  (setq futur--process-active (delq done futur--process-active))
+  (cl-block nil
+   (while futur--process-waiting
+    (pcase-let ((`(,fut . ,call) (pop futur--process-waiting)))
+      (pcase fut
+        ((futur--waiting)
+         (let ((new (apply #'futur--process-bounded call)))
+          (futur-register-callback
+           new (lambda (err val) (futur--deliver new err val)))
+          (cl-return))))))))
+
+(cl-defmethod futur-blocker-abort ((_ (eql 'waiting)) _error)
+  nil)
+
 (defun futur--process-completed-p (proc)
   (memq (process-status proc) '(exit signal closed failed)))
 
@@ -607,7 +644,7 @@ If IDLE is non-nil, then wait for that amount of idle time."
   (when (futur--process-completed-p proc)
     (futur-deliver-value futur (process-exit-status proc))))
 
-(defun futur-process-make (&rest args)
+(defun futur--process-make (&rest args)
   "Create a process and return a future that delivers its exit code.
 The ARGS are like those of `make-process' except that they can't include
 `:sentinel' because that is used internally."
@@ -642,7 +679,7 @@ The DISPLAY argument is ignored: redisplay always happens."
     (`(:file ,(and file (pred stringp)))
      (setq destination (expand-file-name file)))
     (`(,_ . ,_) (error "Separate handling of stderr is not supported yet")))
-  (let* ((futur (futur-process-make
+  (let* ((futur (futur--process-make
                  :name program
                  :command (cons program args)
                  :coding (if (stringp destination) '(binary . nil))
@@ -651,6 +688,7 @@ The DISPLAY argument is ignored: redisplay always happens."
                  :filter (if (bufferp destination) nil
                            #'futur-process-call--filter)))
          (proc (pcase-exhaustive futur ((futur--waiting blocker) blocker))))
+    (push futur futur--process-active)
     (when (stringp destination)
       (write-region "" nil destination nil 'silent))
     (pcase-exhaustive infile

Reply via email to