branch: externals/futur
commit 43f3a3748e0baf318598d52ac194534af821f5b4
Author: Stefan Monnier <[email protected]>
Commit: Stefan Monnier <[email protected]>
futur--process-bounded: Make it work
I'm still not happy with the API, so it's still under "--", but at
least it does what it's supposed to now.
* futur-tests.el (futur-process-bounded): New test.
* futur.el (futur-process-max): Make it a plain defvar.
(futur--process-waiting): Make it a queue.
(futur--process-bounded-start): New function, extracted from
`futur--process-bounded`.
(futur--process-bounded): Use it, and adjust to the use of a queue.
(futur--process-next): Adjust and fix `fut/new` confusion.
---
futur-tests.el | 16 ++++++++++++++++
futur.el | 45 +++++++++++++++++++++++++--------------------
2 files changed, 41 insertions(+), 20 deletions(-)
diff --git a/futur-tests.el b/futur-tests.el
index 63f7241c82..e1ef14adeb 100644
--- a/futur-tests.el
+++ b/futur-tests.el
@@ -162,5 +162,21 @@
(should (equal res
'(0 "00000000: 456d 6163 73
Emacs\n"))))))
+(ert-deftest futur-process-bounded ()
+ (let* ((futures ())
+ (start (float-time))
+ (futur-process-max 2))
+ (dotimes (_ 10)
+ (push (futur--process-bounded #'futur-timeout 0.1) futures))
+ (futur-blocking-wait-to-get-result (apply #'futur-list futures))
+ (should (<= 0.5 (- (float-time) start) 0.6)))
+ (let* ((futures ())
+ (start (float-time))
+ (futur-process-max 3))
+ (dotimes (_ 10)
+ (push (futur--process-bounded #'futur-timeout 0.1) futures))
+ (futur-blocking-wait-to-get-result (apply #'futur-list futures))
+ (should (<= 0.4 (- (float-time) start) 0.5))))
+
(provide 'futur-tests)
;;; futur-tests.el ends here
diff --git a/futur.el b/futur.el
index 399ab5a475..cc10430a13 100644
--- a/futur.el
+++ b/futur.el
@@ -643,39 +643,44 @@ Returns non-nil if it waited the full TIME."
;;;; Processes
-(defcustom futur-process-max
- (if (fboundp 'num-processors) (num-processors) 2)
- "Maximum number of concurrent subprocesses."
- :type 'integer)
+(defvar futur-process-max
+ (if (fboundp 'num-processors) ;; Emacs-28
+ (num-processors) 2)
+ "Maximum number of concurrent subprocesses.")
(defvar futur--process-active nil
"List of active process-futures.")
-(defvar futur--process-waiting nil
- "List of process-futures waiting to start.")
+(defvar futur--process-waiting (futur--queue)
+ "Queue of futures waiting to start
+Each element is of the form (FUTURE FUN . ARGS).")
(defun futur--process-bounded (&rest args)
(if (< (length futur--process-active) futur-process-max)
- (let ((new (apply #'funcall args)))
- (push new futur--process-active)
- (futur--register-callback
- new (oclosure-lambda (futur--aux) (_ _) (futur--process-next new)))
- new)
+ (futur--process-bounded-start args)
(let ((new (futur--waiting 'waiting)))
- (push (cons new args) futur--process-waiting)
+ (futur--queue-enqueue futur--process-waiting (cons new args))
new)))
+(defun futur--process-bounded-start (args)
+ (let ((new (apply #'funcall args)))
+ (push new futur--process-active)
+ (futur--register-callback
+ new (oclosure-lambda (futur--aux) (_ _) (futur--process-next new)))
+ new))
+
(defun futur--process-next (done)
(setq futur--process-active (delq done futur--process-active))
(cl-block nil
- (while futur--process-waiting
- (pcase-let ((`(,fut . ,call) (pop futur--process-waiting)))
- (pcase fut
- ((futur--waiting)
- (let ((new (apply #'futur--process-bounded call)))
- (futur--register-callback
- new (lambda (err val) (futur--deliver new err val)))
- (cl-return))))))))
+ (while (not (futur--queue-empty-p futur--process-waiting))
+ (pcase-let ((`(,fut . ,call)
+ (futur--queue-dequeue futur--process-waiting)))
+ (pcase fut
+ ((futur--waiting)
+ (let ((new (futur--process-bounded-start call)))
+ (futur--register-callback
+ new (lambda (err val) (futur--deliver fut err val)))
+ (cl-return))))))))
(cl-defmethod futur-blocker-abort ((_ (eql 'waiting)) _error)
nil)