Le Monday 23 June 2008 17:51:26 Romain Beauxis, vous avez écrit :
> I will try to reproduce here..

        Hi folks !

After some hours waiting to see the issue happening again, 
I was able to grab some relevant logs !

Then it took me another couple of hour to get to understand 
what was going bad.. 

Indeed, it was a probabilistic bug, hence the difficulty to reproduce..

So, look at this correct log extract:
2008/06/24 16:17:20 [generic queue #1:4] There are 1 ready tasks.
2008/06/24 16:17:20 [generic queue #1:4] There are 0 ready tasks.
2008/06/24 16:17:20 [non-blocking queue #1:4] Left select at 1214317040.812570 
(1/0/0).
2008/06/24 16:17:20 [non-blocking queue #1:4] There are 0 ready tasks.
2008/06/24 16:17:20 [non-blocking queue #1:4] Enter select at 
1214317040.812747, timeout -1.000000 (3/0/0).
2008/06/24 16:17:23 [b:3] Finished with "/home/toots/test/bla.mp3"
2008/06/24 16:17:23 [format.mp3:4] close "/home/toots/test/bla.mp3"
2008/06/24 16:17:23 [b:4] queue length 2646000-=1323000
2008/06/24 16:17:23 [format.mp3:4] open "/home/toots/test/bla.mp3"
2008/06/24 16:17:23 [b:3] Prepared "/home/toots/test/bla.mp3" -- rid 25
2008/06/24 16:17:23 [non-blocking queue #1:4] Left select at 1214317043.061973 
(1/0/0).
2008/06/24 16:17:23 [non-blocking queue #1:4] There are 1 ready tasks.
2008/06/24 16:17:23 [non-blocking queue #1:4] Enter select at 
1214317043.062211, timeout -1.000000 (2/0/0).
2008/06/24 16:17:23 [generic queue #1:4] There are 1 ready tasks.
2008/06/24 16:17:23 [decoder:4] Trying WAV decoder for 
"/home/toots/test/bla.mp3"
2008/06/24 16:17:23 [decoder:4] Trying VORBIS decoder for 
"/home/toots/test/bla.mp3"
2008/06/24 16:17:23 [decoder:4] Trying MP3 decoder for 
"/home/toots/test/bla.mp3"
2008/06/24 16:17:23 [format.mp3:4] open "/home/toots/test/bla.mp3"
2008/06/24 16:17:23 [format.mp3:4] close "/home/toots/test/bla.mp3"
2008/06/24 16:17:23 [b:4] queue length 1323000+=1323000 (rid 27)


It's a full round:
1) End of latest task, go to sleep, waiting for reading on 3 sockets
2) The current song ends, ask for next one, 
     and wake up feeding task via one of the reading socket
3) Open a new file in the playlist, test and enqueue it

Now, one failing round:
2008/06/24 16:17:23 [generic queue #1:4] There are 0 ready tasks.
2008/06/24 16:17:23 [non-blocking queue #1:4] Left select at 1214317043.065750 
(1/0/0).
2008/06/24 16:17:23 [non-blocking queue #1:4] There are 1 ready tasks.
2008/06/24 16:17:23 [non-blocking queue #1:4] Enter select at 
1214317043.065975, timeout -1.000000 (2/0/0).
2008/06/24 16:17:25 [b:3] Finished with "/home/toots/test/bla.mp3"
2008/06/24 16:17:25 [format.mp3:4] close "/home/toots/test/bla.mp3"
2008/06/24 16:17:25 [b:4] queue length 2646000-=1323000
2008/06/24 16:17:25 [format.mp3:4] open "/home/toots/test/bla.mp3"
2008/06/24 16:17:25 [b:3] Prepared "/home/toots/test/bla.mp3" -- rid 26
2008/06/24 16:17:27 [b:3] Finished with "/home/toots/test/bla.mp3"
2008/06/24 16:17:27 [format.mp3:4] close "/home/toots/test/bla.mp3"
2008/06/24 16:17:27 [b:4] queue length 1323000-=1323000
2008/06/24 16:17:27 [format.mp3:4] open "/home/toots/test/bla.mp3"
2008/06/24 16:17:27 [b:3] Prepared "/home/toots/test/bla.mp3" -- rid 27
2008/06/24 16:17:29 [b:3] Finished with "/home/toots/test/bla.mp3"
2008/06/24 16:17:29 [format.mp3:4] close "/home/toots/test/bla.mp3"
2008/06/24 16:17:29 [b:5] Queue is empty !
2008/06/24 16:17:29 [b:5] Failed to prepare track: no file

What happens here, is that at the moment when the generic queue 
looks for new tasks, there is nothing. 

Here is the race condition: when the non-blocking queue looks for 
any task, the task has just been submited..

Unfortunately, this task is "maybe blocking" and is exactly the task 
that will wait for restarting feeding upon wake_up calls. 

Hence, the non blocking queue doesn't process it and the feeding task 
is later not started again when wake_up is called...

As a confirmation, any other action that would wake the scheduler 
would restart the feeding task... 

Then, I would like to propose the following patch to fix the issue. It moves 
all asynchronous processing to the scheduler: instead of creating manually 
a new waiting task each time it goes to sleep, it initially registers an async
 task and wakes it when the feeding task has to be restarted.

That way, the scheduler will always include the async task when going 
to sleep, and the wake_up will always suceed.


I think this issue (plus other minor bugfixes) is worth preparing a bugfix 
release, but the patch is quite heavy so I would like to have your comments 
on it.

Romain
Index: sources/request_source.ml
===================================================================
--- sources/request_source.ml	(révision 5751)
+++ sources/request_source.ml	(copie de travail)
@@ -167,12 +167,10 @@
   val mutable queue_length = 0 (* Frames *)
   val mutable resolving = None
 
-  (* A Unix pipe to wake up the queue waiting on a Unix.select call.
-   * The initial values are stubs: a new pipe is allocated on wake_up,
-   * and destroyed before going to sleep.
-   * Indeed no resource should be allocated outside that. *)
-  val mutable out_pipe = Unix.stdout
-  val mutable in_pipe = Unix.stdin
+  (** Asynchronous task for waking up the queuing process.. *)
+  val run_m = Mutex.create ()
+  val mutable do_run = false
+  val mutable wake_task = None 
 
   (** State should be `Sleeping on awakening, and is then turned to `Running.
     * Eventually #sleep puts it to `Tired, then waits for it to be `Sleeping,
@@ -182,16 +180,24 @@
   val state_cond = Condition.create ()
 
   method private wake_up activation =
-    (let o,i = Unix.pipe () in out_pipe <- o ; in_pipe <- i) ;
     Tutils.mutexify state_lock
       (fun () ->
          assert (state = `Sleeping) ;
          state <- `Running) () ;
-    Duppy.Task.add Tutils.scheduler
-      { Duppy.Task.
-          priority = priority ;
-          events   = [`Delay 0.] ;
-          handler  = (fun _ -> self#feed_queue () ; []) }
+    Mutex.lock run_m; do_run <- true; Mutex.unlock run_m ;
+    wake_task <- 
+      Some 
+        (Duppy.Async.add Tutils.scheduler ~priority
+          (fun () ->  Mutex.lock run_m ;
+                      if do_run then begin
+                        Duppy.Task.add Tutils.scheduler
+                         { Duppy.Task.
+                           priority = priority ;
+                           events   = [`Delay 0.] ;
+                           handler  = (fun _ -> self#feed_queue () ; []) } ;
+                        do_run <- false
+                      end ;
+		      Mutex.unlock run_m))
 
   method private sleep =
     Tutils.mutexify state_lock
@@ -207,12 +213,16 @@
           Request.destroy req
       done
     with e -> Mutex.unlock qlock ; if e <> Queue.Empty then raise e end ;
-    out_pipe <- Unix.stdout ; in_pipe <- Unix.stdin
+    let task = Utils.get_some wake_task in
+      wake_task <- None ;
+      Duppy.Async.stop task
 
   (** This method should be called whenever the feeding task has a new
     * opportunity to feed the queue, in case it is sleeping. *)
   method private notify_new_request =
-    ignore (Unix.write in_pipe " " 0 1)
+    match wake_task with
+      | Some task -> Duppy.Async.wake_up task
+      | None -> ()
 
   (** A function that returns delays for tasks, making sure that these tasks
     * don't repeat too fast.
@@ -235,16 +245,9 @@
   (** The body of the feeding task *)
   method private feed_queue () : unit =
     let put_on_sleep () =
-      let consume l =
-        assert (List.exists ((=) (`Read out_pipe)) l) ;
-        (* Consume a character in the pipe *)
-        ignore (Unix.read out_pipe " " 0 1)
-      in
-        Duppy.Task.add Tutils.scheduler
-          { Duppy.Task.
-              priority = priority ;
-              events   = [`Read out_pipe] ;
-              handler  = (fun l -> consume l; self#feed_queue (); []) }
+      Mutex.lock run_m ;
+      do_run <- true ;
+      Mutex.unlock run_m
     in
     (* If the test fails, the task ends.. *)
     if
@@ -316,7 +319,6 @@
         let len,f = Queue.take retrieved in
           self#log#f 4 "queue length %d-=%d" queue_length len ;
           queue_length <- queue_length - len ;
-          self#notify_new_request ;
           Some f
       with
         | Queue.Empty ->
@@ -324,6 +326,7 @@
             None
     in
       Mutex.unlock qlock ;
+      self#notify_new_request ;
       ans
 
   method copy_queue =
-------------------------------------------------------------------------
Check out the new SourceForge.net Marketplace.
It's the best place to buy or sell services for
just about anything Open Source.
http://sourceforge.net/services/buy/index.php
_______________________________________________
Savonet-devl mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/savonet-devl

Répondre à