I'm going to review your patch,
but it might not be until next week (I'm moving house on Friday!).
That gave me just the time I needed to iron out some more bugs...
So here is the next version of my patches for the threaded RTS.

For now, I resolved the finalizer issue by making the finalizers behave approximately as in the non-threaded case.
When the main action finishes, all finalizers are run one after another (but in unspecified order), then the program exits. While the finalizers are being run, other haskell threads _continue to run_. This seems ugly and dangerous to me, but a) it is the same in the non-threaded RTS and b) it's non-trivial to do it differently.

I also discovered and fixed some more bugs/omissions:
- signals might have been delivered to the wrong thread (and therefore ignored for a long time).
- there was a possible deadlock in Handle.lhs (if a signal arrived at the wrong time, the mvar protecting a handle was never filled again)
- Throwing an exception to a thread blocked on a foreign call caused the RTS to barf - foreign calls now implicitly block asynchronous exceptions.

I've run the concurrent and ffi parts of the testsuite on it.
All the ffi tests pass (except for ffi009 on Mac OS X, but that is due to limitations of createAdjustor).
*) Conc018 sometimes fails (at least on Mac OS X) because the test case is inherently non-deterministic. The output of the program depends on timing (which is correct behaviour, IMHO).
*) Conc031, conc033 and conc034 fail (hang) because deadlock detection is disabled.
*) conc036 fails in the ghci case because ghci treats unsafe/safe/threadsafe all the same, so the output differs (but I think that's legal - nobody ever _promised_ that unsafe foreign calls would block all haskell threads - that's an implementation detail, isn't it?)

Cheers,

Wolfgang

-------------------

Index: ghc/compiler/deSugar/DsForeign.lhs
===================================================================
RCS file: /home/cvs/root/fptools/ghc/compiler/deSugar/DsForeign.lhs,v
retrieving revision 1.72
diff -r1.72 DsForeign.lhs
452a453,454
> declareCResult | res_hty_is_unit = empty
> | otherwise = cResType <+> text "cret;"
454,455c456,458
< return_what | res_hty_is_unit = empty
< | otherwise = parens (unpackHObj res_hty <> parens (text "ret"))
---
> assignCResult | res_hty_is_unit = empty
> | otherwise =
> text "cret=" <> unpackHObj res_hty <> parens (text "ret") <> semi
471a475,476
> , declareCResult
> , text "rts_lock();"
486c491,494
< , text "return" <> return_what <> semi
---
> , assignCResult
> , text "rts_unlock();"
> , if res_hty_is_unit then empty
> else text "return cret;"
Index: ghc/includes/RtsAPI.h
===================================================================
RCS file: /home/cvs/root/fptools/ghc/includes/RtsAPI.h,v
retrieving revision 1.30
diff -r1.30 RtsAPI.h
40a41,54
>
> /* ------------------------------------------------------------------------ ----
> Locking.
>
> In a multithreaded environments, you have to surround all access to the
> RtsAPI with these calls.
> ------------------------------------------------------------------------ - */
>
> void
> rts_lock ( void );
>
> void
> rts_unlock ( void );
>
87a102,103
> Note that these calls may cause Garbage Collection, so all HaskellObj
> references are rendered invalid by these calls.
Index: ghc/includes/TSO.h
===================================================================
RCS file: /home/cvs/root/fptools/ghc/includes/TSO.h,v
retrieving revision 1.28
diff -r1.28 TSO.h
147c147,149
< , BlockedOnCCall
---
> , BlockedOnCCall
> , BlockedOnCCall_NoUnblockExc // same as above but don't unblock async exceptions
> // in resumeThread()
186c188
<
---
>
Index: ghc/includes/Updates.h
===================================================================
RCS file: /home/cvs/root/fptools/ghc/includes/Updates.h,v
retrieving revision 1.28
diff -r1.28 Updates.h
95a96,119
> #elif defined(RTS_SUPPORTS_THREADS)
>
> # ifdef TICKY_TICKY
> # define UPD_IND_NOLOCK(updclosure, heapptr) \
> { \
> const StgInfoTable *info; \
> info = ((StgClosure *)updclosure)->header.info; \
> AWAKEN_BQ_NOLOCK(info,updclosure); \
> updateWithPermIndirection(info, \
> (StgClosure *)updclosure, \
> (StgClosure *)heapptr); \
> }
> # else
> # define UPD_IND_NOLOCK(updclosure, heapptr) \
> { \
> const StgInfoTable *info; \
> info = ((StgClosure *)updclosure)->header.info; \
> AWAKEN_BQ_NOLOCK(info,updclosure); \
> updateWithIndirection(info, \
> (StgClosure *)updclosure, \
> (StgClosure *)heapptr); \
> }
> # endif
>
173a198,208
> #ifdef RTS_SUPPORTS_THREADS
> extern void awakenBlockedQueueNoLock(StgTSO *q);
> #define DO_AWAKEN_BQ_NOLOCK(closure) \
> STGCALL1(awakenBlockedQueueNoLock, \
> ((StgBlockingQueue *)closure)->blocking_queue);
>
> #define AWAKEN_BQ_NOLOCK(info,closure) \
> if (info == &stg_BLACKHOLE_BQ_info) { \
> DO_AWAKEN_BQ_NOLOCK(closure); \
> }
> #endif
Index: ghc/rts/Capability.c
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Capability.c,v
retrieving revision 1.15
diff -r1.15 Capability.c
23a24
> #include "Signals.h" /* to get at handleSignalsInThisThread() */
47c48
< static nat rts_n_waiting_workers = 0;
---
> nat rts_n_waiting_workers = 0;
56c57
< * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold.
---
> * thread_ready_cond is signalled whenever noCapabilities doesn't hold.
60,63d60
< #if 0
< /* For documentation purposes only */
< #define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE())
< #endif
121a119,120
> static Capability *returning_capabilities;
> /* Capabilities being passed to returning worker threads */
140a140
> ASSERT(rts_n_free_capabilities > 0);
143a144,146
> #ifdef RTS_SUPPORTS_THREADS
> handleSignalsInThisThread();
> #endif
164,172c167
< {
< #if defined(SMP)
< cap->link = free_capabilities;
< free_capabilities = cap;
< rts_n_free_capabilities++;
< #else
< rts_n_free_capabilities = 1;
< #endif
<
---
> { // Precondition: sched_mutex must be held
173a169,171
> #ifndef SMP
> ASSERT(rts_n_free_capabilities == 0);
> #endif
180a179,184
> #if defined(SMP)
> // SMP variant untested
> cap->link = returning_capabilities;
> returning_capabilities = cap;
> #else
> #endif
183,184c187,195
< } else if ( !EMPTY_RUN_QUEUE() ) {
< /* Signal that work is available */
---
> } else /*if ( !EMPTY_RUN_QUEUE() )*/ {
> #if defined(SMP)
> cap->link = free_capabilities;
> free_capabilities = cap;
> rts_n_free_capabilities++;
> #else
> rts_n_free_capabilities = 1;
> #endif
> /* Signal that a capability is available */
188c199
< return;
---
> return;
229d239
< rts_n_waiting_workers++;
233c243,246
< while ( noCapabilities() ) {
---
> if ( noCapabilities() ) {
> rts_n_waiting_workers++;
> wakeBlockedWorkerThread();
> context_switch = 1; // make sure it's our turn soon
234a248,259
> #if defined(SMP)
> *pCap = returning_capabilities;
> returning_capabilities = (*pCap)->link;
> #else
> *pCap = &MainCapability;
> ASSERT(rts_n_free_capabilities == 0);
> #ifdef RTS_SUPPORTS_THREADS
> handleSignalsInThisThread();
> #endif
> #endif
> } else {
> grabCapability(pCap);
236,237d260
<
< grabCapability(pCap);
256c279
< * Post-condition: pMutex isn't held and the Capability has
---
> * Post-condition: pMutex is held and the Capability has
262c285
< if ( rts_n_waiting_workers > 0 && noCapabilities() ) {
---
> if ( rts_n_waiting_workers > 0 ) {
264c287
< fprintf(stderr,"worker thread (%ld): giving up RTS token\n", osThreadId()));
---
> fprintf(stderr,"worker thread (%p): giving up RTS token\n", osThreadId()));
266c289
< /* And wait for work */
---
> /* And wait for work */
267a291,293
> IF_DEBUG(scheduler,
> fprintf(stderr,"worker thread (%p): got back RTS token (after yieldToReturningWorker)\n",
> osThreadId()));
283a310
> * Post-condition: pMutex is held and *pCap is held by the current thread
295a323
>
321a350
> returning_capabilities = NULL;
Index: ghc/rts/Capability.h
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Capability.h,v
retrieving revision 1.9
diff -r1.9 Capability.h
37,40c37
< /* number of worker threads waiting to do good work within
< the RTS. Used by Task.c (only) to determine whether or not
< new worker threads needs to be created (when an external call
< is made).
---
> /* number of worker threads waiting for a return capability
42c39
< extern nat rts_n_waiting_workers; /* used by Task.c to determine */
---
> extern nat rts_n_waiting_workers;
46a44,49
>
>
> static inline rtsBool needToYieldToReturningWorker(void)
> {
> return rts_n_waiting_workers > 0;
> }
Index: ghc/rts/Interpreter.c
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Interpreter.c,v
retrieving revision 1.37
diff -r1.37 Interpreter.c
1159a1160
> StgTSO *tso = cap->r.rCurrentTSO;
1178c1179,1180
< // Careful: suspendThread might have shifted the stack
---
> // Careful:
> // suspendThread might have shifted the stack
1180,1183c1182,1187
< // Sp out of the TSO to find the ccall args again:
< marshall_fn ( (void*)(cap->r.rCurrentTSO->sp + RET_DYN_SIZE
< + sizeofW(StgRetDyn)) );
<
---
> // Sp out of the TSO to find the ccall args again
> // We don't own the capability anymore, so we mustn't use it
> // Instead, we have to save the TSO ptr beforehand.
> // Also note that GC may strike at any time now (from another thread).
> marshall_fn ( (void*)(tso->sp + RET_DYN_SIZE + sizeofW(StgRetDyn)) );
>
Index: ghc/rts/RtsAPI.c
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/RtsAPI.c,v
retrieving revision 1.38
diff -r1.38 RtsAPI.c
19a20
> #include "Capability.h"
23,66d23
< #if defined(RTS_SUPPORTS_THREADS)
< /* Cheesy locking scheme while waiting for the
< * RTS API to change.
< */
< static Mutex alloc_mutex = INIT_MUTEX_VAR;
< static Condition alloc_cond = INIT_COND_VAR;
< #define INVALID_THREAD_ID ((OSThreadId)(-1))
<
< /* Thread currently owning the allocator */
< static OSThreadId c_id = INVALID_THREAD_ID;
<
< static StgPtr alloc(nat n)
< {
< OSThreadId tid = osThreadId();
< ACQUIRE_LOCK(&alloc_mutex);
< if (tid == c_id) {
< /* I've got the lock, just allocate() */
< ;
< } else if (c_id == INVALID_THREAD_ID) {
< c_id = tid;
< } else {
< waitCondition(&alloc_cond, &alloc_mutex);
< c_id = tid;
< }
< RELEASE_LOCK(&alloc_mutex);
< return allocate(n);
< }
<
< static void releaseAllocLock(void)
< {
< ACQUIRE_LOCK(&alloc_mutex);
< /* Reset the allocator owner */
< c_id = INVALID_THREAD_ID;
< RELEASE_LOCK(&alloc_mutex);
<
< /* Free up an OS thread waiting to get in */
< signalCondition(&alloc_cond);
< }
< #else
< # define alloc(n) allocate(n)
< # define releaseAllocLock() /* nothing */
< #endif
<
<
73c30
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
82c39
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
91c48
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
101c58
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
111c68
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
121c78
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,2));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2));
131c88
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
141c98
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
151c108
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
161c118
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
172c129
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,2));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2));
183c140
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
192c149
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,sizeofW(StgDouble)));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,sizeofW(StgDouble)));
201c158
< StgClosure *p = (StgClosure *)alloc(sizeofW(StgHeader)+1);
---
> StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1);
210c167
< StgClosure *p = (StgClosure *)alloc(sizeofW(StgHeader)+1);
---
> StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1);
239c196
< ap = (StgClosure *)alloc(sizeofW(StgClosure) + 2);
---
> ap = (StgClosure *)allocate(sizeofW(StgClosure) + 2);
417d373
< releaseAllocLock();
427d382
< releaseAllocLock();
441d395
< releaseAllocLock();
448a403
>
452c407
< StgTSO* tso;
---
> StgTSO* tso;
455d409
< releaseAllocLock();
475d428
< releaseAllocLock();
495d447
< releaseAllocLock();
517a470,500
>
> #ifdef RTS_SUPPORTS_THREADS
> void
> rts_lock()
> {
> Capability *cap;
> ACQUIRE_LOCK(&sched_mutex);
>
> // we request to get the capability immediately, in order to
> // a) stop other threads from using allocate()
> // b) wake the current worker thread from awaitEvent()
> // (so that a thread started by rts_eval* will start immediately)
> grabReturnCapability(&sched_mutex,&cap);
>
> // now that we have the capability, we don't need it anymore
> // (other threads will continue to run as soon as we release the sched_mutex)
> releaseCapability(cap);
>
> // In the RTS hasn't been entered yet,
> // start a RTS task.
> // If there is already a task available (waiting for the work capability),
> // this will do nothing.
> startSchedulerTask();
> }
>
> void
> rts_unlock()
> {
> RELEASE_LOCK(&sched_mutex);
> }
> #endif
Index: ghc/rts/Schedule.c
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Schedule.c,v
retrieving revision 1.160
diff -r1.160 Schedule.c
137a138,143
> #ifdef THREADED_RTS
> // Pointer to the thread that executes main
> // When this thread is finished, the program terminates.
> StgMainThread *main_main_thread = NULL;
> #endif
>
309,310c315,321
<
<
---
> #if defined(RTS_SUPPORTS_THREADS)
> void
> startSchedulerTask(void)
> {
> startTask(&taskStart);
> }
> #endif
375a387
> IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId()));
429d440
< deleteAllThreads();
431a443,453
> #if defined(RTS_SUPPORTS_THREADS)
> // In the threaded RTS, deadlock detection doesn't work,
> // so just exit right away.
> prog_belch("interrupted");
> releaseCapability(cap);
> startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
> RELEASE_LOCK(&sched_mutex);
> shutdownHaskellAndExit(EXIT_SUCCESS);
> #else
> deleteAllThreads();
> #endif
443c465
< for (m = main_threads; m != NULL; m = m->link) {
---
> for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
455a478,484
> if(m == main_main_thread)
> {
> releaseCapability(cap);
> startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
> RELEASE_LOCK(&sched_mutex);
> shutdownHaskellAndExit(EXIT_SUCCESS);
> }
468a498,504
> if(m == main_main_thread)
> {
> releaseCapability(cap);
> startTask(taskStart); // thread-safe-call to shutdownHaskellAndExit
> RELEASE_LOCK(&sched_mutex);
> shutdownHaskellAndExit(EXIT_SUCCESS);
> }
565,566d600
< * ToDo: what if another client comes along & requests another
< * main thread?
568c602,607
< if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
---
> if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
> #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
> || EMPTY_RUN_QUEUE()
> #endif
> )
> {
589c628
< #ifndef PAR
---
> #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
701a741,742
> #elif defined(RTS_SUPPORTS_THREADS)
> /* ToDo: add deadlock detection in threaded RTS */
716a758
> #if defined(SMP)
735a778,781
> #else
> if ( EMPTY_RUN_QUEUE() )
> continue; // nothing to do
> #endif
1019c1065
<
---
>
1033a1080,1087
>
> run_thread:
>
> #ifdef RTS_SUPPORTS_THREADS
> IF_DEBUG(scheduler, sched_belch("thread %p about to release lock for running %ld",
> osThreadId(),t->id));
>
> #endif
1046d1099
< run_thread:
1064c1117
<
---
>
1072,1073c1125,1126
<
< #ifdef SMP
---
>
> #ifdef RTS_SUPPORTS_THREADS
1587c1640,1646
< cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
---
> if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
> {
> cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
> cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
> }
> else
> cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1599,1602d1657
<
< ToDo: optimise this and only create a new task if there's a need
< for one (i.e., if there's only one Concurrent Haskell thread alive,
< there's no need to create a new task).
1604,1607c1659,1662
< IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
< if (concCall) {
< startTask(taskStart);
< }
---
> IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
> //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
> startTask(taskStart);
> //}
1629,1634c1684,1687
< if ( concCall ) {
< ACQUIRE_LOCK(&sched_mutex);
< grabReturnCapability(&sched_mutex, &cap);
< } else {
< grabCapability(&cap);
< }
---
> ACQUIRE_LOCK(&sched_mutex);
> grabReturnCapability(&sched_mutex, &cap);
>
> IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
1652a1706,1714
>
> #if defined(RTS_SUPPORTS_THREADS)
> if(tso->why_blocked == BlockedOnCCall)
> {
> awakenBlockedQueueNoLock(tso->blocked_exceptions);
> tso->blocked_exceptions = NULL;
> }
> #endif
>
1656a1719
> #if defined(RTS_SUPPORTS_THREADS)
1657a1721
> #endif
1977c2041
< static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
---
> static void scheduleThread_ (StgTSO* tso);
1980,1985c2044
< scheduleThread_(StgTSO *tso
< , rtsBool createTask
< #if !defined(THREADED_RTS)
< STG_UNUSED
< #endif
< )
---
> scheduleThread_(StgTSO *tso)
1995,2002d2053
< #if defined(THREADED_RTS)
< /* If main() is scheduling a thread, don't bother creating a
< * new task.
< */
< if ( createTask ) {
< startTask(taskStart);
< }
< #endif
2013c2064
< scheduleThread_(tso, rtsFalse);
---
> scheduleThread_(tso);
2019c2070
< {
---
> { // Precondition: sched_mutex must be held
2039d2089
< ACQUIRE_LOCK(&sched_mutex);
2045c2095
< scheduleThread_(tso, rtsTrue);
---
> scheduleThread_(tso);
2047c2097
< return waitThread_(m, rtsTrue); // waitThread_ releases sched_mutex
---
> return waitThread_(m, rtsTrue);
2234a2285
> SchedulerStatus stat;
2251c2302
< return waitThread_(m, rtsFalse); // waitThread_ releases sched_mutex
---
> stat = waitThread_(m, rtsFalse);
2253c2304
< return waitThread_(m);
---
> stat = waitThread_(m);
2254a2306,2307
> RELEASE_LOCK(&sched_mutex);
> return stat;
2277a2331
> main_main_thread = m;
2279a2334,2335
> ACQUIRE_LOCK(&sched_mutex);
> main_main_thread = NULL;
2312,2317c2368
< #if defined(THREADED_RTS)
< if (blockWaiting)
< #endif
< RELEASE_LOCK(&sched_mutex);
<
< // Postcondition: sched_mutex must not be held
---
> // Postcondition: sched_mutex still held
2930a2982,2992
>
> #ifdef RTS_SUPPORTS_THREADS
> void
> awakenBlockedQueueNoLock(StgTSO *tso)
> {
> while (tso != END_TSO_QUEUE) {
> tso = unblockOneLocked(tso);
> }
> }
> #endif
>
3517d3578
<
3525d3585
<
3600a3661,3663
> break;
> case BlockedOnCCall_NoUnblockExc:
> fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
Index: ghc/rts/Schedule.h
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Schedule.h,v
retrieving revision 1.36
diff -r1.36 Schedule.h
36a37,39
> #if defined(RTS_SUPPORTS_THREADS)
> void awakenBlockedQueueNoLock(StgTSO *tso);
> #endif
63c66
< /* awaitEvent()
---
> /* awaitEvent(rtsBool wait)
65c68
< * Raises an exception asynchronously in the specified thread.
---
> * Checks for blocked threads that need to be woken.
80a84,93
> /* wakeBlockedWorkerThread()
> *
> * If a worker thread is currently blocked in awaitEvent(), interrupt it.
> *
> * Called from STG : NO
> * Locks assumed : sched_mutex
> */
> void wakeBlockedWorkerThread(void); /* In Select.c */
>
>
176a190,193
> #if defined(THREADED_RTS)
> rtsBool thread_bound;
> Condition bound_thread_cond;
> #endif
260,262c277
< if ( !noCapabilities() ) { \
< signalCondition(&thread_ready_cond); \
< } \
---
> wakeBlockedWorkerThread(); \
278a294,303
>
> #if defined(RTS_SUPPORTS_THREADS)
> /* If no task is waiting for a capability,
> * spawn a new worker thread.
> *
> * (Used by the RtsAPI)
> */
> void
> startSchedulerTask(void);
> #endif
Index: ghc/rts/Select.c
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Select.c,v
retrieving revision 1.22
diff -r1.22 Select.c
18a19
> #include "Capability.h"
37a39,47
> #ifdef RTS_SUPPORTS_THREADS
> static rtsBool isWorkerBlockedInAwaitEvent = rtsFalse;
> static rtsBool workerWakeupPending = rtsFalse;
> #ifndef mingw32_TARGET_OS
> static int workerWakeupPipe[2];
> static rtsBool workerWakeupInited = rtsFalse;
> #endif
> #endif
>
159a170,179
> #ifdef RTS_SUPPORTS_THREADS
> if(!workerWakeupInited)
> {
> pipe(workerWakeupPipe);
> workerWakeupInited = rtsTrue;
> }
> FD_SET(workerWakeupPipe[0], &rfd);
> maxfd = workerWakeupPipe[0] > maxfd ? workerWakeupPipe[0] : maxfd;
> #endif
>
171a192,196
>
> #ifdef RTS_SUPPORTS_THREADS
> isWorkerBlockedInAwaitEvent = rtsTrue;
> workerWakeupPending = rtsFalse;
> #endif
208a234,237
> #ifdef RTS_SUPPORTS_THREADS
> isWorkerBlockedInAwaitEvent = rtsTrue;
> #endif
> RELEASE_LOCK(&sched_mutex);
212a242,244
> #ifdef RTS_SUPPORTS_THREADS
> isWorkerBlockedInAwaitEvent = rtsFalse;
> #endif
244a277,288
> #ifdef RTS_SUPPORTS_THREADS
> /* If another worker thread wants to take over,
> * return to the scheduler
> */
> if (needToYieldToReturningWorker()) {
> return; /* still hold the lock */
> }
> #endif
>
> #ifdef RTS_SUPPORTS_THREADS
> isWorkerBlockedInAwaitEvent = rtsTrue;
> #endif
290c334,344
<
---
>
> #if defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_TARGET_OS)
> // if we were woken up by wakeBlockedWorkerThread,
> // read the dummy byte from the pipe
> if(select_succeeded && FD_ISSET(workerWakeupPipe[0], &rfd))
> {
> unsigned char dummy;
> wait = rtsFalse;
> read(workerWakeupPipe[0],&dummy,1);
> }
> #endif
292a347,375
>
>
> #ifdef RTS_SUPPORTS_THREADS
> /* wakeBlockedWorkerThread
> *
> * If a worker thread is currently blocked within awaitEvent,
> * wake it.
> * Must be called with sched_mutex held.
> */
>
> void
> wakeBlockedWorkerThread()
> {
> #ifndef mingw32_TARGET_OS
> if(isWorkerBlockedInAwaitEvent && !workerWakeupPending)
> {
> unsigned char dummy = 42; // Any value will do here
>
> // write something so that select() wakes up
> write(workerWakeupPipe[1],&dummy,1);
> workerWakeupPending = rtsTrue;
> }
> #else
> // The Win32 implementation currently uses a polling loop,
> // so there is no need to explicitly wake it
> #endif
> }
>
> #endif
Index: ghc/rts/Signals.c
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Signals.c,v
retrieving revision 1.32
diff -r1.32 Signals.c
51a52,66
>
> #ifdef RTS_SUPPORTS_THREADS
> pthread_t signalHandlingThread;
>
> // Handle all signals in the current thread.
> // Called from Capability.c whenever the main capability is granted to a thread
> // and in installDefaultHandlers
> void
> handleSignalsInThisThread()
> {
> signalHandlingThread = pthread_self();
> }
> #endif
>
>
107a123,136
> #if defined(THREADED_RTS)
> // Make the thread that currently holds the main capability
> // handle the signal.
> // This makes sure that awaitEvent() is interrupted
> // and it (hopefully) prevents race conditions
> // (signal handlers are not atomic with respect to other threads)
>
> if(pthread_self() != signalHandlingThread)
> {
> pthread_kill(signalHandlingThread, sig);
> return;
> }
> #endif
>
217a247,248
> action.sa_flags = 0;
>
350a382,393
> // ToDo: The code for the threaded RTS below does something very
> // similar. Maybe the SMP special case is not needed
> // -- Wolfgang Thaller
> #elif defined(THREADED_RTS)
> // Make the thread that currently holds the main capability
> // handle the signal.
> // This makes sure that awaitEvent() is interrupted
> if(pthread_self() != signalHandlingThread)
> {
> pthread_kill(signalHandlingThread, sig);
> return;
> }
384a428,430
> #endif
> #ifdef RTS_SUPPORTS_THREADS
> handleSignalsInThisThread();
Index: ghc/rts/Signals.h
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Signals.h,v
retrieving revision 1.8
diff -r1.8 Signals.h
29a30,33
> #ifdef RTS_SUPPORTS_THREADS
> extern void handleSignalsInThisThread(void);
> #endif
>
Index: ghc/rts/Task.c
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Task.c,v
retrieving revision 1.5
diff -r1.5 Task.c
160c160
< on thread_ready_cond, signal it rather than creating a new one.
---
> on thread_ready_cond, don't create a new one.
166,168c166,167
< signalCondition(&thread_ready_cond);
< /* Not needed, but gives more 'interesting' thread schedules when testing */
< yieldThread();
---
> // the task will run as soon as a capability is available,
> // so there's no need to wake it.
Index: ghc/rts/Weak.c
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Weak.c,v
retrieving revision 1.26
diff -r1.26 Weak.c
36a37,39
> #if defined(RTS_SUPPORTS_THREADS)
> rts_lock();
> #endif
42a46,50
> #if defined(RTS_SUPPORTS_THREADS)
> rts_evalIO(w->finalizer,NULL);
> rts_unlock();
> rts_lock();
> #else
43a52
> #endif
46a56,58
> #if defined(RTS_SUPPORTS_THREADS)
> rts_unlock();
> #endif
Index: libraries/base/GHC/Handle.hs
===================================================================
RCS file: /home/cvs/root/fptools/libraries/base/GHC/Handle.hs,v
retrieving revision 1.12
diff -r1.12 Handle.hs
134,135c134,138
< (h',v) <- catch (act h_)
< (\ ex -> putMVar m h_ >> ioError (augmentIOError ex fun h h_))
---
> (h',v) <- catchException (act h_)
> (\ err -> putMVar m h_ >>
> case err of
> IOException ex -> ioError (augmentIOError ex fun h h_)
> _ -> throw err)
149,150c152,156
< v <- catch (act h_)
< (\ ex -> putMVar m h_ >> ioError (augmentIOError ex fun h h_))
---
> v <- catchException (act h_)
> (\ err -> putMVar m h_ >>
> case err of
> IOException ex -> ioError (augmentIOError ex fun h h_)
> _ -> throw err)
165,166c171,175
< h' <- catch (act h_)
< (\ ex -> putMVar m h_ >> ioError (augmentIOError ex fun h h_))
---
> h' <- catchException (act h_)
> (\ err -> putMVar m h_ >>
> case err of
> IOException ex -> ioError (augmentIOError ex fun h h_)
> _ -> throw err)

_______________________________________________
Cvs-ghc mailing list
[EMAIL PROTECTED]
http://www.haskell.org/mailman/listinfo/cvs-ghc


Reply via email to