Here are my patches for the threaded RTS. They seem to work [ghc still
runs, and my own simple test program does what it's supposed to do] on
Mac OS X. I've just started a complete build on my old linux box
(300Mhz, so it will take some time...). In theory, they shouldn't break
the Windows version either, but someone should definitely verify that.
As it's not Mac OS X specific, I'm asking for permission first: I
hereby formally request permission to commit the changes described
below into the GHC CVS repository.
Below you'll find a general description of my changes and the actual
patches.
Cheers
Wolfgang
====== cut here ======
Threaded RTS Upgrade
New Feature:
------------
Allow call-ins into the RTS at any time.
Call-ins to the RTS now wake a blocked worker thread, so no unexpected
blocking occurs.
I implemented this for Unix, but it should work as is on Windows,
because the Windows version doesn't block anyway and uses a polling
loop instead.
Limitations:
------------
Deadlock detection isn't possible in a reliable way (a mixed
Haskell-Foreign Language system is only deadlocked if the foreign
language threads are deadlocked, too), so it has been disabled.
Finalizers are not run on program exit (hard to implement properly -
and is it really necessary/wanted?)
Safe calls are now implicitly threadsafe. The previous implementation
used to crash sometimes, the behaviour of "safe" has never been
specified accurately, and I don't like "safe" (because if it's not
threadsafe then it's not really safe to use).
Known Bug:
----------
There is still a race condition in awaitEvent: If a signal arrives
between the last check for pending signals and the select() call, it is
only handled after the select finishes.
Bugs Fixed:
-----------
- "safe" calls could cause crashes
- yieldToReturningWorker never managed to pass the capability to the
returning worker (the symptoms were deadlock-like)
- calling into the RTS at the wrong time could cause a crash.
- The run_thread: label in schedule() was in the wrong place with
respect to acquire/release of the sched_mutex. Crashes might have
followed in rare circumstances.
- The RTS didn't terminate when the main action finished, as specified
in the user manual.
Index: ghc/compiler/deSugar/DsForeign.lhs
===================================================================
RCS file: /home/cvs/root/fptools/ghc/compiler/deSugar/DsForeign.lhs,v
retrieving revision 1.72
diff -r1.72 DsForeign.lhs
452a453,454
> declareCResult | res_hty_is_unit = empty
> | otherwise = cResType <+> text "cret;"
454,455c456,458
< return_what | res_hty_is_unit = empty
< | otherwise = parens (unpackHObj res_hty <> parens (text
"ret"))
---
> assignCResult | res_hty_is_unit = empty
> | otherwise =
> text "cret=" <> unpackHObj res_hty <> parens (text "ret")
<> semi
471a475,476
> , declareCResult
> , text "rts_lock();"
486c491,494
< , text "return" <> return_what <> semi
---
> , assignCResult
> , text "rts_unlock();"
> , if res_hty_is_unit then empty
> else text "return cret;"
Index: ghc/includes/RtsAPI.h
===================================================================
RCS file: /home/cvs/root/fptools/ghc/includes/RtsAPI.h,v
retrieving revision 1.30
diff -r1.30 RtsAPI.h
40a41,54
>
> /*
------------------------------------------------------------------------
----
> Locking.
>
> In a multithreaded environments, you have to surround all access
to the
> RtsAPI with these calls.
>
------------------------------------------------------------------------
- */
>
> void
> rts_lock ( void );
>
> void
> rts_unlock ( void );
>
87a102,103
> Note that these calls may cause Garbage Collection, so all
HaskellObj
> references are rendered invalid by these calls.
Index: ghc/rts/Capability.c
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Capability.c,v
retrieving revision 1.15
diff -r1.15 Capability.c
47c47
< static nat rts_n_waiting_workers = 0;
---
> nat rts_n_waiting_workers = 0;
56c56
< * thread_ready_cond is signalled whenever COND_NO_THREADS_READY
doesn't hold.
---
> * thread_ready_cond is signalled whenever noCapabilities doesn't
hold.
60,63d59
< #if 0
< /* For documentation purposes only */
< #define COND_NO_THREADS_READY() (noCapabilities() ||
EMPTY_RUN_QUEUE())
< #endif
74a71
>
121a119,120
> static Capability *returning_capabilities;
> /* Capabilities being passed to returning worker threads */
140a140
> ASSERT(rts_n_free_capabilities > 0);
164,172c164
< {
< #if defined(SMP)
< cap->link = free_capabilities;
< free_capabilities = cap;
< rts_n_free_capabilities++;
< #else
< rts_n_free_capabilities = 1;
< #endif
<
---
> { // Precondition: sched_mutex must be held
180a173,178
> #if defined(SMP)
> // SMP variant untested
> cap->link = returning_capabilities;
> returning_capabilities = cap;
> #else
> #endif
183,184c181,189
< } else if ( !EMPTY_RUN_QUEUE() ) {
< /* Signal that work is available */
---
> } else /*if ( !EMPTY_RUN_QUEUE() )*/ {
> #if defined(SMP)
> cap->link = free_capabilities;
> free_capabilities = cap;
> rts_n_free_capabilities++;
> #else
> rts_n_free_capabilities = 1;
> #endif
> /* Signal that a capability is available */
188c193
< return;
---
> return;
229d233
< rts_n_waiting_workers++;
233c237,240
< while ( noCapabilities() ) {
---
> if ( noCapabilities() ) {
> rts_n_waiting_workers++;
> wakeBlockedWorkerThread();
> context_switch = 1; // make sure it's our turn soon
234a242,250
> #if defined(SMP)
> *pCap = returning_capabilities;
> returning_capabilities = (*pCap)->link;
> #else
> *pCap = &MainCapability;
> ASSERT(rts_n_free_capabilities == 0);
> #endif
> } else {
> grabCapability(pCap);
236,237d251
<
< grabCapability(pCap);
256c270
< * Post-condition: pMutex isn't held and the Capability has
---
> * Post-condition: pMutex is held and the Capability has
262c276
< if ( rts_n_waiting_workers > 0 && noCapabilities() ) {
---
> if ( rts_n_waiting_workers > 0 ) {
266c280
< /* And wait for work */
---
> /* And wait for work */
283a298
> * Post-condition: pMutex is held and *pCap is held by the current
thread
295a311
>
321a338
> returning_capabilities = NULL;
Index: ghc/rts/Capability.h
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Capability.h,v
retrieving revision 1.9
diff -r1.9 Capability.h
37,40c37
< /* number of worker threads waiting to do good work within
< the RTS. Used by Task.c (only) to determine whether or not
< new worker threads needs to be created (when an external call
< is made).
---
> /* number of worker threads waiting for a return capability
42c39
< extern nat rts_n_waiting_workers; /* used by Task.c to determine */
---
> extern nat rts_n_waiting_workers;
46a44,49
>
>
> static inline rtsBool needToYieldToReturningWorker()
> {
> return rts_n_waiting_workers > 0;
> }
Index: ghc/rts/RtsAPI.c
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/RtsAPI.c,v
retrieving revision 1.38
diff -r1.38 RtsAPI.c
19a20
> #include "Capability.h"
23,66d23
< #if defined(RTS_SUPPORTS_THREADS)
< /* Cheesy locking scheme while waiting for the
< * RTS API to change.
< */
< static Mutex alloc_mutex = INIT_MUTEX_VAR;
< static Condition alloc_cond = INIT_COND_VAR;
< #define INVALID_THREAD_ID ((OSThreadId)(-1))
<
< /* Thread currently owning the allocator */
< static OSThreadId c_id = INVALID_THREAD_ID;
<
< static StgPtr alloc(nat n)
< {
< OSThreadId tid = osThreadId();
< ACQUIRE_LOCK(&alloc_mutex);
< if (tid == c_id) {
< /* I've got the lock, just allocate() */
< ;
< } else if (c_id == INVALID_THREAD_ID) {
< c_id = tid;
< } else {
< waitCondition(&alloc_cond, &alloc_mutex);
< c_id = tid;
< }
< RELEASE_LOCK(&alloc_mutex);
< return allocate(n);
< }
<
< static void releaseAllocLock(void)
< {
< ACQUIRE_LOCK(&alloc_mutex);
< /* Reset the allocator owner */
< c_id = INVALID_THREAD_ID;
< RELEASE_LOCK(&alloc_mutex);
<
< /* Free up an OS thread waiting to get in */
< signalCondition(&alloc_cond);
< }
< #else
< # define alloc(n) allocate(n)
< # define releaseAllocLock() /* nothing */
< #endif
<
<
73c30
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
82c39
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
91c48
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
101c58
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
111c68
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
121c78
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,2));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2));
131c88
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
141c98
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
151c108
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
161c118
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
172c129
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,2));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2));
183c140
< StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1));
---
> StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1));
192c149
< StgClosure *p = (StgClosure
*)alloc(CONSTR_sizeW(0,sizeofW(StgDouble)));
---
> StgClosure *p = (StgClosure
*)allocate(CONSTR_sizeW(0,sizeofW(StgDouble)));
201c158
< StgClosure *p = (StgClosure *)alloc(sizeofW(StgHeader)+1);
---
> StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1);
210c167
< StgClosure *p = (StgClosure *)alloc(sizeofW(StgHeader)+1);
---
> StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1);
239c196
< ap = (StgClosure *)alloc(sizeofW(StgClosure) + 2);
---
> ap = (StgClosure *)allocate(sizeofW(StgClosure) + 2);
417d373
< releaseAllocLock();
427d382
< releaseAllocLock();
441d395
< releaseAllocLock();
455d409
< releaseAllocLock();
475d428
< releaseAllocLock();
495d447
< releaseAllocLock();
517a470,494
>
> #ifdef RTS_SUPPORTS_THREADS
> void
> rts_lock()
> {
> Capability *cap;
> ACQUIRE_LOCK(&sched_mutex);
>
> // we request to get the capability immediately, in order to
> // a) stop other threads from using allocate()
> // b) wake the current worker thread from awaitEvent()
> // (so that a thread started by rts_eval* will start
immediately)
> grabReturnCapability(&sched_mutex,&cap);
>
> // now that we have the capability, we don't need it anymore
> // (other threads will continue to run as soon as we release the
sched_mutex)
> releaseCapability(cap);
> }
>
> void
> rts_unlock()
> {
> RELEASE_LOCK(&sched_mutex);
> }
> #endif
Index: ghc/rts/RtsStartup.c
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/RtsStartup.c,v
retrieving revision 1.67
diff -r1.67 RtsStartup.c
289c289
< #if !defined(GRAN)
---
> #if !defined(GRAN) && !defined(RTS_SUPPORTS_THREADS)
Index: ghc/rts/Schedule.c
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Schedule.c,v
retrieving revision 1.160
diff -r1.160 Schedule.c
137a138,140
> // Pointer to the thread that executes main
> StgMainThread *main_main_thread = NULL;
>
310,311d312
<
<
429d429
< deleteAllThreads();
431a432,439
> #if defined(RTS_SUPPORTS_THREADS)
> // In the threaded RTS, deadlock detection doesn't work,
> // so just exit right away.
> prog_belch("interrupted");
> shutdownHaskellAndExit(EXIT_INTERRUPTED);
> #else
> deleteAllThreads();
> #endif
443c451
< for (m = main_threads; m != NULL; m = m->link) {
---
> for (m = main_threads; m != NULL; prev = &m->link, m = m->link)
{
455a464,465
> if(m == main_main_thread)
> shutdownHaskellAndExit(EXIT_SUCCESS);
468a479,480
> if(m == main_main_thread)
> shutdownHaskellAndExit(was_interrupted ?
EXIT_INTERRUPTED : EXIT_KILLED);
565,566d576
< * ToDo: what if another client comes along & requests another
< * main thread?
568c578,583
< if ( !EMPTY_QUEUE(blocked_queue_hd) ||
!EMPTY_QUEUE(sleeping_queue) ) {
---
> if ( !EMPTY_QUEUE(blocked_queue_hd) ||
!EMPTY_QUEUE(sleeping_queue)
> #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
> || EMPTY_RUN_QUEUE()
> #endif
> )
> {
589c604
< #ifndef PAR
---
> #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
701a717,718
> #elif defined(RTS_SUPPORTS_THREADS)
> /* ToDo: add deadlock detection in threaded RTS */
716a734
> #if defined(SMP)
735a754,757
> #else
> if ( EMPTY_RUN_QUEUE() )
> continue; // nothing to do
> #endif
1019c1041
<
---
>
1033a1056,1057
>
> run_thread:
1046d1069
< run_thread:
1064c1087
<
---
>
1072c1095
<
---
>
1605,1607c1628,1630
< if (concCall) {
< startTask(taskStart);
< }
---
> //if (concCall) { // implementing "safe" as opposed to "threadsafe"
is more difficult
> startTask(taskStart);
> //}
1629,1634c1652,1653
< if ( concCall ) {
< ACQUIRE_LOCK(&sched_mutex);
< grabReturnCapability(&sched_mutex, &cap);
< } else {
< grabCapability(&cap);
< }
---
> ACQUIRE_LOCK(&sched_mutex);
> grabReturnCapability(&sched_mutex, &cap);
1656a1676
> #if defined(RTS_SUPPORTS_THREADS)
1657a1678
> #endif
1977c1998
< static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
---
> static void scheduleThread_ (StgTSO* tso);
1980,1985c2001
< scheduleThread_(StgTSO *tso
< , rtsBool createTask
< #if !defined(THREADED_RTS)
< STG_UNUSED
< #endif
< )
---
> scheduleThread_(StgTSO *tso)
1995,2002d2010
< #if defined(THREADED_RTS)
< /* If main() is scheduling a thread, don't bother creating a
< * new task.
< */
< if ( createTask ) {
< startTask(taskStart);
< }
< #endif
2013c2021
< scheduleThread_(tso, rtsFalse);
---
> scheduleThread_(tso);
2019c2027
< {
---
> { // Precondition: sched_mutex must be held
2039d2046
< ACQUIRE_LOCK(&sched_mutex);
2045c2052
< scheduleThread_(tso, rtsTrue);
---
> scheduleThread_(tso);
2047c2054
< return waitThread_(m, rtsTrue); // waitThread_ releases sched_mutex
---
> return waitThread_(m, rtsTrue);
2234a2242
> SchedulerStatus stat;
2251c2259
< return waitThread_(m, rtsFalse); // waitThread_ releases sched_mutex
---
> stat = waitThread_(m, rtsFalse);
2253c2261
< return waitThread_(m);
---
> stat = waitThread_(m);
2254a2263,2264
> RELEASE_LOCK(&sched_mutex);
> return stat;
2277a2288
> main_main_thread = m;
2279a2291,2292
> ACQUIRE_LOCK(&sched_mutex);
> main_main_thread = NULL;
2312,2317c2325
< #if defined(THREADED_RTS)
< if (blockWaiting)
< #endif
< RELEASE_LOCK(&sched_mutex);
<
< // Postcondition: sched_mutex must not be held
---
> // Postcondition: sched_mutex still held
3517d3524
<
3525d3531
<
Index: ghc/rts/Schedule.h
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Schedule.h,v
retrieving revision 1.36
diff -r1.36 Schedule.h
63c63
< /* awaitEvent()
---
> /* awaitEvent(rtsBool wait)
65c65
< * Raises an exception asynchronously in the specified thread.
---
> * Checks for blocked threads that need to be woken.
80a81,90
> /* wakeBlockedWorkerThread()
> *
> * If a worker thread is currently blocked in awaitEvent(), interrupt
it.
> *
> * Called from STG : NO
> * Locks assumed : sched_mutex
> */
> void wakeBlockedWorkerThread(void); /* In Select.c */
>
>
176a187,190
> #if defined(THREADED_RTS)
> rtsBool thread_bound;
> Condition bound_thread_cond;
> #endif
260,262c274
< if ( !noCapabilities() ) { \
< signalCondition(&thread_ready_cond); \
< } \
---
> wakeBlockedWorkerThread(); \
Index: ghc/rts/Select.c
===================================================================
RCS file: /home/cvs/root/fptools/ghc/rts/Select.c,v
retrieving revision 1.22
diff -r1.22 Select.c
18a19
> #include "Capability.h"
37a39,46
> #ifdef RTS_SUPPORTS_THREADS
> static rtsBool isWorkerBlockedInAwaitEvent = rtsFalse;
> #ifndef mingw32_TARGET_OS
> static int workerWakeupPipe[2];
> static rtsBool workerWakeupInited = rtsFalse;
> #endif
> #endif
>
159a169,178
> #ifdef RTS_SUPPORTS_THREADS
> if(!workerWakeupInited)
> {
> pipe(workerWakeupPipe);
> workerWakeupInited = rtsTrue;
> }
> FD_SET(workerWakeupPipe[0], &rfd);
> maxfd = workerWakeupPipe[0] > maxfd ? workerWakeupPipe[0] :
maxfd;
> #endif
>
171a191,194
>
> #ifdef RTS_SUPPORTS_THREADS
> isWorkerBlockedInAwaitEvent = rtsTrue;
> #endif
208a232,235
> #ifdef RTS_SUPPORTS_THREADS
> isWorkerBlockedInAwaitEvent = rtsTrue;
> #endif
> RELEASE_LOCK(&sched_mutex);
212a240,242
> #ifdef RTS_SUPPORTS_THREADS
> isWorkerBlockedInAwaitEvent = rtsFalse;
> #endif
244a275,286
> #ifdef RTS_SUPPORTS_THREADS
> /* If another worker thread wants to take over,
> * return to the scheduler
> */
> if (needToYieldToReturningWorker()) {
> return; /* still hold the lock */
> }
> #endif
>
> #ifdef RTS_SUPPORTS_THREADS
> isWorkerBlockedInAwaitEvent = rtsTrue;
> #endif
290c332,342
<
---
>
> #if defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_TARGET_OS)
> // if we were woken up by wakeBlockedWorkerThread,
> // read the dummy byte from the pipe
> if(select_succeeded && FD_ISSET(workerWakeupPipe[0], &rfd))
> {
> unsigned char dummy;
> wait = rtsFalse;
> read(workerWakeupPipe[0],&dummy,1);
> }
> #endif
292a345,372
>
>
> #ifdef RTS_SUPPORTS_THREADS
> /* wakeBlockedWorkerThread
> *
> * If a worker thread is currently blocked within awaitEvent,
> * wake it.
> * Must be called with sched_mutex held.
> */
>
> void
> wakeBlockedWorkerThread()
> {
> #ifndef mingw32_TARGET_OS
> if(isWorkerBlockedInAwaitEvent)
> {
> unsigned char dummy = 42; // Any value will do here
>
> // write something so that select() wakes up
> write(workerWakeupPipe[1],&dummy,1);
> }
> #else
> // The Win32 implementation currently uses a polling loop,
> // so there is no need to explicitly wake it
> #endif
> }
>
> #endif
_______________________________________________
Cvs-ghc mailing list
[EMAIL PROTECTED]
http://www.haskell.org/mailman/listinfo/cvs-ghc
- RE: Threaded RTS Patch Wolfgang Thaller
- RE: Threaded RTS Patch Simon Marlow
- Threaded RTS Patch Wolfgang Thaller
- RE: Threaded RTS Patch Simon Peyton-Jones
- RE: Threaded RTS Patch Simon Peyton-Jones
- Re: Threaded RTS Patch Wolfgang Thaller
- RE: Threaded RTS Patch Simon Marlow
