From 90f84e9abf3366936c52e9f1ca491987a0df6a18 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Thu, 14 Mar 2024 07:08:17 +0000
Subject: [PATCH v14 2/4] Make XLogCtl->LogwrtResult accessible with atomics

Currently, access to LogwrtResult is protected by a spinlock.
This becomes severely contended in some scenarios, such as with a
largish replication flock: walsenders all calling GetFlushRecPtr
repeatedly cause the processor heat up to the point where eggs can
be fried on top.

This can be reduced to a non-problem by replacing
XLogCtl->LogwrtResult with a struct containing a pair of
atomically accessed variables. Do so.

In a few places, we can adjust the exact location where the locals
are updated to account for the fact that we no longer need the
spinlock.
---
 src/backend/access/transam/xlog.c | 89 +++++++++++++++++--------------
 src/tools/pgindent/typedefs.list  |  1 +
 2 files changed, 50 insertions(+), 40 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 20a5f86209..762f97b991 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -290,16 +290,13 @@ static bool doPageWrites;
  *
  * LogwrtRqst indicates a byte position that we need to write and/or fsync
  * the log up to (all records before that point must be written or fsynced).
- * LogwrtResult indicates the byte positions we have already written/fsynced.
- * These structs are identical but are declared separately to indicate their
- * slightly different functions.
+ * LogWrtResult indicates the byte positions we have already written/fsynced.
+ * These structs are similar but are declared separately to indicate their
+ * slightly different functions; in addition, the latter is read and written
+ * using atomic operations.
  *
- * To read XLogCtl->LogwrtResult, you must hold either info_lck or
- * WALWriteLock.  To update it, you need to hold both locks.  The point of
- * this arrangement is that the value can be examined by code that already
- * holds WALWriteLock without needing to grab info_lck as well.  In addition
- * to the shared variable, each backend has a private copy of LogwrtResult,
- * which is updated when convenient.
+ * In addition to the shared variable, each backend has a private copy of
+ * LogwrtResult, which is updated when convenient.
  *
  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
  * (protected by info_lck), but we don't need to cache any copies of it.
@@ -322,6 +319,12 @@ static bool doPageWrites;
  *----------
  */
 
+typedef struct XLogwrtAtomic
+{
+	pg_atomic_uint64 Write;		/* last byte + 1 written out */
+	pg_atomic_uint64 Flush;		/* last byte + 1 flushed */
+} XLogwrtAtomic;
+
 typedef struct XLogwrtRqst
 {
 	XLogRecPtr	Write;			/* last byte + 1 to write out */
@@ -334,6 +337,13 @@ typedef struct XLogwrtResult
 	XLogRecPtr	Flush;			/* last byte + 1 flushed */
 } XLogwrtResult;
 
+/*
+ * Update local copy of shared XLogCtl->LogwrtResult.
+ */
+#define XLogUpdateLocalLogWrtResult() \
+		LogwrtResult.Write = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Write); \
+		LogwrtResult.Flush = pg_atomic_read_membarrier_u64(&XLogCtl->LogwrtResult.Flush);
+
 /*
  * Inserting to WAL is protected by a small fixed number of WAL insertion
  * locks. To insert to the WAL, you must hold one of the locks - it doesn't
@@ -457,6 +467,8 @@ typedef struct XLogCtlData
 {
 	XLogCtlInsert Insert;
 
+	XLogwrtAtomic LogwrtResult;
+
 	/* Protected by info_lck: */
 	XLogwrtRqst LogwrtRqst;
 	XLogRecPtr	RedoRecPtr;		/* a recent copy of Insert->RedoRecPtr */
@@ -473,12 +485,6 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
-	XLogwrtResult LogwrtResult;
-
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
 	 *
@@ -608,7 +614,7 @@ static ControlFileData *ControlFile = NULL;
 static int	UsableBytesInSegment;
 
 /*
- * Private, possibly out-of-date copy of shared LogwrtResult.
+ * Private, possibly out-of-date copy of shared XLogCtl->LogwrtResult.
  * See discussion above.
  */
 static XLogwrtResult LogwrtResult = {0, 0};
@@ -958,9 +964,8 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* advance global request to include new block(s) */
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
-		/* update local result copy while I have the chance */
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
+		XLogUpdateLocalLogWrtResult();
 	}
 
 	/*
@@ -1977,12 +1982,12 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 			if (opportunistic)
 				break;
 
-			/* Before waiting, get info_lck and update LogwrtResult */
+			/* Advance shared memory write request position */
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			LogwrtResult = XLogCtl->LogwrtResult;
 			SpinLockRelease(&XLogCtl->info_lck);
+			XLogUpdateLocalLogWrtResult();
 
 			/*
 			 * Now that we have an up-to-date LogwrtResult value, see if we
@@ -2002,7 +2007,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
-				LogwrtResult = XLogCtl->LogwrtResult;
+				XLogUpdateLocalLogWrtResult();
 				if (LogwrtResult.Write >= OldPageRqstPtr)
 				{
 					/* OK, someone wrote it already */
@@ -2286,7 +2291,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	/*
 	 * Update local LogwrtResult (caller probably did this already, but...)
 	 */
-	LogwrtResult = XLogCtl->LogwrtResult;
+	XLogUpdateLocalLogWrtResult();
 
 	/*
 	 * Since successive pages in the xlog cache are consecutively allocated,
@@ -2546,12 +2551,16 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	 */
 	{
 		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->LogwrtResult = LogwrtResult;
 		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
 			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
 		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
 			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
 		SpinLockRelease(&XLogCtl->info_lck);
+
+		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write,
+										LogwrtResult.Write);
+		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush,
+										LogwrtResult.Flush);
 	}
 }
 
@@ -2569,12 +2578,12 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	prevAsyncXactLSN;
 
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	sleeping = XLogCtl->WalWriterSleeping;
 	prevAsyncXactLSN = XLogCtl->asyncXactLSN;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
 		XLogCtl->asyncXactLSN = asyncXactLSN;
 	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogWrtResult();
 
 	/*
 	 * If somebody else already called this function with a more aggressive
@@ -2781,8 +2790,8 @@ XLogFlush(XLogRecPtr record)
 		SpinLockAcquire(&XLogCtl->info_lck);
 		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
 			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
+		XLogUpdateLocalLogWrtResult();
 
 		/* done already? */
 		if (record <= LogwrtResult.Flush)
@@ -2812,7 +2821,7 @@ XLogFlush(XLogRecPtr record)
 		}
 
 		/* Got the lock; recheck whether request is satisfied */
-		LogwrtResult = XLogCtl->LogwrtResult;
+		XLogUpdateLocalLogWrtResult();
 		if (record <= LogwrtResult.Flush)
 		{
 			LWLockRelease(WALWriteLock);
@@ -2936,9 +2945,9 @@ XLogBackgroundFlush(void)
 
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	WriteRqst = XLogCtl->LogwrtRqst;
 	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogWrtResult();
 
 	/* back off to last completed page boundary */
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
@@ -3024,7 +3033,7 @@ XLogBackgroundFlush(void)
 	/* now wait for any in-progress insertions to finish and get write lock */
 	WaitXLogInsertionsToFinish(WriteRqst.Write);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	XLogUpdateLocalLogWrtResult();
 	if (WriteRqst.Write > LogwrtResult.Write ||
 		WriteRqst.Flush > LogwrtResult.Flush)
 	{
@@ -3112,9 +3121,7 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogWrtResult();
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -4907,6 +4914,9 @@ XLOGShmemInit(void)
 	XLogCtl->InstallXLogFileSegmentActive = false;
 	XLogCtl->WalWriterSleeping = false;
 
+	pg_atomic_init_u64(&XLogCtl->LogwrtResult.Write, InvalidXLogRecPtr);
+	pg_atomic_init_u64(&XLogCtl->LogwrtResult.Flush, InvalidXLogRecPtr);
+
 	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
 	SpinLockInit(&XLogCtl->info_lck);
 	pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
@@ -5925,10 +5935,13 @@ StartupXLOG(void)
 		XLogCtl->InitializedUpTo = EndOfLog;
 	}
 
+	/*
+	 * Update local and shared status. This is OK to do without any locks
+	 * because no other process can be reading or writing WAL yet.
+	 */
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
-
-	XLogCtl->LogwrtResult = LogwrtResult;
-
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog);
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog);
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
 
@@ -6367,9 +6380,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 {
 	Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
 
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogWrtResult();
 
 	/*
 	 * If we're writing and flushing WAL, the time line can't be changing, so
@@ -9281,9 +9292,7 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	XLogUpdateLocalLogWrtResult();
 
 	return LogwrtResult.Write;
 }
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index aa7a25b8f8..8a9471cc57 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -3132,6 +3132,7 @@ XLogRedoAction
 XLogSegNo
 XLogSource
 XLogStats
+XLogwrtAtomic
 XLogwrtResult
 XLogwrtRqst
 XPV
-- 
2.34.1

