From 66c509e07bcbaa4580b32266326e34487a16d683 Mon Sep 17 00:00:00 2001
From: alterego655 <824662526@qq.com>
Date: Tue, 16 Dec 2025 10:21:36 +0800
Subject: [PATCH v5 1/4] Extend xlogwait infrastructure with write and flush
 wait types

Add support for waiting on WAL write and flush LSNs in addition to the
existing replay LSN wait type. This provides the foundation for
extending the WAIT FOR command with MODE parameter.

Key changes:
- Add WAIT_LSN_TYPE_STANDBY_WRITE and WAIT_LSN_TYPE_STANDBY_FLUSH to WaitLSNType
- Add GetCurrentLSNForWaitType() to retrieve current LSN for each wait type
- Add new wait events WAIT_EVENT_WAIT_FOR_WAL_WRITE and
  WAIT_EVENT_WAIT_FOR_WAL_FLUSH for pg_stat_activity visibility
- Update WaitForLSN() to use GetCurrentLSNForWaitType() internally
---
 src/backend/access/transam/xlog.c             |  2 +-
 src/backend/access/transam/xlogrecovery.c     |  4 +-
 src/backend/access/transam/xlogwait.c         | 84 ++++++++++++++-----
 src/backend/commands/wait.c                   |  2 +-
 .../utils/activity/wait_event_names.txt       |  3 +-
 src/include/access/xlogwait.h                 | 12 ++-
 6 files changed, 80 insertions(+), 27 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6a5640df51a..a6e348f2109 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6241,7 +6241,7 @@ StartupXLOG(void)
 	 * Wake up all waiters for replay LSN.  They need to report an error that
 	 * recovery was ended before reaching the target LSN.
 	 */
-	WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, InvalidXLogRecPtr);
+	WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_REPLAY, InvalidXLogRecPtr);
 
 	/*
 	 * Shutdown the recovery environment.  This must occur after
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index ae2398d6975..01ffe30ffee 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1846,8 +1846,8 @@ PerformWalRecovery(void)
 			 */
 			if (waitLSNState &&
 				(XLogRecoveryCtl->lastReplayedEndRecPtr >=
-				 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_REPLAY])))
-				WaitLSNWakeup(WAIT_LSN_TYPE_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr);
+				 pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_REPLAY])))
+				WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_REPLAY, XLogRecoveryCtl->lastReplayedEndRecPtr);
 
 			/* Else, try to fetch the next WAL record */
 			record = ReadRecord(xlogprefetcher, LOG, false, replayTLI);
diff --git a/src/backend/access/transam/xlogwait.c b/src/backend/access/transam/xlogwait.c
index 6109381c0f0..726a4a14084 100644
--- a/src/backend/access/transam/xlogwait.c
+++ b/src/backend/access/transam/xlogwait.c
@@ -12,25 +12,30 @@
  *		This file implements waiting for WAL operations to reach specific LSNs
  *		on both physical standby and primary servers. The core idea is simple:
  *		every process that wants to wait publishes the LSN it needs to the
- *		shared memory, and the appropriate process (startup on standby, or
- *		WAL writer/backend on primary) wakes it once that LSN has been reached.
+ *		shared memory, and the appropriate process (startup on standby,
+ *		walreceiver on standby, or WAL writer/backend on primary) wakes it
+ *		once that LSN has been reached.
  *
  *		The shared memory used by this module comprises a procInfos
  *		per-backend array with the information of the awaited LSN for each
  *		of the backend processes.  The elements of that array are organized
- *		into a pairing heap waitersHeap, which allows for very fast finding
- *		of the least awaited LSN.
+ *		into pairing heaps (waitersHeap), one for each WaitLSNType, which
+ *		allows for very fast finding of the least awaited LSN for each type.
  *
- *		In addition, the least-awaited LSN is cached as minWaitedLSN.  The
- *		waiter process publishes information about itself to the shared
- *		memory and waits on the latch until it is woken up by the appropriate
- *		process, standby is promoted, or the postmaster	dies.  Then, it cleans
- *		information about itself in the shared memory.
+ *		In addition, the least-awaited LSN for each type is cached in the
+ *		minWaitedLSN array.  The waiter process publishes information about
+ *		itself to the shared memory and waits on the latch until it is woken
+ *		up by the appropriate process, standby is promoted, or the postmaster
+ *		dies.  Then, it cleans information about itself in the shared memory.
  *
- *		On standby servers: After replaying a WAL record, the startup process
- *		first performs a fast path check minWaitedLSN > replayLSN.  If this
- *		check is negative, it checks waitersHeap and wakes up the backend
- *		whose awaited LSNs are reached.
+ *		On standby servers:
+ *		- After replaying a WAL record, the startup process performs a fast
+ *		  path check minWaitedLSN[REPLAY] > replayLSN.  If this check is
+ *		  negative, it checks waitersHeap[REPLAY] and wakes up the backends
+ *		  whose awaited LSNs are reached.
+ *		- After receiving WAL, the walreceiver process performs similar checks
+ *		  against the flush and write LSNs, waking up waiters in the FLUSH
+ *		  and WRITE heaps respectively.
  *
  *		On primary servers: After flushing WAL, the WAL writer or backend
  *		process performs a similar check against the flush LSN and wakes up
@@ -49,6 +54,7 @@
 #include "access/xlogwait.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "replication/walreceiver.h"
 #include "storage/latch.h"
 #include "storage/proc.h"
 #include "storage/shmem.h"
@@ -62,6 +68,48 @@ static int	waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
 
 struct WaitLSNState *waitLSNState = NULL;
 
+/*
+ * Wait event for each WaitLSNType, used with WaitLatch() to report
+ * the wait in pg_stat_activity.
+ */
+static const uint32 WaitLSNWaitEvents[] = {
+	[WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
+	[WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
+	[WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+	[WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
+};
+
+StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
+				 "WaitLSNWaitEvents must match WaitLSNType enum");
+
+/*
+ * Get the current LSN for the specified wait type.
+ */
+XLogRecPtr
+GetCurrentLSNForWaitType(WaitLSNType lsnType)
+{
+	switch (lsnType)
+	{
+		case WAIT_LSN_TYPE_STANDBY_REPLAY:
+			return GetXLogReplayRecPtr(NULL);
+
+		case WAIT_LSN_TYPE_STANDBY_WRITE:
+			return GetWalRcvWriteRecPtr();
+
+		case WAIT_LSN_TYPE_STANDBY_FLUSH:
+			return GetWalRcvFlushRecPtr(NULL, NULL);
+
+		case WAIT_LSN_TYPE_PRIMARY_FLUSH:
+			return GetFlushRecPtr(NULL);
+
+		case WAIT_LSN_TYPE_COUNT:
+			break;
+	}
+
+	elog(ERROR, "invalid LSN wait type: %d", lsnType);
+	pg_unreachable();
+}
+
 /* Report the amount of shared memory space needed for WaitLSNState. */
 Size
 WaitLSNShmemSize(void)
@@ -341,13 +389,11 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
 		int			rc;
 		long		delay_ms = -1;
 
-		if (lsnType == WAIT_LSN_TYPE_REPLAY)
-			currentLSN = GetXLogReplayRecPtr(NULL);
-		else
-			currentLSN = GetFlushRecPtr(NULL);
+		/* Get current LSN for the wait type */
+		currentLSN = GetCurrentLSNForWaitType(lsnType);
 
 		/* Check that recovery is still in-progress */
-		if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
+		if (lsnType != WAIT_LSN_TYPE_PRIMARY_FLUSH && !RecoveryInProgress())
 		{
 			/*
 			 * Recovery was ended, but check if target LSN was already
@@ -376,7 +422,7 @@ WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
 		CHECK_FOR_INTERRUPTS();
 
 		rc = WaitLatch(MyLatch, wake_events, delay_ms,
-					   (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
+					   WaitLSNWaitEvents[lsnType]);
 
 		/*
 		 * Emergency bailout if postmaster has died.  This is to avoid the
diff --git a/src/backend/commands/wait.c b/src/backend/commands/wait.c
index a37bddaefb2..dd2570cb787 100644
--- a/src/backend/commands/wait.c
+++ b/src/backend/commands/wait.c
@@ -140,7 +140,7 @@ ExecWaitStmt(ParseState *pstate, WaitStmt *stmt, DestReceiver *dest)
 	 */
 	Assert(MyProc->xmin == InvalidTransactionId);
 
-	waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_REPLAY, lsn, timeout);
+	waitLSNResult = WaitForLSN(WAIT_LSN_TYPE_STANDBY_REPLAY, lsn, timeout);
 
 	/*
 	 * Process the result of WaitForLSN().  Throw appropriate error if needed.
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index c0632bf901a..05bd4376c67 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -89,8 +89,9 @@ LIBPQWALRECEIVER_CONNECT	"Waiting in WAL receiver to establish connection to rem
 LIBPQWALRECEIVER_RECEIVE	"Waiting in WAL receiver to receive data from remote server."
 SSL_OPEN_SERVER	"Waiting for SSL while attempting connection."
 WAIT_FOR_STANDBY_CONFIRMATION	"Waiting for WAL to be received and flushed by the physical standby."
-WAIT_FOR_WAL_FLUSH	"Waiting for WAL flush to reach a target LSN on a primary."
+WAIT_FOR_WAL_FLUSH	"Waiting for WAL flush to reach a target LSN on a primary or standby."
 WAIT_FOR_WAL_REPLAY	"Waiting for WAL replay to reach a target LSN on a standby."
+WAIT_FOR_WAL_WRITE	"Waiting for WAL write to reach a target LSN on a standby."
 WAL_SENDER_WAIT_FOR_WAL	"Waiting for WAL to be flushed in WAL sender process."
 WAL_SENDER_WRITE_DATA	"Waiting for any activity when processing replies from WAL receiver in WAL sender process."
 
diff --git a/src/include/access/xlogwait.h b/src/include/access/xlogwait.h
index 3e8fcbd9177..3b2f34b8698 100644
--- a/src/include/access/xlogwait.h
+++ b/src/include/access/xlogwait.h
@@ -35,11 +35,16 @@ typedef enum
  */
 typedef enum WaitLSNType
 {
-	WAIT_LSN_TYPE_REPLAY,		/* Waiting for replay on standby */
-	WAIT_LSN_TYPE_FLUSH,		/* Waiting for flush on primary */
+	/* Standby wait types (walreceiver/startup wakes) */
+	WAIT_LSN_TYPE_STANDBY_REPLAY,
+	WAIT_LSN_TYPE_STANDBY_WRITE,
+	WAIT_LSN_TYPE_STANDBY_FLUSH,
+
+	/* Primary wait types (WAL writer/backends wake) */
+	WAIT_LSN_TYPE_PRIMARY_FLUSH,
 } WaitLSNType;
 
-#define WAIT_LSN_TYPE_COUNT (WAIT_LSN_TYPE_FLUSH + 1)
+#define WAIT_LSN_TYPE_COUNT (WAIT_LSN_TYPE_PRIMARY_FLUSH + 1)
 
 /*
  * WaitLSNProcInfo - the shared memory structure representing information
@@ -97,6 +102,7 @@ extern PGDLLIMPORT WaitLSNState *waitLSNState;
 
 extern Size WaitLSNShmemSize(void);
 extern void WaitLSNShmemInit(void);
+extern XLogRecPtr GetCurrentLSNForWaitType(WaitLSNType lsnType);
 extern void WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN);
 extern void WaitLSNCleanup(void);
 extern WaitLSNResult WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN,
-- 
2.51.0

