Hi all,

Am Dienstag, den 28.10.2008, 11:37 +0100 schrieb Jörg F. Wittenberger:
> If you agree to use smarter data structures in the scheduler, I'll take
> my time to expand my proposal.

Here we go!

Attached a slightly modified scheduler.scm and a file rbtree.scm.

> * At the end of the day, we might take a minute to consider the
> question which data structure would be the best.  I took the wttree
> only because

Thankful for a suggestion from Marc Feeley I reused rbtree.scm from the
snowfort repository.  The attached version extends the snow version by
optionally generating lookup and fold procedures for the generated
trees.  (And contains some chicken specific lines on the head.)

rbtree.scm is currently literally included by the attached scheduler.scm
- but maybe we better only include the generated code or rewrite it
hygienically.  The latter is something I better leave alone until I got
use to those macros.

summary of changes

* file descriptor list (##sys#fd-list) became a priority queue
* timeout list (##sys#timeout-list) became a priority queue
* a new 'waiting and a ##sys#waiting-queue where created
* some select(2) calls (and associated processing!) where saved

The result - well with about 250 open file descriptors serve 20 parallel
http requests, which each cause several threads to serve plus some xml
processing (plus http chunk encoding with i/o flush after each chunk!).
I recall this test being impossible with the chicken version of askemos
within a 20 seconds limit.  First I moved to a quite elaborate timeout
handling anyway.  This plus just the wttree made the test run in about
8-12 seconds.  Now we are at 4.

I'd say I'll stop here.  Good enough.

Things left to be done:

The queue nodes are defined as srfi-9 records.  Maybe this would be even
better as a lolevel structure?  Felix?  Is there something to gain?

> * I would to change the interface the scheduler uses to manipulate the
> fd-list to read like fd-list:ref fd-list:ref/default fd-list:delete!
> where ref, ref/default etc. have the same signature as in srfi-69.  (And
> have those interfaces exported from the smarter data structure.)  Then a
> simple  "(import (prefix my-great-structure fd-list:))" will do all the
> work.

This has not yet been done and I might not come around any time soon.

Best regards

/Jörg
(define-syntax define-macro
  (syntax-rules ()
    ((_ (name . llist) body ...)
     (define-syntax name
       (lambda (x r c)
	 (apply (lambda llist body ...) (cdr x)))))
    ((_ name . body)
     (define-syntax name
       (lambda (x r c) (cdr x))))))

(define-macro (define-rbtree
		rbtree-init!
		node->rbtree
		lookup			; may be #f
		tree-fold		; may be #f
		tree-for-each		; may be #f
		insert!
		remove!
		reposition!
		empty?
		singleton?
		match?			; may be #f iff lookup is #f
		key<?			; may be #f iff lookup is #f
		before?
		color
		color-set!
		parent
		parent-set!
		left
		left-set!
		right
		right-set!
		leftmost
		leftmost-set!
		rightmost
		rightmost-set!)

  (define (black rbtree)
    rbtree)

  (define (black? rbtree)
    `(lambda (node)
       (,color node)))

  (define (blacken! rbtree)
    `(lambda (node)
       (,color-set! node ,(black rbtree))))

  (define (red)
    #f)

  (define (red?)
    `(lambda (node)
       (not (,color node))))

  (define (reden!)
    `(lambda (node)
       (,color-set! node ,(red))))

  (define (copy-color!)
    `(lambda (node1 node2)
       (,color-set! node1 (,color node2))))

  (define (exchange-color!)
    `(lambda (node1 node2)
       (let ((color-node1 (,color node1)))
	 (,color-set! node1 (,color node2))
	 (,color-set! node2 color-node1))))

  (define (update-parent!)
    `(lambda (parent-node old-node new-node)
       (if (eq? old-node (,left parent-node))
	   (,left-set! parent-node new-node)
	   (,right-set! parent-node new-node))))

  (define (rotate! side1 side1-set! side2 side2-set!)
    `(lambda (node)
       (let ((side2-node (,side2 node)))
	 (let ((side1-side2-node (,side1 side2-node)))
	   (,side2-set! node side1-side2-node)
	   (,parent-set! side1-side2-node node))
	 (let ((parent-node (,parent node)))
	   (,side1-set! side2-node node)
	   (,parent-set! node side2-node)
	   (,parent-set! side2-node parent-node)
	   (,(update-parent!) parent-node node side2-node)))))

  (define (rotate-left!)
    (rotate! left left-set! right right-set!))

  (define (rotate-right!)
    (rotate! right right-set! left left-set!))

  (define (neighbor side other-side)
    `(lambda (node rbtree)
       (let ((side-node (,side node)))
	 (if (eq? side-node rbtree)
	     (let ((parent-node (,parent node)))
	       (if (or (eq? parent-node rbtree)
		       (eq? node (,side parent-node)))
		   rbtree
		   parent-node))
	     (let loop ((x side-node))
	       (let ((other-side-x (,other-side x)))
		 (if (eq? other-side-x rbtree)
		     x
		     (loop other-side-x))))))))

  (define (insert-below! x)
    `(let ((x ,x))
       (if (,before? node x)
	   (insert-left! (,left x) x)
	   (insert-right! (,right x) x))))

  (define (insert-body side1 rotate-side1! side2 rotate-side2!)
    `(let ((side2-parent-parent-x (,side2 parent-parent-x)))
       (if (,(red?) side2-parent-parent-x)
	   (begin
	     (,(blacken! 'rbtree) parent-x)
	     (,(blacken! 'rbtree) side2-parent-parent-x)
	     (,(reden!) parent-parent-x)
	     (loop parent-parent-x))
	   (let ((y
		  (if (eq? x (,side2 parent-x))
		      (begin
			(,rotate-side1! parent-x)
			(,parent parent-x))
		      (,parent x))))
	     (,(blacken! 'rbtree) y)
	     (let ((parent-y (,parent y)))
	       (,(reden!) parent-y)
	       (,rotate-side2! parent-y))))))

  (define (remove-body side1 rotate-side1! side2 rotate-side2!)
    `(let ((x
	    (let ((side2-parent-node (,side2 parent-node)))
	      (if (,(red?) side2-parent-node)
		  (begin
		    (,(blacken! 'rbtree) side2-parent-node)
		    (,(reden!) parent-node)
		    (,rotate-side1! parent-node)
		    (,side2 parent-node))
		  side2-parent-node))))

       (define (common-case y)
	 (,(copy-color!) y parent-node)
	 (,(blacken! 'rbtree) parent-node)
	 (,(blacken! 'rbtree) (,side2 y))
	 (,rotate-side1! parent-node)
	 (,(blacken! 'rbtree) (,left rbtree)))

       (if (,(red?) (,side2 x))
	   (common-case x)
	   (let ((side1-x (,side1 x)))
	     (if (,(black? 'rbtree) side1-x)
		 (begin
		   (,(reden!) x)
		   (fixup! (,parent parent-node) parent-node))
		 (begin
		   (,(blacken! 'rbtree) side1-x)
		   (,(reden!) x)
		   (,rotate-side2! x)
		   (common-case (,side2 parent-node))))))))

  `(begin

     (define (,rbtree-init! rbtree)
       ,@(if leftmost
             `((,leftmost-set! rbtree rbtree))
             `())
       (,(blacken! 'rbtree) rbtree)
       (,parent-set! rbtree rbtree)
       (,left-set! rbtree rbtree)
       rbtree)

     (define (,node->rbtree node)
       (or (,color node)
	   (,color (,parent node))))

     ,@(if lookup
	   `((define (,lookup rbtree key)
	       (let loop ((node (,left (,node->rbtree rbtree))))
		 (if (eq? rbtree node) #f
		     (cond
		      ((,match? key node) node)
		      ((,key<? key node) (loop (,left node)))
		      (else (loop (,right node))))))))
	   '())

     ,@(if tree-fold
	   `((define (,tree-fold procedure init rbtree)
	       (define (fold init node)
		 (if (eq? rbtree node)
		     init
		     (fold (procedure node (fold init (,right node))) (,left node))))
	       (fold init (,left (,node->rbtree rbtree)))))
	   '())
     ,@(if tree-for-each
	   `((define (,tree-for-each procedure rbtree)
	       (let loop ((node (,left (,node->rbtree rbtree))))
		 (or (eq? rbtree node)
		     (begin
		       (procedure node)
		       (loop (,left node))
		       (loop (,right node)))))))
	   '())

     (define (,insert! rbtree node)

       (define (fixup!)

	 (let loop ((x node))
	   (let ((parent-x (,parent x)))
	     (if (,(red?) parent-x)
		 (let ((parent-parent-x (,parent parent-x)))
		   (if (eq? parent-x (,left parent-parent-x))
		       ,(insert-body left
				     (rotate-left!)
				     right
				     (rotate-right!))
		       ,(insert-body right
				     (rotate-right!)
				     left
				     (rotate-left!)))))))

	 (,(blacken! 'rbtree) (,left rbtree))
	 #f)

       (define (insert-left! left-x x)
	 (if (eq? left-x rbtree)
	     (begin
	       (,left-set! x node)
	       (,parent-set! node x)

	       ;; check if leftmost must be updated

	       ,@(if leftmost
		     `((if (eq? x (,leftmost rbtree))
			   (,leftmost-set! rbtree node)))
		     `())

	       (fixup!))
	     ,(insert-below! 'left-x)))

       (define (insert-right! right-x x)
	 (if (eq? right-x rbtree)
	     (begin
	       (,right-set! x node)
	       (,parent-set! node x)

	       ;; check if rightmost must be updated

	       ,@(if rightmost
		     `((if (eq? x (,rightmost rbtree))
			   (,rightmost-set! rbtree node)))
		     `())

	       (fixup!))
	     ,(insert-below! 'right-x)))

       (,(reden!) node)
       (,left-set! node rbtree)
       (,right-set! node rbtree)

       (insert-left! (,left rbtree) rbtree)

       (,parent-set! rbtree rbtree)

       node)

     (define (,remove! node)
       (let ((rbtree (,node->rbtree node)))

	 (define (fixup! parent-node node)
	   (cond ((or (eq? parent-node rbtree) (,(red?) node))
		  (,(blacken! 'rbtree) node))
		 ((eq? node (,left parent-node))
		  ,(remove-body left
				(rotate-left!)
				right
				(rotate-right!)))
		 (else
		  ,(remove-body right
				(rotate-right!)
				left
				(rotate-left!)))))

	 (let ((parent-node (,parent node))
	       (left-node (,left node))
	       (right-node (,right node)))

	   (,parent-set! node #f) ;; to avoid leaks
	   (,left-set! node #f)
	   (,right-set! node #f)

	   (cond ((eq? left-node rbtree)

		  ;; check if leftmost must be updated

		  ,@(if leftmost
			`((if (eq? node (,leftmost rbtree))
			      (,leftmost-set!
			       rbtree
			       (if (eq? right-node rbtree)
				   parent-node
				   right-node))))
			`())

		  (,parent-set! right-node parent-node)
		  (,(update-parent!) parent-node node right-node)

		  (if (,(black? 'rbtree) node)
		      (begin
			(,(reden!) node) ;; to avoid leaks
			(fixup! parent-node right-node))))

		 ((eq? right-node rbtree)

		  ;; check if rightmost must be updated

		  ,@(if rightmost
			`((if (eq? node (,rightmost rbtree))
			      (,rightmost-set!
			       rbtree
			       left-node)))
			`())

		  (,parent-set! left-node parent-node)
		  (,(update-parent!) parent-node node left-node)

		  ;; At this point we know that the node is black.
		  ;; This is because the right child is nil and the
		  ;; left child is red (if the left child was black
		  ;; the tree would not be balanced)

		  (,(reden!) node) ;; to avoid leaks
		  (fixup! parent-node left-node))

		 (else
		  (let loop ((x right-node) (parent-x node))
		    (let ((left-x (,left x)))
		      (if (eq? left-x rbtree)
			  (begin
			    (,(exchange-color!) x node)
			    (,parent-set! left-node x)
			    (,left-set! x left-node)
			    (,parent-set! x parent-node)
			    (,(update-parent!) parent-node node x)
			    (if (eq? x right-node)
				(if (,(black? 'rbtree) node)
				    (begin
				      (,(reden!) node) ;; to avoid leaks
				      (fixup! x (,right x))))
				(let ((right-x (,right x)))
				  (,parent-set! right-x parent-x)
				  (,left-set! parent-x right-x)
				  (,parent-set! right-node x)
				  (,right-set! x right-node)
				  (if (,(black? 'rbtree) node)
				      (begin
					(,(reden!) node) ;; to avoid leaks
					(fixup! parent-x right-x))))))
			  (loop left-x x)))))))

	 (,parent-set! rbtree rbtree)))

     (define (,reposition! node)
       (let* ((rbtree
	       (,node->rbtree node))
	      (predecessor-node
	       (,(neighbor left right) node rbtree))
	      (successor-node
	       (,(neighbor right left) node rbtree)))
	 (if (or (and (not (eq? predecessor-node rbtree))
		      (,before? node predecessor-node))
		 (and (not (eq? successor-node rbtree))
		      (,before? successor-node node)))
	     (begin
	       (,remove! node)
	       (,insert! rbtree node)))))

     (define (,empty? rbtree)
       (eq? rbtree (,left rbtree)))

     (define (,singleton? rbtree)
       (let ((root (,left rbtree)))
	 (and (not (eq? root rbtree))
	      (eq? (,left root) rbtree)
	      (eq? (,right root) rbtree))))))
; scheduler.scm - Basic scheduler for multithreading
;
; Copyright (c) 2000-2007, Felix L. Winkelmann
; Copyright (c) 2008, The Chicken Team
; All rights reserved.
;
; Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
; conditions are met:
;
;   Redistributions of source code must retain the above copyright notice, this list of conditions and the following
;     disclaimer. 
;   Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
;     disclaimer in the documentation and/or other materials provided with the distribution. 
;   Neither the name of the author nor the names of its contributors may be used to endorse or promote
;     products derived from this software without specific prior written permission. 
;
; THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS
; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
; AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR
; CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
; CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
; SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
; THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
; OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
; POSSIBILITY OF SUCH DAMAGE.


(declare
  (fixnum)
  (unit scheduler)
  (disable-interrupts)
  (usual-integrations)
  (disable-warning var)
  (hide ##sys#ready-queue-head ##sys#ready-queue-tail
	##sys#waiting-queue-head ##sys#waiting-queue-tail
	##sys#timeout-list
	##sys#update-thread-state-buffer ##sys#restore-thread-state-buffer
	##sys#remove-from-ready-queue ##sys#unblock-threads-for-i/o ##sys#force-primordial
	##sys#fdset-input-set ##sys#fdset-output-set ##sys#fdset-clear
	##sys#fdset-select-timeout ##sys#fdset-restore
	##sys#clear-i/o-state-for-thread!

	make-int-priority-queue-entry int-priority-queue-entry?
	int-priority-queue-color int-priority-queue-color-set!
	int-priority-queue-parent int-priority-queue-parent-set!
	int-priority-queue-left int-priority-queue-left-set!
	int-priority-queue-right int-priority-queue-right-set!
	int-priority-queue-index int-priority-queue-index-set!
	int-priority-queue-value int-priority-queue-value-set!
	int-priority-queue-before? int-priority-queue-match? int-priority-queue-index-before?
	int-priority-queue-init! int-priority-queue->rbtree
	int-priority-queue-lookup int-priority-queue-node-fold int-priority-queue-node-for-each
	int-priority-queue-node-insert! int-priority-queue-remove! int-priority-queue-reposition!
	int-priority-queue-empty? int-priority-queue-singleton?
	##sys#fd-list-add-thread!
	)
  (foreign-declare #<<EOF
#ifdef HAVE_ERRNO_H
# include <errno.h>
# define C_signal_interrupted_p     C_mk_bool(errno == EINTR)
#else
# define C_signal_interrupted_p     C_SCHEME_FALSE
#endif
# include <sys/stat.h>

#ifdef _WIN32
# if _MSC_VER > 1300
# include <winsock2.h>
# include <ws2tcpip.h>
# else
# include <winsock.h>
# endif
/* Beware: winsock2.h must come BEFORE windows.h */
# define C_msleep(n)     (Sleep(C_unfix(n)), C_SCHEME_TRUE)
#else
# include <unistd.h>
# include <sys/types.h>
# include <sys/time.h>
# include <time.h>
static C_word C_msleep(C_word ms);
C_word C_msleep(C_word ms) {
#ifdef __CYGWIN__
  if(usleep(C_unfix(ms) * 1000) == -1) return C_SCHEME_FALSE;
#else
  struct timespec ts;
  unsigned long mss = C_unfix(ms);
  ts.tv_sec = mss / 1000;
  ts.tv_nsec = (mss % 1000) * 1000000;
  
  if(nanosleep(&ts, NULL) == -1) return C_SCHEME_FALSE;
#endif
  return C_SCHEME_TRUE;
}
#endif
static fd_set C_fdset_input, C_fdset_output, C_fdset_input_2, C_fdset_output_2;
#define C_fd_test_input(fd)  C_mk_bool(FD_ISSET(C_unfix(fd), &C_fdset_input))
#define C_fd_test_output(fd)  C_mk_bool(FD_ISSET(C_unfix(fd), &C_fdset_output))
EOF
) )

(cond-expand
 [paranoia]
 [else
  (declare (unsafe)) ] )


(cond-expand
 (hygienic-macros
  (define-syntax dbg
    (syntax-rules ()
      ((_ . _) #f)))
  #;(define-syntax dbg
    (syntax-rules ()
      ((_ x ...) (begin (print x ...) (flush-output (current-output-port)))))) )
 (else
  (define-macro (dbg . args) #f)
  (define-macro (dbg . args)
  `(print "DBG: " ,@args) ) ) )

(include "rbtree.scm")

;; We shall replace that with a lolevel structure once.

(define-record-type <int-priority-queue>
  (make-int-priority-queue-entry color parent left right index value)
  int-priority-queue-entry?
  (color int-priority-queue-color int-priority-queue-color-set!)
  (parent int-priority-queue-parent int-priority-queue-parent-set!)
  (left int-priority-queue-left int-priority-queue-left-set!)
  (right int-priority-queue-right int-priority-queue-right-set!)
  (index int-priority-queue-index int-priority-queue-index-set!)
  (value int-priority-queue-value int-priority-queue-value-set!))

(define (int-priority-queue-before? node1 node2) ;; ordering function
  (fx< (int-priority-queue-index node1) (int-priority-queue-index node2)))

(define (int-priority-queue-match? key node)
  (eqv? key (int-priority-queue-index node)))

(define (int-priority-queue-index-before? key node)
  (fx< key (int-priority-queue-index node)))

(define-rbtree
  int-priority-queue-init!	      ;; defined by define-rbtree
  int-priority-queue->rbtree	      ;; defined by define-rbtree
  int-priority-queue-lookup	      ;; defined by define-rbtree
  int-priority-queue-node-fold	      ;; defined by define-rbtree
  int-priority-queue-node-for-each    ;; defined by define-rbtree
  int-priority-queue-node-insert!     ;; defined by define-rbtree
  int-priority-queue-remove!	      ;; defined by define-rbtree
  int-priority-queue-reposition!      ;; defined by define-rbtree
  int-priority-queue-empty?	      ;; defined by define-rbtree
  int-priority-queue-singleton?	      ;; defined by define-rbtree
  int-priority-queue-match?
  int-priority-queue-index-before?
  int-priority-queue-before?
  int-priority-queue-color
  int-priority-queue-color-set!
  int-priority-queue-parent
  int-priority-queue-parent-set!
  int-priority-queue-left
  int-priority-queue-left-set!
  int-priority-queue-right
  int-priority-queue-right-set!
  int-priority-queue-right
  int-priority-queue-right-set!
  #f
  #f)

(define (make-int-priority-queue)
  (int-priority-queue-init! (make-int-priority-queue-entry #f #f #f #f #f #f)))

(define ##sys#timeout-list (make-int-priority-queue))

(define-inline (##sys#timeout-list-empty?) (int-priority-queue-empty? ##sys#timeout-list))

(define ##sys#fd-list (make-int-priority-queue))

(define-inline (##sys#fd-list-empty?) (int-priority-queue-empty? ##sys#fd-list))

(define ##sys#ready-queue-head '())
(define ##sys#ready-queue-tail '())

(define (##sys#ready-queue) ##sys#ready-queue-head)

(define-inline (##sys#ready-queue-empty?) (eq? '() ##sys#ready-queue-head))

(define (##sys#add-to-ready-queue thread)
  (##sys#setslot thread 3 'ready)
  (let ((new-pair (cons thread '())))
    (cond ((##sys#ready-queue-empty?)
	   (set! ##sys#ready-queue-head new-pair))
	  (else (set-cdr! ##sys#ready-queue-tail new-pair)) )
    (set! ##sys#ready-queue-tail new-pair) ) )

(define-inline (##sys#remove-from-ready-queue)
  (let ((first-pair ##sys#ready-queue-head))
    (and (not (null? first-pair))
	 (let ((first-cdr (cdr first-pair)))
	   (set! ##sys#ready-queue-head first-cdr)
	   (when (eq? '() first-cdr) (set! ##sys#ready-queue-tail '()))
	   (car first-pair) ) ) ) )

(define ##sys#waiting-queue-head '())
(define ##sys#waiting-queue-tail '())

(define (##sys#waiting-queue) ##sys#waiting-queue-head)

(define-inline (##sys#waiting-queue-empty?) (eq? '() ##sys#waiting-queue-head))

(define-inline (##sys#add-to-waiting-queue thread)
  (##sys#setslot thread 3 'ready)
  (let ((new-pair (cons thread '())))
    (cond ((##sys#waiting-queue-empty?) 
	   (set! ##sys#waiting-queue-head new-pair))
	  (else (set-cdr! ##sys#waiting-queue-tail new-pair)) )
    (set! ##sys#waiting-queue-tail new-pair) ) )

(define-inline (##sys#release-waiting-queue)
  (set! ##sys#ready-queue-head ##sys#waiting-queue-head)
  (set! ##sys#ready-queue-tail ##sys#waiting-queue-tail)
  (set! ##sys#waiting-queue-head '())
  (set! ##sys#waiting-queue-tail '()))

(define (##sys#schedule)
  (define (switch thread)
    (dbg "switching to " thread)
    (set! ##sys#current-thread thread)
    (##sys#setslot thread 3 'running)
    (##sys#restore-thread-state-buffer thread)
    (##core#inline "C_set_initial_timer_interrupt_period" (##sys#slot thread 9))
    ((##sys#slot thread 1)) )
  (let* ([ct ##sys#current-thread]
	 [cts (##sys#slot ct 3)] )
    (dbg "scheduling, current: " ct ", ready: " ##sys#ready-queue-head)
    (##sys#update-thread-state-buffer ct)
    ;; Put current thread on ready-queue:
    (when (or (eq? cts 'running) (eq? cts 'ready)) ; should ct really be 'ready? - normally not.
      (##sys#setislot ct 13 #f)			   ; clear timeout-unblock flag
      (##sys#add-to-waiting-queue ct) )
    (let loop1 ((check #f))
      (when  (or check (##sys#ready-queue-empty?))
	;; Unblock threads waiting for timeout:
	(unless (##sys#timeout-list-empty?)
	   (if (##sys#unblock-threads-for-timeout!)
	       (##sys#force-primordial)))
	;; Unblock threads blocked by I/O:
	(unless (##sys#fd-list-empty?)
	   (##sys#unblock-threads-for-i/o) ) )
      ;; Fetch and activate next ready thread:
      (let loop2 ()
	(let ([nt (##sys#remove-from-ready-queue)])
	  (cond [(not nt) 
		 (if (and check (##sys#timeout-list-empty?) (##sys#fd-list-empty?))
		     (##sys#signal-hook #:runtime-error "deadlock")
		     (begin
		       (##sys#release-waiting-queue)
		       (loop1 #t)) ) ]
		[(eq? (##sys#slot nt 3) 'ready) (switch nt)]
		[else (loop2)] ) ) ) ) ))

(define (##sys#force-primordial)
  (dbg "primordial thread forced due to interrupt")
  (##sys#thread-unblock! ##sys#primordial-thread) )

(define (##sys#update-thread-state-buffer thread)
  (let ([buf (##sys#slot thread 5)])
    (##sys#setslot buf 0 ##sys#dynamic-winds)
    (##sys#setslot buf 1 ##sys#standard-input)
    (##sys#setslot buf 2 ##sys#standard-output)
    (##sys#setslot buf 3 ##sys#standard-error)
    (##sys#setslot buf 4 ##sys#current-exception-handler)
    (##sys#setslot buf 5 ##sys#current-parameter-vector) ) )

(define (##sys#restore-thread-state-buffer thread)
  (let ([buf (##sys#slot thread 5)])
    (set! ##sys#dynamic-winds (##sys#slot buf 0))
    (set! ##sys#standard-input (##sys#slot buf 1))
    (set! ##sys#standard-output (##sys#slot buf 2))
    (set! ##sys#standard-error (##sys#slot buf 3)) 
    (set! ##sys#current-exception-handler (##sys#slot buf 4))
    (set! ##sys#current-parameter-vector (##sys#slot buf 5)) ) )

(set! ##sys#interrupt-hook
  (let ([oldhook ##sys#interrupt-hook])
    (lambda (reason state)
      (when (fx= reason 255)		; C_TIMER_INTERRUPT_NUMBER
	(let ([ct ##sys#current-thread])
	  (##sys#setslot ct 1 (lambda () (oldhook reason state))) 
	  (##sys#schedule) ) )		; expected not to return!
      (oldhook reason state) ) ) )

(define (##sys#remove-from-timeout-list t)
  (let ((entry (##sys#slot t 4)))
    (when entry
      (int-priority-queue-remove! entry)
      (##sys#setislot t 4 #f))))

(define (##sys#thread-block-for-timeout! t tm)
  (dbg t " blocks for " tm)
  ;; It wouldn't hurt if the thread structure where prepared to be
  ;; moved between thread queues, however that too much of a change at
  ;; once.
  (let ((ton (make-int-priority-queue-entry #f #f #f #f tm t)))
    (##sys#setslot t 4 ton)
    (int-priority-queue-node-insert! ##sys#timeout-list ton)) 
  (##sys#setslot t 3 'blocked)
  (##sys#setislot t 13 #f) )

(define (##sys#unblock-threads-for-timeout!)
  (let ([now (##sys#fudge 16)])
    (dbg "timeout (" now ") list: " (##sys#timeout-list-empty?))
    (let loop ()
      (unless (##sys#timeout-list-empty?)
	(let* ((entry (int-priority-queue-right ##sys#timeout-list))
	       (tmo (int-priority-queue-index entry)))
	  (dbg "  " now " -> " tmo)
	  (if (>= now tmo)
	      (let ((tto (int-priority-queue-value entry)))
		(if (not (eq? (##sys#slot tto 4) entry))
		    (print "(not (eq? (##sys#slot " (##sys#slot tto 4) " 4) " entry ")) "))
		(int-priority-queue-remove! entry)
		(##sys#setislot tto 4 #f)
		(##sys#setislot tto 13 #t) ; mark as being unblocked by timeout
		(##sys#clear-i/o-state-for-thread! tto)
		;;(pp `(CLEARED: ,tto ,@##sys#fd-list) ##sys#standard-error) ;***
		(##sys#thread-basic-unblock! tto)
		(loop) ) ))))
    ;; If there are no threads blocking on a select call (fd-list) but
    ;; there are threads in the timeout list then sleep for the number
    ;; of milliseconds of next thread to wake up and return #t if
    ;; interupted.
    (and (##sys#ready-queue-empty?)
	 (##sys#waiting-queue-empty?)
	 (##sys#fd-list-empty?)
	 (not (##sys#timeout-list-empty?))
	 (let ([tmo (int-priority-queue-index
		     (int-priority-queue-right ##sys#timeout-list))])
	   (and (not (##core#inline "C_msleep" (fxmax 0 (- tmo now))))
		(foreign-value "C_signal_interrupted_p" bool) ) ) )))

(define (##sys#thread-block-for-termination! t t2)
  (dbg t " blocks for " t2)
  (let ([state (##sys#slot t2 3)])
    (unless (or (eq? state 'dead) (eq? state 'terminated))
      (##sys#setslot t2 12 (cons t (##sys#slot t2 12)))
      (##sys#setslot t 3 'blocked) 
      (##sys#setislot t 13 #f)
      (##sys#setslot t 11 t2) ) ) )

(define (##sys#thread-kill! t s)
  (dbg "killing: " t " -> " s ", recipients: " (##sys#slot t 12))
  (##sys#abandon-mutexes t)
  (##sys#setslot t 3 s)
  (##sys#setislot t 11 #f)
  (##sys#setislot t 8 '())
  (##sys#remove-from-timeout-list t)
  (let ([rs (##sys#slot t 12)])
    (unless (null? rs)
      (for-each
       (lambda (t2)
	 (dbg "  checking: " t2 " (" (##sys#slot t2 3) ") -> " (##sys#slot t2 11))
	 (when (eq? (##sys#slot t2 11) t)
	   (##sys#thread-basic-unblock! t2) ) )
       rs) ) )
  (##sys#setislot t 12 '()) )

(define (##sys#thread-basic-unblock! t)
  (dbg "unblocking: " t)
  (##sys#setislot t 11 #f)
  (if (##sys#slot t 4)
      (begin
	(dbg "##sys#thread-basic-unblock! timout slot is still set!")
	(##sys#setislot t 4 #f)))
  (##sys#add-to-ready-queue t) )

(define ##sys#default-exception-handler
  (let ([print-error-message print-error-message]
	[display display]
	[print-call-chain print-call-chain]
	[open-output-string open-output-string]
	[get-output-string get-output-string] )
    (lambda (arg)
      (let ([ct ##sys#current-thread])
	(dbg "exception: " ct " -> " (if (##sys#structure? arg 'condition) (##sys#slot arg 2) arg))
	(cond [(foreign-value "C_abort_on_thread_exceptions" bool)
	       (let* ([pt ##sys#primordial-thread]
		      [ptx (##sys#slot pt 1)] )
		 (##sys#setslot 
		  pt 1 
		  (lambda ()
		    (##sys#signal arg)
		    (ptx) ) )
		 (##sys#thread-unblock! pt) ) ]
	      [##sys#warnings-enabled
	       (let ([o (open-output-string)])
		 (display "Warning (" o)
		 (display ct o)
		 (display "): " o)
		 (print-error-message arg ##sys#standard-error (get-output-string o))
		 (print-call-chain ##sys#standard-error 0 ct) ) ] )
	(##sys#setslot ct 7 arg)
	(##sys#thread-kill! ct 'terminated)
	(##sys#schedule) ) ) ) )


;;; `select()'-based blocking:

(define (##sys#empty-fd-list!) (set! ##sys#fd-list (make-int-priority-queue)))

(define (##sys#fd-list-add-thread! fd t)
  (let ((entry (int-priority-queue-lookup ##sys#fd-list fd)))
    (if entry
	(if (not (memq t (int-priority-queue-value entry)))
	    (int-priority-queue-value-set! entry (cons t (int-priority-queue-value entry))))
	(int-priority-queue-node-insert!
	 ##sys#fd-list (make-int-priority-queue-entry #f #f #f #f fd (list t))))))

(define ##sys#fdset-select-timeout
  (foreign-lambda* int ([bool to] [unsigned-long tm])
    "struct timeval timeout;"
    "timeout.tv_sec = tm / 1000;"
    "timeout.tv_usec = (tm % 1000) * 1000;"
    "C_fdset_input_2 = C_fdset_input;"
    "C_fdset_output_2 = C_fdset_output;"
    "return(select(FD_SETSIZE, &C_fdset_input, &C_fdset_output, NULL, to ? &timeout : NULL));") )

(define ##sys#fdset-restore
  (foreign-lambda* void ()
    "C_fdset_input = C_fdset_input_2;"
    "C_fdset_output = C_fdset_output_2;") )

((foreign-lambda* void ()
   "FD_ZERO(&C_fdset_input);"
   "FD_ZERO(&C_fdset_output);") )

(define ##sys#fdset-input-set
  (foreign-lambda* void ([int fd])
    "FD_SET(fd, &C_fdset_input);" ) )

(define ##sys#fdset-output-set
  (foreign-lambda* void ([int fd])
    "FD_SET(fd, &C_fdset_output);" ) )

(define ##sys#fdset-clear
  (foreign-lambda* void ([int fd])
    "FD_CLR(fd, &C_fdset_input_2);"
    "FD_CLR(fd, &C_fdset_output_2);") )

(define (##sys#thread-block-for-i/o! t fd i/o)
  (dbg t " blocks for I/O " fd)
  (##sys#fd-list-add-thread! fd t)
  (case i/o
    ((#t #:input) (##sys#fdset-input-set fd))
    ((#f #:output) (##sys#fdset-output-set fd))
    ((#:all)
     (##sys#fdset-input-set fd)
     (##sys#fdset-output-set fd) ) )
  (##sys#setslot t 3 'blocked)
  (##sys#setislot t 13 #f)
  (##sys#setslot t 11 (cons fd i/o)) )

(define-foreign-variable error-bad-file int "(errno == EBADF)")

(define (##sys#unblock-threads-for-i/o)
  (dbg "fd-list: " (int-priority-queue-node-fold
		    (lambda (n i) (cons (cons (int-priority-queue-index n) (int-priority-queue-value n)) i))
		    '()
		    ##sys#fd-list))
  (let* ([to? (not (##sys#timeout-list-empty?))]
	 [rq? (not (and (##sys#ready-queue-empty?) (##sys#waiting-queue-empty?)))]
	 [n (##sys#fdset-select-timeout	; we use FD_SETSIZE, but really should use max fd
	     (or rq? to?)
	     (if (and to? (not rq?))	; no thread was unblocked by timeout, so wait
		 (let* ([entry (int-priority-queue-right ##sys#timeout-list)]
			[tmo (int-priority-queue-index entry)]
			[now (##sys#fudge 16)])
		   (fxmax 0 (- tmo now)) )
		 0) ) ] )		; otherwise immediate timeout.
    (dbg n " fds ready")
    (cond [(eq? n 0) (##sys#fdset-restore)]
	  [(eq? -1 n)
	   (cond
	    (error-bad-file
	     (##sys#call-with-current-continuation
	      (lambda (exit)
		(int-priority-queue-node-for-each
		 (lambda (node)
		   (define fd (int-priority-queue-index node))
		   (define ts (int-priority-queue-value node))
		   (dbg "check bad " fd)
		   (let ((bad ((foreign-lambda*
				bool ((integer fd))
				"struct stat buf;"
				"int i = ( (fstat(fd, &buf) == -1 && errno == EBADF) ? 1 : 0);"
				"return(i);")
			       fd)))
		     (when bad
			   (dbg "bad is " fd)
			   (##sys#fdset-clear fd)
			   (int-priority-queue-remove! node)
			   (for-each
			    (lambda (thread)
			      (thread-signal!
			       thread
			       (##sys#make-structure
				'condition
				'(exn i/o) ;; better? '(exn i/o net)
				(list '(exn . message) "bad file descriptor"
				      '(exn . arguments) (list fd)
				      '(exn . location) thread) )))
			    ts)
			   (exit #t))))
		 ##sys#fd-list)))
	     (##sys#fdset-restore)
	     (##sys#unblock-threads-for-i/o))
	    (else (##sys#force-primordial))) ]
	  [(fx> n 0)
	   (map
	    int-priority-queue-remove!
	    (##sys#call-with-current-continuation
	     (lambda (exit)
	       (int-priority-queue-node-fold
		(lambda (node init)
		  (define fd (int-priority-queue-index node))
		  (define threads (int-priority-queue-value node))
		  (if (zero? n) (exit init)
		      (let* ([inf (##core#inline "C_fd_test_input" fd)]
			     [outf (##core#inline "C_fd_test_output" fd)] )
			(dbg "fd " fd " ready: input=" inf ", output=" outf)
			(if (or inf outf)
			    (begin
			      (for-each
			       (lambda (t)
				 (let* ((p (##sys#slot t 11)) )
				   (when (and (pair? p)
					      (eq? fd (car p))
					      (not (##sys#slot t 13) ) ) ; not unblocked by timeout
					 (##sys#thread-basic-unblock! t) ) ))
			       threads)
			      (##sys#fdset-clear fd)
			      (set! n (sub1 n))
			      (cons node init))
			    init))))
		'()
		##sys#fd-list))))
	   (##sys#fdset-restore) ] ) ) )

;;; Clear I/O state for unblocked thread

(define (##sys#clear-i/o-state-for-thread! t)
  (when (pair? (##sys#slot t 11))
    (let* ((fd (##sys#slot (##sys#slot t 11) 0))
	   (entry (int-priority-queue-lookup ##sys#fd-list fd)))
      (when entry
        (let ((ts (##sys#delq t (int-priority-queue-value entry)))) ; remove from fd-list entry
	  (cond ((null? ts)
		 ;;(pp `(CLEAR FD: ,fd ,t) ##sys#standard-error)
		 (##sys#fdset-clear fd) ; no more threads waiting for this fd
		 (##sys#fdset-restore)
		 (int-priority-queue-remove! entry))
		(else
		 (int-priority-queue-value-set! entry ts)) ) ))))) ; fd-list entry is list with t removed


;;; Get list of all threads that are ready or waiting for timeout or waiting for I/O:
;
; (contributed by Joerg Wittenberger)

(define (##sys#all-threads #!optional
			   (cns (lambda (queue arg val init)
				  (cons val init)))
			   (init '()))
  (let loop ((l ##sys#ready-queue-head) (i init))
    (if (pair? l)
	(loop (cdr l) (cns 'ready #f (car l) i))
	(let loop ((l ##sys#waiting-queue-head) (i i))
	  (if (pair? l)
	      (loop (cdr l) (cns 'waiting #f (car l) i))
	      (int-priority-queue-node-fold
	       (lambda (n i)
		 (fold (lambda (t i) (cns 'i/o (int-priority-queue-index n) t i))
		       i (int-priority-queue-value n)))
	       (int-priority-queue-node-fold
		(lambda (n i)
		  (cns 'timeout (int-priority-queue-index n) (int-priority-queue-value n) i))
		l ##sys#timeout-list)
	       ##sys#fd-list))))))

;;; Remove all waiting threads from the relevant queues with the exception of the current thread:
#|
(define (##sys#fetch-and-clear-threads)
  (let ([all (vector ##sys#ready-queue-head ##sys#ready-queue-tail ##sys#fd-list ##sys#timeout-list)])
    (set! ##sys#ready-queue-head '())
    (set! ##sys#ready-queue-tail '())
    (##sys#empty-fd-list!)
    (set! ##sys#timeout-list '()) 
    all) )


;;; Restore list of waiting threads:

(define (##sys#restore-threads vec)
  (set! ##sys#ready-queue-head (##sys#slot vec 0))
  (set! ##sys#ready-queue-tail (##sys#slot vec 1))
  (set! ##sys#fd-list (##sys#slot vec 2))
  (set! ##sys#timeout-list (##sys#slot vec 3)) )
|#

;;; Unblock thread cleanly:

(define (##sys#thread-unblock! t)
  (when (eq? 'blocked (##sys#slot t 3))
    (##sys#remove-from-timeout-list t)
    (##sys#clear-i/o-state-for-thread! t)
    (##sys#setislot t 12 '())
    (##sys#thread-basic-unblock! t) ) )

;;; Multithreaded breakpoints

(define (##sys#break-entry name args)
  (when (or (not ##sys#break-in-thread) (eq? ##sys#break-in-thread ##sys#current-thread))
    (##sys#call-with-current-continuation
     (lambda (k)
       (let* ((pk (if (eq? ##sys#current-thread ##sys#primordial-thread)
		      '()
		      (list '(exn . thread) ##sys#current-thread
			    '(exn . primordial-continuation) 
			    (lambda _ ((##sys#slot ##sys#primordial-thread 1))))))
	      (exn (##sys#make-structure
		    'condition
		    '(exn breakpoint)
		    (append 
		     (list '(exn . message) "*** breakpoint ***"
			   '(exn . arguments) (cons name args)
			   '(exn . location) name
			   '(exn . continuation) k)
		     pk) ) ) )
	 (set! ##sys#last-breakpoint exn)
	 (cond ((eq? ##sys#current-thread ##sys#primordial-thread)
		(##sys#signal exn) )
	       (else
		(##sys#setslot ##sys#current-thread 3 'suspended)
		(##sys#setslot ##sys#current-thread 1 (lambda () (k (##core#undefined))))
		(let ([old (##sys#slot ##sys#primordial-thread 1)])
		  (##sys#setslot
		   ##sys#primordial-thread 1
		   (lambda ()
		     (##sys#signal exn)
		     (old) ) )
		  (##sys#thread-unblock! ##sys#primordial-thread)
		  (##sys#schedule) ) ) ) ) ) ) ) )
		  
(define (##sys#break-resume exn)
  ;; assumes current-thread is primordial
  (let* ((props (##sys#slot exn 2))
	 (a (member '(exn . continuation) props))
	 (t (member '(exn . thread) props))
	 (pk (or (member '(exn . primordial-continuation) props) a)))
    (when t
      (let ((t (cadr t)))
	(if a
	    (##sys#setslot t 1 (lambda () ((cadr a) (##core#undefined))))
	    (##sys#signal-hook #:type-error "condition has no continuation" exn) )
	(##sys#add-to-ready-queue t) ) )
    (if pk
	((cadr pk) (##core#undefined))
	(##sys#signal-hook #:type-error "condition has no continuation" exn) ) ) )
_______________________________________________
Chicken-users mailing list
Chicken-users@nongnu.org
http://lists.nongnu.org/mailman/listinfo/chicken-users

Reply via email to