On 04/18/2011 06:14 PM, Holger Hans Peter Freyther wrote:
> I was thinking of something similar. I'll look at what failures are these.
make check hangs on the Delay.st test with the most simple hack to check if
the last entry is the same as the previous. I assume (just a blind guess) that
we fill the queue with timers from itimer or such... or setting the exception
flag even if we did nothing.
This should fix it by removing a lot of broken code and getting
{thread,signal}-safety right. I removed the arrays altogether (replaced
them by lists) so the error you were getting is not there at all anymore! :)
Testing is welcome.
Paolo
>From d78b916f9cafeb4bb123084c7609b0bbc82d620e Mon Sep 17 00:00:00 2001
From: Paolo Bonzini <[email protected]>
Date: Sun, 5 Jun 2011 17:53:10 +0200
Subject: [PATCH] rewrite async call queue
---
configure.ac | 4 +
libgst/events.c | 16 ++--
libgst/events.h | 2 +-
libgst/gstpriv.h | 11 ++-
libgst/interp-bc.inl | 20 +----
libgst/interp-jit.inl | 20 +----
libgst/interp.c | 203 ++++++++++++++++++++++++-----------------
libgst/interp.h | 22 +++++-
libgst/oop.c | 7 ++-
libgst/prims.def | 5 +-
libgst/sysdep.c | 1 +
libgst/sysdep.h | 1 -
libgst/sysdep/cygwin/timer.c | 2 +-
libgst/sysdep/posix/events.c | 7 +-
14 files changed, 175 insertions(+), 146 deletions(-)
diff --git a/configure.ac b/configure.ac
index 50a9f2f..e661b9d 100644
--- a/configure.ac
+++ b/configure.ac
@@ -238,6 +238,10 @@ dnl ------------------------------ C COMPILER / OS ------------
{ echo; echo "${term_bold}Platform environment:${term_norm}"; } >& AS_MESSAGE_FD
GST_C_SYNC_BUILTINS
+if test $gst_cv_have_sync_fetch_and_add = no; then
+ AC_MSG_ERROR([Synchronization primitives not found, please use a newer compiler.])
+fi
+
GST_LOCK
AC_SYS_LARGEFILE
AC_C_INLINE
diff --git a/libgst/events.c b/libgst/events.c
index bc1b312..4ca6668 100644
--- a/libgst/events.c
+++ b/libgst/events.c
@@ -57,7 +57,7 @@
/* Holds the semaphores to be signaled when the operating system sends
us a C-style signal. */
-volatile OOP _gst_sem_int_vec[NSIG];
+async_queue_entry _gst_sem_int_vec[NSIG];
/* Signals _gst_sem_int_vec[SIG] and removes the semaphore from the vector
(because C-style signal handlers are one-shot). */
@@ -67,14 +67,10 @@ static RETSIGTYPE signal_handler (int sig);
RETSIGTYPE
signal_handler (int sig)
{
- _gst_disable_interrupts (true);
- if (_gst_sem_int_vec[sig])
+ if (_gst_sem_int_vec[sig].data)
{
- if (IS_CLASS (_gst_sem_int_vec[sig], _gst_semaphore_class))
- {
- _gst_async_signal_and_unregister (_gst_sem_int_vec[sig]);
- _gst_sem_int_vec[sig] = NULL;
- }
+ if (IS_CLASS (_gst_sem_int_vec[sig].data, _gst_semaphore_class))
+ _gst_async_call_internal (&_gst_sem_int_vec[sig]);
else
{
_gst_errorf
@@ -83,7 +79,6 @@ signal_handler (int sig)
}
}
- _gst_enable_interrupts (true);
_gst_set_signal_handler (sig, SIG_DFL);
_gst_wakeup ();
}
@@ -95,8 +90,9 @@ _gst_async_interrupt_wait (OOP semaphoreOOP,
if (sig < 0 || sig >= NSIG)
return;
- _gst_sem_int_vec[sig] = semaphoreOOP;
_gst_register_oop (semaphoreOOP);
+ _gst_sem_int_vec[sig].func = _gst_do_async_signal_and_unregister;
+ _gst_sem_int_vec[sig].data = semaphoreOOP;
_gst_set_signal_handler (sig, signal_handler);
/* should probably package up the old interrupt state here for return
diff --git a/libgst/events.h b/libgst/events.h
index 85d1656..464d8a3 100644
--- a/libgst/events.h
+++ b/libgst/events.h
@@ -54,7 +54,7 @@
#define GST_EVENTS_H
/* Array of semaphores associated to the C signals. */
-extern volatile OOP _gst_sem_int_vec[NSIG]
+extern async_queue_entry _gst_sem_int_vec[NSIG]
ATTRIBUTE_HIDDEN;
/* Initialize the data structures used to hold information about
diff --git a/libgst/gstpriv.h b/libgst/gstpriv.h
index 1ad17e7..50ee317 100644
--- a/libgst/gstpriv.h
+++ b/libgst/gstpriv.h
@@ -199,9 +199,10 @@
DO_PREFETCH ((x), 0, (k));
/* Synchronization primitives. */
-#if !defined HAVE_SYNC_BUILTINS && defined __GNUC__
-#define __sync_synchronize() __asm__ ("" : : : "memory")
-#endif
+#define __sync_exchange(ptr, val) \
+ ({ __typeof__ (*(ptr)) _x; \
+ do _x = *(ptr); while (!__sync_bool_compare_and_swap ((ptr), (_x), (val))); \
+ _x; })
/* Kill a warning when using GNU C. Note that this allows using
break or continue inside a macro, unlike do...while(0) */
@@ -518,8 +519,10 @@ extern OOP _gst_nil_oop
#ifdef __GNUC__
#define no_opt(x) ({ __typeof__ ((x)) _result; \
asm ("" : "=r" (_result) : "0" ((x))); _result; })
+#define barrier() asm ("")
#else
#define no_opt(x) (x)
+#define barrier()
#endif
/* integer conversions and some information on SmallIntegers. */
@@ -597,7 +600,6 @@ extern OOP _gst_nil_oop
#include "callin.h"
#include "cint.h"
#include "dict.h"
-#include "events.h"
#include "heap.h"
#include "lex.h"
#include "tree.h"
@@ -608,6 +610,7 @@ extern OOP _gst_nil_oop
#include "byte.h"
#include "comp.h"
#include "interp.h"
+#include "events.h"
#include "opt.h"
#include "save.h"
#include "str.h"
diff --git a/libgst/interp-bc.inl b/libgst/interp-bc.inl
index 9a1243e..8819481 100644
--- a/libgst/interp-bc.inl
+++ b/libgst/interp-bc.inl
@@ -525,25 +525,7 @@ monitor_byte_codes:
/* First, deal with any async signals. */
if (async_queue_enabled)
- {
- gl_lock_lock (async_queue_lock);
- _gst_disable_interrupts (false);
- __sync_synchronize ();
-
- if UNCOMMON (async_queue_index || async_queue_index_sig)
- {
- int i;
- for (i = 0; i < async_queue_index; i++)
- queued_async_signals[i].func (queued_async_signals[i].data);
- for (i = 0; i < async_queue_index_sig; i++)
- queued_async_signals_sig[i].func (queued_async_signals_sig[i].data);
-
- async_queue_index = async_queue_index_sig = 0;
- }
-
- _gst_enable_interrupts (false);
- gl_lock_unlock (async_queue_lock);
- }
+ empty_async_queue ();
if UNCOMMON (time_to_preempt)
ACTIVE_PROCESS_YIELD ();
diff --git a/libgst/interp-jit.inl b/libgst/interp-jit.inl
index f254730..0e8fa27 100644
--- a/libgst/interp-jit.inl
+++ b/libgst/interp-jit.inl
@@ -413,25 +413,7 @@ _gst_interpret (OOP processOOP)
/* First, deal with any async signals. */
if (async_queue_enabled)
- {
- gl_lock_lock (async_queue_lock);
- _gst_disable_interrupts (false);
- __sync_synchronize ();
-
- if UNCOMMON (async_queue_index || async_queue_index_sig)
- {
- int i;
- for (i = 0; i < async_queue_index; i++)
- queued_async_signals[i].func (queued_async_signals[i].data);
- for (i = 0; i < async_queue_index_sig; i++)
- queued_async_signals_sig[i].func (queued_async_signals_sig[i].data);
-
- async_queue_index = async_queue_index_sig = 0;
- }
-
- _gst_enable_interrupts (false);
- gl_lock_unlock (async_queue_lock);
- }
+ empty_async_queue ();
thisContext =
(gst_method_context) OOP_TO_OBJ (_gst_this_context_oop);
diff --git a/libgst/interp.c b/libgst/interp.c
index 88e3f4c..c83fbef 100644
--- a/libgst/interp.c
+++ b/libgst/interp.c
@@ -113,13 +113,6 @@
to speed up these messages for Arrays, Strings, and ByteArrays. */
#define METHOD_CACHE_SIZE (1 << 11)
-typedef struct async_queue_entry
-{
- void (*func) (OOP);
- OOP data;
-}
-async_queue_entry;
-
typedef struct interp_jmp_buf
{
jmp_buf jmpBuf;
@@ -247,14 +240,10 @@ static int class_cache_prim;
#endif
/* Queue for async (outside the interpreter) semaphore signals */
-gl_lock_define (static, async_queue_lock);
static mst_Boolean async_queue_enabled = true;
-static int async_queue_index;
-static int async_queue_size;
-static async_queue_entry *queued_async_signals;
-
-static int async_queue_index_sig;
-static async_queue_entry queued_async_signals_sig[NSIG];
+static async_queue_entry queued_async_signals_tail;
+static async_queue_entry *queued_async_signals = &queued_async_signals_tail;
+static async_queue_entry *queued_async_signals_sig = &queued_async_signals_tail;
/* When not NULL, this causes the byte code interpreter to immediately
send the message whose selector is here to the current stack
@@ -295,6 +284,9 @@ static inline mst_Boolean cached_index_oop_put_primitive (OOP rec,
OOP val,
intptr_t spec);
+/* Empty the queue of asynchronous calls. */
+static void empty_async_queue (void);
+
/* Try to find another process with higher or same priority as the
active one. Return whether there is one. */
static mst_Boolean would_reschedule_process (void);
@@ -1608,73 +1600,130 @@ _gst_sync_signal (OOP semaphoreOOP, mst_Boolean incr_if_empty)
return true;
}
-static void
-do_async_signal (OOP semaphoreOOP)
+void
+_gst_do_async_signal (OOP semaphoreOOP)
{
_gst_sync_signal (semaphoreOOP, true);
}
void
-_gst_async_call (void (*func) (OOP), OOP arg)
+_gst_do_async_signal_and_unregister (OOP semaphoreOOP)
{
- if (_gst_signal_count)
- {
- /* Async-signal-safe version. Because the discriminant is
- _gst_signal_count, can only be called from within libgst.la
- (in particular events.c). Also, because of that we assume
- interrupts are already disabled. */
-
- if (async_queue_index_sig == NSIG)
- {
- static const char errmsg[] = "Asynchronous signal queue full!\n";
- write (2, errmsg, sizeof (errmsg) - 1);
- abort ();
- }
+ _gst_sync_signal (semaphoreOOP, true);
+ _gst_unregister_oop (semaphoreOOP);
+}
- queued_async_signals_sig[async_queue_index_sig].func = func;
- queued_async_signals_sig[async_queue_index_sig++].data = arg;
- }
- else
- {
- /* Thread-safe version for the masses. */
+/* Async-signal-safe version, does no allocation. Using an atomic operation
+ is still the simplest choice, but on top of that we check that the entry
+ is not already in the list. Also, the datum and next field are NULLed
+ automatically when the call is made. */
+void
+_gst_async_call_internal (async_queue_entry *e)
+{
+ /* For async-signal safety, we need to check that the entry is not
+ already in the list. Checking that atomically with CAS is the
+ simplest way. */
+ do
+ if (__sync_val_compare_and_swap(&e->next, NULL, queued_async_signals_sig))
+ return;
+ while (!__sync_bool_compare_and_swap (&queued_async_signals_sig, e->next, e));
+ SET_EXCEPT_FLAG (true);
+}
- gl_lock_lock (async_queue_lock);
- if (async_queue_index == async_queue_size)
- {
- async_queue_size *= 2;
- queued_async_signals =
- realloc (queued_async_signals,
- sizeof (async_queue_entry) * async_queue_size);
- }
+void
+_gst_async_call (void (*func) (OOP), OOP arg)
+{
+ /* Thread-safe version for the masses. This lockless stack
+ is reversed in the interpreter loop to get FIFO behavior. */
+ async_queue_entry *sig = xmalloc (sizeof (async_queue_entry));
+ sig->func = func;
+ sig->data = arg;
- queued_async_signals[async_queue_index].func = func;
- queued_async_signals[async_queue_index++].data = arg;
- gl_lock_unlock (async_queue_lock);
- _gst_wakeup ();
- }
-
+ do
+ sig->next = queued_async_signals;
+ while (!__sync_bool_compare_and_swap (&queued_async_signals,
+ sig->next, sig));
+ _gst_wakeup ();
SET_EXCEPT_FLAG (true);
}
mst_Boolean
_gst_have_pending_async_calls ()
{
- return async_queue_index_sig > 0 || async_queue_index > 0;
+ return (queued_async_signals != &queued_async_signals_tail
+ || queued_async_signals_sig != &queued_async_signals_tail);
+}
+
+void
+empty_async_queue ()
+{
+ async_queue_entry *sig, *sig_reversed;
+
+ /* Process a batch of asynchronous requests. These are pushed
+ in LIFO order by _gst_async_call. By reversing the list
+ in place before walking it, we get FIFO order. */
+ sig = __sync_exchange (&queued_async_signals, &queued_async_signals_tail);
+ sig_reversed = &queued_async_signals_tail;
+ while (sig != &queued_async_signals_tail)
+ {
+ async_queue_entry *next = sig->next;
+ sig->next = sig_reversed;
+ sig_reversed = sig;
+ sig = next;
+ }
+
+ sig = sig_reversed;
+ while (sig != &queued_async_signals_tail)
+ {
+ async_queue_entry *next = sig->next;
+ sig->func (sig->data);
+ free (sig);
+ sig = next;
+ }
+
+ /* For async-signal-safe processing, we need to avoid entering
+ the same item twice into the list. So we use NEXT to mark
+ items that have been added... */
+ sig = __sync_exchange (&queued_async_signals_sig, &queued_async_signals_tail);
+ sig_reversed = &queued_async_signals_tail;
+ while (sig != &queued_async_signals_tail)
+ {
+ async_queue_entry *next = sig->next;
+ sig->next = sig_reversed;
+ sig_reversed = sig;
+ sig = next;
+ }
+
+ sig = sig_reversed;
+ while (sig != &queued_async_signals_tail)
+ {
+ async_queue_entry *next = sig->next;
+ void (*func) (OOP) = sig->func;
+ OOP data = sig->data;
+ barrier ();
+
+ sig->data = NULL;
+ barrier ();
+
+ /* ... and we only NULL it after a signal handler can start
+ writing to it. */
+ sig->next = NULL;
+ barrier ();
+ func (data);
+ sig = next;
+ }
}
void
_gst_async_signal (OOP semaphoreOOP)
{
- _gst_async_call (do_async_signal, semaphoreOOP);
+ _gst_async_call (_gst_do_async_signal, semaphoreOOP);
}
void
_gst_async_signal_and_unregister (OOP semaphoreOOP)
{
- _gst_disable_interrupts (false); /* block out everything! */
- _gst_async_call (do_async_signal, semaphoreOOP);
- _gst_async_call (_gst_unregister_oop, semaphoreOOP);
- _gst_enable_interrupts (false);
+ _gst_async_call (_gst_do_async_signal_and_unregister, semaphoreOOP);
}
void
@@ -2204,12 +2253,6 @@ _gst_init_interpreter (void)
#endif
_gst_this_context_oop = _gst_nil_oop;
- async_queue_index = 0;
- async_queue_index_sig = 0;
- async_queue_size = 32;
- queued_async_signals = malloc (sizeof (async_queue_entry) * async_queue_size);
- gl_lock_init (async_queue_lock);
-
for (i = 0; i < MAX_LIFO_DEPTH; i++)
lifo_contexts[i].flags = F_POOLED | F_CONTEXT;
@@ -2378,16 +2421,14 @@ _gst_copy_processor_registers (void)
void
copy_semaphore_oops (void)
{
- int i;
-
- _gst_disable_interrupts (false); /* block out everything! */
+ async_queue_entry *sig;
- for (i = 0; i < async_queue_index; i++)
- if (queued_async_signals[i].data)
- MAYBE_COPY_OOP (queued_async_signals[i].data);
- for (i = 0; i < async_queue_index_sig; i++)
- if (queued_async_signals_sig[i].data)
- MAYBE_COPY_OOP (queued_async_signals_sig[i].data);
+ for (sig = queued_async_signals; sig != &queued_async_signals_tail;
+ sig = sig->next)
+ MAYBE_COPY_OOP (sig->data);
+ for (sig = queued_async_signals_sig; sig != &queued_async_signals_tail;
+ sig = sig->next)
+ MAYBE_COPY_OOP (sig->data);
/* there does seem to be a window where this is not valid */
if (single_step_semaphore)
@@ -2395,8 +2436,6 @@ copy_semaphore_oops (void)
/* there does seem to be a window where this is not valid */
MAYBE_COPY_OOP (switch_to_process);
-
- _gst_enable_interrupts (false);
}
@@ -2416,16 +2455,14 @@ _gst_mark_processor_registers (void)
void
mark_semaphore_oops (void)
{
- int i;
-
- _gst_disable_interrupts (false); /* block out everything! */
+ async_queue_entry *sig;
- for (i = 0; i < async_queue_index; i++)
- if (queued_async_signals[i].data)
- MAYBE_MARK_OOP (queued_async_signals[i].data);
- for (i = 0; i < async_queue_index_sig; i++)
- if (queued_async_signals_sig[i].data)
- MAYBE_MARK_OOP (queued_async_signals_sig[i].data);
+ for (sig = queued_async_signals; sig != &queued_async_signals_tail;
+ sig = sig->next)
+ MAYBE_MARK_OOP (sig->data);
+ for (sig = queued_async_signals_sig; sig != &queued_async_signals_tail;
+ sig = sig->next)
+ MAYBE_MARK_OOP (sig->data);
/* there does seem to be a window where this is not valid */
if (single_step_semaphore)
@@ -2433,8 +2470,6 @@ mark_semaphore_oops (void)
/* there does seem to be a window where this is not valid */
MAYBE_MARK_OOP (switch_to_process);
-
- _gst_enable_interrupts (false);
}
diff --git a/libgst/interp.h b/libgst/interp.h
index 22d1c55..7696625 100644
--- a/libgst/interp.h
+++ b/libgst/interp.h
@@ -327,6 +327,14 @@ typedef gst_uchar *ip_type;
extern ip_type ip
ATTRIBUTE_HIDDEN;
+typedef struct async_queue_entry
+{
+ void (*func) (OOP);
+ OOP data;
+ struct async_queue_entry *next;
+}
+async_queue_entry;
+
/* When not NULL, this causes the byte code interpreter to immediately
send the message whose selector is here to the current stack
top. */
@@ -401,7 +409,19 @@ extern mst_Boolean _gst_have_pending_async_calls (void)
/* Set up so that FUNC will be called, with ARGOOP as its argument,
as soon as the next sequence point is reached. */
extern void _gst_async_call (void (*func) (OOP),
- OOP argOOP)
+ OOP argOOP)
+ ATTRIBUTE_HIDDEN;
+
+/* Worker functions for _gst_async_call_internal. */;
+extern void _gst_do_async_signal (OOP semaphoreOOP)
+ ATTRIBUTE_HIDDEN;
+extern void _gst_do_async_signal_and_unregister (OOP semaphoreOOP)
+ ATTRIBUTE_HIDDEN;
+
+/* Set up so that ENTRY->FUNC will be called, with ENTRY->DATA as its
+ argument, as soon as the next sequence point is reached. Async-signal
+ safe version. */
+extern void _gst_async_call_internal (async_queue_entry *entry)
ATTRIBUTE_HIDDEN;
/* Signal SEMAPHOREOOP so that one of the processes waiting on that
diff --git a/libgst/oop.c b/libgst/oop.c
index 0a34f95..2e8b7fd 100644
--- a/libgst/oop.c
+++ b/libgst/oop.c
@@ -1529,7 +1529,12 @@ mourn_objects (void)
array = new_instance_with (_gst_array_class, size, &processor->gcArray);
_gst_copy_buffer (array->data);
if (!IS_NIL (processor->gcSemaphore))
- _gst_async_signal (processor->gcSemaphore);
+ {
+ static async_queue_entry e;
+ e.func = _gst_do_async_signal;
+ e.data = processor->gcSemaphore;
+ _gst_async_call_internal (&e);
+ }
else
{
_gst_errorf ("Running finalizers before initialization.");
diff --git a/libgst/prims.def b/libgst/prims.def
index 5eff664..2b14aaa 100644
--- a/libgst/prims.def
+++ b/libgst/prims.def
@@ -3861,7 +3861,10 @@ primitive VMpr_Processor_disableEnableInterrupts :
if (id == prim_id (VMpr_Processor_disableInterrupts) && count++ == 0)
async_queue_enabled = false;
else if (id == prim_id (VMpr_Processor_enableInterrupts) && --count == 0)
- async_queue_enabled = true;
+ {
+ async_queue_enabled = true;
+ SET_EXCEPT_FLAG (true);
+ }
process->interrupts = FROM_INT (count);
diff --git a/libgst/sysdep.c b/libgst/sysdep.c
index 1c3dcd7..dfb5415 100644
--- a/libgst/sysdep.c
+++ b/libgst/sysdep.c
@@ -67,6 +67,7 @@
#include "sysdep/cygwin/files.c"
#include "sysdep/cygwin/mem.c"
#elif !defined WIN32
+#include <pthread.h>
#include "sysdep/posix/findexec.c"
#include "sysdep/posix/timer.c"
#include "sysdep/posix/signals.c"
diff --git a/libgst/sysdep.h b/libgst/sysdep.h
index af77ccc..2cebe69 100644
--- a/libgst/sysdep.h
+++ b/libgst/sysdep.h
@@ -113,7 +113,6 @@ extern void _gst_signal_after (int deltaMilli,
/* Initialize system dependent stuff. */
extern void _gst_init_sysdep (void)
ATTRIBUTE_HIDDEN;
-
extern void _gst_init_sysdep_win32 (void)
ATTRIBUTE_HIDDEN;
diff --git a/libgst/sysdep/cygwin/timer.c b/libgst/sysdep/cygwin/timer.c
index 37e9b42..648fe93 100644
--- a/libgst/sysdep/cygwin/timer.c
+++ b/libgst/sysdep/cygwin/timer.c
@@ -96,7 +96,7 @@ alarm_thread (unused)
}
void
-_gst_init_sysdep_win32 ()
+_gst_init_sysdep_win32 (void)
{
HANDLE hthread;
DWORD tid;
diff --git a/libgst/sysdep/posix/events.c b/libgst/sysdep/posix/events.c
index e3d3c36..ab2a88e 100644
--- a/libgst/sysdep/posix/events.c
+++ b/libgst/sysdep/posix/events.c
@@ -207,7 +207,7 @@ _gst_async_timed_wait (OOP semaphoreOOP,
mst_Boolean
_gst_is_timeout_programmed (void)
{
- return (!IS_NIL (_gst_sem_int_vec[SIGALRM]));
+ return (!IS_NIL (no_opt (_gst_sem_int_vec[SIGALRM].data)));
}
void
@@ -341,9 +341,8 @@ file_polling_handler (int sig)
{
if (num_used_pollfds > 0)
{
- _gst_disable_interrupts (true);
- _gst_async_call (async_signal_polled_files, NULL);
- _gst_enable_interrupts (true);
+ static async_queue_entry e = { async_signal_polled_files, NULL, NULL };
+ _gst_async_call_internal (&e);
}
_gst_set_signal_handler (sig, file_polling_handler);
--
1.7.4.4
_______________________________________________
help-smalltalk mailing list
[email protected]
https://lists.gnu.org/mailman/listinfo/help-smalltalk