hello!

last week i have implemented termination detection for the ghc runtime.
my motivation was a parallel interpreter for a concurrent constraint
language which i have implemented using STM.  this interpreter has reached
its successful final state iff none the stm transactions it has spawned
make progress anymore, i.e. all threads are in state BlockedOnSTM.  well,
it turned out that this state is rather difficult to detect.  it could be
possible to use unsafePerformIO from within an atomically block with an
idempotent io action such that there is some thread which counts the
number of threads which are blocked on stm, but i could not come up with
something that is free of race conditions.  mainly this is because with
stm, it is impossible to know (in haskell code) which threads are woken up
by a committed stm transaction.

well, there is something that would work: a wave-based distributed
termination algorithm.  but that would be very inefficient: every thread
needs to be woken up, and there is still the issue of finding the right
time to send out a detection wave: too late, and the user gets bored; too
early, and there is even more overhead.

of course, the ghc runtime knows which threads are woken up, so i thought
it should be possible to implement the termination detection there.
and also, this should be more reliable than the ugly hack i had before (a
master thread with a timeout that is reset on every stm commit, by
throwing an exception).

so how can the ghc runtime detect termination?
it is quite simple: we need to add a counter somewhere that is incremented
every time a thread that is BlockedOnSTM is woken up, and decremented
every time a thread goes into the BlockedOnSTM state (via retry).
but just having a single counter has a drawback: it might become a
scalability bottleneck.
so i have extended the StgTSO struct with two fields: a counter, and a
parent pointer.
the parent pointer points to the TSO that spawned the thread.
the counter field of a TSO counts the threads which are children of 
this thread (non-transitively!) and have not terminated yet.
invariant: the counter is always >= 0, and == 0 iff the subtree rooted at
this thread has terminated.

conceptually, we have to modify the counter at the following events:
- fork: the forking thread's counter is incremented
        the forked thread's counter is initialized to 1
- exit: the exiting thread's counter is decremented
- retry: the retrying thread's counter is decremented
- wakeup (stm): the counter of the woken thread is incremented
increment means: add 1; if new value is 1: recursively increment parent
decrement means: sub 1; if new value is 0: recursively decrement parent
                        if there is no parent, signal termination

of course, it has to be guaranteed that increments always arrive at the
root before the corresponding decrements, otherwise termination may be
detected prematurely.
note that termination can only be signalled for a thread which has already
exited, or which has called GHC.Conc.awaitTermination (described below).

there are two added primitive operations:
- counterDec# decrements the calling thread's counter
  and also sets the parent pointer to NULL (so the calling thread becomes
  the root of a thread tree for which termination will be detected)
- counterInc# just increments the calling thread's counter
  (cancels the effect of counterDec)

these primitives are meant to be called from a single place:
awaitTermination, which is in GHC.Conc.
it calls counterDec#, then waits for the exception with a delay.
afterwards, it just calls counterInc#.

the termination is signalled by throwing a Deadlock exception to the root
of the thread tree.  actually, i believe the termination condition is a
livelock, but there is no constructor for that.

note that there may be several independent subcomputations within a single
haskell process for which termination can be detected with this approach.

the main drawbacks of this code:
- the locking overhead (counter updates)
- because of the parent pointers, non-leaf threads will never be garbage
  collected if any child thread is still alive.
this could be alleviated by only enabling the termination detection if the
program specifically requests it, e.g. some global flag variable which is
set by another primitive operation.

i would welcome a review of the code; this is the first time i have hacked
on ghc (and also the first non-trivial C code i have written in years), so
there may be issues and interactions with other parts of the runtime that
i have not anticipated. so far, i have tested it only with -threaded
-debug, but it seems to work with -N32 (on a single processor, that is all
i have).
do you think that others might be interested in this functionality?
could it be included in ghc?

also, something that i really do not understand: in the counterDec#
primitive, the stack_size field of the StgTSO struct is corrupted for
no apparent reason, so i save it onto the stack and then restore it.
none of the other primitive ops seem to do something similar.
what am i doing wrong?

patch is against ghc 6.6 (actually the debian package 6.6-3, but i hope
those weird arm floating point endianness patches don't matter much :) ).

        michael stahl


-- 
"I don't feel we did wrong in taking this great country away from them.
 There were great numbers of people who needed new land, and the Indians
 were selfishly trying to keep it for themselves." -- John Wayne
diff -ru /tmp/ghc-6.6/compiler/prelude/primops.txt.pp ghc-6.6/compiler/prelude/primops.txt.pp
--- /tmp/ghc-6.6/compiler/prelude/primops.txt.pp	2006-10-10 21:03:47.000000000 +0200
+++ ghc-6.6/compiler/prelude/primops.txt.pp	2007-02-06 15:08:52.000000000 +0100
@@ -1477,6 +1477,18 @@
    with
    out_of_line = True
 
+primop  CounterIncOp "counterInc#" GenPrimOp
+   State# RealWorld -> State# RealWorld
+   with
+   has_side_effects = True
+   out_of_line      = True
+
+primop  CounterDecOp "counterDec#" GenPrimOp
+   State# RealWorld -> State# RealWorld
+   with
+   has_side_effects = True
+   out_of_line      = True
+
 ------------------------------------------------------------------------
 section "Weak pointers"
 ------------------------------------------------------------------------
diff -ru /tmp/ghc-6.6/includes/mkDerivedConstants.c ghc-6.6/includes/mkDerivedConstants.c
--- /tmp/ghc-6.6/includes/mkDerivedConstants.c	2006-10-10 21:03:51.000000000 +0200
+++ ghc-6.6/includes/mkDerivedConstants.c	2007-02-06 20:37:17.000000000 +0100
@@ -283,6 +283,8 @@
     closure_field(StgTSO, saved_errno);
     closure_field(StgTSO, trec);
     closure_field(StgTSO, flags);
+    closure_field(StgTSO, parent);
+    closure_field(StgTSO, counter);
     closure_field_("StgTSO_CCCS", StgTSO, prof.CCCS);
     tso_field(StgTSO, sp);
     tso_field_offset(StgTSO, stack);
diff -ru /tmp/ghc-6.6/includes/StgMiscClosures.h ghc-6.6/includes/StgMiscClosures.h
--- /tmp/ghc-6.6/includes/StgMiscClosures.h	2006-10-10 21:03:49.000000000 +0200
+++ ghc-6.6/includes/StgMiscClosures.h	2007-02-06 14:55:16.000000000 +0100
@@ -590,6 +590,8 @@
 RTS_FUN(myThreadIdzh_fast);
 RTS_FUN(labelThreadzh_fast);
 RTS_FUN(isCurrentThreadBoundzh_fast);
+RTS_FUN(counterInczh_fast);
+RTS_FUN(counterDeczh_fast);
 
 RTS_FUN(mkWeakzh_fast);
 RTS_FUN(finalizzeWeakzh_fast);
diff -ru /tmp/ghc-6.6/includes/TSO.h ghc-6.6/includes/TSO.h
--- /tmp/ghc-6.6/includes/TSO.h	2006-10-10 21:03:50.000000000 +0200
+++ ghc-6.6/includes/TSO.h	2007-02-13 02:07:44.000000000 +0100
@@ -160,6 +160,9 @@
     StgTSODistInfo dist;
 #endif
 
+    StgWord32          counter;
+    struct StgTSO_*    parent;
+
     /* The thread stack... */
     StgWord32	       stack_size;     /* stack size in *words* */
     StgWord32          max_stack_size; /* maximum stack size in *words* */
diff -ru /tmp/ghc-6.6/libraries/base/GHC/Conc.lhs ghc-6.6/libraries/base/GHC/Conc.lhs
--- /tmp/ghc-6.6/libraries/base/GHC/Conc.lhs	2006-10-10 21:08:04.000000000 +0200
+++ ghc-6.6/libraries/base/GHC/Conc.lhs	2007-02-12 20:37:00.000000000 +0100
@@ -36,6 +36,7 @@
 	, pseq 		-- :: a -> b -> b
 	, yield         -- :: IO ()
 	, labelThread	-- :: ThreadId -> String -> IO ()
+	, awaitTermination -- :: IO ()
 
 	-- * Waiting
 	, threadDelay	  	-- :: Int -> IO ()
@@ -98,7 +99,8 @@
 import GHC.Num		( Num(..) )
 import GHC.Real		( fromIntegral, quot )
 import GHC.Base		( Int(..) )
-import GHC.Exception    ( catchException, Exception(..), AsyncException(..) )
+import GHC.Exception    ( catchException, Exception(..), AsyncException(..),
+                          unblock )
 import GHC.Pack		( packCString# )
 import GHC.Ptr          ( Ptr(..), plusPtr, FunPtr(..) )
 import GHC.STRef
@@ -246,6 +248,16 @@
 yield = IO $ \s -> 
    case (yield# s) of s1 -> (# s1, () #)
 
+awaitTermination :: IO ()
+awaitTermination =
+        catchException 
+                (counterDec >> unblock (threadDelay 2147483647))
+                (\e -> counterInc >> case e of Deadlock -> return ()
+                                               _        -> throw e)
+    where
+        counterDec = IO (\s -> case counterDec# s of s' -> (# s', () #) )
+        counterInc = IO (\s -> case counterInc# s of s' -> (# s', () #) )
+
 {- | 'labelThread' stores a string as identifier for this thread if
 you built a RTS with debugging support. This identifier will be used in
 the debugging output to make distinction of different threads easier
diff -ru /tmp/ghc-6.6/rts/GC.c ghc-6.6/rts/GC.c
--- /tmp/ghc-6.6/rts/GC.c	2006-10-10 21:03:51.000000000 +0200
+++ ghc-6.6/rts/GC.c	2007-02-12 17:25:48.000000000 +0100
@@ -2642,7 +2642,11 @@
 
     // scavange current transaction record
     tso->trec = (StgTRecHeader *)evacuate((StgClosure *)tso->trec);
-    
+
+    if (tso->parent) {
+            tso->parent = (StgTSO *)evacuate((StgClosure *)tso->parent);
+    }
+
     // scavenge this thread's stack 
     scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
 }
@@ -4175,6 +4179,7 @@
    * that starts with an activation record. 
    */
 
+  ASSERT (p <= stack_end);
   while (p < stack_end) {
     info  = get_ret_itbl((StgClosure *)p);
       
diff -ru /tmp/ghc-6.6/rts/HSprel.def ghc-6.6/rts/HSprel.def
--- /tmp/ghc-6.6/rts/HSprel.def	2006-10-10 21:03:47.000000000 +0200
+++ ghc-6.6/rts/HSprel.def	2007-02-06 20:08:28.000000000 +0100
@@ -24,5 +24,6 @@
 PrelIOBase_BlockedOnDeadMVar_closure
 PrelIOBase_BlockedIndefinitely_closure
 PrelIOBase_NonTermination_closure
+PrelIOBase_Deadlock_closure
 PrelWeak_runFinalizzerBatch_closure
 __stginit_Prelude
diff -ru /tmp/ghc-6.6/rts/Linker.c ghc-6.6/rts/Linker.c
--- /tmp/ghc-6.6/rts/Linker.c	2006-10-10 21:03:52.000000000 +0200
+++ ghc-6.6/rts/Linker.c	2007-02-06 15:05:29.000000000 +0100
@@ -493,6 +493,8 @@
       SymX(cmpIntegerzh_fast)	        	\
       SymX(cmpIntegerIntzh_fast)	      	\
       SymX(complementIntegerzh_fast)		\
+      SymX(counterDeczh_fast)		        \
+      SymX(counterInczh_fast)		        \
       SymX(createAdjustor)			\
       SymX(decodeDoublezh_fast)			\
       SymX(decodeFloatzh_fast)			\
diff -ru /tmp/ghc-6.6/rts/Prelude.h ghc-6.6/rts/Prelude.h
--- /tmp/ghc-6.6/rts/Prelude.h	2006-10-10 21:03:50.000000000 +0200
+++ ghc-6.6/rts/Prelude.h	2007-02-06 20:06:00.000000000 +0100
@@ -41,6 +41,7 @@
 PRELUDE_CLOSURE(base_GHCziIOBase_BlockedIndefinitely_closure);
 PRELUDE_CLOSURE(base_GHCziIOBase_NonTermination_closure);
 PRELUDE_CLOSURE(base_GHCziIOBase_NestedAtomically_closure);
+PRELUDE_CLOSURE(base_GHCziIOBase_Deadlock_closure);
 
 #if !defined(mingw32_HOST_OS)
 PRELUDE_CLOSURE(base_GHCziConc_ensureIOManagerIsRunning_closure);
@@ -93,6 +94,7 @@
 #define BlockedIndefinitely_closure (&base_GHCziIOBase_BlockedIndefinitely_closure)
 #define NonTermination_closure    (&base_GHCziIOBase_NonTermination_closure)
 #define NestedAtomically_closure  (&base_GHCziIOBase_NestedAtomically_closure)
+#define Deadlock_closure          (&base_GHCziIOBase_Deadlock_closure)
 
 #define Czh_static_info           (&base_GHCziBase_Czh_static_info)
 #define Fzh_static_info           (&base_GHCziFloat_Fzh_static_info)
diff -ru /tmp/ghc-6.6/rts/PrimOps.cmm ghc-6.6/rts/PrimOps.cmm
--- /tmp/ghc-6.6/rts/PrimOps.cmm	2006-10-10 21:03:51.000000000 +0200
+++ ghc-6.6/rts/PrimOps.cmm	2007-02-13 02:08:55.000000000 +0100
@@ -947,6 +947,23 @@
   RET_N(r);
 }
 
+counterInczh_fast
+{
+  foreign "C" incrementCounter(CurrentTSO "ptr", 0/*FALSE*/) [];
+  jump %ENTRY_CODE(Sp(0));
+}
+
+counterDeczh_fast
+{
+  W_ sz;
+  sz = StgTSO_stack_size(CurrentTSO);
+  StgTSO_parent(CurrentTSO) = NULL;
+  foreign "C" decrementCounter(MyCapability() "ptr", CurrentTSO "ptr") [];
+  /* FIXME: why does stack_size get corrupted? */
+  StgTSO_stack_size(CurrentTSO) = sz;
+  jump %ENTRY_CODE(Sp(0));
+}
+
 
 /* -----------------------------------------------------------------------------
  * TVar primitives
diff -ru /tmp/ghc-6.6/rts/RaiseAsync.c ghc-6.6/rts/RaiseAsync.c
--- /tmp/ghc-6.6/rts/RaiseAsync.c	2006-10-10 21:03:51.000000000 +0200
+++ ghc-6.6/rts/RaiseAsync.c	2007-02-12 18:24:18.000000000 +0100
@@ -730,6 +730,7 @@
     // perhaps have a debugging test to make sure that this really
     // happens and that the 'zombie' transaction does not get
     // committed.
+    incrementCounter(tso, rtsFalse);
     goto done;
 
   case BlockedOnMVar:
diff -ru /tmp/ghc-6.6/rts/Schedule.c ghc-6.6/rts/Schedule.c
--- /tmp/ghc-6.6/rts/Schedule.c	2006-10-10 21:03:51.000000000 +0200
+++ ghc-6.6/rts/Schedule.c	2007-02-13 02:02:07.000000000 +0100
@@ -1798,9 +1798,91 @@
  * Handle a thread that returned to the scheduler with ThreadFinished
  * -------------------------------------------------------------------------- */
 
+void decrementCounter(Capability *cap, StgTSO *t) {
+    StgTSO *tso = t;
+    StgTSO *par;
+    StgWord32 ctr;
+    int tmp;
+    lockTSO(t);
+    ASSERT (t->counter > 0);
+    ctr = --t->counter;
+    unlockTSO(t);
+    debugTrace(DEBUG_sched,
+                        "Decrementing counter for thread %lu (%lu)",
+                        (unsigned long)t->id, (unsigned long)ctr);
+    /* recursively decrement counters */
+    while (ctr == 0) {
+            par = tso->parent;
+            if (par) { /* not root */
+                    while (par->what_next == ThreadRelocated) {
+                            par = par->link;
+                            tso->parent = par;
+                    }
+                    tso = par;
+                    lockTSO(tso);
+                    ASSERT (tso->counter > 0);
+                    ctr = --tso->counter;
+                    unlockTSO(tso);
+                    debugTrace(DEBUG_sched,
+                        "Decrementing counter for thread %lu (%lu)",
+                        (unsigned long)tso->id, (unsigned long)ctr);
+            } else { /* root: termination */
+                    /* NB: if tso == t, the root thread has exited
+                     *     throwTo seems to handle that case, so we do not */
+                    debugTrace(DEBUG_sched,
+                        "Detected termination for thread %lu (%s)",
+                        (unsigned long)tso->id, whatNext_strs[tso->what_next]);
+                    tmp = throwTo(cap, t, tso,
+                                (StgClosure *)Deadlock_closure, NULL);
+                    /* we assume that the calling thread will delay */
+                    ASSERT (tmp == THROWTO_SUCCESS); //FIXME wrong wrong wrong
+                    break;
+            }
+    }
+}
+
+void incrementCounter (StgTSO *t, rtsBool t_is_locked) {
+    StgTSO *tso = t;
+    StgTSO *par;
+    StgWord32 ctr;
+    if (!t_is_locked) lockTSO(t);
+    ASSERT (t->counter >= 0);
+    ctr = ++t->counter;
+    if (!t_is_locked) unlockTSO(t);
+    debugTrace(DEBUG_sched,
+                        "Incrementing counter for thread %lu (%lu)",
+                        (unsigned long)t->id, (unsigned long)ctr);
+    while (ctr == 1) {
+            par = tso->parent;
+            if (par) { /* not root */
+                    while (par->what_next == ThreadRelocated) {
+                            par = par->link;
+                            tso->parent = par;
+                    }
+                    tso = par;
+                    lockTSO(tso);
+                    ASSERT (tso->counter >= 0);
+                    ctr = ++tso->counter;
+                    unlockTSO(tso);
+                    debugTrace(DEBUG_sched,
+                        "Incrementing counter for thread %lu (%lu)",
+                        (unsigned long)tso->id, (unsigned long)ctr);
+            } else {
+                    /* this can be caused by the root thread after it
+                     * received the Deadlock exception */
+                    debugTrace(DEBUG_sched,
+                        "Deadlock broken for thread %lu (%s)",
+                        (unsigned long)tso->id, whatNext_strs[tso->what_next]);
+                    break;
+            }
+    }
+}
+
 static rtsBool
 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
 {
+    decrementCounter(cap, t);
+
     /* Need to check whether this was a main thread, and if so,
      * return with the return value.
      *
diff -ru /tmp/ghc-6.6/rts/Schedule.h ghc-6.6/rts/Schedule.h
--- /tmp/ghc-6.6/rts/Schedule.h	2006-10-10 21:03:49.000000000 +0200
+++ ghc-6.6/rts/Schedule.h	2007-02-12 18:22:04.000000000 +0100
@@ -13,6 +13,10 @@
 #include "OSThreads.h"
 #include "Capability.h"
 
+/* termination detection counters */
+extern void decrementCounter(Capability *cap, StgTSO *t);
+extern void incrementCounter(StgTSO *t, rtsBool t_is_locked);
+
 /* initScheduler(), exitScheduler()
  * Called from STG :  no
  * Locks assumed   :  none
diff -ru /tmp/ghc-6.6/rts/STM.c ghc-6.6/rts/STM.c
--- /tmp/ghc-6.6/rts/STM.c	2006-10-10 21:03:49.000000000 +0200
+++ ghc-6.6/rts/STM.c	2007-02-12 17:34:30.000000000 +0100
@@ -320,10 +320,11 @@
 
 // Helper functions for thread blocking and unblocking
 
-static void park_tso(StgTSO *tso) {
+static void park_tso(Capability *cap, StgTSO *tso) {
   ASSERT(tso -> why_blocked == NotBlocked);
   tso -> why_blocked = BlockedOnSTM;
   tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
+  decrementCounter(cap, tso);
   TRACE("park_tso on tso=%p\n", tso);
 }
 
@@ -1063,7 +1064,7 @@
     // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
     // in the TSO, (c) TREC_WAITING in the Trec.  
     build_wait_queue_entries_for_trec(cap, tso, trec);
-    park_tso(tso);
+    park_tso(cap, tso);
     trec -> state = TREC_WAITING;
 
     // We haven't released ownership of the transaction yet.  The TSO
@@ -1109,7 +1110,7 @@
     // The transaction remains valid -- do nothing because it is already on
     // the wait queues
     ASSERT (trec -> state == TREC_WAITING);
-    park_tso(tso);
+    park_tso(cap, tso);
     revert_ownership(trec, TRUE);
   } else {
     // The transcation has become invalid.  We can now remove it from the wait
diff -ru /tmp/ghc-6.6/rts/Threads.c ghc-6.6/rts/Threads.c
--- /tmp/ghc-6.6/rts/Threads.c	2006-10-10 21:03:50.000000000 +0200
+++ ghc-6.6/rts/Threads.c	2007-02-12 20:46:41.000000000 +0100
@@ -104,6 +104,16 @@
     tso->saved_errno = 0;
     tso->bound = NULL;
     tso->cap = cap;
+
+    /* the main thread has no parent; hope this will detect that */
+    if (cap->in_haskell == rtsTrue) {
+            tso->counter = 1; /* thread itself is runnable */
+            tso->parent = cap->r.rCurrentTSO;
+            incrementCounter(tso->parent, rtsFalse);
+    } else {
+            tso->counter = 1;
+            tso->parent = NULL;
+    }
     
     tso->stack_size     = stack_size;
     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
@@ -488,6 +498,11 @@
   // NO, might be a WHITEHOLE: ASSERT(get_itbl(tso)->type == TSO);
   ASSERT(tso->why_blocked != NotBlocked);
 
+  if (tso->why_blocked == BlockedOnSTM) {
+        /* NB: tso _must_ be locked at this point */
+        incrementCounter(tso, rtsTrue);
+  }
+
   tso->why_blocked = NotBlocked;
   next = tso->link;
   tso->link = END_TSO_QUEUE;
_______________________________________________
Glasgow-haskell-users mailing list
Glasgow-haskell-users@haskell.org
http://www.haskell.org/mailman/listinfo/glasgow-haskell-users

Reply via email to