From 097bff14bb9d5c9b39b73977bb6651e4c6727bda Mon Sep 17 00:00:00 2001
From: alterego655 <824662526@qq.com>
Date: Tue, 16 Dec 2025 15:47:12 +0800
Subject: [PATCH v3 2/2] Optimize SnapBuildPurgeOlderTxn with two-phase
 approach

Use different purging strategies based on SnapBuild state:

- Pre-CONSISTENT: Use in-place compaction O(n) since committed.xip
  is unsorted during this phase.

- Post-CONSISTENT: Use binary search O(log n) since committed.xip
  is maintained in sorted order after reaching consistency.

The catchange.xip array, which is always sorted, uses binary search
in all phases.

This eliminates the workspace allocation and double-copy overhead
from the previous implementation. The binary search handles XID ring
wraparound by splitting the old interval [xmin - 2^31, xmin) into
one or two contiguous segments in the uint32-sorted array.

Also adds assert blocks to verify sorted invariants before binary
search operations.
---
 src/backend/replication/logical/snapbuild.c | 200 +++++++++++++++-----
 1 file changed, 156 insertions(+), 44 deletions(-)

diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e0d7d3e0ae8..ee5e6614a8b 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -152,6 +152,10 @@ static ResourceOwner SavedResourceOwnerDuringExport = NULL;
 static bool ExportInProgress = false;
 
 /* ->committed and ->catchange manipulation */
+static int	xid_bsearch_lower_bound(const TransactionId *xip, int n,
+									TransactionId key);
+static int	xid_purge_with_boundary(TransactionId *xip, int xcnt,
+									TransactionId boundary, TransactionId xmin);
 static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
 
 /* snapshot building/manipulation/distribution functions */
@@ -926,6 +930,104 @@ SnapBuildAddCommittedTxns(SnapBuild *builder,
 	}
 }
 
+/*
+ * Binary search helper: find the first index where xip[i] >= key.
+ * Returns n if all elements are less than key.
+ *
+ * This is a standard lower_bound operation using unsigned comparison to match
+ * xidComparator ordering.
+ */
+static int
+xid_bsearch_lower_bound(const TransactionId *xip, int n, TransactionId key)
+{
+	int			lo = 0;
+	int			hi = n;
+
+	while (lo < hi)
+	{
+		int			mid = (lo + hi) >> 1;
+
+		if (xip[mid] < key)
+			lo = mid + 1;
+		else
+			hi = mid;
+	}
+	return lo;
+}
+
+/*
+ * Remove XIDs in the modular interval [boundary, xmin) from a sorted array.
+ *
+ * The array must be sorted in numeric uint32 order. XIDs in [boundary, xmin)
+ * are those that satisfy NormalTransactionIdPrecedes(xid, xmin).
+ *
+ * Returns the new count after removal. The array is compacted in-place.
+ */
+static int
+xid_purge_with_boundary(TransactionId *xip, int xcnt,
+						TransactionId boundary, TransactionId xmin)
+{
+	int			idx_boundary;
+	int			idx_xmin;
+	int			new_xcnt;
+
+	if (xcnt == 0)
+		return 0;
+
+	/*
+	 * Find where [boundary, xmin) appears in the numeric-sorted array.
+	 * idx_boundary = first index with xip[i] >= boundary idx_xmin     = first
+	 * index with xip[i] >= xmin
+	 */
+	idx_boundary = xid_bsearch_lower_bound(xip, xcnt, boundary);
+	idx_xmin = xid_bsearch_lower_bound(xip, xcnt, xmin);
+
+	/*
+	 * Case split based on numeric comparison (array is uint32-sorted):
+	 *
+	 * Case A: boundary <= xmin numerically Interval forms one block:
+	 * [idx_boundary, idx_xmin) Keep: [0, idx_boundary) and [idx_xmin, n)
+	 *
+	 * Case B: boundary > xmin numerically Interval straddles numeric zero as
+	 * two blocks: [0, idx_xmin) and [idx_boundary, n) Keep: [idx_xmin,
+	 * idx_boundary)
+	 *
+	 * Note: Case B occurs due to ring geometry when the modular interval
+	 * crosses zero in numeric order, not because XIDs have "wrapped"
+	 * operationally.
+	 */
+	if (boundary <= xmin)
+	{
+		/* Case A: interval is contiguous, keep prefix and suffix */
+		int			prefix_len = idx_boundary;
+		int			suffix_len = xcnt - idx_xmin;
+
+		if (suffix_len > 0 && idx_xmin > idx_boundary)
+			memmove(xip + prefix_len,
+					xip + idx_xmin,
+					suffix_len * sizeof(TransactionId));
+
+		new_xcnt = prefix_len + suffix_len;
+	}
+	else
+	{
+		/* Case B: interval straddles zero, keep middle block */
+		int			keep_len;
+
+		Assert(idx_boundary >= idx_xmin);
+		keep_len = idx_boundary - idx_xmin;
+
+		if (keep_len > 0 && idx_xmin > 0)
+			memmove(xip,
+					xip + idx_xmin,
+					keep_len * sizeof(TransactionId));
+
+		new_xcnt = keep_len;
+	}
+
+	return new_xcnt;
+}
+
 /*
  * Remove knowledge about transactions we treat as committed or containing catalog
  * changes that are smaller than ->xmin. Those won't ever get checked via
@@ -939,74 +1041,84 @@ SnapBuildAddCommittedTxns(SnapBuild *builder,
 static void
 SnapBuildPurgeOlderTxn(SnapBuild *builder)
 {
-	int			off;
-	TransactionId *workspace;
-	int			surviving_xids = 0;
+	TransactionId boundary;
 
 	/* not ready yet */
 	if (!TransactionIdIsNormal(builder->xmin))
 		return;
 
-	/* TODO: Neater algorithm than just copying and iterating? */
-	workspace =
-		MemoryContextAlloc(builder->context,
-						   builder->committed.xcnt * sizeof(TransactionId));
+	/*
+	 * Compute the boundary for the "old" interval [boundary, xmin). XIDs in
+	 * this interval satisfy NormalTransactionIdPrecedes(xid, xmin).
+	 */
+	boundary = builder->xmin - 0x80000000U;
 
-	/* copy xids that still are interesting to workspace */
-	for (off = 0; off < builder->committed.xcnt; off++)
+	/* Purge committed.xip */
+	if (builder->committed.xcnt > 0)
 	{
-		if (NormalTransactionIdPrecedes(builder->committed.xip[off],
-										builder->xmin))
-			;					/* remove */
-		else
-			workspace[surviving_xids++] = builder->committed.xip[off];
-	}
+		int			old_xcnt = builder->committed.xcnt;
 
-	/* copy workspace back to persistent state */
-	memcpy(builder->committed.xip, workspace,
-		   surviving_xids * sizeof(TransactionId));
+		if (builder->state < SNAPBUILD_CONSISTENT)
+		{
+			/*
+			 * Pre-CONSISTENT: array is unsorted, use in-place compaction
+			 * O(n).
+			 */
+			int			off;
+			int			surviving_xids = 0;
 
-	elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
-		 (uint32) builder->committed.xcnt, (uint32) surviving_xids,
-		 builder->xmin, builder->xmax);
-	builder->committed.xcnt = surviving_xids;
+			for (off = 0; off < builder->committed.xcnt; off++)
+			{
+				if (!NormalTransactionIdPrecedes(builder->committed.xip[off],
+												 builder->xmin))
+					builder->committed.xip[surviving_xids++] =
+						builder->committed.xip[off];
+			}
+			builder->committed.xcnt = surviving_xids;
+		}
+		else
+		{
+			/*
+			 * Post-CONSISTENT: array is sorted, use binary search O(log n).
+			 */
+#ifdef USE_ASSERT_CHECKING
+			for (int i = 1; i < old_xcnt; i++)
+				Assert(builder->committed.xip[i - 1] < builder->committed.xip[i]);
+#endif
+			builder->committed.xcnt =
+				xid_purge_with_boundary(builder->committed.xip,
+										old_xcnt, boundary, builder->xmin);
+		}
 
-	pfree(workspace);
+		elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
+			 (uint32) old_xcnt, (uint32) builder->committed.xcnt,
+			 builder->xmin, builder->xmax);
+	}
 
 	/*
-	 * Purge xids in ->catchange as well. The purged array must also be sorted
-	 * in xidComparator order.
+	 * Purge catchange.xip using binary search. This array is always sorted.
 	 */
 	if (builder->catchange.xcnt > 0)
 	{
-		/*
-		 * Since catchange.xip is sorted, we find the lower bound of xids that
-		 * are still interesting.
-		 */
-		for (off = 0; off < builder->catchange.xcnt; off++)
-		{
-			if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
-											 builder->xmin))
-				break;
-		}
+		int			old_xcnt = builder->catchange.xcnt;
 
-		surviving_xids = builder->catchange.xcnt - off;
+#ifdef USE_ASSERT_CHECKING
+		for (int i = 1; i < old_xcnt; i++)
+			Assert(builder->catchange.xip[i - 1] < builder->catchange.xip[i]);
+#endif
+		builder->catchange.xcnt =
+			xid_purge_with_boundary(builder->catchange.xip,
+									old_xcnt, boundary, builder->xmin);
 
-		if (surviving_xids > 0)
-		{
-			memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
-					surviving_xids * sizeof(TransactionId));
-		}
-		else
+		if (builder->catchange.xcnt == 0)
 		{
 			pfree(builder->catchange.xip);
 			builder->catchange.xip = NULL;
 		}
 
 		elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
-			 (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
+			 (uint32) old_xcnt, (uint32) builder->catchange.xcnt,
 			 builder->xmin, builder->xmax);
-		builder->catchange.xcnt = surviving_xids;
 	}
 }
 
-- 
2.51.0

