Author: Armin Rigo <[email protected]>
Branch: stmgc-c8
Changeset: r78177:f597af41b44b
Date: 2015-06-18 14:48 +0200
http://bitbucket.org/pypy/pypy/changeset/f597af41b44b/

Log:    add task_done() and join() to rstm's queues

diff --git a/rpython/memory/gctransform/stmframework.py 
b/rpython/memory/gctransform/stmframework.py
--- a/rpython/memory/gctransform/stmframework.py
+++ b/rpython/memory/gctransform/stmframework.py
@@ -225,6 +225,7 @@
     gct_stm_transaction_break                       = _gct_with_roots_pushed
     gct_stm_collect                                 = _gct_with_roots_pushed
     gct_stm_queue_get                               = _gct_with_roots_pushed
+    gct_stm_queue_join                              = _gct_with_roots_pushed
 
 
 class StmRootWalker(BaseRootWalker):
diff --git a/rpython/rlib/rstm.py b/rpython/rlib/rstm.py
--- a/rpython/rlib/rstm.py
+++ b/rpython/rlib/rstm.py
@@ -370,12 +370,22 @@
 def _ll_queue_put(q, newitem):
     llop.stm_queue_put(lltype.Void, q, q.ll_raw_queue, newitem)
 
+@dont_look_inside
+def _ll_queue_task_done(q):
+    llop.stm_queue_task_done(lltype.Void, q.ll_raw_queue)
+
+@dont_look_inside
+def _ll_queue_join(q):
+    return llop.stm_queue_join(lltype.Signed, q, q.ll_raw_queue)
+
 _QUEUE_OBJ = lltype.GcStruct('QUEUE_OBJ',
                              ('ll_raw_queue', _STM_QUEUE_P),
                              hints={'immutable': True},
                              rtti=True,
                              adtmeths={'get': _ll_queue_get,
-                                       'put': _ll_queue_put})
+                                       'put': _ll_queue_put,
+                                       'task_done': _ll_queue_task_done,
+                                       'join': _ll_queue_join})
 NULL_QUEUE = lltype.nullptr(_QUEUE_OBJ)
 
 def _ll_queue_trace(gc, obj, callback, arg):
@@ -423,3 +433,10 @@
     def put(self, newitem):
         assert lltype.typeOf(newitem) == llmemory.GCREF
         self._content.put(newitem)
+
+    def task_done(self):
+        self._content.task_done()
+
+    def join(self):
+        self._content.join()
+        return 0
diff --git a/rpython/rtyper/llinterp.py b/rpython/rtyper/llinterp.py
--- a/rpython/rtyper/llinterp.py
+++ b/rpython/rtyper/llinterp.py
@@ -1006,6 +1006,8 @@
     op_stm_queue_free = _stm_not_implemented
     op_stm_queue_get = _stm_not_implemented
     op_stm_queue_put = _stm_not_implemented
+    op_stm_queue_task_done = _stm_not_implemented
+    op_stm_queue_join = _stm_not_implemented
     op_stm_queue_tracefn = _stm_not_implemented
     op_stm_register_thread_local = _stm_not_implemented
     op_stm_unregister_thread_local = _stm_not_implemented
diff --git a/rpython/rtyper/lltypesystem/lloperation.py 
b/rpython/rtyper/lltypesystem/lloperation.py
--- a/rpython/rtyper/lltypesystem/lloperation.py
+++ b/rpython/rtyper/lltypesystem/lloperation.py
@@ -483,6 +483,8 @@
     'stm_queue_free':         LLOp(),
     'stm_queue_get':          LLOp(canmallocgc=True),   # push roots!
     'stm_queue_put':          LLOp(),
+    'stm_queue_task_done':    LLOp(),
+    'stm_queue_join':         LLOp(canmallocgc=True),   # push roots!
     'stm_queue_tracefn':      LLOp(),
 
     # __________ address operations __________
diff --git a/rpython/translator/stm/breakfinder.py 
b/rpython/translator/stm/breakfinder.py
--- a/rpython/translator/stm/breakfinder.py
+++ b/rpython/translator/stm/breakfinder.py
@@ -10,6 +10,7 @@
     'stm_leave_callback_call',
     'stm_transaction_break',
     'stm_queue_get',
+    'stm_queue_join',
     ])
 
 for tb in TRANSACTION_BREAK:
diff --git a/rpython/translator/stm/funcgen.py 
b/rpython/translator/stm/funcgen.py
--- a/rpython/translator/stm/funcgen.py
+++ b/rpython/translator/stm/funcgen.py
@@ -392,6 +392,17 @@
     return 'stm_queue_put((object_t *)%s, %s, (object_t *)%s);' % (
         arg0, arg1, arg2)
 
+def stm_queue_task_done(funcgen, op):
+    arg0 = funcgen.expr(op.args[0])
+    return 'stm_queue_task_done(%s);' % (arg0,)
+
+def stm_queue_join(funcgen, op):
+    arg0 = funcgen.expr(op.args[0])
+    arg1 = funcgen.expr(op.args[1])
+    result = funcgen.expr(op.result)
+    return ('%s = stm_queue_join((object_t *)%s, %s, '
+            '&stm_thread_local);' % (result, arg0, arg1,))
+
 def stm_queue_tracefn(funcgen, op):
     arg0 = funcgen.expr(op.args[0])
     arg1 = funcgen.expr(op.args[1])
diff --git a/rpython/translator/stm/src_stm/revision 
b/rpython/translator/stm/src_stm/revision
--- a/rpython/translator/stm/src_stm/revision
+++ b/rpython/translator/stm/src_stm/revision
@@ -1,1 +1,1 @@
-7592a0f11ac2
+d083e426a17d
diff --git a/rpython/translator/stm/src_stm/stm/queue.c 
b/rpython/translator/stm/src_stm/stm/queue.c
--- a/rpython/translator/stm/src_stm/stm/queue.c
+++ b/rpython/translator/stm/src_stm/stm/queue.c
@@ -25,6 +25,10 @@
            notion is per segment.)  this flag says that the queue is
            already in the tree STM_PSEGMENT->active_queues. */
         bool active;
+
+        /* counts the number of put's done in this transaction, minus
+           the number of task_done's */
+        int64_t unfinished_tasks_in_this_transaction;
     };
     char pad[64];
 } stm_queue_segment_t;
@@ -37,6 +41,10 @@
 
     /* a chained list of old entries in the queue */
     queue_entry_t *volatile old_entries;
+
+    /* total of 'unfinished_tasks_in_this_transaction' for all
+       committed transactions */
+    volatile int64_t unfinished_tasks;
 };
 
 
@@ -126,6 +134,7 @@
     queue_lock_acquire();
 
     bool added_any_old_entries = false;
+    bool finished_more_tasks = false;
     wlog_t *item;
     TREE_LOOP_FORWARD(STM_PSEGMENT->active_queues, item) {
         stm_queue_t *queue = (stm_queue_t *)item->addr;
@@ -133,6 +142,11 @@
         queue_entry_t *head, *freehead;
 
         if (at_commit) {
+            int64_t d = seg->unfinished_tasks_in_this_transaction;
+            if (d != 0) {
+                finished_more_tasks |= (d < 0);
+                __sync_add_and_fetch(&queue->unfinished_tasks, d);
+            }
             head = seg->added_in_this_transaction;
             freehead = seg->old_objects_popped;
         }
@@ -145,6 +159,7 @@
         seg->added_in_this_transaction = NULL;
         seg->added_young_limit = NULL;
         seg->old_objects_popped = NULL;
+        seg->unfinished_tasks_in_this_transaction = 0;
 
         /* free the list of entries that must disappear */
         queue_free_entries(freehead);
@@ -176,10 +191,11 @@
 
     queue_lock_release();
 
-    if (added_any_old_entries) {
-        assert(_has_mutex());
+    assert(_has_mutex());
+    if (added_any_old_entries)
         cond_broadcast(C_QUEUE_OLD_ENTRIES);
-    }
+    if (finished_more_tasks)
+        cond_broadcast(C_QUEUE_FINISHED_MORE_TASKS);
 }
 
 void stm_queue_put(object_t *qobj, stm_queue_t *queue, object_t *newitem)
@@ -195,6 +211,7 @@
     seg->added_in_this_transaction = entry;
 
     queue_activate(queue);
+    seg->unfinished_tasks_in_this_transaction++;
 
     /* add qobj to 'objects_pointing_to_nursery' if it has the
        WRITE_BARRIER flag */
@@ -285,6 +302,41 @@
     }
 }
 
+void stm_queue_task_done(stm_queue_t *queue)
+{
+    queue_activate(queue);
+    stm_queue_segment_t *seg = &queue->segs[STM_SEGMENT->segment_num - 1];
+    seg->unfinished_tasks_in_this_transaction--;
+}
+
+long stm_queue_join(object_t *qobj, stm_queue_t *queue, stm_thread_local_t *tl)
+{
+    int64_t result;
+
+#if STM_TESTS
+    result = queue->unfinished_tasks;   /* can't wait in tests */
+    result += (queue->segs[STM_SEGMENT->segment_num - 1]
+               .unfinished_tasks_in_this_transaction);
+    return result;
+#else
+    STM_PUSH_ROOT(*tl, qobj);
+    _stm_commit_transaction();
+
+    s_mutex_lock();
+    while ((result = queue->unfinished_tasks) > 0) {
+        cond_wait(C_QUEUE_FINISHED_MORE_TASKS);
+    }
+    s_mutex_unlock();
+
+    _stm_start_transaction(tl);
+    STM_POP_ROOT(*tl, qobj);   /* 'queue' should stay alive until here */
+#endif
+
+    /* returns 0 for 'ok', or negative if there was more task_done()
+       than put() so far */
+    return result;
+}
+
 static void queue_trace_list(queue_entry_t *entry, void trace(object_t **),
                              queue_entry_t *stop_at)
 {
diff --git a/rpython/translator/stm/src_stm/stm/sync.h 
b/rpython/translator/stm/src_stm/stm/sync.h
--- a/rpython/translator/stm/src_stm/stm/sync.h
+++ b/rpython/translator/stm/src_stm/stm/sync.h
@@ -7,6 +7,7 @@
     C_SEGMENT_FREE,
     C_SEGMENT_FREE_OR_SAFE_POINT,
     C_QUEUE_OLD_ENTRIES,
+    C_QUEUE_FINISHED_MORE_TASKS,
     _C_TOTAL
 };
 
diff --git a/rpython/translator/stm/src_stm/stmgc.h 
b/rpython/translator/stm/src_stm/stmgc.h
--- a/rpython/translator/stm/src_stm/stmgc.h
+++ b/rpython/translator/stm/src_stm/stmgc.h
@@ -747,6 +747,11 @@
    transaction (this is needed to ensure correctness). */
 object_t *stm_queue_get(object_t *qobj, stm_queue_t *queue, double timeout,
                         stm_thread_local_t *tl);
+/* task_done() and join(): see https://docs.python.org/2/library/queue.html */
+void stm_queue_task_done(stm_queue_t *queue);
+/* join() commits and waits outside a transaction (so push roots).
+   Unsuitable if the current transaction is atomic! */
+long stm_queue_join(object_t *qobj, stm_queue_t *queue, stm_thread_local_t 
*tl);
 void stm_queue_tracefn(stm_queue_t *queue, void trace(object_t **));
 
 
diff --git a/rpython/translator/stm/test/test_ztranslated.py 
b/rpython/translator/stm/test/test_ztranslated.py
--- a/rpython/translator/stm/test/test_ztranslated.py
+++ b/rpython/translator/stm/test/test_ztranslated.py
@@ -634,6 +634,18 @@
             x2 = cast_gcref_to_instance(X, p2)
             assert x2 is x1
             #
+            q.task_done()
+            q.task_done()
+            res = q.join()
+            assert res == 0
+            res = q.join()
+            assert res == 0
+            if objectmodel.we_are_translated():
+                q.task_done()
+                q.task_done()
+                res = q.join()
+                assert res == -2
+            #
             print "ok!"
             return 0
 
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to