Author: Armin Rigo <[email protected]>
Branch: stmgc-c8
Changeset: r78163:91a280e13a9d
Date: 2015-06-18 09:48 +0200
http://bitbucket.org/pypy/pypy/changeset/91a280e13a9d/
Log: import stmgc branch 'queue', and add rlib support for stm queues
diff --git a/rpython/memory/gctransform/stmframework.py
b/rpython/memory/gctransform/stmframework.py
--- a/rpython/memory/gctransform/stmframework.py
+++ b/rpython/memory/gctransform/stmframework.py
@@ -224,6 +224,7 @@
gct_stm_stop_all_other_threads = _gct_with_roots_pushed
gct_stm_transaction_break = _gct_with_roots_pushed
gct_stm_collect = _gct_with_roots_pushed
+ gct_stm_queue_get = _gct_with_roots_pushed
class StmRootWalker(BaseRootWalker):
diff --git a/rpython/rlib/rstm.py b/rpython/rlib/rstm.py
--- a/rpython/rlib/rstm.py
+++ b/rpython/rlib/rstm.py
@@ -356,3 +356,70 @@
" use h.writeobj() instead")
object = property(_getobj, _setobj)
+
+# ____________________________________________________________
+
+_STM_QUEUE_P = rffi.COpaquePtr('stm_queue_t')
+
+@dont_look_inside
+def _ll_queue_get(q, timeout=-1.0):
+ # Returns a GCREF.
+ return llop.stm_queue_get(llmemory.GCREF, q, q.ll_raw_queue, timeout)
+
+@dont_look_inside
+def _ll_queue_put(q, newitem):
+ llop.stm_queue_put(lltype.Void, q, q.ll_raw_queue, newitem)
+
+_QUEUE_OBJ = lltype.GcStruct('QUEUE_OBJ',
+ ('ll_raw_queue', _STM_QUEUE_P),
+ hints={'immutable': True},
+ rtti=True,
+ adtmeths={'get': _ll_queue_get,
+ 'put': _ll_queue_put})
+NULL_QUEUE = lltype.nullptr(_QUEUE_OBJ)
+
+def _ll_queue_trace(gc, obj, callback, arg):
+ from rpython.memory.gctransform.stmframework import get_visit_function
+ visit_fn = get_visit_function(callback, arg)
+ addr = obj + llmemory.offsetof(_QUEUE_OBJ, 'll_raw_queue')
+ llop.stm_queue_tracefn(lltype.Void, addr.address[0], visit_fn)
+lambda_queue_trace = lambda: _ll_queue_trace
+
+def _ll_queue_finalizer(q):
+ if q.ll_raw_queue:
+ llop.stm_queue_free(lltype.Void, q.ll_raw_queue)
+lambda_queue_finlz = lambda: _ll_queue_finalizer
+
+@dont_look_inside
+def create_queue():
+ if not we_are_translated():
+ return QueueForTest() # for tests
+ rgc.register_custom_light_finalizer(_QUEUE_OBJ, lambda_queue_finlz)
+ rgc.register_custom_trace_hook(_QUEUE_OBJ, lambda_queue_trace)
+ q = lltype.malloc(_QUEUE_OBJ, zero=True)
+ q.ll_raw_queue = llop.stm_queue_create(_STM_QUEUE_P)
+ return q
+
+class QueueForTest(object):
+ def __init__(self):
+ import Queue
+ self._content = Queue.Queue()
+ self._Empty = Queue.Empty
+
+ def _cleanup_(self):
+ raise Exception("cannot translate a prebuilt rstm.Queue object")
+
+ def get(self, timeout=-1.0):
+ if timeout < 0.0:
+ return self._content.get()
+ try:
+ if timeout == 0.0:
+ return self._content.get(block=False)
+ else:
+ return self._content.get(timeout=timeout)
+ except self._Empty:
+ return NULL_GCREF
+
+ def put(self, newitem):
+ assert lltype.typeOf(newitem) == llmemory.GCREF
+ self._content.put(newitem)
diff --git a/rpython/rtyper/llinterp.py b/rpython/rtyper/llinterp.py
--- a/rpython/rtyper/llinterp.py
+++ b/rpython/rtyper/llinterp.py
@@ -1002,6 +1002,11 @@
op_stm_hashtable_length_upper_bound = _stm_not_implemented
op_stm_hashtable_list = _stm_not_implemented
op_stm_hashtable_free = _stm_not_implemented
+ op_stm_queue_create = _stm_not_implemented
+ op_stm_queue_free = _stm_not_implemented
+ op_stm_queue_get = _stm_not_implemented
+ op_stm_queue_put = _stm_not_implemented
+ op_stm_queue_tracefn = _stm_not_implemented
op_stm_register_thread_local = _stm_not_implemented
op_stm_unregister_thread_local = _stm_not_implemented
op_stm_really_force_cast_ptr = _stm_not_implemented
diff --git a/rpython/rtyper/lltypesystem/lloperation.py
b/rpython/rtyper/lltypesystem/lloperation.py
--- a/rpython/rtyper/lltypesystem/lloperation.py
+++ b/rpython/rtyper/lltypesystem/lloperation.py
@@ -479,6 +479,12 @@
'stm_hashtable_list' : LLOp(),
'stm_hashtable_tracefn': LLOp(),
+ 'stm_queue_create': LLOp(),
+ 'stm_queue_free': LLOp(),
+ 'stm_queue_get': LLOp(canmallocgc=True), # push roots!
+ 'stm_queue_put': LLOp(),
+ 'stm_queue_tracefn': LLOp(),
+
# __________ address operations __________
'boehm_malloc': LLOp(),
diff --git a/rpython/translator/backendopt/finalizer.py
b/rpython/translator/backendopt/finalizer.py
--- a/rpython/translator/backendopt/finalizer.py
+++ b/rpython/translator/backendopt/finalizer.py
@@ -18,7 +18,8 @@
"""
ok_operations = ['ptr_nonzero', 'ptr_eq', 'ptr_ne', 'free', 'same_as',
'direct_ptradd', 'force_cast', 'track_alloc_stop',
- 'raw_free', 'debug_print', 'stm_hashtable_free']
+ 'raw_free', 'debug_print', 'stm_hashtable_free',
+ 'stm_queue_free']
def analyze_light_finalizer(self, graph):
result = self.analyze_direct_call(graph)
diff --git a/rpython/translator/c/src/mem.c b/rpython/translator/c/src/mem.c
--- a/rpython/translator/c/src/mem.c
+++ b/rpython/translator/c/src/mem.c
@@ -62,7 +62,7 @@
#ifdef RPY_STM
// spinlock_acquire/spinlock_release defined in ../../stm/src_stm/stmgcintf.h
-static Signed pypy_debug_alloc_lock = 0;
+static uint8_t pypy_debug_alloc_lock = 0;
#else
# define spinlock_acquire(lock) /* nothing */
# define spinlock_release(lock) /* nothing */
diff --git a/rpython/translator/stm/breakfinder.py
b/rpython/translator/stm/breakfinder.py
--- a/rpython/translator/stm/breakfinder.py
+++ b/rpython/translator/stm/breakfinder.py
@@ -9,6 +9,7 @@
'stm_enter_callback_call',
'stm_leave_callback_call',
'stm_transaction_break',
+ 'stm_queue_get',
])
for tb in TRANSACTION_BREAK:
diff --git a/rpython/translator/stm/funcgen.py
b/rpython/translator/stm/funcgen.py
--- a/rpython/translator/stm/funcgen.py
+++ b/rpython/translator/stm/funcgen.py
@@ -368,3 +368,32 @@
arg2 = funcgen.expr(op.args[2])
return ('stm_hashtable_tracefn(%s, (stm_hashtable_t *)%s, '
' (void(*)(object_t**))%s);' % (arg0, arg1, arg2))
+
+def stm_queue_create(funcgen, op):
+ result = funcgen.expr(op.result)
+ return '%s = stm_queue_create();' % (result,)
+
+def stm_queue_free(funcgen, op):
+ arg = funcgen.expr(op.args[0])
+ return 'stm_queue_free(%s);' % (arg,)
+
+def stm_queue_get(funcgen, op):
+ arg0 = funcgen.expr(op.args[0])
+ arg1 = funcgen.expr(op.args[1])
+ arg2 = funcgen.expr(op.args[2])
+ result = funcgen.expr(op.result)
+ return ('%s = (rpygcchar_t *)stm_queue_get((object_t *)%s, %s, %s, '
+ '&stm_thread_local);' % (result, arg0, arg1, arg2))
+
+def stm_queue_put(funcgen, op):
+ arg0 = funcgen.expr(op.args[0])
+ arg1 = funcgen.expr(op.args[1])
+ arg2 = funcgen.expr(op.args[2])
+ return 'stm_queue_put((object_t *)%s, %s, (object_t *)%s);' % (
+ arg0, arg1, arg2)
+
+def stm_queue_tracefn(funcgen, op):
+ arg0 = funcgen.expr(op.args[0])
+ arg1 = funcgen.expr(op.args[1])
+ return ('stm_queue_tracefn((stm_queue_t *)%s, '
+ ' (void(*)(object_t**))%s);' % (arg0, arg1))
diff --git a/rpython/translator/stm/src_stm/revision
b/rpython/translator/stm/src_stm/revision
--- a/rpython/translator/stm/src_stm/revision
+++ b/rpython/translator/stm/src_stm/revision
@@ -1,1 +1,1 @@
-14536c2a2af4
+3ca830828468
diff --git a/rpython/translator/stm/src_stm/stm/atomic.h
b/rpython/translator/stm/src_stm/stm/atomic.h
--- a/rpython/translator/stm/src_stm/stm/atomic.h
+++ b/rpython/translator/stm/src_stm/stm/atomic.h
@@ -42,12 +42,19 @@
#endif
-#define spinlock_acquire(lock) \
- do { if (LIKELY(__sync_lock_test_and_set(&(lock), 1) == 0)) break; \
- spin_loop(); } while (1)
-#define spinlock_release(lock) \
- do { assert((lock) == 1); \
- __sync_lock_release(&(lock)); } while (0)
+static inline void _spinlock_acquire(uint8_t *plock) {
+ retry:
+ if (__builtin_expect(__sync_lock_test_and_set(plock, 1) != 0, 0)) {
+ spin_loop();
+ goto retry;
+ }
+}
+static inline void _spinlock_release(uint8_t *plock) {
+ assert(*plock == 1);
+ __sync_lock_release(plock);
+}
+#define spinlock_acquire(lock) _spinlock_acquire(&(lock))
+#define spinlock_release(lock) _spinlock_release(&(lock))
#endif /* _STM_ATOMIC_H */
diff --git a/rpython/translator/stm/src_stm/stm/core.c
b/rpython/translator/stm/src_stm/stm/core.c
--- a/rpython/translator/stm/src_stm/stm/core.c
+++ b/rpython/translator/stm/src_stm/stm/core.c
@@ -1146,6 +1146,7 @@
assert(tree_is_cleared(STM_PSEGMENT->callbacks_on_commit_and_abort[1]));
assert(list_is_empty(STM_PSEGMENT->young_objects_with_light_finalizers));
assert(STM_PSEGMENT->finalizers == NULL);
+ assert(STM_PSEGMENT->active_queues == NULL);
#ifndef NDEBUG
/* this should not be used when objects_pointing_to_nursery == NULL */
STM_PSEGMENT->position_markers_len_old = 99999999999999999L;
@@ -1351,6 +1352,9 @@
STM_PSEGMENT->overflow_number_has_been_used = false;
}
+ if (STM_PSEGMENT->active_queues)
+ queues_deactivate_all(/*at_commit=*/true);
+
invoke_and_clear_user_callbacks(0); /* for commit */
/* done */
@@ -1505,6 +1509,9 @@
if (tl->mem_clear_on_abort)
memset(tl->mem_clear_on_abort, 0, tl->mem_bytes_to_clear_on_abort);
+ if (STM_PSEGMENT->active_queues)
+ queues_deactivate_all(/*at_commit=*/false);
+
invoke_and_clear_user_callbacks(1); /* for abort */
if (is_abort(STM_SEGMENT->nursery_end)) {
diff --git a/rpython/translator/stm/src_stm/stm/core.h
b/rpython/translator/stm/src_stm/stm/core.h
--- a/rpython/translator/stm/src_stm/stm/core.h
+++ b/rpython/translator/stm/src_stm/stm/core.h
@@ -117,13 +117,15 @@
bool minor_collect_will_commit_now;
struct tree_s *callbacks_on_commit_and_abort[2];
+ struct tree_s *active_queues;
+ uint8_t active_queues_lock;
/* This is the number stored in the overflowed objects (a multiple of
GCFLAG_OVERFLOW_NUMBER_bit0). It is incremented when the
transaction is done, but only if we actually overflowed any
object; otherwise, no object has got this number. */
+ bool overflow_number_has_been_used;
uint32_t overflow_number;
- bool overflow_number_has_been_used;
struct stm_commit_log_entry_s *last_commit_log_entry;
diff --git a/rpython/translator/stm/src_stm/stm/gcpage.c
b/rpython/translator/stm/src_stm/stm/gcpage.c
--- a/rpython/translator/stm/src_stm/stm/gcpage.c
+++ b/rpython/translator/stm/src_stm/stm/gcpage.c
@@ -48,7 +48,7 @@
}
-static int lock_growth_large = 0;
+static uint8_t lock_growth_large = 0;
static stm_char *allocate_outside_nursery_large(uint64_t size)
{
diff --git a/rpython/translator/stm/src_stm/stm/largemalloc.c
b/rpython/translator/stm/src_stm/stm/largemalloc.c
--- a/rpython/translator/stm/src_stm/stm/largemalloc.c
+++ b/rpython/translator/stm/src_stm/stm/largemalloc.c
@@ -108,7 +108,7 @@
static struct {
- int lock;
+ uint8_t lock;
mchunk_t *first_chunk, *last_chunk;
dlist_t largebins[N_BINS];
} lm __attribute__((aligned(64)));
diff --git a/rpython/translator/stm/src_stm/stm/queue.c
b/rpython/translator/stm/src_stm/stm/queue.c
new file mode 100644
--- /dev/null
+++ b/rpython/translator/stm/src_stm/stm/queue.c
@@ -0,0 +1,319 @@
+/* Imported by rpython/translator/stm/import_stmgc.py */
+typedef struct queue_entry_s {
+ object_t *object;
+ struct queue_entry_s *next;
+} queue_entry_t;
+
+typedef union stm_queue_segment_u {
+ struct {
+ /* a chained list of fresh entries that have been allocated and
+ added to this queue during the current transaction. If the
+ transaction commits, these are moved to 'old_entries'. */
+ queue_entry_t *added_in_this_transaction;
+
+ /* a point inside the chained list above such that all items from
+ this point are known to contain non-young objects, for GC */
+ queue_entry_t *added_young_limit;
+
+ /* a chained list of old entries that the current transaction
+ popped. only used if the transaction is not inevitable:
+ if it aborts, these entries are added back to 'old_entries'. */
+ queue_entry_t *old_objects_popped;
+
+ /* a queue is active when either of the two chained lists
+ above is not empty, until the transaction commits. (this
+ notion is per segment.) this flag says that the queue is
+ already in the tree STM_PSEGMENT->active_queues. */
+ bool active;
+ };
+ char pad[64];
+} stm_queue_segment_t;
+
+
+struct stm_queue_s {
+ /* this structure is always allocated on a multiple of 64 bytes,
+ and the 'segs' is an array of items 64 bytes each */
+ stm_queue_segment_t segs[STM_NB_SEGMENTS];
+
+ /* a chained list of old entries in the queue */
+ queue_entry_t *volatile old_entries;
+};
+
+
+stm_queue_t *stm_queue_create(void)
+{
+ void *mem;
+ int result = posix_memalign(&mem, 64, sizeof(stm_queue_t));
+ assert(result == 0);
+ (void)result;
+ memset(mem, 0, sizeof(stm_queue_t));
+ return (stm_queue_t *)mem;
+}
+
+static void queue_free_entries(queue_entry_t *lst)
+{
+ while (lst != NULL) {
+ queue_entry_t *next = lst->next;
+ free(lst);
+ lst = next;
+ }
+}
+
+void stm_queue_free(stm_queue_t *queue)
+{
+ long i;
+ dprintf(("free queue %p\n", queue));
+ for (i = 0; i < STM_NB_SEGMENTS; i++) {
+ stm_queue_segment_t *seg = &queue->segs[i];
+
+ /* it is possible that queues_deactivate_all() runs in parallel,
+ but it should not be possible at this point for another thread
+ to change 'active' from false to true. if it is false, then
+ that's it */
+ if (!seg->active) {
+ assert(!seg->added_in_this_transaction);
+ assert(!seg->added_young_limit);
+ assert(!seg->old_objects_popped);
+ continue;
+ }
+
+ struct stm_priv_segment_info_s *pseg = get_priv_segment(i + 1);
+ spinlock_acquire(pseg->active_queues_lock);
+
+ if (seg->active) {
+ assert(pseg->active_queues != NULL);
+ bool ok = tree_delete_item(pseg->active_queues, (uintptr_t)queue);
+ assert(ok);
+ (void)ok;
+ }
+ queue_free_entries(seg->added_in_this_transaction);
+ queue_free_entries(seg->old_objects_popped);
+
+ spinlock_release(pseg->active_queues_lock);
+ }
+ free(queue);
+}
+
+static inline void queue_lock_acquire(void)
+{
+ int num = STM_SEGMENT->segment_num;
+ spinlock_acquire(get_priv_segment(num)->active_queues_lock);
+}
+static inline void queue_lock_release(void)
+{
+ int num = STM_SEGMENT->segment_num;
+ spinlock_release(get_priv_segment(num)->active_queues_lock);
+}
+
+static void queue_activate(stm_queue_t *queue)
+{
+ stm_queue_segment_t *seg = &queue->segs[STM_SEGMENT->segment_num - 1];
+
+ if (!seg->active) {
+ queue_lock_acquire();
+ if (STM_PSEGMENT->active_queues == NULL)
+ STM_PSEGMENT->active_queues = tree_create();
+ tree_insert(STM_PSEGMENT->active_queues, (uintptr_t)queue, 0);
+ assert(!seg->active);
+ seg->active = true;
+ dprintf(("activated queue %p\n", queue));
+ queue_lock_release();
+ }
+}
+
+static void queues_deactivate_all(bool at_commit)
+{
+ queue_lock_acquire();
+
+ bool added_any_old_entries = false;
+ wlog_t *item;
+ TREE_LOOP_FORWARD(STM_PSEGMENT->active_queues, item) {
+ stm_queue_t *queue = (stm_queue_t *)item->addr;
+ stm_queue_segment_t *seg = &queue->segs[STM_SEGMENT->segment_num - 1];
+ queue_entry_t *head, *freehead;
+
+ if (at_commit) {
+ head = seg->added_in_this_transaction;
+ freehead = seg->old_objects_popped;
+ }
+ else {
+ head = seg->old_objects_popped;
+ freehead = seg->added_in_this_transaction;
+ }
+
+ /* forget the two lists of entries */
+ seg->added_in_this_transaction = NULL;
+ seg->added_young_limit = NULL;
+ seg->old_objects_popped = NULL;
+
+ /* free the list of entries that must disappear */
+ queue_free_entries(freehead);
+
+ /* move the list of entries that must survive to 'old_entries' */
+ if (head != NULL) {
+ queue_entry_t *old;
+ queue_entry_t *tail = head;
+ while (tail->next != NULL)
+ tail = tail->next;
+ dprintf(("items move to old_entries in queue %p\n", queue));
+ retry:
+ old = queue->old_entries;
+ tail->next = old;
+ if (!__sync_bool_compare_and_swap(&queue->old_entries, old, head))
+ goto retry;
+ added_any_old_entries = true;
+ }
+
+ /* deactivate this queue */
+ assert(seg->active);
+ seg->active = false;
+ dprintf(("deactivated queue %p\n", queue));
+
+ } TREE_LOOP_END;
+
+ tree_free(STM_PSEGMENT->active_queues);
+ STM_PSEGMENT->active_queues = NULL;
+
+ queue_lock_release();
+
+ if (added_any_old_entries) {
+ assert(_has_mutex());
+ cond_broadcast(C_QUEUE_OLD_ENTRIES);
+ }
+}
+
+void stm_queue_put(object_t *qobj, stm_queue_t *queue, object_t *newitem)
+{
+ /* must be run in a transaction, but doesn't cause conflicts or
+ delays or transaction breaks. you need to push roots!
+ */
+ stm_queue_segment_t *seg = &queue->segs[STM_SEGMENT->segment_num - 1];
+ queue_entry_t *entry = malloc(sizeof(queue_entry_t));
+ assert(entry);
+ entry->object = newitem;
+ entry->next = seg->added_in_this_transaction;
+ seg->added_in_this_transaction = entry;
+
+ queue_activate(queue);
+
+ /* add qobj to 'objects_pointing_to_nursery' if it has the
+ WRITE_BARRIER flag */
+ if (qobj->stm_flags & GCFLAG_WRITE_BARRIER) {
+ qobj->stm_flags &= ~GCFLAG_WRITE_BARRIER;
+ LIST_APPEND(STM_PSEGMENT->objects_pointing_to_nursery, qobj);
+ }
+}
+
+object_t *stm_queue_get(object_t *qobj, stm_queue_t *queue, double timeout,
+ stm_thread_local_t *tl)
+{
+ /* if the queue is empty, this commits and waits outside a transaction.
+ must not be called if the transaction is atomic! never causes
+ conflicts. you need to push roots!
+ */
+ struct timespec t;
+ bool t_ready = false;
+ queue_entry_t *entry;
+ object_t *result;
+ stm_queue_segment_t *seg = &queue->segs[STM_SEGMENT->segment_num - 1];
+
+ if (seg->added_in_this_transaction) {
+ entry = seg->added_in_this_transaction;
+ seg->added_in_this_transaction = entry->next;
+ if (entry == seg->added_young_limit)
+ seg->added_young_limit = entry->next;
+ result = entry->object;
+ assert(result != NULL);
+ free(entry);
+ return result;
+ }
+
+ retry:
+ entry = queue->old_entries;
+ if (entry != NULL) {
+ if (!__sync_bool_compare_and_swap(&queue->old_entries,
+ entry, entry->next))
+ goto retry;
+
+ /* successfully popped the old 'entry'. It remains in the
+ 'old_objects_popped' list for now. */
+ entry->next = seg->old_objects_popped;
+ seg->old_objects_popped = entry;
+
+ queue_activate(queue);
+ assert(entry->object != NULL);
+ return entry->object;
+ }
+ else {
+ /* no pending entry, wait */
+#if STM_TESTS
+ assert(timeout == 0.0); /* can't wait in the basic tests */
+#endif
+ if (timeout == 0.0) {
+ if (!stm_is_inevitable(tl)) {
+ stm_become_inevitable(tl, "stm_queue_get");
+ goto retry;
+ }
+ else
+ return NULL;
+ }
+
+ STM_PUSH_ROOT(*tl, qobj);
+ _stm_commit_transaction();
+
+ s_mutex_lock();
+ while (queue->old_entries == NULL) {
+ if (timeout < 0.0) { /* no timeout */
+ cond_wait(C_QUEUE_OLD_ENTRIES);
+ }
+ else {
+ if (!t_ready) {
+ timespec_delay(&t, timeout);
+ t_ready = true;
+ }
+ if (!cond_wait_timespec(C_QUEUE_OLD_ENTRIES, &t)) {
+ timeout = 0.0; /* timed out! */
+ break;
+ }
+ }
+ }
+ s_mutex_unlock();
+
+ _stm_start_transaction(tl);
+ STM_POP_ROOT(*tl, qobj); /* 'queue' should stay alive until here */
+ goto retry;
+ }
+}
+
+static void queue_trace_list(queue_entry_t *entry, void trace(object_t **),
+ queue_entry_t *stop_at)
+{
+ while (entry != stop_at) {
+ trace(&entry->object);
+ entry = entry->next;
+ }
+}
+
+void stm_queue_tracefn(stm_queue_t *queue, void trace(object_t **))
+{
+ if (trace == TRACE_FOR_MAJOR_COLLECTION) {
+ long i;
+ for (i = 0; i < STM_NB_SEGMENTS; i++) {
+ stm_queue_segment_t *seg = &queue->segs[i];
+ seg->added_young_limit = seg->added_in_this_transaction;
+ queue_trace_list(seg->added_in_this_transaction, trace, NULL);
+ queue_trace_list(seg->old_objects_popped, trace, NULL);
+ }
+ queue_trace_list(queue->old_entries, trace, NULL);
+ }
+ else {
+ /* for minor collections: it is enough to trace the objects
+ added in the current transaction. All other objects are
+ old (or, worse, belong to a parallel thread and must not
+ be traced). */
+ stm_queue_segment_t *seg = &queue->segs[STM_SEGMENT->segment_num - 1];
+ queue_trace_list(seg->added_in_this_transaction, trace,
+ seg->added_young_limit);
+ seg->added_young_limit = seg->added_in_this_transaction;
+ }
+}
diff --git a/rpython/translator/stm/src_stm/stm/queue.h
b/rpython/translator/stm/src_stm/stm/queue.h
new file mode 100644
--- /dev/null
+++ b/rpython/translator/stm/src_stm/stm/queue.h
@@ -0,0 +1,2 @@
+/* Imported by rpython/translator/stm/import_stmgc.py */
+static void queues_deactivate_all(bool at_commit);
diff --git a/rpython/translator/stm/src_stm/stm/setup.c
b/rpython/translator/stm/src_stm/stm/setup.c
--- a/rpython/translator/stm/src_stm/stm/setup.c
+++ b/rpython/translator/stm/src_stm/stm/setup.c
@@ -167,6 +167,7 @@
tree_free(pr->callbacks_on_commit_and_abort[1]);
list_free(pr->young_objects_with_light_finalizers);
list_free(pr->old_objects_with_light_finalizers);
+ if (pr->active_queues) tree_free(pr->active_queues);
}
munmap(stm_object_pages, TOTAL_MEMORY);
diff --git a/rpython/translator/stm/src_stm/stm/smallmalloc.c
b/rpython/translator/stm/src_stm/stm/smallmalloc.c
--- a/rpython/translator/stm/src_stm/stm/smallmalloc.c
+++ b/rpython/translator/stm/src_stm/stm/smallmalloc.c
@@ -44,7 +44,7 @@
memset(full_pages_object_size, 0, sizeof(full_pages_object_size));
}
-static int gmfp_lock = 0;
+static uint8_t gmfp_lock = 0;
static void grab_more_free_pages_for_small_allocations(void)
{
diff --git a/rpython/translator/stm/src_stm/stm/sync.c
b/rpython/translator/stm/src_stm/stm/sync.c
--- a/rpython/translator/stm/src_stm/stm/sync.c
+++ b/rpython/translator/stm/src_stm/stm/sync.c
@@ -125,7 +125,7 @@
t->tv_nsec = nsec;
}
-static inline bool cond_wait_timeout(enum cond_type_e ctype, double delay)
+static bool cond_wait_timespec(enum cond_type_e ctype, struct timespec *pt)
{
#ifdef STM_NO_COND_WAIT
stm_fatalerror("*** cond_wait/%d called!", (int)ctype);
@@ -133,11 +133,8 @@
assert(_has_mutex_here);
- struct timespec t;
- timespec_delay(&t, delay);
-
int err = pthread_cond_timedwait(&sync_ctl.cond[ctype],
- &sync_ctl.global_mutex, &t);
+ &sync_ctl.global_mutex, pt);
if (err == 0)
return true; /* success */
if (LIKELY(err == ETIMEDOUT))
@@ -145,6 +142,13 @@
stm_fatalerror("pthread_cond_timedwait/%d: %d", (int)ctype, err);
}
+static bool cond_wait_timeout(enum cond_type_e ctype, double delay)
+{
+ struct timespec t;
+ timespec_delay(&t, delay);
+ return cond_wait_timespec(ctype, &t);
+}
+
static inline void cond_signal(enum cond_type_e ctype)
{
int err = pthread_cond_signal(&sync_ctl.cond[ctype]);
diff --git a/rpython/translator/stm/src_stm/stm/sync.h
b/rpython/translator/stm/src_stm/stm/sync.h
--- a/rpython/translator/stm/src_stm/stm/sync.h
+++ b/rpython/translator/stm/src_stm/stm/sync.h
@@ -6,6 +6,7 @@
C_REQUEST_REMOVED,
C_SEGMENT_FREE,
C_SEGMENT_FREE_OR_SAFE_POINT,
+ C_QUEUE_OLD_ENTRIES,
_C_TOTAL
};
diff --git a/rpython/translator/stm/src_stm/stmgc.c
b/rpython/translator/stm/src_stm/stmgc.c
--- a/rpython/translator/stm/src_stm/stmgc.c
+++ b/rpython/translator/stm/src_stm/stmgc.c
@@ -20,6 +20,7 @@
#include "stm/finalizer.h"
#include "stm/locks.h"
#include "stm/detach.h"
+#include "stm/queue.h"
#include "stm/misc.c"
#include "stm/list.c"
#include "stm/smallmalloc.c"
@@ -42,4 +43,5 @@
#include "stm/rewind_setjmp.c"
#include "stm/finalizer.c"
#include "stm/hashtable.c"
+#include "stm/queue.c"
#include "stm/detach.c"
diff --git a/rpython/translator/stm/src_stm/stmgc.h
b/rpython/translator/stm/src_stm/stmgc.h
--- a/rpython/translator/stm/src_stm/stmgc.h
+++ b/rpython/translator/stm/src_stm/stmgc.h
@@ -729,6 +729,27 @@
object_t *object;
};
+
+/* Queues. The items you put() and get() back are in random order.
+ Like hashtables, the type 'stm_queue_t' is not an object type at
+ all; you need to allocate and free it explicitly. If you want to
+ embed the queue inside an 'object_t' you probably need a light
+ finalizer to do the freeing. */
+typedef struct stm_queue_s stm_queue_t;
+
+stm_queue_t *stm_queue_create(void);
+void stm_queue_free(stm_queue_t *);
+/* put() does not cause delays or transaction breaks */
+void stm_queue_put(object_t *qobj, stm_queue_t *queue, object_t *newitem);
+/* get() can commit and wait outside a transaction (so push roots).
+ Unsuitable if the current transaction is atomic! With timeout < 0.0,
+ waits forever; with timeout >= 0.0, returns NULL in an *inevitable*
+ transaction (this is needed to ensure correctness). */
+object_t *stm_queue_get(object_t *qobj, stm_queue_t *queue, double timeout,
+ stm_thread_local_t *tl);
+void stm_queue_tracefn(stm_queue_t *queue, void trace(object_t **));
+
+
/* ==================== END ==================== */
static void (*stmcb_expand_marker)(char *segment_base, uintptr_t odd_number,
diff --git a/rpython/translator/stm/test/test_ztranslated.py
b/rpython/translator/stm/test/test_ztranslated.py
--- a/rpython/translator/stm/test/test_ztranslated.py
+++ b/rpython/translator/stm/test/test_ztranslated.py
@@ -608,6 +608,46 @@
data = cbuilder.cmdexec('')
assert 'ok!\n' in data
+ def test_queue(self):
+ class X(object):
+ pass
+
+ def main(argv):
+ q = rstm.create_queue()
+ p = q.get(0.0)
+ assert p == lltype.nullptr(llmemory.GCREF.TO)
+ p = q.get(0.001)
+ assert p == lltype.nullptr(llmemory.GCREF.TO)
+ #
+ x1 = X()
+ p1 = cast_instance_to_gcref(x1)
+ q.put(p1)
+ #
+ p2 = q.get()
+ x2 = cast_gcref_to_instance(X, p2)
+ assert x2 is x1
+ #
+ q.put(p1)
+ rgc.collect()
+ #
+ p2 = q.get()
+ x2 = cast_gcref_to_instance(X, p2)
+ assert x2 is x1
+ #
+ print "ok!"
+ return 0
+
+ res = main([]) # direct run
+ assert res == 0
+
+ t, cbuilder = self.compile(main)
+ data = cbuilder.cmdexec('')
+ assert 'ok!\n' in data
+
+ t, cbuilder = self.compile(main, backendopt=True)
+ data = cbuilder.cmdexec('')
+ assert 'ok!\n' in data
+
def test_allocate_preexisting(self):
py.test.skip("kill me or re-add me")
S = lltype.GcStruct('S', ('n', lltype.Signed))
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit