Hi, On Fri, Aug 8, 2025 at 7:06 PM Xuneng Zhou <[email protected]> wrote: > > Hi, Tom! > > Thanks for looking at this. > > On Fri, Aug 8, 2025 at 2:20 AM Tom Lane <[email protected]> wrote: > > > > Xuneng Zhou <[email protected]> writes: > > > V9 replaces the original partitioned xid-wait htab with a single, > > > unified one, reflecting the modest entry count and rare contention for > > > waiting. To prevent possible races when multiple backends wait on the > > > same XID for the first time in XidWaitOnStandby, a dedicated lock has > > > been added to protect the hash table. > > > > This seems like adding quite a lot of extremely subtle code in > > order to solve a very small problem. I thought the v1 patch > > was about the right amount of complexity. > > Yeah, this patch is indeed complex, and the complexity might not be > well-justified—given the current use cases, it feels like we’re paying > a lot for very little. TBH, getting the balance right between > efficiency gains and cost, in terms of both code complexity and > runtime overhead, is beyond my current ability here, since I’m > touching many parts of the code for the first time. Every time I > thought I’d figured it out, new subtleties surfaced—though I’ve > learned a lot from the exploration and hacking. We may agree on the > necessity of fixing this issue, but not yet on how to fix it. I’m open > to discussion and suggestions. >
Some changes in v10: 1) XidWaitHashLock is used for all operations on XidWaitHash though might be unnecessary for some cases. 2) Field pg_atomic_uint32 waiter_count was removed from the XidWaitEntry. The start process now takes charge of cleaning up the XidWaitHash entry after waking up processes. 3) pg_atomic_uint32 xidWaiterNum is added to avoid unnecessary lock acquire & release and htab look-up while there's no xid waiting. Hope this could eliminate some subtleties. Exponential backoff in earlier patches is simple and effective for alleviating cpu overhead in extended waiting; however it could also bring unwanted latency for more sensitive use cases like logical walsender on cascading standbys. Unfortunately, I am unable to come up with a solution that is correct, effective and simple in all cases. Best, Xuneng
From a845e40c3aaea70e572cf12bfc0cbe482ff1e9c7 Mon Sep 17 00:00:00 2001 From: alterego665 <[email protected]> Date: Fri, 29 Aug 2025 14:55:20 +0800 Subject: [PATCH v10] Optimize transaction waiting during logical decoding on hot standby servers. On hot standby servers, XactLockTableWait falls back to polling TransactionIdIsInProgress() with fixed 1ms sleeps when transactions from the primary have no local lock table entries, causing excessive CPU usage. Introduce a new waiting waiting path for hot-standby that uses the existing KnownAssignedXids infrastructure combined with a condition variable to replace the use of XactLockTableWait in standby, eliminating the polling loop. --- src/backend/replication/logical/snapbuild.c | 10 +- src/backend/storage/ipc/procarray.c | 223 +++++++++++++++++- src/backend/storage/lmgr/lmgr.c | 6 +- .../utils/activity/wait_event_names.txt | 2 + src/include/storage/lwlocklist.h | 1 + src/include/storage/procarray.h | 2 + 6 files changed, 235 insertions(+), 9 deletions(-) diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 98ddee20929..25a7733f770 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1451,7 +1451,15 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff) if (TransactionIdFollows(xid, cutoff)) continue; - XactLockTableWait(xid, NULL, NULL, XLTW_None); + /* + * In primary, we use XactLockTableWait to wait for the transaction to finish. + * In standby, since xact lock table is not maintained, we use XidWaitOnStandby + * to accomplish the same. + */ + if (!RecoveryInProgress()) + XactLockTableWait(xid, NULL, NULL, XLTW_None); + else + XidWaitOnStandby(xid); } /* diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index bf987aed8d3..ee732e7fd86 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -62,6 +62,7 @@ #include "storage/procarray.h" #include "utils/acl.h" #include "utils/builtins.h" +#include "utils/hsearch.h" #include "utils/rel.h" #include "utils/snapmgr.h" @@ -282,6 +283,11 @@ static TransactionId *KnownAssignedXids; static bool *KnownAssignedXidsValid; static TransactionId latestObservedXid = InvalidTransactionId; +/* + * Array of XIDs to wake up on standby + */ +static TransactionId *KnownAssignedXidsToWakeup; + /* * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is * the highest xid that might still be running that we don't have in @@ -306,6 +312,31 @@ static GlobalVisState GlobalVisTempRels; */ static TransactionId ComputeXidHorizonsResultLastXmin; + +/* + * Atomic counter tracking the number of entries in XidWaitHash. + * Used for lock-free early exit optimization in WakeXidWaiters(). + */ +static pg_atomic_uint32 xidWaitHashEntries; + +/* + * Hash table entry for per-XID waiting on standby servers. + */ +typedef struct XidWaitEntry +{ + TransactionId xid; /* transaction ID being waited for */ + ConditionVariable cv; /* condition variable for this XID */ +} XidWaitEntry; + +/* + * Global hash table for XID waiting. + * + * This hash table maps transaction IDs to XidWaitEntry structures, + * enabling efficient per-XID waiting during hot standby recovery. + * XidWaitHash is a unified shared hash table protected by a single LWLock. + */ +static HTAB *XidWaitHash = NULL; + #ifdef XIDCACHE_DEBUG /* counters for XidCache measurement */ @@ -352,7 +383,7 @@ static bool KnownAssignedXidExists(TransactionId xid); static void KnownAssignedXidsRemove(TransactionId xid); static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids, TransactionId *subxids); -static void KnownAssignedXidsRemovePreceding(TransactionId removeXid); +static int KnownAssignedXidsRemovePreceding(TransactionId removeXid); static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax); static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, @@ -369,6 +400,9 @@ static inline FullTransactionId FullXidRelativeTo(FullTransactionId rel, TransactionId xid); static void GlobalVisUpdateApply(ComputeXidHorizonsResult *horizons); +static void WakeXidWaiters(TransactionId xid); +static void WakeAllXidWaiters(void); + /* * Report shared-memory space needed by ProcArrayShmemInit */ @@ -403,9 +437,12 @@ ProcArrayShmemSize(void) { size = add_size(size, mul_size(sizeof(TransactionId), - TOTAL_MAX_CACHED_SUBXIDS)); + 2 * TOTAL_MAX_CACHED_SUBXIDS)); size = add_size(size, mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS)); + size = add_size(size, + hash_estimate_size(MaxBackends + NUM_AUXILIARY_PROCS, + sizeof(XidWaitEntry))); } return size; @@ -458,6 +495,31 @@ ProcArrayShmemInit(void) ShmemInitStruct("KnownAssignedXidsValid", mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS), &found); + KnownAssignedXidsToWakeup = (TransactionId *) + ShmemInitStruct("KnownAssignedXidsToWakeup", + mul_size(sizeof(TransactionId), + TOTAL_MAX_CACHED_SUBXIDS), + &found); + + /* Initialize XID waiter hash table for standby XID waiting */ + { + HASHCTL info; + + info.keysize = sizeof(TransactionId); + info.entrysize = sizeof(XidWaitEntry); + + XidWaitHash = ShmemInitHash("XID Wait Hash", + MaxBackends + NUM_AUXILIARY_PROCS, + MaxBackends + NUM_AUXILIARY_PROCS, + &info, + HASH_ELEM | HASH_BLOBS | HASH_SHARED_MEM); + } + + /* Initialize atomic counter for XID waiters optimization */ + if (!found) + { + pg_atomic_init_u32(&xidWaitHashEntries, 0); + } } } @@ -1113,6 +1175,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) * throw them away before we apply the recovery snapshot. */ KnownAssignedXidsReset(); + WakeAllXidWaiters(); standbyState = STANDBY_INITIALIZED; } else @@ -1370,6 +1433,10 @@ ProcArrayApplyXidAssignment(TransactionId topxid, procArray->lastOverflowedXid = max_xid; LWLockRelease(ProcArrayLock); + + /* Wake up waiters for expired subtransactions */ + for (i = 0; i < nsubxids; i++) + WakeXidWaiters(subxids[i]); } /* @@ -4450,6 +4517,11 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids, TransamVariables->xactCompletionCount++; LWLockRelease(ProcArrayLock); + + /* Wake up per-XID waiters */ + WakeXidWaiters(xid); + for (int i = 0; i < nsubxids; i++) + WakeXidWaiters(subxids[i]); } /* @@ -4462,7 +4534,7 @@ ExpireAllKnownAssignedTransactionIds(void) FullTransactionId latestXid; LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - KnownAssignedXidsRemovePreceding(InvalidTransactionId); + (void) KnownAssignedXidsRemovePreceding(InvalidTransactionId); /* Reset latestCompletedXid to nextXid - 1 */ Assert(FullTransactionIdIsValid(TransamVariables->nextXid)); @@ -4483,6 +4555,9 @@ ExpireAllKnownAssignedTransactionIds(void) */ procArray->lastOverflowedXid = InvalidTransactionId; LWLockRelease(ProcArrayLock); + + /* Wake all XID waiters since all transactions are being expired */ + WakeAllXidWaiters(); } /* @@ -4494,6 +4569,8 @@ void ExpireOldKnownAssignedTransactionIds(TransactionId xid) { TransactionId latestXid; + int i; + int count; LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); @@ -4513,8 +4590,12 @@ ExpireOldKnownAssignedTransactionIds(TransactionId xid) */ if (TransactionIdPrecedes(procArray->lastOverflowedXid, xid)) procArray->lastOverflowedXid = InvalidTransactionId; - KnownAssignedXidsRemovePreceding(xid); + count = KnownAssignedXidsRemovePreceding(xid); LWLockRelease(ProcArrayLock); + + /* Wake XID waiters that have expired transactions they're waiting for */ + for (i = 0; i < count; i++) + WakeXidWaiters(KnownAssignedXidsToWakeup[i]); } /* @@ -4991,9 +5072,11 @@ KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids, * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid * then clear the whole table. * + * Returns the number of XIDs removed. + * * Caller must hold ProcArrayLock in exclusive mode. */ -static void +static int KnownAssignedXidsRemovePreceding(TransactionId removeXid) { ProcArrayStruct *pArray = procArray; @@ -5005,9 +5088,10 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid) if (!TransactionIdIsValid(removeXid)) { elog(DEBUG4, "removing all KnownAssignedXids"); + count = pArray->numKnownAssignedXids; pArray->numKnownAssignedXids = 0; pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0; - return; + return count; } elog(DEBUG4, "prune KnownAssignedXids to %u", removeXid); @@ -5031,6 +5115,7 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid) if (!StandbyTransactionIdIsPrepared(knownXid)) { KnownAssignedXidsValid[i] = false; + KnownAssignedXidsToWakeup[count] = knownXid; count++; } } @@ -5060,6 +5145,8 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid) /* Opportunistically compress the array */ KnownAssignedXidsCompress(KAX_PRUNE, true); + + return count; } /* @@ -5227,3 +5314,127 @@ KnownAssignedXidsReset(void) LWLockRelease(ProcArrayLock); } + +/* + * Wait for XID completion using condition variables. + * + * This function implements efficient waiting for transaction completion + * on standby servers by using a hash table of condition variables keyed + * by transaction ID + * + * Note: This function is only meaningful during hot standby recovery. + * Primary servers should use the lock-based waiting mechanisms. + */ +void +XidWaitOnStandby(TransactionId xid) +{ + XidWaitEntry *entry; + bool found; + TransactionId wait_xid; + + Assert(XidWaitHash); + Assert(TransactionIdIsValid(xid)); + Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny())); + + /* Quick exit if transaction already complete */ + if (!TransactionIdIsInProgress(xid)) + return; + + /* Always wait on the topmost transaction to avoid lost wake-ups */ + wait_xid = SubTransGetTopmostTransaction(xid); + + LWLockAcquire(XidWaitHashLock, LW_EXCLUSIVE); + + entry = (XidWaitEntry *) + hash_search(XidWaitHash, &wait_xid, HASH_ENTER, &found); + + if (!found) + { + /* Initialize new entry */ + entry->xid = wait_xid; + ConditionVariableInit(&entry->cv); + pg_atomic_fetch_add_u32(&xidWaitHashEntries, 1); + } + + LWLockRelease(XidWaitHashLock); + + ConditionVariablePrepareToSleep(&entry->cv); + + /* Wait loop with condition re-checking */ + while (TransactionIdIsInProgress(xid)) + { + ConditionVariableSleep(&entry->cv, WAIT_EVENT_XACT_COMPLETE); + CHECK_FOR_INTERRUPTS(); + } + + ConditionVariableCancelSleep(); +} + +/* + * Wake waiters for a specific XID. + * + * This function is called when a transaction completes on the primary + * server and we need to wake up any standby processes that were waiting + * for that specific transaction ID. + * + * Uses the hash table to locate waiters for the specified XID and + * broadcasts on the associated condition variable to wake all waiting + * backends simultaneously. + */ +static void +WakeXidWaiters(TransactionId xid) +{ + XidWaitEntry *entry; + bool found; + + Assert(XidWaitHash); + + /* Early exit optimization: no waiters to wake */ + if (pg_atomic_read_u32(&xidWaitHashEntries) == 0) + return; + + LWLockAcquire(XidWaitHashLock, LW_EXCLUSIVE); + entry = (XidWaitEntry *) + hash_search(XidWaitHash, &xid, HASH_FIND, &found); + + if (found) + { + ConditionVariableBroadcast(&entry->cv); + hash_search(XidWaitHash, &xid, HASH_REMOVE, NULL); + pg_atomic_fetch_sub_u32(&xidWaitHashEntries, 1); + } + + LWLockRelease(XidWaitHashLock); +} + +/* + * Wake all XID waiters. + * + * This function wakes up all backends waiting on any transaction ID. + * It is primarily used during standby promotion when the server is + * transitioning from recovery mode to normal operation, at which point + * all XID-based waiting becomes invalid. + */ +static void +WakeAllXidWaiters(void) +{ + HASH_SEQ_STATUS status; + XidWaitEntry *entry; + + Assert(XidWaitHash); + + /* Early exit optimization: no waiters to wake */ + if (pg_atomic_read_u32(&xidWaitHashEntries) == 0) + return; + + LWLockAcquire(XidWaitHashLock, LW_EXCLUSIVE); + + hash_seq_init(&status, XidWaitHash); + while ((entry = (XidWaitEntry *) hash_seq_search(&status)) != NULL) + ConditionVariableBroadcast(&entry->cv); + + /* Reset counter since we're waking all waiters */ + pg_atomic_write_u32(&xidWaitHashEntries, 0); + + LWLockRelease(XidWaitHashLock); +} \ No newline at end of file diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index 3f6bf70bd3c..536b1542a09 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -668,6 +668,8 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, ErrorContextCallback callback; bool first = true; + Assert(!RecoveryInProgress()); + /* * If an operation is specified, set up our verbose error context * callback. @@ -718,7 +720,6 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid, */ if (!first) { - CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); } first = false; @@ -741,6 +742,8 @@ ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure) LOCKTAG tag; bool first = true; + Assert(!RecoveryInProgress()); + for (;;) { Assert(TransactionIdIsValid(xid)); @@ -761,7 +764,6 @@ ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure) /* See XactLockTableWait about this case */ if (!first) { - CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); } first = false; diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 5427da5bc1b..c3bc115da30 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -159,6 +159,7 @@ SYNC_REP "Waiting for confirmation from a remote server during synchronous repli WAL_RECEIVER_EXIT "Waiting for the WAL receiver to exit." WAL_RECEIVER_WAIT_START "Waiting for startup process to send initial data for streaming replication." WAL_SUMMARY_READY "Waiting for a new WAL summary to be generated." +XACT_COMPLETE "Waiting for a transaction to complete." XACT_GROUP_UPDATE "Waiting for the group leader to update transaction status at transaction end." ABI_compatibility: @@ -352,6 +353,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry." InjectionPoint "Waiting to read or update information related to injection points." SerialControl "Waiting to read or update shared <filename>pg_serial</filename> state." AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." +XidWaitHash "Waiting to access XID wait hash table." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index 06a1ffd4b08..0a21ceaf04e 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -85,6 +85,7 @@ PG_LWLOCK(50, DSMRegistry) PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) +PG_LWLOCK(54, XidWaitHash) /* * There also exist several built-in LWLock tranches. As with the predefined diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 2f4ae06c279..350c1090de9 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -100,4 +100,6 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin, extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin, TransactionId *catalog_xmin); +extern void XidWaitOnStandby(TransactionId xid); + #endif /* PROCARRAY_H */ -- 2.49.0
