From 3af1307633447f61d6fc57d50b9a10c9329c2af5 Mon Sep 17 00:00:00 2001
From: alterego665 <824662526@qq.com>
Date: Mon, 1 Sep 2025 08:37:45 +0800
Subject: [PATCH v11] Optimize transaction waiting during logical decoding on 
 hot standby servers. On hot standby servers, XactLockTableWait falls back to 
 polling TransactionIdIsInProgress() with fixed 1ms sleeps when transactions 
 from the primary have no local lock table entries, causing excessive CPU 
 usage.

Introduce a new waiting waiting path for hot-standby that uses the existing
KnownAssignedXids infrastructure combined with a condition variable to replace
the use of XactLockTableWait in standby, eliminating the polling loop.
---
 src/backend/replication/logical/snapbuild.c   |  10 +-
 src/backend/storage/ipc/procarray.c           | 201 +++++++++++++++++-
 src/backend/storage/lmgr/lmgr.c               |   6 +-
 .../utils/activity/wait_event_names.txt       |   2 +
 src/include/storage/lwlocklist.h              |   1 +
 src/include/storage/procarray.h               |   2 +
 6 files changed, 213 insertions(+), 9 deletions(-)

diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 98ddee20929..25a7733f770 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -1451,7 +1451,15 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
 		if (TransactionIdFollows(xid, cutoff))
 			continue;
 
-		XactLockTableWait(xid, NULL, NULL, XLTW_None);
+		/*
+		 * In primary, we use XactLockTableWait to wait for the transaction to finish.
+		 * In standby, since xact lock table is not maintained, we use XidWaitOnStandby
+		 * to accomplish the same.
+		 */
+		if (!RecoveryInProgress())
+			XactLockTableWait(xid, NULL, NULL, XLTW_None);
+		else
+			XidWaitOnStandby(xid);
 	}
 
 	/*
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 200f72c6e25..896c33c2ca2 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -62,6 +62,7 @@
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
+#include "utils/hsearch.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
@@ -282,6 +283,11 @@ static TransactionId *KnownAssignedXids;
 static bool *KnownAssignedXidsValid;
 static TransactionId latestObservedXid = InvalidTransactionId;
 
+/*
+ * Array of XIDs to wake up on standby
+ */
+static TransactionId *KnownAssignedXidsToWakeup;
+
 /*
  * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
  * the highest xid that might still be running that we don't have in
@@ -306,6 +312,24 @@ static GlobalVisState GlobalVisTempRels;
  */
 static TransactionId ComputeXidHorizonsResultLastXmin;
 
+/*
+ * Hash table entry for per-XID waiting on standby servers.
+ */
+typedef struct XidWaitEntry
+{
+	TransactionId xid;				/* transaction ID being waited for */
+	ConditionVariable cv;			/* condition variable for this XID */
+} XidWaitEntry;
+
+/*
+ * Global hash table for XID waiting.
+ *
+ * This hash table maps transaction IDs to XidWaitEntry structures,
+ * enabling efficient per-XID waiting during hot standby recovery.
+ * XidWaitHash is a unified shared hash table protected by a single LWLock.
+ */
+static HTAB *XidWaitHash = NULL;
+
 #ifdef XIDCACHE_DEBUG
 
 /* counters for XidCache measurement */
@@ -352,7 +376,7 @@ static bool KnownAssignedXidExists(TransactionId xid);
 static void KnownAssignedXidsRemove(TransactionId xid);
 static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
 										TransactionId *subxids);
-static void KnownAssignedXidsRemovePreceding(TransactionId removeXid);
+static int	KnownAssignedXidsRemovePreceding(TransactionId removeXid);
 static int	KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
 static int	KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
 										   TransactionId *xmin,
@@ -369,6 +393,9 @@ static inline FullTransactionId FullXidRelativeTo(FullTransactionId rel,
 												  TransactionId xid);
 static void GlobalVisUpdateApply(ComputeXidHorizonsResult *horizons);
 
+static void WakeXidWaiters(TransactionId xid);
+static void WakeAllXidWaiters(void);
+
 /*
  * Report shared-memory space needed by ProcArrayShmemInit
  */
@@ -403,9 +430,12 @@ ProcArrayShmemSize(void)
 	{
 		size = add_size(size,
 						mul_size(sizeof(TransactionId),
-								 TOTAL_MAX_CACHED_SUBXIDS));
+								 2 * TOTAL_MAX_CACHED_SUBXIDS));
 		size = add_size(size,
 						mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
+		size = add_size(size,
+						hash_estimate_size(MaxBackends + NUM_AUXILIARY_PROCS,
+							sizeof(XidWaitEntry)));
 	}
 
 	return size;
@@ -458,6 +488,25 @@ ProcArrayShmemInit(void)
 			ShmemInitStruct("KnownAssignedXidsValid",
 							mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
 							&found);
+		KnownAssignedXidsToWakeup = (TransactionId *)
+			ShmemInitStruct("KnownAssignedXidsToWakeup",
+							mul_size(sizeof(TransactionId),
+									 TOTAL_MAX_CACHED_SUBXIDS),
+							&found);
+
+				/* Initialize XID waiter hash table for standby XID waiting */
+		{
+			HASHCTL info;
+
+			info.keysize = sizeof(TransactionId);
+			info.entrysize = sizeof(XidWaitEntry);
+
+			XidWaitHash = ShmemInitHash("XID Wait Hash",
+										MaxBackends + NUM_AUXILIARY_PROCS,
+										MaxBackends + NUM_AUXILIARY_PROCS,
+										&info,
+										HASH_ELEM | HASH_BLOBS | HASH_SHARED_MEM);
+		}
 	}
 }
 
@@ -1113,6 +1162,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
 			 * throw them away before we apply the recovery snapshot.
 			 */
 			KnownAssignedXidsReset();
+			WakeAllXidWaiters();
 			standbyState = STANDBY_INITIALIZED;
 		}
 		else
@@ -1370,6 +1420,10 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
 		procArray->lastOverflowedXid = max_xid;
 
 	LWLockRelease(ProcArrayLock);
+
+	/* Wake up waiters for expired subtransactions */
+	for (i = 0; i < nsubxids; i++)
+		WakeXidWaiters(subxids[i]);
 }
 
 /*
@@ -4450,6 +4504,11 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
 	TransamVariables->xactCompletionCount++;
 
 	LWLockRelease(ProcArrayLock);
+
+	/* Wake up per-XID waiters */
+	WakeXidWaiters(xid);
+	for (int i = 0; i < nsubxids; i++)
+		WakeXidWaiters(subxids[i]);
 }
 
 /*
@@ -4462,7 +4521,7 @@ ExpireAllKnownAssignedTransactionIds(void)
 	FullTransactionId latestXid;
 
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-	KnownAssignedXidsRemovePreceding(InvalidTransactionId);
+	(void) KnownAssignedXidsRemovePreceding(InvalidTransactionId);
 
 	/* Reset latestCompletedXid to nextXid - 1 */
 	Assert(FullTransactionIdIsValid(TransamVariables->nextXid));
@@ -4483,6 +4542,9 @@ ExpireAllKnownAssignedTransactionIds(void)
 	 */
 	procArray->lastOverflowedXid = InvalidTransactionId;
 	LWLockRelease(ProcArrayLock);
+
+	/* Wake all XID waiters since all transactions are being expired */
+	WakeAllXidWaiters();
 }
 
 /*
@@ -4494,6 +4556,8 @@ void
 ExpireOldKnownAssignedTransactionIds(TransactionId xid)
 {
 	TransactionId latestXid;
+	int			i;
+	int			count;
 
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
@@ -4513,8 +4577,12 @@ ExpireOldKnownAssignedTransactionIds(TransactionId xid)
 	 */
 	if (TransactionIdPrecedes(procArray->lastOverflowedXid, xid))
 		procArray->lastOverflowedXid = InvalidTransactionId;
-	KnownAssignedXidsRemovePreceding(xid);
+	count = KnownAssignedXidsRemovePreceding(xid);
 	LWLockRelease(ProcArrayLock);
+
+	/* Wake XID waiters that have expired transactions they're waiting for */
+	for (i = 0; i < count; i++)
+		WakeXidWaiters(KnownAssignedXidsToWakeup[i]);
 }
 
 /*
@@ -4991,9 +5059,11 @@ KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
  * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
  * then clear the whole table.
  *
+ * Returns the number of XIDs removed.
+ *
  * Caller must hold ProcArrayLock in exclusive mode.
  */
-static void
+static int
 KnownAssignedXidsRemovePreceding(TransactionId removeXid)
 {
 	ProcArrayStruct *pArray = procArray;
@@ -5005,9 +5075,10 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid)
 	if (!TransactionIdIsValid(removeXid))
 	{
 		elog(DEBUG4, "removing all KnownAssignedXids");
+		count = pArray->numKnownAssignedXids;
 		pArray->numKnownAssignedXids = 0;
 		pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0;
-		return;
+		return count;
 	}
 
 	elog(DEBUG4, "prune KnownAssignedXids to %u", removeXid);
@@ -5031,6 +5102,7 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid)
 			if (!StandbyTransactionIdIsPrepared(knownXid))
 			{
 				KnownAssignedXidsValid[i] = false;
+				KnownAssignedXidsToWakeup[count] = knownXid;
 				count++;
 			}
 		}
@@ -5060,6 +5132,8 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid)
 
 	/* Opportunistically compress the array */
 	KnownAssignedXidsCompress(KAX_PRUNE, true);
+
+	return count;
 }
 
 /*
@@ -5227,3 +5301,118 @@ KnownAssignedXidsReset(void)
 
 	LWLockRelease(ProcArrayLock);
 }
+
+/*
+ * Wait for XID completion using condition variables.
+ *
+ * This function implements efficient waiting for transaction completion
+ * on standby servers by using a hash table of condition variables keyed
+ * by transaction ID
+ *
+ * Note: This function is only meaningful during hot standby recovery.
+ * Primary servers should use the lock-based waiting mechanisms.
+ */
+void
+XidWaitOnStandby(TransactionId xid)
+{
+	XidWaitEntry *entry;
+	bool		found;
+	TransactionId wait_xid;
+
+	Assert(XidWaitHash);
+	Assert(TransactionIdIsValid(xid));
+	Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
+
+	/* Quick exit if transaction already complete */
+	if (!TransactionIdIsInProgress(xid))
+	{
+		return;
+	}
+
+	/* Always wait on the topmost transaction to avoid lost wake-ups */
+	wait_xid = SubTransGetTopmostTransaction(xid);
+
+	LWLockAcquire(XidWaitHashLock, LW_EXCLUSIVE);
+
+	entry = (XidWaitEntry *)
+		hash_search(XidWaitHash, &wait_xid, HASH_ENTER, &found);
+
+	if (!found)
+	{
+		/* Initialize new entry */
+		entry->xid = wait_xid;
+		ConditionVariableInit(&entry->cv);
+	}
+
+	LWLockRelease(XidWaitHashLock);
+
+	ConditionVariablePrepareToSleep(&entry->cv);
+
+	/* Wait loop with condition re-checking */
+	while (TransactionIdIsInProgress(xid))
+	{
+		ConditionVariableSleep(&entry->cv, WAIT_EVENT_XACT_COMPLETE);
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	ConditionVariableCancelSleep();
+}
+
+/*
+ * Wake waiters for a specific XID.
+ *
+ * This function is called when a transaction completes on the primary
+ * server and we need to wake up any standby processes that were waiting
+ * for that specific transaction ID.
+ *
+ * Uses the hash table to locate waiters for the specified XID and
+ * broadcasts on the associated condition variable to wake all waiting
+ * backends simultaneously.
+ */
+static void
+WakeXidWaiters(TransactionId xid)
+{
+	XidWaitEntry *entry;
+	bool		found;
+
+	Assert(XidWaitHash);
+
+	LWLockAcquire(XidWaitHashLock, LW_EXCLUSIVE);
+	entry = (XidWaitEntry *)
+		hash_search(XidWaitHash, &xid, HASH_FIND, &found);
+
+	if (found)
+	{
+		ConditionVariableBroadcast(&entry->cv);
+		hash_search(XidWaitHash, &xid, HASH_REMOVE, NULL);
+	}
+
+	LWLockRelease(XidWaitHashLock);
+}
+
+/*
+ * Wake all XID waiters.
+ *
+ * This function wakes up all backends waiting on any transaction ID.
+ * It is primarily used during standby promotion when the server is
+ * transitioning from recovery mode to normal operation, at which point
+ * all XID-based waiting becomes invalid.
+ */
+static void
+WakeAllXidWaiters(void)
+{
+	HASH_SEQ_STATUS status;
+	XidWaitEntry *entry;
+
+	Assert(XidWaitHash);
+
+	LWLockAcquire(XidWaitHashLock, LW_EXCLUSIVE);
+
+	hash_seq_init(&status, XidWaitHash);
+	while ((entry = (XidWaitEntry *) hash_seq_search(&status)) != NULL)
+	{
+		ConditionVariableBroadcast(&entry->cv);
+	}
+
+	LWLockRelease(XidWaitHashLock);
+}
\ No newline at end of file
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 4798eb79003..6c48e75f15c 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -668,6 +668,8 @@ XactLockTableWait(TransactionId xid, Relation rel, const ItemPointerData *ctid,
 	ErrorContextCallback callback;
 	bool		first = true;
 
+	Assert(!RecoveryInProgress());
+
 	/*
 	 * If an operation is specified, set up our verbose error context
 	 * callback.
@@ -718,7 +720,6 @@ XactLockTableWait(TransactionId xid, Relation rel, const ItemPointerData *ctid,
 		 */
 		if (!first)
 		{
-			CHECK_FOR_INTERRUPTS();
 			pg_usleep(1000L);
 		}
 		first = false;
@@ -741,6 +742,8 @@ ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure)
 	LOCKTAG		tag;
 	bool		first = true;
 
+	Assert(!RecoveryInProgress());
+
 	for (;;)
 	{
 		Assert(TransactionIdIsValid(xid));
@@ -761,7 +764,6 @@ ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure)
 		/* See XactLockTableWait about this case */
 		if (!first)
 		{
-			CHECK_FOR_INTERRUPTS();
 			pg_usleep(1000L);
 		}
 		first = false;
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 5427da5bc1b..c3bc115da30 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -159,6 +159,7 @@ SYNC_REP	"Waiting for confirmation from a remote server during synchronous repli
 WAL_RECEIVER_EXIT	"Waiting for the WAL receiver to exit."
 WAL_RECEIVER_WAIT_START	"Waiting for startup process to send initial data for streaming replication."
 WAL_SUMMARY_READY	"Waiting for a new WAL summary to be generated."
+XACT_COMPLETE	"Waiting for a transaction to complete."
 XACT_GROUP_UPDATE	"Waiting for the group leader to update transaction status at transaction end."
 
 ABI_compatibility:
@@ -352,6 +353,7 @@ DSMRegistry	"Waiting to read or update the dynamic shared memory registry."
 InjectionPoint	"Waiting to read or update information related to injection points."
 SerialControl	"Waiting to read or update shared <filename>pg_serial</filename> state."
 AioWorkerSubmissionQueue	"Waiting to access AIO worker submission queue."
+XidWaitHash	"Waiting to access XID wait hash table."
 
 #
 # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h
index 06a1ffd4b08..0a21ceaf04e 100644
--- a/src/include/storage/lwlocklist.h
+++ b/src/include/storage/lwlocklist.h
@@ -85,6 +85,7 @@ PG_LWLOCK(50, DSMRegistry)
 PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
 PG_LWLOCK(53, AioWorkerSubmissionQueue)
+PG_LWLOCK(54, XidWaitHash)
 
 /*
  * There also exist several built-in LWLock tranches.  As with the predefined
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 2f4ae06c279..350c1090de9 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -100,4 +100,6 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
 											TransactionId *catalog_xmin);
 
+extern void XidWaitOnStandby(TransactionId xid);
+
 #endif							/* PROCARRAY_H */
-- 
2.49.0

