From cd51153a3a3c10689dd80a820e7fea0e4bff58f4 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Thu, 24 Oct 2024 17:29:51 -0700
Subject: [PATCH v9 3/6] radixtree.h: support shared iteration.

This commit supports a shared iteration operation on a radix tree with
multiple processes. The radix tree must be in shared mode to start a
shared iteration. Parallel workers can attach the shared iteration
using the iterator handle given by the leader process. Same as the
normal iteration, it's guaranteed that the shared iteration returns
key-values in an ascending order.

Reviewed-by:
Discussion: https://postgr.es/m/
---
 src/include/lib/radixtree.h                   | 216 +++++++++++++++---
 .../modules/test_radixtree/test_radixtree.c   | 128 +++++++----
 2 files changed, 272 insertions(+), 72 deletions(-)

diff --git a/src/include/lib/radixtree.h b/src/include/lib/radixtree.h
index f0abb0df389..c8efa61ac7c 100644
--- a/src/include/lib/radixtree.h
+++ b/src/include/lib/radixtree.h
@@ -136,6 +136,9 @@
  * RT_LOCK_SHARE 	- Lock the radix tree in share mode
  * RT_UNLOCK		- Unlock the radix tree
  * RT_GET_HANDLE	- Return the handle of the radix tree
+ * RT_BEGIN_ITERATE_SHARED	- Begin iterating in shared mode.
+ * RT_ATTACH_ITERATE_SHARED	- Attach to the shared iterator.
+ * RT_GET_ITER_HANDLE		- Get the handle of the shared iterator.
  *
  * Optional Interface
  * ---------
@@ -179,6 +182,9 @@
 #define RT_ATTACH RT_MAKE_NAME(attach)
 #define RT_DETACH RT_MAKE_NAME(detach)
 #define RT_GET_HANDLE RT_MAKE_NAME(get_handle)
+#define RT_BEGIN_ITERATE_SHARED RT_MAKE_NAME(begin_iterate_shared)
+#define RT_ATTACH_ITERATE_SHARED RT_MAKE_NAME(attach_iterate_shared)
+#define RT_GET_ITER_HANDLE RT_MAKE_NAME(get_iter_handle)
 #define RT_LOCK_EXCLUSIVE RT_MAKE_NAME(lock_exclusive)
 #define RT_LOCK_SHARE RT_MAKE_NAME(lock_share)
 #define RT_UNLOCK RT_MAKE_NAME(unlock)
@@ -238,15 +244,19 @@
 #define RT_SHRINK_NODE_16 RT_MAKE_NAME(shrink_child_16)
 #define RT_SHRINK_NODE_48 RT_MAKE_NAME(shrink_child_48)
 #define RT_SHRINK_NODE_256 RT_MAKE_NAME(shrink_child_256)
+#define RT_INITIALIZE_ITER RT_MAKE_NAME(initialize_iter)
 #define RT_NODE_ITERATE_NEXT RT_MAKE_NAME(node_iterate_next)
 #define RT_VERIFY_NODE RT_MAKE_NAME(verify_node)
 
 /* type declarations */
 #define RT_RADIX_TREE RT_MAKE_NAME(radix_tree)
 #define RT_RADIX_TREE_CONTROL RT_MAKE_NAME(radix_tree_control)
+#define RT_ITER_CONTROL RT_MAKE_NAME(iter_control)
 #define RT_ITER RT_MAKE_NAME(iter)
 #ifdef RT_SHMEM
 #define RT_HANDLE RT_MAKE_NAME(handle)
+#define RT_ITER_CONTROL_SHARED RT_MAKE_NAME(iter_control_shared)
+#define RT_ITER_HANDLE RT_MAKE_NAME(iter_handle)
 #endif
 #define RT_NODE RT_MAKE_NAME(node)
 #define RT_CHILD_PTR RT_MAKE_NAME(child_ptr)
@@ -272,6 +282,7 @@ typedef struct RT_ITER RT_ITER;
 
 #ifdef RT_SHMEM
 typedef dsa_pointer RT_HANDLE;
+typedef dsa_pointer RT_ITER_HANDLE;
 #endif
 
 #ifdef RT_SHMEM
@@ -282,6 +293,9 @@ RT_SCOPE	RT_HANDLE RT_GET_HANDLE(RT_RADIX_TREE * tree);
 RT_SCOPE void RT_LOCK_EXCLUSIVE(RT_RADIX_TREE * tree);
 RT_SCOPE void RT_LOCK_SHARE(RT_RADIX_TREE * tree);
 RT_SCOPE void RT_UNLOCK(RT_RADIX_TREE * tree);
+RT_SCOPE	RT_ITER *RT_BEGIN_ITERATE_SHARED(RT_RADIX_TREE * tree);
+RT_SCOPE	RT_ITER_HANDLE RT_GET_ITER_HANDLE(RT_ITER * iter);
+RT_SCOPE	RT_ITER *RT_ATTACH_ITERATE_SHARED(RT_RADIX_TREE * tree, RT_ITER_HANDLE handle);
 #else
 RT_SCOPE	RT_RADIX_TREE *RT_CREATE(MemoryContext ctx);
 #endif
@@ -689,6 +703,7 @@ typedef struct RT_RADIX_TREE_CONTROL
 	RT_HANDLE	handle;
 	uint32		magic;
 	LWLock		lock;
+	int			tranche_id;
 #endif
 
 	RT_PTR_ALLOC root;
@@ -739,11 +754,9 @@ typedef struct RT_NODE_ITER
 	int			idx;
 }			RT_NODE_ITER;
 
-/* state for iterating over the whole radix tree */
-struct RT_ITER
+/* Contain the iteration state data */
+typedef struct RT_ITER_CONTROL
 {
-	RT_RADIX_TREE *tree;
-
 	/*
 	 * A stack to track iteration for each level. Level 0 is the lowest (or
 	 * leaf) level
@@ -754,8 +767,36 @@ struct RT_ITER
 
 	/* The key constructed during iteration */
 	uint64		key;
-};
+}			RT_ITER_CONTROL;
 
+#ifdef RT_SHMEM
+/* Contain the shared iteration state data */
+typedef struct RT_ITER_CONTROL_SHARED
+{
+	/* Actual shared iteration state data */
+	RT_ITER_CONTROL common;
+
+	/* protect the control data */
+	LWLock		lock;
+
+	RT_ITER_HANDLE handle;
+	pg_atomic_uint32 refcnt;
+}			RT_ITER_CONTROL_SHARED;
+#endif
+
+/* state for iterating over the whole radix tree */
+struct RT_ITER
+{
+	RT_RADIX_TREE *tree;
+
+	/* pointing to either local memory or DSA */
+	RT_ITER_CONTROL *ctl;
+
+#ifdef RT_SHMEM
+	/* True if the iterator is for shared iteration */
+	bool		shared;
+#endif
+};
 
 /* verification (available only in assert-enabled builds) */
 static void RT_VERIFY_NODE(RT_NODE * node);
@@ -1833,6 +1874,7 @@ RT_CREATE(MemoryContext ctx)
 	tree->ctl = (RT_RADIX_TREE_CONTROL *) dsa_get_address(dsa, dp);
 	tree->ctl->handle = dp;
 	tree->ctl->magic = RT_RADIX_TREE_MAGIC;
+	tree->ctl->tranche_id = tranche_id;
 	LWLockInitialize(&tree->ctl->lock, tranche_id);
 #else
 	tree->ctl = (RT_RADIX_TREE_CONTROL *) palloc0(sizeof(RT_RADIX_TREE_CONTROL));
@@ -2044,6 +2086,28 @@ RT_FREE(RT_RADIX_TREE * tree)
 
 /***************** ITERATION *****************/
 
+/*
+ * Common routine to initialize the given iterator.
+ */
+static void
+RT_INITIALIZE_ITER(RT_RADIX_TREE * tree, RT_ITER * iter)
+{
+	RT_CHILD_PTR root;
+
+	iter->tree = tree;
+
+	Assert(RT_PTR_ALLOC_IS_VALID(tree->ctl->root));
+	root.alloc = iter->tree->ctl->root;
+	RT_PTR_SET_LOCAL(tree, &root);
+
+	iter->ctl->top_level = iter->tree->ctl->start_shift / RT_SPAN;
+
+	/* Set the root to start */
+	iter->ctl->cur_level = iter->ctl->top_level;
+	iter->ctl->node_iters[iter->ctl->cur_level].node = root;
+	iter->ctl->node_iters[iter->ctl->cur_level].idx = 0;
+}
+
 /*
  * Create and return an iterator for the given radix tree
  * in the caller's memory context.
@@ -2055,24 +2119,50 @@ RT_SCOPE	RT_ITER *
 RT_BEGIN_ITERATE(RT_RADIX_TREE * tree)
 {
 	RT_ITER    *iter;
-	RT_CHILD_PTR root;
 
 	iter = (RT_ITER *) palloc0(sizeof(RT_ITER));
-	iter->tree = tree;
+	iter->ctl = (RT_ITER_CONTROL *) palloc0(sizeof(RT_ITER_CONTROL));
+	RT_INITIALIZE_ITER(tree, iter);
 
-	Assert(RT_PTR_ALLOC_IS_VALID(tree->ctl->root));
-	root.alloc = iter->tree->ctl->root;
-	RT_PTR_SET_LOCAL(tree, &root);
+#ifdef RT_SHMEM
+	/* we will non-shared iteration on a shared radix tree */
+	iter->shared = false;
+#endif
 
-	iter->top_level = iter->tree->ctl->start_shift / RT_SPAN;
+	return iter;
+}
 
-	/* Set the root to start */
-	iter->cur_level = iter->top_level;
-	iter->node_iters[iter->cur_level].node = root;
-	iter->node_iters[iter->cur_level].idx = 0;
+#ifdef RT_SHMEM
+/*
+ * Create and return the shared iterator for the given shard radix tree.
+ *
+ * Taking a lock on a radix tree in shared mode during the shared iteration to
+ * prevent concurrent writes is the caller's responsibility.
+ */
+RT_SCOPE	RT_ITER *
+RT_BEGIN_ITERATE_SHARED(RT_RADIX_TREE * tree)
+{
+	RT_ITER    *iter;
+	RT_ITER_CONTROL_SHARED *ctl_shared;
+	dsa_pointer dp;
+
+	/* The radix tree must be in shared mode */
+	Assert(tree->ctl->magic == RT_RADIX_TREE_MAGIC);
+
+	dp = dsa_allocate0(tree->dsa, sizeof(RT_ITER_CONTROL_SHARED));
+	ctl_shared = (RT_ITER_CONTROL_SHARED *) dsa_get_address(tree->dsa, dp);
+	ctl_shared->handle = dp;
+	LWLockInitialize(&ctl_shared->lock, tree->ctl->tranche_id);
+	pg_atomic_init_u32(&ctl_shared->refcnt, 1);
+
+	iter = (RT_ITER *) palloc0(sizeof(RT_ITER));
+	iter->ctl = (RT_ITER_CONTROL *) ctl_shared;
+	iter->shared = true;
+	RT_INITIALIZE_ITER(tree, iter);
 
 	return iter;
 }
+#endif
 
 /*
  * Scan the inner node and return the next child pointer if one exists, otherwise
@@ -2086,12 +2176,18 @@ RT_NODE_ITERATE_NEXT(RT_ITER * iter, int level)
 	RT_CHILD_PTR node;
 	RT_PTR_ALLOC *slot = NULL;
 
+	node_iter = &(iter->ctl->node_iters[level]);
+	node = node_iter->node;
+
 #ifdef RT_SHMEM
-	Assert(iter->tree->ctl->magic == RT_RADIX_TREE_MAGIC);
-#endif
 
-	node_iter = &(iter->node_iters[level]);
-	node = node_iter->node;
+	/*
+	 * Since the iterator is shared, the local pointer of the node might be
+	 * set by other backends, we need to make sure to use the local pointer.
+	 */
+	if (iter->shared)
+		RT_PTR_SET_LOCAL(iter->tree, &node);
+#endif
 
 	Assert(node.local != NULL);
 
@@ -2164,8 +2260,8 @@ RT_NODE_ITERATE_NEXT(RT_ITER * iter, int level)
 	}
 
 	/* Update the key */
-	iter->key &= ~(((uint64) RT_CHUNK_MASK) << (level * RT_SPAN));
-	iter->key |= (((uint64) key_chunk) << (level * RT_SPAN));
+	iter->ctl->key &= ~(((uint64) RT_CHUNK_MASK) << (level * RT_SPAN));
+	iter->ctl->key |= (((uint64) key_chunk) << (level * RT_SPAN));
 
 	return slot;
 }
@@ -2179,18 +2275,29 @@ RT_ITERATE_NEXT(RT_ITER * iter, uint64 *key_p)
 {
 	RT_PTR_ALLOC *slot = NULL;
 
-	while (iter->cur_level <= iter->top_level)
+#ifdef RT_SHMEM
+	/* Prevent the shared iterator from being updated concurrently */
+	if (iter->shared)
+		LWLockAcquire(&((RT_ITER_CONTROL_SHARED *) iter->ctl)->lock, LW_EXCLUSIVE);
+#endif
+
+	while (iter->ctl->cur_level <= iter->ctl->top_level)
 	{
 		RT_CHILD_PTR node;
 
-		slot = RT_NODE_ITERATE_NEXT(iter, iter->cur_level);
+		slot = RT_NODE_ITERATE_NEXT(iter, iter->ctl->cur_level);
 
-		if (iter->cur_level == 0 && slot != NULL)
+		if (iter->ctl->cur_level == 0 && slot != NULL)
 		{
 			/* Found a value at the leaf node */
-			*key_p = iter->key;
+			*key_p = iter->ctl->key;
 			node.alloc = *slot;
 
+#ifdef RT_SHMEM
+			if (iter->shared)
+				LWLockRelease(&((RT_ITER_CONTROL_SHARED *) iter->ctl)->lock);
+#endif
+
 			if (RT_CHILDPTR_IS_VALUE(*slot))
 				return (RT_VALUE_TYPE *) slot;
 			else
@@ -2206,17 +2313,23 @@ RT_ITERATE_NEXT(RT_ITER * iter, uint64 *key_p)
 			node.alloc = *slot;
 			RT_PTR_SET_LOCAL(iter->tree, &node);
 
-			iter->cur_level--;
-			iter->node_iters[iter->cur_level].node = node;
-			iter->node_iters[iter->cur_level].idx = 0;
+			iter->ctl->cur_level--;
+			iter->ctl->node_iters[iter->ctl->cur_level].node = node;
+			iter->ctl->node_iters[iter->ctl->cur_level].idx = 0;
 		}
 		else
 		{
 			/* Not found the child slot, move up the tree */
-			iter->cur_level++;
+			iter->ctl->cur_level++;
 		}
+
 	}
 
+#ifdef RT_SHMEM
+	if (iter->shared)
+		LWLockRelease(&((RT_ITER_CONTROL_SHARED *) iter->ctl)->lock);
+#endif
+
 	/* We've visited all nodes, so the iteration finished */
 	return NULL;
 }
@@ -2227,9 +2340,44 @@ RT_ITERATE_NEXT(RT_ITER * iter, uint64 *key_p)
 RT_SCOPE void
 RT_END_ITERATE(RT_ITER * iter)
 {
+#ifdef RT_SHMEM
+	RT_ITER_CONTROL_SHARED *ctl = (RT_ITER_CONTROL_SHARED *) iter->ctl;
+
+	if (iter->shared &&
+		pg_atomic_sub_fetch_u32(&ctl->refcnt, 1) == 0)
+		dsa_free(iter->tree->dsa, ctl->handle);
+#endif
 	pfree(iter);
 }
 
+#ifdef	RT_SHMEM
+RT_SCOPE	RT_ITER_HANDLE
+RT_GET_ITER_HANDLE(RT_ITER * iter)
+{
+	Assert(iter->shared);
+	return ((RT_ITER_CONTROL_SHARED *) iter->ctl)->handle;
+
+}
+
+RT_SCOPE	RT_ITER *
+RT_ATTACH_ITERATE_SHARED(RT_RADIX_TREE * tree, RT_ITER_HANDLE handle)
+{
+	RT_ITER    *iter;
+	RT_ITER_CONTROL_SHARED *ctl;
+
+	iter = (RT_ITER *) palloc0(sizeof(RT_ITER));
+	iter->tree = tree;
+	ctl = (RT_ITER_CONTROL_SHARED *) dsa_get_address(tree->dsa, handle);
+	iter->ctl = (RT_ITER_CONTROL *) ctl;
+	iter->shared = true;
+
+	/* For every iterator, increase the refcnt by 1 */
+	pg_atomic_add_fetch_u32(&ctl->refcnt, 1);
+
+	return iter;
+}
+#endif
+
 /***************** DELETION *****************/
 
 #ifdef RT_USE_DELETE
@@ -2929,7 +3077,11 @@ RT_DUMP_NODE(RT_NODE * node)
 #undef RT_PTR_ALLOC
 #undef RT_INVALID_PTR_ALLOC
 #undef RT_HANDLE
+#undef RT_ITER_HANDLE
+#undef RT_ITER_CONTROL
+#undef RT_ITER_HANDLE
 #undef RT_ITER
+#undef RT_SHARED_ITER
 #undef RT_NODE
 #undef RT_NODE_ITER
 #undef RT_NODE_KIND_4
@@ -2966,6 +3118,11 @@ RT_DUMP_NODE(RT_NODE * node)
 #undef RT_LOCK_SHARE
 #undef RT_UNLOCK
 #undef RT_GET_HANDLE
+#undef RT_BEGIN_ITERATE_SHARED
+#undef RT_ATTACH_ITERATE_SHARED
+#undef RT_GET_ITER_HANDLE
+#undef RT_ATTACH_ITER
+#undef RT_GET_ITER_HANDLE
 #undef RT_FIND
 #undef RT_SET
 #undef RT_BEGIN_ITERATE
@@ -3022,5 +3179,6 @@ RT_DUMP_NODE(RT_NODE * node)
 #undef RT_SHRINK_NODE_256
 #undef RT_NODE_DELETE
 #undef RT_NODE_INSERT
+#undef RT_INITIALIZE_ITER
 #undef RT_NODE_ITERATE_NEXT
 #undef RT_VERIFY_NODE
diff --git a/src/test/modules/test_radixtree/test_radixtree.c b/src/test/modules/test_radixtree/test_radixtree.c
index 32de6a3123e..dcba1508a29 100644
--- a/src/test/modules/test_radixtree/test_radixtree.c
+++ b/src/test/modules/test_radixtree/test_radixtree.c
@@ -158,12 +158,86 @@ test_empty(void)
 #endif
 }
 
+/* Iteration test for test_basic() */
+static void
+test_iterate_basic(rt_radix_tree *radixtree, uint64 *keys, int children,
+				   bool asc, bool shared)
+{
+	rt_iter    *iter;
+
+#ifdef TEST_SHARED_RT
+	if (!shared)
+		iter = rt_begin_iterate(radixtree);
+	else
+		iter = rt_begin_iterate_shared(radixtree);
+#else
+	iter = rt_begin_iterate(radixtree);
+#endif
+
+	for (int i = 0; i < children; i++)
+	{
+		uint64		expected;
+		uint64		iterkey;
+		TestValueType *iterval;
+
+		/* iteration is ordered by key, so adjust expected value accordingly */
+		if (asc)
+			expected = keys[i];
+		else
+			expected = keys[children - 1 - i];
+
+		iterval = rt_iterate_next(iter, &iterkey);
+
+		EXPECT_TRUE(iterval != NULL);
+		EXPECT_EQ_U64(iterkey, expected);
+		EXPECT_EQ_U64(*iterval, expected);
+	}
+
+	rt_end_iterate(iter);
+}
+
+/* Iteration test for test_random() */
+static void
+test_iterate_random(rt_radix_tree *radixtree, uint64 *keys, int num_keys,
+					bool shared)
+{
+	rt_iter    *iter;
+
+#ifdef TEST_SHARED_RT
+	if (!shared)
+		iter = rt_begin_iterate(radixtree);
+	else
+		iter = rt_begin_iterate_shared(radixtree);
+#else
+	iter = rt_begin_iterate(radixtree);
+#endif
+
+	for (int i = 0; i < num_keys; i++)
+	{
+		uint64		expected;
+		uint64		iterkey;
+		TestValueType *iterval;
+
+		/* skip duplicate keys */
+		if (i < num_keys - 1 && keys[i + 1] == keys[i])
+			continue;
+
+		expected = keys[i];
+		iterval = rt_iterate_next(iter, &iterkey);
+
+		EXPECT_TRUE(iterval != NULL);
+		EXPECT_EQ_U64(iterkey, expected);
+		EXPECT_EQ_U64(*iterval, expected);
+	}
+
+	rt_end_iterate(iter);
+}
+
 /* Basic set, find, and delete tests */
 static void
 test_basic(rt_node_class_test_elem *test_info, int shift, bool asc)
 {
 	rt_radix_tree *radixtree;
-	rt_iter    *iter;
 	uint64	   *keys;
 	int			children = test_info->nkeys;
 #ifdef TEST_SHARED_RT
@@ -244,28 +318,12 @@ test_basic(rt_node_class_test_elem *test_info, int shift, bool asc)
 	}
 
 	/* test that iteration returns the expected keys and values */
-	iter = rt_begin_iterate(radixtree);
-
-	for (int i = 0; i < children; i++)
-	{
-		uint64		expected;
-		uint64		iterkey;
-		TestValueType *iterval;
-
-		/* iteration is ordered by key, so adjust expected value accordingly */
-		if (asc)
-			expected = keys[i];
-		else
-			expected = keys[children - 1 - i];
-
-		iterval = rt_iterate_next(iter, &iterkey);
-
-		EXPECT_TRUE(iterval != NULL);
-		EXPECT_EQ_U64(iterkey, expected);
-		EXPECT_EQ_U64(*iterval, expected);
-	}
+	test_iterate_basic(radixtree, keys, children, asc, false);
 
-	rt_end_iterate(iter);
+#ifdef TEST_SHARED_RT
+	/* test shared-iteration as well */
+	test_iterate_basic(radixtree, keys, children, asc, true);
+#endif
 
 	/* delete all keys again */
 	for (int i = 0; i < children; i++)
@@ -295,7 +353,6 @@ static void
 test_random(void)
 {
 	rt_radix_tree *radixtree;
-	rt_iter    *iter;
 	pg_prng_state state;
 
 	/* limit memory usage by limiting the key space */
@@ -387,27 +444,12 @@ test_random(void)
 	}
 
 	/* test that iteration returns the expected keys and values */
-	iter = rt_begin_iterate(radixtree);
-
-	for (int i = 0; i < num_keys; i++)
-	{
-		uint64		expected;
-		uint64		iterkey;
-		TestValueType *iterval;
+	test_iterate_random(radixtree, keys, num_keys, false);
 
-		/* skip duplicate keys */
-		if (i < num_keys - 1 && keys[i + 1] == keys[i])
-			continue;
-
-		expected = keys[i];
-		iterval = rt_iterate_next(iter, &iterkey);
-
-		EXPECT_TRUE(iterval != NULL);
-		EXPECT_EQ_U64(iterkey, expected);
-		EXPECT_EQ_U64(*iterval, expected);
-	}
-
-	rt_end_iterate(iter);
+#ifdef TEST_SHARED_RT
+	/* test shared-iteration as well */
+	test_iterate_random(radixtree, keys, num_keys, true);
+#endif
 
 	/* reset random number generator for deletion */
 	pg_prng_seed(&state, seed);
-- 
2.43.5

