Author: Armin Rigo <[email protected]>
Branch: bag
Changeset: r1582:21b4163950f8
Date: 2015-01-24 11:54 +0100
http://bitbucket.org/pypy/stmgc/changeset/21b4163950f8/
Log: in-progress
diff --git a/c7/stm/bag.c b/c7/stm/bag.c
--- a/c7/stm/bag.c
+++ b/c7/stm/bag.c
@@ -44,6 +44,7 @@
struct stm_bag_seg_s {
uintptr_t *deque_left, *deque_middle, *deque_right;
struct list_s *abort_list;
+ uint64_t start_time; /* the transaction's unique_start_time */
};
struct stm_bag_s {
@@ -62,6 +63,7 @@
bs->deque_middle = &block->items[0];
bs->deque_right = &block->items[0];
LIST_CREATE(bs->abort_list);
+ bs->start_time = 0;
}
return bag;
}
@@ -79,41 +81,105 @@
}
LIST_FREE(bs->abort_list);
}
+
+ s_mutex_lock();
+ for (i = 0; i < STM_NB_SEGMENTS; i++) {
+ struct stm_bag_seg_s *bs = &bag->by_segment[i];
+ struct stm_segment_info_s *pub = get_segment(i + 1);
+ stm_thread_local_t *tl = pub->running_thread;
+ if (tl->associated_segment_num == i + 1) {
+ stm_call_on_abort(tl, bs, NULL);
+ }
+ }
+ s_mutex_unlock();
+
+ free(bag);
+}
+
+static void bag_add(struct stm_bag_seg_s *bs, object_t *newobj)
+{
+ struct deque_block_s *block = deque_block(bs->deque_right);
+ *bs->deque_right++ = (uintptr_t)newobj;
+
+ if (bs->deque_right == &block->items[DEQUE_BLOCK_SIZE]) {
+ if (block->next == NULL)
+ block->next = deque_new_block();
+ bs->deque_right = &block->next->items[0];
+ }
+}
+
+static void bag_abort_callback(void *key)
+{
+ struct stm_bag_seg_s *bs = (struct stm_bag_seg_s *)key;
+
+ /* remove the "added in this transaction" items */
+ bs->deque_right = bs->deque_middle;
+
+ /* reinstall the items from the "abort_list" */
+ LIST_FOREACH_F(bs->abort_list, object_t *, bag_add(bs, item));
+ list_clear(bs->abort_list);
+
+ /* these items are not "added in this transaction" */
+ bs->deque_middle = bs->deque_right;
+}
+
+static struct stm_bag_seg_s *bag_check_start_time(stm_bag_t *bag)
+{
+ int i = STM_SEGMENT->segment_num - 1;
+ struct stm_bag_seg_s *bs = &bag->by_segment[i];
+
+ if (bs->start_time != STM_PSEGMENT->unique_start_time) {
+ /* There was a commit or an abort since the last operation
+ on the same bag in the same segment. If there was an
+ abort, bag_abort_callback() should have been called to
+ reset the state. Assume that any non-reset state is
+ there because of a commit.
+
+ The middle pointer moves to the right: there are no
+ more "added in this transaction" entries. And the
+ "already popped items" list is forgotten.
+ */
+ bs->deque_middle = bs->deque_right;
+ list_clear(bs->abort_list);
+ bs->start_time = STM_PSEGMENT->unique_start_time;
+
+ /* We're about to modify the bag, so register an abort
+ callback now. */
+ stm_thread_local_t *tl = STM_SEGMENT->running_thread;
+ assert(tl->associated_segment_num == STM_SEGMENT->segment_num);
+ stm_call_on_abort(tl, bs, &bag_abort_callback);
+ }
+
+ return bs;
}
void stm_bag_add(stm_bag_t *bag, object_t *newobj)
{
- int i = STM_SEGMENT->segment_num - 1;
- struct stm_bag_seg_s *bs = &bag->by_segment[i];
- struct deque_block_s *block = deque_block(bs->deque_right);
-
- *bs->deque_right++ = (uintptr_t)newobj;
-
- if (bs->deque_right == &block->items[DEQUE_BLOCK_SIZE]) {
- assert(block->next == NULL);
- block->next = deque_new_block();
- bs->deque_right = &block->next->items[0];
- }
+ struct stm_bag_seg_s *bs = bag_check_start_time(bag);
+ bag_add(bs, newobj);
}
object_t *stm_bag_try_pop(stm_bag_t *bag)
{
- int i = STM_SEGMENT->segment_num - 1;
- struct stm_bag_seg_s *bs = &bag->by_segment[i];
+ struct stm_bag_seg_s *bs = bag_check_start_time(bag);
if (bs->deque_left == bs->deque_right) {
return NULL;
}
+
struct deque_block_s *block = deque_block(bs->deque_left);
- bool any_old_item_to_pop = (bs->deque_left != bs->deque_middle);
+ bool from_same_transaction = (bs->deque_left == bs->deque_middle);
uintptr_t result = *bs->deque_left++;
if (bs->deque_left == &block->items[DEQUE_BLOCK_SIZE]) {
bs->deque_left = &block->next->items[0];
deque_free_block(block);
}
- if (!any_old_item_to_pop) {
+ if (from_same_transaction) {
bs->deque_middle = bs->deque_left;
}
+ else {
+ LIST_APPEND(bs->abort_list, result);
+ }
return (object_t *)result;
}
diff --git a/c7/stm/contention.c b/c7/stm/contention.c
--- a/c7/stm/contention.c
+++ b/c7/stm/contention.c
@@ -73,7 +73,7 @@
__attribute__((unused))
static void cm_abort_the_younger(struct contmgr_s *cm)
{
- if (STM_PSEGMENT->start_time >= cm->other_pseg->start_time) {
+ if (STM_PSEGMENT->unique_start_time >= cm->other_pseg->unique_start_time) {
/* We started after the other thread. Abort */
cm->abort_other = false;
}
@@ -100,7 +100,7 @@
__attribute__((unused))
static void cm_pause_if_younger(struct contmgr_s *cm)
{
- if (STM_PSEGMENT->start_time >= cm->other_pseg->start_time) {
+ if (STM_PSEGMENT->unique_start_time >= cm->other_pseg->unique_start_time) {
/* We started after the other thread. Pause */
cm->try_sleep = true;
cm->abort_other = false;
diff --git a/c7/stm/core.c b/c7/stm/core.c
--- a/c7/stm/core.c
+++ b/c7/stm/core.c
@@ -324,7 +324,7 @@
STM_SEGMENT->transaction_read_version = 1;
}
-static uint64_t _global_start_time = 0;
+static uint64_t _global_start_time = 1;
static void _stm_start_transaction(stm_thread_local_t *tl)
{
@@ -337,7 +337,7 @@
assert(STM_PSEGMENT->safe_point == SP_NO_TRANSACTION);
assert(STM_PSEGMENT->transaction_state == TS_NONE);
timing_event(tl, STM_TRANSACTION_START);
- STM_PSEGMENT->start_time = _global_start_time++;
+ STM_PSEGMENT->unique_start_time = _global_start_time++;
STM_PSEGMENT->signalled_to_commit_soon = false;
STM_PSEGMENT->safe_point = SP_RUNNING;
STM_PSEGMENT->marker_inev.object = NULL;
diff --git a/c7/stm/core.h b/c7/stm/core.h
--- a/c7/stm/core.h
+++ b/c7/stm/core.h
@@ -138,7 +138,7 @@
/* Start time: to know approximately for how long a transaction has
been running, in contention management */
- uint64_t start_time;
+ uint64_t unique_start_time;
/* This is the number stored in the overflowed objects (a multiple of
GCFLAG_OVERFLOW_NUMBER_bit0). It is incremented when the
diff --git a/c7/stm/list.h b/c7/stm/list.h
--- a/c7/stm/list.h
+++ b/c7/stm/list.h
@@ -96,6 +96,16 @@
} \
} while (0)
+#define LIST_FOREACH_F(lst, TYPE, CODE) \
+ do { \
+ struct list_s *_lst = (lst); \
+ uintptr_t _i, _c = _lst->count; \
+ for (_i = 0; _i < _c; _i++) { \
+ TYPE item = (TYPE)_lst->items[_i]; \
+ CODE; \
+ } \
+ } while (0)
+
/************************************************************/
/* The tree_xx functions are, like the name hints, implemented as a tree,
diff --git a/c7/test/test_bag.py b/c7/test/test_bag.py
--- a/c7/test/test_bag.py
+++ b/c7/test/test_bag.py
@@ -131,3 +131,31 @@
assert got == lp1
#
stm_major_collect() # to get rid of the bag object
+
+ def test_abort_recovers_popped(self):
+ self.start_transaction()
+ q = self.allocate_bag()
+ self.push_root(q)
+ lp1 = stm_allocate(16)
+ lp2 = stm_allocate(16)
+ stm_set_char(lp1, 'M')
+ stm_set_char(lp2, 'N')
+ b_add(q, lp1)
+ b_add(q, lp2)
+ self.commit_transaction()
+ q = self.pop_root()
+ #
+ self.start_transaction()
+ lp1 = b_pop(q)
+ lp2 = b_pop(q)
+ assert stm_get_char(lp1) == 'M'
+ assert stm_get_char(lp2) == 'N'
+ self.abort_transaction()
+ #
+ self.start_transaction()
+ lp1 = b_pop(q)
+ lp2 = b_pop(q)
+ assert stm_get_char(lp1) == 'M'
+ assert stm_get_char(lp2) == 'N'
+ #
+ stm_major_collect() # to get rid of the bag object
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit