Author: Armin Rigo <[email protected]>
Branch: stmgc-c8
Changeset: r78163:91a280e13a9d
Date: 2015-06-18 09:48 +0200
http://bitbucket.org/pypy/pypy/changeset/91a280e13a9d/

Log:    import stmgc branch 'queue', and add rlib support for stm queues

diff --git a/rpython/memory/gctransform/stmframework.py 
b/rpython/memory/gctransform/stmframework.py
--- a/rpython/memory/gctransform/stmframework.py
+++ b/rpython/memory/gctransform/stmframework.py
@@ -224,6 +224,7 @@
     gct_stm_stop_all_other_threads                  = _gct_with_roots_pushed
     gct_stm_transaction_break                       = _gct_with_roots_pushed
     gct_stm_collect                                 = _gct_with_roots_pushed
+    gct_stm_queue_get                               = _gct_with_roots_pushed
 
 
 class StmRootWalker(BaseRootWalker):
diff --git a/rpython/rlib/rstm.py b/rpython/rlib/rstm.py
--- a/rpython/rlib/rstm.py
+++ b/rpython/rlib/rstm.py
@@ -356,3 +356,70 @@
                         " use h.writeobj() instead")
 
     object = property(_getobj, _setobj)
+
+# ____________________________________________________________
+
+_STM_QUEUE_P = rffi.COpaquePtr('stm_queue_t')
+
+@dont_look_inside
+def _ll_queue_get(q, timeout=-1.0):
+    # Returns a GCREF.
+    return llop.stm_queue_get(llmemory.GCREF, q, q.ll_raw_queue, timeout)
+
+@dont_look_inside
+def _ll_queue_put(q, newitem):
+    llop.stm_queue_put(lltype.Void, q, q.ll_raw_queue, newitem)
+
+_QUEUE_OBJ = lltype.GcStruct('QUEUE_OBJ',
+                             ('ll_raw_queue', _STM_QUEUE_P),
+                             hints={'immutable': True},
+                             rtti=True,
+                             adtmeths={'get': _ll_queue_get,
+                                       'put': _ll_queue_put})
+NULL_QUEUE = lltype.nullptr(_QUEUE_OBJ)
+
+def _ll_queue_trace(gc, obj, callback, arg):
+    from rpython.memory.gctransform.stmframework import get_visit_function
+    visit_fn = get_visit_function(callback, arg)
+    addr = obj + llmemory.offsetof(_QUEUE_OBJ, 'll_raw_queue')
+    llop.stm_queue_tracefn(lltype.Void, addr.address[0], visit_fn)
+lambda_queue_trace = lambda: _ll_queue_trace
+
+def _ll_queue_finalizer(q):
+    if q.ll_raw_queue:
+        llop.stm_queue_free(lltype.Void, q.ll_raw_queue)
+lambda_queue_finlz = lambda: _ll_queue_finalizer
+
+@dont_look_inside
+def create_queue():
+    if not we_are_translated():
+        return QueueForTest()      # for tests
+    rgc.register_custom_light_finalizer(_QUEUE_OBJ, lambda_queue_finlz)
+    rgc.register_custom_trace_hook(_QUEUE_OBJ, lambda_queue_trace)
+    q = lltype.malloc(_QUEUE_OBJ, zero=True)
+    q.ll_raw_queue = llop.stm_queue_create(_STM_QUEUE_P)
+    return q
+
+class QueueForTest(object):
+    def __init__(self):
+        import Queue
+        self._content = Queue.Queue()
+        self._Empty = Queue.Empty
+
+    def _cleanup_(self):
+        raise Exception("cannot translate a prebuilt rstm.Queue object")
+
+    def get(self, timeout=-1.0):
+        if timeout < 0.0:
+            return self._content.get()
+        try:
+            if timeout == 0.0:
+                return self._content.get(block=False)
+            else:
+                return self._content.get(timeout=timeout)
+        except self._Empty:
+            return NULL_GCREF
+
+    def put(self, newitem):
+        assert lltype.typeOf(newitem) == llmemory.GCREF
+        self._content.put(newitem)
diff --git a/rpython/rtyper/llinterp.py b/rpython/rtyper/llinterp.py
--- a/rpython/rtyper/llinterp.py
+++ b/rpython/rtyper/llinterp.py
@@ -1002,6 +1002,11 @@
     op_stm_hashtable_length_upper_bound = _stm_not_implemented
     op_stm_hashtable_list = _stm_not_implemented
     op_stm_hashtable_free = _stm_not_implemented
+    op_stm_queue_create = _stm_not_implemented
+    op_stm_queue_free = _stm_not_implemented
+    op_stm_queue_get = _stm_not_implemented
+    op_stm_queue_put = _stm_not_implemented
+    op_stm_queue_tracefn = _stm_not_implemented
     op_stm_register_thread_local = _stm_not_implemented
     op_stm_unregister_thread_local = _stm_not_implemented
     op_stm_really_force_cast_ptr = _stm_not_implemented
diff --git a/rpython/rtyper/lltypesystem/lloperation.py 
b/rpython/rtyper/lltypesystem/lloperation.py
--- a/rpython/rtyper/lltypesystem/lloperation.py
+++ b/rpython/rtyper/lltypesystem/lloperation.py
@@ -479,6 +479,12 @@
     'stm_hashtable_list'  :   LLOp(),
     'stm_hashtable_tracefn':  LLOp(),
 
+    'stm_queue_create':       LLOp(),
+    'stm_queue_free':         LLOp(),
+    'stm_queue_get':          LLOp(canmallocgc=True),   # push roots!
+    'stm_queue_put':          LLOp(),
+    'stm_queue_tracefn':      LLOp(),
+
     # __________ address operations __________
 
     'boehm_malloc':         LLOp(),
diff --git a/rpython/translator/backendopt/finalizer.py 
b/rpython/translator/backendopt/finalizer.py
--- a/rpython/translator/backendopt/finalizer.py
+++ b/rpython/translator/backendopt/finalizer.py
@@ -18,7 +18,8 @@
     """
     ok_operations = ['ptr_nonzero', 'ptr_eq', 'ptr_ne', 'free', 'same_as',
                      'direct_ptradd', 'force_cast', 'track_alloc_stop',
-                     'raw_free', 'debug_print', 'stm_hashtable_free']
+                     'raw_free', 'debug_print', 'stm_hashtable_free',
+                     'stm_queue_free']
 
     def analyze_light_finalizer(self, graph):
         result = self.analyze_direct_call(graph)
diff --git a/rpython/translator/c/src/mem.c b/rpython/translator/c/src/mem.c
--- a/rpython/translator/c/src/mem.c
+++ b/rpython/translator/c/src/mem.c
@@ -62,7 +62,7 @@
 
 #ifdef RPY_STM
 // spinlock_acquire/spinlock_release defined in ../../stm/src_stm/stmgcintf.h
-static Signed pypy_debug_alloc_lock = 0;
+static uint8_t pypy_debug_alloc_lock = 0;
 #else
 # define spinlock_acquire(lock)               /* nothing */
 # define spinlock_release(lock)               /* nothing */
diff --git a/rpython/translator/stm/breakfinder.py 
b/rpython/translator/stm/breakfinder.py
--- a/rpython/translator/stm/breakfinder.py
+++ b/rpython/translator/stm/breakfinder.py
@@ -9,6 +9,7 @@
     'stm_enter_callback_call',
     'stm_leave_callback_call',
     'stm_transaction_break',
+    'stm_queue_get',
     ])
 
 for tb in TRANSACTION_BREAK:
diff --git a/rpython/translator/stm/funcgen.py 
b/rpython/translator/stm/funcgen.py
--- a/rpython/translator/stm/funcgen.py
+++ b/rpython/translator/stm/funcgen.py
@@ -368,3 +368,32 @@
     arg2 = funcgen.expr(op.args[2])
     return ('stm_hashtable_tracefn(%s, (stm_hashtable_t *)%s, '
             ' (void(*)(object_t**))%s);' % (arg0, arg1, arg2))
+
+def stm_queue_create(funcgen, op):
+    result = funcgen.expr(op.result)
+    return '%s = stm_queue_create();' % (result,)
+
+def stm_queue_free(funcgen, op):
+    arg = funcgen.expr(op.args[0])
+    return 'stm_queue_free(%s);' % (arg,)
+
+def stm_queue_get(funcgen, op):
+    arg0 = funcgen.expr(op.args[0])
+    arg1 = funcgen.expr(op.args[1])
+    arg2 = funcgen.expr(op.args[2])
+    result = funcgen.expr(op.result)
+    return ('%s = (rpygcchar_t *)stm_queue_get((object_t *)%s, %s, %s, '
+            '&stm_thread_local);' % (result, arg0, arg1, arg2))
+
+def stm_queue_put(funcgen, op):
+    arg0 = funcgen.expr(op.args[0])
+    arg1 = funcgen.expr(op.args[1])
+    arg2 = funcgen.expr(op.args[2])
+    return 'stm_queue_put((object_t *)%s, %s, (object_t *)%s);' % (
+        arg0, arg1, arg2)
+
+def stm_queue_tracefn(funcgen, op):
+    arg0 = funcgen.expr(op.args[0])
+    arg1 = funcgen.expr(op.args[1])
+    return ('stm_queue_tracefn((stm_queue_t *)%s, '
+            ' (void(*)(object_t**))%s);' % (arg0, arg1))
diff --git a/rpython/translator/stm/src_stm/revision 
b/rpython/translator/stm/src_stm/revision
--- a/rpython/translator/stm/src_stm/revision
+++ b/rpython/translator/stm/src_stm/revision
@@ -1,1 +1,1 @@
-14536c2a2af4
+3ca830828468
diff --git a/rpython/translator/stm/src_stm/stm/atomic.h 
b/rpython/translator/stm/src_stm/stm/atomic.h
--- a/rpython/translator/stm/src_stm/stm/atomic.h
+++ b/rpython/translator/stm/src_stm/stm/atomic.h
@@ -42,12 +42,19 @@
 #endif
 
 
-#define spinlock_acquire(lock)                                          \
-    do { if (LIKELY(__sync_lock_test_and_set(&(lock), 1) == 0)) break;  \
-         spin_loop(); } while (1)
-#define spinlock_release(lock)                                          \
-    do { assert((lock) == 1);                                           \
-         __sync_lock_release(&(lock)); } while (0)
+static inline void _spinlock_acquire(uint8_t *plock) {
+ retry:
+    if (__builtin_expect(__sync_lock_test_and_set(plock, 1) != 0, 0)) {
+        spin_loop();
+        goto retry;
+    }
+}
+static inline void _spinlock_release(uint8_t *plock) {
+    assert(*plock == 1);
+    __sync_lock_release(plock);
+}
+#define spinlock_acquire(lock) _spinlock_acquire(&(lock))
+#define spinlock_release(lock) _spinlock_release(&(lock))
 
 
 #endif  /* _STM_ATOMIC_H */
diff --git a/rpython/translator/stm/src_stm/stm/core.c 
b/rpython/translator/stm/src_stm/stm/core.c
--- a/rpython/translator/stm/src_stm/stm/core.c
+++ b/rpython/translator/stm/src_stm/stm/core.c
@@ -1146,6 +1146,7 @@
     assert(tree_is_cleared(STM_PSEGMENT->callbacks_on_commit_and_abort[1]));
     assert(list_is_empty(STM_PSEGMENT->young_objects_with_light_finalizers));
     assert(STM_PSEGMENT->finalizers == NULL);
+    assert(STM_PSEGMENT->active_queues == NULL);
 #ifndef NDEBUG
     /* this should not be used when objects_pointing_to_nursery == NULL */
     STM_PSEGMENT->position_markers_len_old = 99999999999999999L;
@@ -1351,6 +1352,9 @@
         STM_PSEGMENT->overflow_number_has_been_used = false;
     }
 
+    if (STM_PSEGMENT->active_queues)
+        queues_deactivate_all(/*at_commit=*/true);
+
     invoke_and_clear_user_callbacks(0);   /* for commit */
 
     /* done */
@@ -1505,6 +1509,9 @@
     if (tl->mem_clear_on_abort)
         memset(tl->mem_clear_on_abort, 0, tl->mem_bytes_to_clear_on_abort);
 
+    if (STM_PSEGMENT->active_queues)
+        queues_deactivate_all(/*at_commit=*/false);
+
     invoke_and_clear_user_callbacks(1);   /* for abort */
 
     if (is_abort(STM_SEGMENT->nursery_end)) {
diff --git a/rpython/translator/stm/src_stm/stm/core.h 
b/rpython/translator/stm/src_stm/stm/core.h
--- a/rpython/translator/stm/src_stm/stm/core.h
+++ b/rpython/translator/stm/src_stm/stm/core.h
@@ -117,13 +117,15 @@
     bool minor_collect_will_commit_now;
 
     struct tree_s *callbacks_on_commit_and_abort[2];
+    struct tree_s *active_queues;
+    uint8_t active_queues_lock;
 
     /* This is the number stored in the overflowed objects (a multiple of
        GCFLAG_OVERFLOW_NUMBER_bit0).  It is incremented when the
        transaction is done, but only if we actually overflowed any
        object; otherwise, no object has got this number. */
+    bool overflow_number_has_been_used;
     uint32_t overflow_number;
-    bool overflow_number_has_been_used;
 
 
     struct stm_commit_log_entry_s *last_commit_log_entry;
diff --git a/rpython/translator/stm/src_stm/stm/gcpage.c 
b/rpython/translator/stm/src_stm/stm/gcpage.c
--- a/rpython/translator/stm/src_stm/stm/gcpage.c
+++ b/rpython/translator/stm/src_stm/stm/gcpage.c
@@ -48,7 +48,7 @@
 }
 
 
-static int lock_growth_large = 0;
+static uint8_t lock_growth_large = 0;
 
 static stm_char *allocate_outside_nursery_large(uint64_t size)
 {
diff --git a/rpython/translator/stm/src_stm/stm/largemalloc.c 
b/rpython/translator/stm/src_stm/stm/largemalloc.c
--- a/rpython/translator/stm/src_stm/stm/largemalloc.c
+++ b/rpython/translator/stm/src_stm/stm/largemalloc.c
@@ -108,7 +108,7 @@
 
 
 static struct {
-    int lock;
+    uint8_t lock;
     mchunk_t *first_chunk, *last_chunk;
     dlist_t largebins[N_BINS];
 } lm __attribute__((aligned(64)));
diff --git a/rpython/translator/stm/src_stm/stm/queue.c 
b/rpython/translator/stm/src_stm/stm/queue.c
new file mode 100644
--- /dev/null
+++ b/rpython/translator/stm/src_stm/stm/queue.c
@@ -0,0 +1,319 @@
+/* Imported by rpython/translator/stm/import_stmgc.py */
+typedef struct queue_entry_s {
+    object_t *object;
+    struct queue_entry_s *next;
+} queue_entry_t;
+
+typedef union stm_queue_segment_u {
+    struct {
+        /* a chained list of fresh entries that have been allocated and
+           added to this queue during the current transaction.  If the
+           transaction commits, these are moved to 'old_entries'. */
+        queue_entry_t *added_in_this_transaction;
+
+        /* a point inside the chained list above such that all items from
+           this point are known to contain non-young objects, for GC */
+        queue_entry_t *added_young_limit;
+
+        /* a chained list of old entries that the current transaction
+           popped.  only used if the transaction is not inevitable:
+           if it aborts, these entries are added back to 'old_entries'. */
+        queue_entry_t *old_objects_popped;
+
+        /* a queue is active when either of the two chained lists
+           above is not empty, until the transaction commits.  (this
+           notion is per segment.)  this flag says that the queue is
+           already in the tree STM_PSEGMENT->active_queues. */
+        bool active;
+    };
+    char pad[64];
+} stm_queue_segment_t;
+
+
+struct stm_queue_s {
+    /* this structure is always allocated on a multiple of 64 bytes,
+       and the 'segs' is an array of items 64 bytes each */
+    stm_queue_segment_t segs[STM_NB_SEGMENTS];
+
+    /* a chained list of old entries in the queue */
+    queue_entry_t *volatile old_entries;
+};
+
+
+stm_queue_t *stm_queue_create(void)
+{
+    void *mem;
+    int result = posix_memalign(&mem, 64, sizeof(stm_queue_t));
+    assert(result == 0);
+    (void)result;
+    memset(mem, 0, sizeof(stm_queue_t));
+    return (stm_queue_t *)mem;
+}
+
+static void queue_free_entries(queue_entry_t *lst)
+{
+    while (lst != NULL) {
+        queue_entry_t *next = lst->next;
+        free(lst);
+        lst = next;
+    }
+}
+
+void stm_queue_free(stm_queue_t *queue)
+{
+    long i;
+    dprintf(("free queue %p\n", queue));
+    for (i = 0; i < STM_NB_SEGMENTS; i++) {
+        stm_queue_segment_t *seg = &queue->segs[i];
+
+        /* it is possible that queues_deactivate_all() runs in parallel,
+           but it should not be possible at this point for another thread
+           to change 'active' from false to true.  if it is false, then
+           that's it */
+        if (!seg->active) {
+            assert(!seg->added_in_this_transaction);
+            assert(!seg->added_young_limit);
+            assert(!seg->old_objects_popped);
+            continue;
+        }
+
+        struct stm_priv_segment_info_s *pseg = get_priv_segment(i + 1);
+        spinlock_acquire(pseg->active_queues_lock);
+
+        if (seg->active) {
+            assert(pseg->active_queues != NULL);
+            bool ok = tree_delete_item(pseg->active_queues, (uintptr_t)queue);
+            assert(ok);
+            (void)ok;
+        }
+        queue_free_entries(seg->added_in_this_transaction);
+        queue_free_entries(seg->old_objects_popped);
+
+        spinlock_release(pseg->active_queues_lock);
+    }
+    free(queue);
+}
+
+static inline void queue_lock_acquire(void)
+{
+    int num = STM_SEGMENT->segment_num;
+    spinlock_acquire(get_priv_segment(num)->active_queues_lock);
+}
+static inline void queue_lock_release(void)
+{
+    int num = STM_SEGMENT->segment_num;
+    spinlock_release(get_priv_segment(num)->active_queues_lock);
+}
+
+static void queue_activate(stm_queue_t *queue)
+{
+    stm_queue_segment_t *seg = &queue->segs[STM_SEGMENT->segment_num - 1];
+
+    if (!seg->active) {
+        queue_lock_acquire();
+        if (STM_PSEGMENT->active_queues == NULL)
+            STM_PSEGMENT->active_queues = tree_create();
+        tree_insert(STM_PSEGMENT->active_queues, (uintptr_t)queue, 0);
+        assert(!seg->active);
+        seg->active = true;
+        dprintf(("activated queue %p\n", queue));
+        queue_lock_release();
+    }
+}
+
+static void queues_deactivate_all(bool at_commit)
+{
+    queue_lock_acquire();
+
+    bool added_any_old_entries = false;
+    wlog_t *item;
+    TREE_LOOP_FORWARD(STM_PSEGMENT->active_queues, item) {
+        stm_queue_t *queue = (stm_queue_t *)item->addr;
+        stm_queue_segment_t *seg = &queue->segs[STM_SEGMENT->segment_num - 1];
+        queue_entry_t *head, *freehead;
+
+        if (at_commit) {
+            head = seg->added_in_this_transaction;
+            freehead = seg->old_objects_popped;
+        }
+        else {
+            head = seg->old_objects_popped;
+            freehead = seg->added_in_this_transaction;
+        }
+
+        /* forget the two lists of entries */
+        seg->added_in_this_transaction = NULL;
+        seg->added_young_limit = NULL;
+        seg->old_objects_popped = NULL;
+
+        /* free the list of entries that must disappear */
+        queue_free_entries(freehead);
+
+        /* move the list of entries that must survive to 'old_entries' */
+        if (head != NULL) {
+            queue_entry_t *old;
+            queue_entry_t *tail = head;
+            while (tail->next != NULL)
+                tail = tail->next;
+            dprintf(("items move to old_entries in queue %p\n", queue));
+         retry:
+            old = queue->old_entries;
+            tail->next = old;
+            if (!__sync_bool_compare_and_swap(&queue->old_entries, old, head))
+                goto retry;
+            added_any_old_entries = true;
+        }
+
+        /* deactivate this queue */
+        assert(seg->active);
+        seg->active = false;
+        dprintf(("deactivated queue %p\n", queue));
+
+    } TREE_LOOP_END;
+
+    tree_free(STM_PSEGMENT->active_queues);
+    STM_PSEGMENT->active_queues = NULL;
+
+    queue_lock_release();
+
+    if (added_any_old_entries) {
+        assert(_has_mutex());
+        cond_broadcast(C_QUEUE_OLD_ENTRIES);
+    }
+}
+
+void stm_queue_put(object_t *qobj, stm_queue_t *queue, object_t *newitem)
+{
+    /* must be run in a transaction, but doesn't cause conflicts or
+       delays or transaction breaks.  you need to push roots!
+    */
+    stm_queue_segment_t *seg = &queue->segs[STM_SEGMENT->segment_num - 1];
+    queue_entry_t *entry = malloc(sizeof(queue_entry_t));
+    assert(entry);
+    entry->object = newitem;
+    entry->next = seg->added_in_this_transaction;
+    seg->added_in_this_transaction = entry;
+
+    queue_activate(queue);
+
+    /* add qobj to 'objects_pointing_to_nursery' if it has the
+       WRITE_BARRIER flag */
+    if (qobj->stm_flags & GCFLAG_WRITE_BARRIER) {
+        qobj->stm_flags &= ~GCFLAG_WRITE_BARRIER;
+        LIST_APPEND(STM_PSEGMENT->objects_pointing_to_nursery, qobj);
+    }
+}
+
+object_t *stm_queue_get(object_t *qobj, stm_queue_t *queue, double timeout,
+                        stm_thread_local_t *tl)
+{
+    /* if the queue is empty, this commits and waits outside a transaction.
+       must not be called if the transaction is atomic!  never causes
+       conflicts.  you need to push roots!
+    */
+    struct timespec t;
+    bool t_ready = false;
+    queue_entry_t *entry;
+    object_t *result;
+    stm_queue_segment_t *seg = &queue->segs[STM_SEGMENT->segment_num - 1];
+
+    if (seg->added_in_this_transaction) {
+        entry = seg->added_in_this_transaction;
+        seg->added_in_this_transaction = entry->next;
+        if (entry == seg->added_young_limit)
+            seg->added_young_limit = entry->next;
+        result = entry->object;
+        assert(result != NULL);
+        free(entry);
+        return result;
+    }
+
+ retry:
+    entry = queue->old_entries;
+    if (entry != NULL) {
+        if (!__sync_bool_compare_and_swap(&queue->old_entries,
+                                          entry, entry->next))
+            goto retry;
+
+        /* successfully popped the old 'entry'.  It remains in the
+           'old_objects_popped' list for now. */
+        entry->next = seg->old_objects_popped;
+        seg->old_objects_popped = entry;
+
+        queue_activate(queue);
+        assert(entry->object != NULL);
+        return entry->object;
+    }
+    else {
+        /* no pending entry, wait */
+#if STM_TESTS
+        assert(timeout == 0.0);   /* can't wait in the basic tests */
+#endif
+        if (timeout == 0.0) {
+            if (!stm_is_inevitable(tl)) {
+                stm_become_inevitable(tl, "stm_queue_get");
+                goto retry;
+            }
+            else
+                return NULL;
+        }
+
+        STM_PUSH_ROOT(*tl, qobj);
+        _stm_commit_transaction();
+
+        s_mutex_lock();
+        while (queue->old_entries == NULL) {
+            if (timeout < 0.0) {      /* no timeout */
+                cond_wait(C_QUEUE_OLD_ENTRIES);
+            }
+            else {
+                if (!t_ready) {
+                    timespec_delay(&t, timeout);
+                    t_ready = true;
+                }
+                if (!cond_wait_timespec(C_QUEUE_OLD_ENTRIES, &t)) {
+                    timeout = 0.0;   /* timed out! */
+                    break;
+                }
+            }
+        }
+        s_mutex_unlock();
+
+        _stm_start_transaction(tl);
+        STM_POP_ROOT(*tl, qobj);   /* 'queue' should stay alive until here */
+        goto retry;
+    }
+}
+
+static void queue_trace_list(queue_entry_t *entry, void trace(object_t **),
+                             queue_entry_t *stop_at)
+{
+    while (entry != stop_at) {
+        trace(&entry->object);
+        entry = entry->next;
+    }
+}
+
+void stm_queue_tracefn(stm_queue_t *queue, void trace(object_t **))
+{
+    if (trace == TRACE_FOR_MAJOR_COLLECTION) {
+        long i;
+        for (i = 0; i < STM_NB_SEGMENTS; i++) {
+            stm_queue_segment_t *seg = &queue->segs[i];
+            seg->added_young_limit = seg->added_in_this_transaction;
+            queue_trace_list(seg->added_in_this_transaction, trace, NULL);
+            queue_trace_list(seg->old_objects_popped, trace, NULL);
+        }
+        queue_trace_list(queue->old_entries, trace, NULL);
+    }
+    else {
+        /* for minor collections: it is enough to trace the objects
+           added in the current transaction.  All other objects are
+           old (or, worse, belong to a parallel thread and must not
+           be traced). */
+        stm_queue_segment_t *seg = &queue->segs[STM_SEGMENT->segment_num - 1];
+        queue_trace_list(seg->added_in_this_transaction, trace,
+                         seg->added_young_limit);
+        seg->added_young_limit = seg->added_in_this_transaction;
+    }
+}
diff --git a/rpython/translator/stm/src_stm/stm/queue.h 
b/rpython/translator/stm/src_stm/stm/queue.h
new file mode 100644
--- /dev/null
+++ b/rpython/translator/stm/src_stm/stm/queue.h
@@ -0,0 +1,2 @@
+/* Imported by rpython/translator/stm/import_stmgc.py */
+static void queues_deactivate_all(bool at_commit);
diff --git a/rpython/translator/stm/src_stm/stm/setup.c 
b/rpython/translator/stm/src_stm/stm/setup.c
--- a/rpython/translator/stm/src_stm/stm/setup.c
+++ b/rpython/translator/stm/src_stm/stm/setup.c
@@ -167,6 +167,7 @@
         tree_free(pr->callbacks_on_commit_and_abort[1]);
         list_free(pr->young_objects_with_light_finalizers);
         list_free(pr->old_objects_with_light_finalizers);
+        if (pr->active_queues) tree_free(pr->active_queues);
     }
 
     munmap(stm_object_pages, TOTAL_MEMORY);
diff --git a/rpython/translator/stm/src_stm/stm/smallmalloc.c 
b/rpython/translator/stm/src_stm/stm/smallmalloc.c
--- a/rpython/translator/stm/src_stm/stm/smallmalloc.c
+++ b/rpython/translator/stm/src_stm/stm/smallmalloc.c
@@ -44,7 +44,7 @@
     memset(full_pages_object_size, 0, sizeof(full_pages_object_size));
 }
 
-static int gmfp_lock = 0;
+static uint8_t gmfp_lock = 0;
 
 static void grab_more_free_pages_for_small_allocations(void)
 {
diff --git a/rpython/translator/stm/src_stm/stm/sync.c 
b/rpython/translator/stm/src_stm/stm/sync.c
--- a/rpython/translator/stm/src_stm/stm/sync.c
+++ b/rpython/translator/stm/src_stm/stm/sync.c
@@ -125,7 +125,7 @@
     t->tv_nsec = nsec;
 }
 
-static inline bool cond_wait_timeout(enum cond_type_e ctype, double delay)
+static bool cond_wait_timespec(enum cond_type_e ctype, struct timespec *pt)
 {
 #ifdef STM_NO_COND_WAIT
     stm_fatalerror("*** cond_wait/%d called!", (int)ctype);
@@ -133,11 +133,8 @@
 
     assert(_has_mutex_here);
 
-    struct timespec t;
-    timespec_delay(&t, delay);
-
     int err = pthread_cond_timedwait(&sync_ctl.cond[ctype],
-                                     &sync_ctl.global_mutex, &t);
+                                     &sync_ctl.global_mutex, pt);
     if (err == 0)
         return true;     /* success */
     if (LIKELY(err == ETIMEDOUT))
@@ -145,6 +142,13 @@
     stm_fatalerror("pthread_cond_timedwait/%d: %d", (int)ctype, err);
 }
 
+static bool cond_wait_timeout(enum cond_type_e ctype, double delay)
+{
+    struct timespec t;
+    timespec_delay(&t, delay);
+    return cond_wait_timespec(ctype, &t);
+}
+
 static inline void cond_signal(enum cond_type_e ctype)
 {
     int err = pthread_cond_signal(&sync_ctl.cond[ctype]);
diff --git a/rpython/translator/stm/src_stm/stm/sync.h 
b/rpython/translator/stm/src_stm/stm/sync.h
--- a/rpython/translator/stm/src_stm/stm/sync.h
+++ b/rpython/translator/stm/src_stm/stm/sync.h
@@ -6,6 +6,7 @@
     C_REQUEST_REMOVED,
     C_SEGMENT_FREE,
     C_SEGMENT_FREE_OR_SAFE_POINT,
+    C_QUEUE_OLD_ENTRIES,
     _C_TOTAL
 };
 
diff --git a/rpython/translator/stm/src_stm/stmgc.c 
b/rpython/translator/stm/src_stm/stmgc.c
--- a/rpython/translator/stm/src_stm/stmgc.c
+++ b/rpython/translator/stm/src_stm/stmgc.c
@@ -20,6 +20,7 @@
 #include "stm/finalizer.h"
 #include "stm/locks.h"
 #include "stm/detach.h"
+#include "stm/queue.h"
 #include "stm/misc.c"
 #include "stm/list.c"
 #include "stm/smallmalloc.c"
@@ -42,4 +43,5 @@
 #include "stm/rewind_setjmp.c"
 #include "stm/finalizer.c"
 #include "stm/hashtable.c"
+#include "stm/queue.c"
 #include "stm/detach.c"
diff --git a/rpython/translator/stm/src_stm/stmgc.h 
b/rpython/translator/stm/src_stm/stmgc.h
--- a/rpython/translator/stm/src_stm/stmgc.h
+++ b/rpython/translator/stm/src_stm/stmgc.h
@@ -729,6 +729,27 @@
     object_t *object;
 };
 
+
+/* Queues.  The items you put() and get() back are in random order.
+   Like hashtables, the type 'stm_queue_t' is not an object type at
+   all; you need to allocate and free it explicitly.  If you want to
+   embed the queue inside an 'object_t' you probably need a light
+   finalizer to do the freeing. */
+typedef struct stm_queue_s stm_queue_t;
+
+stm_queue_t *stm_queue_create(void);
+void stm_queue_free(stm_queue_t *);
+/* put() does not cause delays or transaction breaks */
+void stm_queue_put(object_t *qobj, stm_queue_t *queue, object_t *newitem);
+/* get() can commit and wait outside a transaction (so push roots).
+   Unsuitable if the current transaction is atomic!  With timeout < 0.0,
+   waits forever; with timeout >= 0.0, returns NULL in an *inevitable*
+   transaction (this is needed to ensure correctness). */
+object_t *stm_queue_get(object_t *qobj, stm_queue_t *queue, double timeout,
+                        stm_thread_local_t *tl);
+void stm_queue_tracefn(stm_queue_t *queue, void trace(object_t **));
+
+
 /* ==================== END ==================== */
 
 static void (*stmcb_expand_marker)(char *segment_base, uintptr_t odd_number,
diff --git a/rpython/translator/stm/test/test_ztranslated.py 
b/rpython/translator/stm/test/test_ztranslated.py
--- a/rpython/translator/stm/test/test_ztranslated.py
+++ b/rpython/translator/stm/test/test_ztranslated.py
@@ -608,6 +608,46 @@
         data = cbuilder.cmdexec('')
         assert 'ok!\n' in data
 
+    def test_queue(self):
+        class X(object):
+            pass
+
+        def main(argv):
+            q = rstm.create_queue()
+            p = q.get(0.0)
+            assert p == lltype.nullptr(llmemory.GCREF.TO)
+            p = q.get(0.001)
+            assert p == lltype.nullptr(llmemory.GCREF.TO)
+            #
+            x1 = X()
+            p1 = cast_instance_to_gcref(x1)
+            q.put(p1)
+            #
+            p2 = q.get()
+            x2 = cast_gcref_to_instance(X, p2)
+            assert x2 is x1
+            #
+            q.put(p1)
+            rgc.collect()
+            #
+            p2 = q.get()
+            x2 = cast_gcref_to_instance(X, p2)
+            assert x2 is x1
+            #
+            print "ok!"
+            return 0
+
+        res = main([])      # direct run
+        assert res == 0
+
+        t, cbuilder = self.compile(main)
+        data = cbuilder.cmdexec('')
+        assert 'ok!\n' in data
+
+        t, cbuilder = self.compile(main, backendopt=True)
+        data = cbuilder.cmdexec('')
+        assert 'ok!\n' in data
+
     def test_allocate_preexisting(self):
         py.test.skip("kill me or re-add me")
         S = lltype.GcStruct('S', ('n', lltype.Signed))
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to