On 2022-Mar-22, Tom Lane wrote:

> I looked briefly at 0001, and I've got to say that I disagree with
> your decision to rearrange the representation of the local LogwrtResult
> copy.  It clutters the patch tremendously and makes it hard to
> understand what the actual functional change is.  Moreover, I'm
> not entirely convinced that it's a notational improvement in the
> first place.
> 
> Perhaps it'd help if you split 0001 into two steps, one to do the
> mechanical change of the representation and then a second patch that
> converts the shared variable to atomics.  Since you've moved around
> the places that read the shared variable, that part is subtler than
> one could wish and really needs to be studied on its own.

Hmm, I did it the other way around: first change to use atomics, then
the mechanical change.  I think that makes the usefulness of the change
more visible, because before the atomics use the use of the combined
struct as a unit remains sensible.

-- 
Álvaro Herrera        Breisgau, Deutschland  —  https://www.EnterpriseDB.com/
Officer Krupke, what are we to do?
Gee, officer Krupke, Krup you! (West Side Story, "Gee, Officer Krupke")
>From dd9b53878faeedba921ea7027e98ddbb433e8647 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Tue, 2 Feb 2021 14:03:43 -0300
Subject: [PATCH v7 1/3] Change XLogCtl->LogwrtResult to use atomic ops
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Currently, access to LogwrtResult is protected by a spinlock.  This
becomes severely contended in some scenarios, such as with a largish
replication flock: walsenders all calling GetFlushRecPtr repeatedly
cause the processor heat up to the point where eggs can be fried on top.

This can be reduced to a non-problem by replacing XLogCtl->LogwrtResult
with a struct containing a pair of atomically accessed variables.

In addition, this commit splits the process-local copy of these values
(kept in the freestanding LogwrtResult struct) into two separate
LogWriteResult and LogFlushResult.  This is not strictly necessary, but
it makes it clearer that these are updated separately, each on their own
schedule.

Author: Álvaro Herrera <alvhe...@alvh.no-ip.org>
Discussion: https://postgr.es/m/20200831182156.GA3983@alvherre.pgsql
---
 src/backend/access/transam/xlog.c | 85 +++++++++++++++----------------
 src/include/port/atomics.h        | 29 +++++++++++
 src/tools/pgindent/typedefs.list  |  1 +
 3 files changed, 72 insertions(+), 43 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 4ac3871c74..6f2eb494fe 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -285,16 +285,13 @@ static bool doPageWrites;
  *
  * LogwrtRqst indicates a byte position that we need to write and/or fsync
  * the log up to (all records before that point must be written or fsynced).
- * LogwrtResult indicates the byte positions we have already written/fsynced.
- * These structs are identical but are declared separately to indicate their
- * slightly different functions.
+ * LogWrtResult indicates the byte positions we have already written/fsynced.
+ * These structs are similar but are declared separately to indicate their
+ * slightly different functions; in addition, the latter is read and written
+ * using atomic operations.
  *
- * To read XLogCtl->LogwrtResult, you must hold either info_lck or
- * WALWriteLock.  To update it, you need to hold both locks.  The point of
- * this arrangement is that the value can be examined by code that already
- * holds WALWriteLock without needing to grab info_lck as well.  In addition
- * to the shared variable, each backend has a private copy of LogwrtResult,
- * which is updated when convenient.
+ * In addition to the shared variable, each backend has a private copy of
+ * LogwrtResult, each member of which is separately updated when convenient.
  *
  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
  * (protected by info_lck), but we don't need to cache any copies of it.
@@ -317,6 +314,12 @@ static bool doPageWrites;
  *----------
  */
 
+typedef struct XLogwrtAtomic
+{
+	pg_atomic_uint64 Write;		/* last byte + 1 written out */
+	pg_atomic_uint64 Flush;		/* last byte + 1 flushed */
+} XLogwrtAtomic;
+
 typedef struct XLogwrtRqst
 {
 	XLogRecPtr	Write;			/* last byte + 1 to write out */
@@ -480,6 +483,7 @@ typedef struct XLogCtlData
 {
 	XLogCtlInsert Insert;
 
+	XLogwrtAtomic LogwrtResult; /* uses atomics */
 	/* Protected by info_lck: */
 	XLogwrtRqst LogwrtRqst;
 	XLogRecPtr	RedoRecPtr;		/* a recent copy of Insert->RedoRecPtr */
@@ -497,12 +501,6 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
-	XLogwrtResult LogwrtResult;
-
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
 	 *
@@ -622,7 +620,7 @@ static ControlFileData *ControlFile = NULL;
 static int	UsableBytesInSegment;
 
 /*
- * Private, possibly out-of-date copy of shared LogwrtResult.
+ * Private, possibly out-of-date copy of shared XLogCtl->LogwrtResult.
  * See discussion above.
  */
 static XLogwrtResult LogwrtResult = {0, 0};
@@ -932,8 +930,6 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* advance global request to include new block(s) */
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
-		/* update local result copy while I have the chance */
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
 	}
 
@@ -1811,6 +1807,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 	 * Now that we have the lock, check if someone initialized the page
 	 * already.
 	 */
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 	while (upto >= XLogCtl->InitializedUpTo || opportunistic)
 	{
 		nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
@@ -1830,17 +1827,17 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 			if (opportunistic)
 				break;
 
-			/* Before waiting, get info_lck and update LogwrtResult */
+			/* Advance shared memory write request position */
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			LogwrtResult = XLogCtl->LogwrtResult;
 			SpinLockRelease(&XLogCtl->info_lck);
 
 			/*
-			 * Now that we have an up-to-date LogwrtResult value, see if we
-			 * still need to write it or if someone else already did.
+			 * Before waiting, update LogwrtResult.Write and see if we still need
+			 * to write it or if someone else already did.
 			 */
+			LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 			if (LogwrtResult.Write < OldPageRqstPtr)
 			{
 				/*
@@ -1855,7 +1852,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
-				LogwrtResult = XLogCtl->LogwrtResult;
+				LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 				if (LogwrtResult.Write >= OldPageRqstPtr)
 				{
 					/* OK, someone wrote it already */
@@ -2101,7 +2098,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	/*
 	 * Update local LogwrtResult (caller probably did this already, but...)
 	 */
-	LogwrtResult = XLogCtl->LogwrtResult;
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
 	/*
 	 * Since successive pages in the xlog cache are consecutively allocated,
@@ -2316,6 +2313,9 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 
 	Assert(npages == 0);
 
+	/* Publish current write result position */
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write, LogwrtResult.Write);
+
 	/*
 	 * If asked to flush, do so
 	 */
@@ -2352,16 +2352,16 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		LogwrtResult.Flush = LogwrtResult.Write;
 	}
 
+	/* Publish current flush result position */
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush, LogwrtResult.Flush);
+
 	/*
-	 * Update shared-memory status
-	 *
-	 * We make sure that the shared 'request' values do not fall behind the
+	 * Make sure that the shared 'request' values do not fall behind the
 	 * 'result' values.  This is not absolutely essential, but it saves some
 	 * code in a couple of places.
 	 */
 	{
 		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->LogwrtResult = LogwrtResult;
 		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
 			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
 		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
@@ -2381,8 +2381,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	WriteRqstPtr = asyncXactLSN;
 	bool		sleeping;
 
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	sleeping = XLogCtl->WalWriterSleeping;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
 		XLogCtl->asyncXactLSN = asyncXactLSN;
@@ -2587,10 +2587,10 @@ XLogFlush(XLogRecPtr record)
 		SpinLockAcquire(&XLogCtl->info_lck);
 		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
 			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
 
 		/* done already? */
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 		if (record <= LogwrtResult.Flush)
 			break;
 
@@ -2618,7 +2618,7 @@ XLogFlush(XLogRecPtr record)
 		}
 
 		/* Got the lock; recheck whether request is satisfied */
-		LogwrtResult = XLogCtl->LogwrtResult;
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 		if (record <= LogwrtResult.Flush)
 		{
 			LWLockRelease(WALWriteLock);
@@ -2742,7 +2742,6 @@ XLogBackgroundFlush(void)
 
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	WriteRqst = XLogCtl->LogwrtRqst;
 	SpinLockRelease(&XLogCtl->info_lck);
 
@@ -2750,8 +2749,10 @@ XLogBackgroundFlush(void)
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
 
 	/* if we have already flushed that far, consider async commit records */
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	if (WriteRqst.Write <= LogwrtResult.Flush)
 	{
+		pg_memory_barrier();
 		SpinLockAcquire(&XLogCtl->info_lck);
 		WriteRqst.Write = XLogCtl->asyncXactLSN;
 		SpinLockRelease(&XLogCtl->info_lck);
@@ -2767,6 +2768,7 @@ XLogBackgroundFlush(void)
 	{
 		if (openLogFile >= 0)
 		{
+			LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 			if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
 								 wal_segment_size))
 			{
@@ -2826,7 +2828,8 @@ XLogBackgroundFlush(void)
 	/* now wait for any in-progress insertions to finish and get write lock */
 	WaitXLogInsertionsToFinish(WriteRqst.Write);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	if (WriteRqst.Write > LogwrtResult.Write ||
 		WriteRqst.Flush > LogwrtResult.Flush)
 	{
@@ -2914,9 +2917,7 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -5489,7 +5490,9 @@ StartupXLOG(void)
 
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
-	XLogCtl->LogwrtResult = LogwrtResult;
+	/* XXX OK to write without WALWriteLock? */
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog);
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog);
 
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
@@ -5925,9 +5928,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 {
 	Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
 
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/*
 	 * If we're writing and flushing WAL, the time line can't be changing, so
@@ -9083,9 +9084,7 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
 	return LogwrtResult.Write;
 }
diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index 9550e04aaa..7abed1785a 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -519,6 +519,35 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
+/*
+ * Monotonically advance the given variable using only atomic operations until
+ * it's at least the target value.
+ *
+ * Full barrier semantics (even when value is unchanged).
+ */
+static inline void
+pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
+{
+	uint64		currval;
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(ptr, 8);
+#endif
+
+	currval = pg_atomic_read_u64(ptr);
+	if (currval >= target_)
+	{
+		pg_memory_barrier();
+		return;
+	}
+
+	while (currval < target_)
+	{
+		if (pg_atomic_compare_exchange_u64(ptr, &currval, target_))
+			break;
+	}
+}
+
 #undef INSIDE_ATOMICS_H
 
 #endif							/* ATOMICS_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 93d5190508..5fed02537a 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2959,6 +2959,7 @@ XLogRecoveryCtlData
 XLogRedoAction
 XLogSegNo
 XLogSource
+XLogwrtAtomic
 XLogwrtResult
 XLogwrtRqst
 XPVIV
-- 
2.30.2

>From 344bd7e42608e0a01aaadcbc27b09ee0137b3210 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Tue, 22 Mar 2022 16:49:40 +0100
Subject: [PATCH v7 2/3] Split LogwrtResult into separate variables

Since the schedule of updating each portion is independent, it's not
very useful to have them both as a single struct.  Before we used
atomics for the corresponding XLogCtl struct this made sense because we
could use struct assignment, but that's no longer the case.

Suggested by Andres Freund.
---
 src/backend/access/transam/xlog.c | 146 +++++++++++++++---------------
 1 file changed, 71 insertions(+), 75 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6f2eb494fe..de8c3e97a0 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -291,7 +291,8 @@ static bool doPageWrites;
  * using atomic operations.
  *
  * In addition to the shared variable, each backend has a private copy of
- * LogwrtResult, each member of which is separately updated when convenient.
+ * each member of LogwrtResult (LogWriteResult and LogFlushResult), each of
+ * which is separately updated when convenient.
  *
  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
  * (protected by info_lck), but we don't need to cache any copies of it.
@@ -326,12 +327,6 @@ typedef struct XLogwrtRqst
 	XLogRecPtr	Flush;			/* last byte + 1 to flush */
 } XLogwrtRqst;
 
-typedef struct XLogwrtResult
-{
-	XLogRecPtr	Write;			/* last byte + 1 written out */
-	XLogRecPtr	Flush;			/* last byte + 1 flushed */
-} XLogwrtResult;
-
 /*
  * Inserting to WAL is protected by a small fixed number of WAL insertion
  * locks. To insert to the WAL, you must hold one of the locks - it doesn't
@@ -623,7 +618,8 @@ static int	UsableBytesInSegment;
  * Private, possibly out-of-date copy of shared XLogCtl->LogwrtResult.
  * See discussion above.
  */
-static XLogwrtResult LogwrtResult = {0, 0};
+static XLogRecPtr LogWriteResult = 0;
+static XLogRecPtr LogFlushResult = 0;
 
 /*
  * openLogFile is -1 or a kernel FD for an open log file segment.
@@ -1807,7 +1803,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 	 * Now that we have the lock, check if someone initialized the page
 	 * already.
 	 */
-	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 	while (upto >= XLogCtl->InitializedUpTo || opportunistic)
 	{
 		nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
@@ -1818,7 +1814,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 		 * already written out.
 		 */
 		OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
-		if (LogwrtResult.Write < OldPageRqstPtr)
+		if (LogWriteResult < OldPageRqstPtr)
 		{
 			/*
 			 * Nope, got work to do. If we just want to pre-initialize as much
@@ -1834,11 +1830,11 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 			SpinLockRelease(&XLogCtl->info_lck);
 
 			/*
-			 * Before waiting, update LogwrtResult.Write and see if we still need
+			 * Before waiting, update LogWriteResult and see if we still need
 			 * to write it or if someone else already did.
 			 */
-			LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
-			if (LogwrtResult.Write < OldPageRqstPtr)
+			LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+			if (LogWriteResult < OldPageRqstPtr)
 			{
 				/*
 				 * Must acquire write lock. Release WALBufMappingLock first,
@@ -1852,8 +1848,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
-				LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
-				if (LogwrtResult.Write >= OldPageRqstPtr)
+				LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+				if (LogWriteResult >= OldPageRqstPtr)
 				{
 					/* OK, someone wrote it already */
 					LWLockRelease(WALWriteLock);
@@ -2098,7 +2094,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	/*
 	 * Update local LogwrtResult (caller probably did this already, but...)
 	 */
-	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
 	/*
 	 * Since successive pages in the xlog cache are consecutively allocated,
@@ -2118,9 +2114,9 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	 * consider writing.  Begin at the buffer containing the next unwritten
 	 * page, or last partially written page.
 	 */
-	curridx = XLogRecPtrToBufIdx(LogwrtResult.Write);
+	curridx = XLogRecPtrToBufIdx(LogWriteResult);
 
-	while (LogwrtResult.Write < WriteRqst.Write)
+	while (LogWriteResult < WriteRqst.Write)
 	{
 		/*
 		 * Make sure we're not ahead of the insert process.  This could happen
@@ -2129,16 +2125,16 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		 */
 		XLogRecPtr	EndPtr = XLogCtl->xlblocks[curridx];
 
-		if (LogwrtResult.Write >= EndPtr)
+		if (LogWriteResult >= EndPtr)
 			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
-				 LSN_FORMAT_ARGS(LogwrtResult.Write),
+				 LSN_FORMAT_ARGS(LogWriteResult),
 				 LSN_FORMAT_ARGS(EndPtr));
 
-		/* Advance LogwrtResult.Write to end of current buffer page */
-		LogwrtResult.Write = EndPtr;
-		ispartialpage = WriteRqst.Write < LogwrtResult.Write;
+		/* Advance LogWriteResult to end of current buffer page */
+		LogWriteResult = EndPtr;
+		ispartialpage = WriteRqst.Write < LogWriteResult;
 
-		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
+		if (!XLByteInPrevSeg(LogWriteResult, openLogSegNo,
 							 wal_segment_size))
 		{
 			/*
@@ -2148,7 +2144,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 			Assert(npages == 0);
 			if (openLogFile >= 0)
 				XLogFileClose();
-			XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
+			XLByteToPrevSeg(LogWriteResult, openLogSegNo,
 							wal_segment_size);
 			openLogTLI = tli;
 
@@ -2160,7 +2156,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		/* Make sure we have the current logfile open */
 		if (openLogFile < 0)
 		{
-			XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
+			XLByteToPrevSeg(LogWriteResult, openLogSegNo,
 							wal_segment_size);
 			openLogTLI = tli;
 			openLogFile = XLogFileOpen(openLogSegNo, tli);
@@ -2172,7 +2168,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		{
 			/* first of group */
 			startidx = curridx;
-			startoffset = XLogSegmentOffset(LogwrtResult.Write - XLOG_BLCKSZ,
+			startoffset = XLogSegmentOffset(LogWriteResult - XLOG_BLCKSZ,
 											wal_segment_size);
 		}
 		npages++;
@@ -2183,7 +2179,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		 * contiguous in memory), or if we are at the end of the logfile
 		 * segment.
 		 */
-		last_iteration = WriteRqst.Write <= LogwrtResult.Write;
+		last_iteration = WriteRqst.Write <= LogWriteResult;
 
 		finishing_seg = !ispartialpage &&
 			(startoffset + npages * XLOG_BLCKSZ) >= wal_segment_size;
@@ -2274,13 +2270,13 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 				/* signal that we need to wakeup walsenders later */
 				WalSndWakeupRequest();
 
-				LogwrtResult.Flush = LogwrtResult.Write;	/* end of page */
+				LogFlushResult = LogWriteResult;	/* end of page */
 
 				if (XLogArchivingActive())
 					XLogArchiveNotifySeg(openLogSegNo, tli);
 
 				XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
-				XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
+				XLogCtl->lastSegSwitchLSN = LogFlushResult;
 
 				/*
 				 * Request a checkpoint if we've consumed too much xlog since
@@ -2301,7 +2297,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		if (ispartialpage)
 		{
 			/* Only asked to write a partial page */
-			LogwrtResult.Write = WriteRqst.Write;
+			LogWriteResult = WriteRqst.Write;
 			break;
 		}
 		curridx = NextBufIdx(curridx);
@@ -2314,13 +2310,13 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	Assert(npages == 0);
 
 	/* Publish current write result position */
-	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write, LogwrtResult.Write);
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write, LogWriteResult);
 
 	/*
 	 * If asked to flush, do so
 	 */
-	if (LogwrtResult.Flush < WriteRqst.Flush &&
-		LogwrtResult.Flush < LogwrtResult.Write)
+	if (LogFlushResult < WriteRqst.Flush &&
+		LogFlushResult < LogWriteResult)
 	{
 		/*
 		 * Could get here without iterating above loop, in which case we might
@@ -2331,12 +2327,12 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 			sync_method != SYNC_METHOD_OPEN_DSYNC)
 		{
 			if (openLogFile >= 0 &&
-				!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
+				!XLByteInPrevSeg(LogWriteResult, openLogSegNo,
 								 wal_segment_size))
 				XLogFileClose();
 			if (openLogFile < 0)
 			{
-				XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
+				XLByteToPrevSeg(LogWriteResult, openLogSegNo,
 								wal_segment_size);
 				openLogTLI = tli;
 				openLogFile = XLogFileOpen(openLogSegNo, tli);
@@ -2349,11 +2345,11 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		/* signal that we need to wakeup walsenders later */
 		WalSndWakeupRequest();
 
-		LogwrtResult.Flush = LogwrtResult.Write;
+		LogFlushResult = LogWriteResult;
 	}
 
 	/* Publish current flush result position */
-	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush, LogwrtResult.Flush);
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush, LogFlushResult);
 
 	/*
 	 * Make sure that the shared 'request' values do not fall behind the
@@ -2362,10 +2358,10 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	 */
 	{
 		SpinLockAcquire(&XLogCtl->info_lck);
-		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
-			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
-		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
-			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
+		if (XLogCtl->LogwrtRqst.Write < LogWriteResult)
+			XLogCtl->LogwrtRqst.Write = LogWriteResult;
+		if (XLogCtl->LogwrtRqst.Flush < LogFlushResult)
+			XLogCtl->LogwrtRqst.Flush = LogFlushResult;
 		SpinLockRelease(&XLogCtl->info_lck);
 	}
 }
@@ -2381,7 +2377,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	WriteRqstPtr = asyncXactLSN;
 	bool		sleeping;
 
-	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	SpinLockAcquire(&XLogCtl->info_lck);
 	sleeping = XLogCtl->WalWriterSleeping;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
@@ -2399,7 +2395,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 		WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
 
 		/* if we have already flushed that far, we're done */
-		if (WriteRqstPtr <= LogwrtResult.Flush)
+		if (WriteRqstPtr <= LogFlushResult)
 			return;
 	}
 
@@ -2551,15 +2547,15 @@ XLogFlush(XLogRecPtr record)
 	}
 
 	/* Quick exit if already known flushed */
-	if (record <= LogwrtResult.Flush)
+	if (record <= LogFlushResult)
 		return;
 
 #ifdef WAL_DEBUG
 	if (XLOG_DEBUG)
 		elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
 			 LSN_FORMAT_ARGS(record),
-			 LSN_FORMAT_ARGS(LogwrtResult.Write),
-			 LSN_FORMAT_ARGS(LogwrtResult.Flush));
+			 LSN_FORMAT_ARGS(LogWriteResult),
+			 LSN_FORMAT_ARGS(LogFlushResult));
 #endif
 
 	START_CRIT_SECTION();
@@ -2568,8 +2564,8 @@ XLogFlush(XLogRecPtr record)
 	 * Since fsync is usually a horribly expensive operation, we try to
 	 * piggyback as much data as we can on each fsync: if we see any more data
 	 * entered into the xlog buffer, we'll write and fsync that too, so that
-	 * the final value of LogwrtResult.Flush is as large as possible. This
-	 * gives us some chance of avoiding another fsync immediately after.
+	 * the final value of LogFlushResult is as large as possible. This gives
+	 * us some chance of avoiding another fsync immediately after.
 	 */
 
 	/* initialize to given target; may increase below */
@@ -2590,8 +2586,8 @@ XLogFlush(XLogRecPtr record)
 		SpinLockRelease(&XLogCtl->info_lck);
 
 		/* done already? */
-		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
-		if (record <= LogwrtResult.Flush)
+		LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+		if (record <= LogFlushResult)
 			break;
 
 		/*
@@ -2618,8 +2614,8 @@ XLogFlush(XLogRecPtr record)
 		}
 
 		/* Got the lock; recheck whether request is satisfied */
-		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
-		if (record <= LogwrtResult.Flush)
+		LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+		if (record <= LogFlushResult)
 		{
 			LWLockRelease(WALWriteLock);
 			break;
@@ -2689,11 +2685,11 @@ XLogFlush(XLogRecPtr record)
 	 * calls from bufmgr.c are not within critical sections and so we will not
 	 * force a restart for a bad LSN on a data page.
 	 */
-	if (LogwrtResult.Flush < record)
+	if (LogFlushResult < record)
 		elog(ERROR,
 			 "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
 			 LSN_FORMAT_ARGS(record),
-			 LSN_FORMAT_ARGS(LogwrtResult.Flush));
+			 LSN_FORMAT_ARGS(LogFlushResult));
 }
 
 /*
@@ -2749,8 +2745,8 @@ XLogBackgroundFlush(void)
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
 
 	/* if we have already flushed that far, consider async commit records */
-	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
-	if (WriteRqst.Write <= LogwrtResult.Flush)
+	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+	if (WriteRqst.Write <= LogFlushResult)
 	{
 		pg_memory_barrier();
 		SpinLockAcquire(&XLogCtl->info_lck);
@@ -2764,12 +2760,12 @@ XLogBackgroundFlush(void)
 	 * holding an open file handle to a logfile that's no longer in use,
 	 * preventing the file from being deleted.
 	 */
-	if (WriteRqst.Write <= LogwrtResult.Flush)
+	if (WriteRqst.Write <= LogFlushResult)
 	{
 		if (openLogFile >= 0)
 		{
-			LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
-			if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
+			LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+			if (!XLByteInPrevSeg(LogWriteResult, openLogSegNo,
 								 wal_segment_size))
 			{
 				XLogFileClose();
@@ -2784,7 +2780,7 @@ XLogBackgroundFlush(void)
 	 */
 	now = GetCurrentTimestamp();
 	flushbytes =
-		WriteRqst.Write / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ;
+		WriteRqst.Write / XLOG_BLCKSZ - LogFlushResult / XLOG_BLCKSZ;
 
 	if (WalWriterFlushAfter == 0 || lastflush == 0)
 	{
@@ -2819,8 +2815,8 @@ XLogBackgroundFlush(void)
 		elog(LOG, "xlog bg flush request write %X/%X; flush: %X/%X, current is write %X/%X; flush %X/%X",
 			 LSN_FORMAT_ARGS(WriteRqst.Write),
 			 LSN_FORMAT_ARGS(WriteRqst.Flush),
-			 LSN_FORMAT_ARGS(LogwrtResult.Write),
-			 LSN_FORMAT_ARGS(LogwrtResult.Flush));
+			 LSN_FORMAT_ARGS(LogWriteResult),
+			 LSN_FORMAT_ARGS(LogFlushResult));
 #endif
 
 	START_CRIT_SECTION();
@@ -2828,10 +2824,10 @@ XLogBackgroundFlush(void)
 	/* now wait for any in-progress insertions to finish and get write lock */
 	WaitXLogInsertionsToFinish(WriteRqst.Write);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
-	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
-	if (WriteRqst.Write > LogwrtResult.Write ||
-		WriteRqst.Flush > LogwrtResult.Flush)
+	LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+	if (WriteRqst.Write > LogWriteResult ||
+		WriteRqst.Flush > LogFlushResult)
 	{
 		XLogWrite(WriteRqst, insertTLI, flexible);
 	}
@@ -2913,14 +2909,14 @@ XLogNeedsFlush(XLogRecPtr record)
 	}
 
 	/* Quick exit if already known flushed */
-	if (record <= LogwrtResult.Flush)
+	if (record <= LogFlushResult)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/* check again */
-	if (record <= LogwrtResult.Flush)
+	if (record <= LogFlushResult)
 		return false;
 
 	return true;
@@ -5488,7 +5484,7 @@ StartupXLOG(void)
 		XLogCtl->InitializedUpTo = EndOfLog;
 	}
 
-	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+	LogWriteResult = LogFlushResult = EndOfLog;
 
 	/* XXX OK to write without WALWriteLock? */
 	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog);
@@ -5928,7 +5924,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 {
 	Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
 
-	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/*
 	 * If we're writing and flushing WAL, the time line can't be changing, so
@@ -5937,7 +5933,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 	if (insertTLI)
 		*insertTLI = XLogCtl->InsertTimeLineID;
 
-	return LogwrtResult.Flush;
+	return LogFlushResult;
 }
 
 /*
@@ -9084,9 +9080,9 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
-	return LogwrtResult.Write;
+	return LogWriteResult;
 }
 
 /*
-- 
2.30.2

>From 6e819dddb96d31835662a7f37cd3cd49907bc39a Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 22 Nov 2021 18:13:09 -0300
Subject: [PATCH v7 3/3] Change asyncXactLSN and WalWriterSleeping to atomics

Using atomics for these two members of XLogCtl further reduces use of
the info_lck spinlock.  There's no benchmark study to show that this
change is necessary, however.
---
 src/backend/access/transam/xlog.c | 31 ++++++++++++++-----------------
 1 file changed, 14 insertions(+), 17 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index de8c3e97a0..151eb145ad 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -479,11 +479,12 @@ typedef struct XLogCtlData
 	XLogCtlInsert Insert;
 
 	XLogwrtAtomic LogwrtResult; /* uses atomics */
+	pg_atomic_uint64 asyncXactLSN;	/* LSN of newest async commit/abort */
+
 	/* Protected by info_lck: */
 	XLogwrtRqst LogwrtRqst;
 	XLogRecPtr	RedoRecPtr;		/* a recent copy of Insert->RedoRecPtr */
 	FullTransactionId ckptFullXid;	/* nextXid of latest checkpoint */
-	XLogRecPtr	asyncXactLSN;	/* LSN of newest async commit/abort */
 	XLogRecPtr	replicationSlotMinLSN;	/* oldest LSN needed by any slot */
 
 	XLogSegNo	lastRemovedSegNo;	/* latest removed/recycled XLOG segment */
@@ -547,9 +548,9 @@ typedef struct XLogCtlData
 	/*
 	 * WalWriterSleeping indicates whether the WAL writer is currently in
 	 * low-power mode (and hence should be nudged if an async commit occurs).
-	 * Protected by info_lck.
+	 * Uses atomics.
 	 */
-	bool		WalWriterSleeping;
+	pg_atomic_uint32 WalWriterSleeping;
 
 	/*
 	 * During recovery, we keep a copy of the latest checkpoint record here.
@@ -2377,12 +2378,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	WriteRqstPtr = asyncXactLSN;
 	bool		sleeping;
 
-	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
-	SpinLockAcquire(&XLogCtl->info_lck);
-	sleeping = XLogCtl->WalWriterSleeping;
-	if (XLogCtl->asyncXactLSN < asyncXactLSN)
-		XLogCtl->asyncXactLSN = asyncXactLSN;
-	SpinLockRelease(&XLogCtl->info_lck);
+	sleeping = (bool) pg_atomic_read_u32(&XLogCtl->WalWriterSleeping);
+	pg_atomic_monotonic_advance_u64(&XLogCtl->asyncXactLSN, asyncXactLSN);
 
 	/*
 	 * If the WALWriter is sleeping, we should kick it to make it come out of
@@ -2395,6 +2392,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 		WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
 
 		/* if we have already flushed that far, we're done */
+		LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 		if (WriteRqstPtr <= LogFlushResult)
 			return;
 	}
@@ -2748,10 +2746,8 @@ XLogBackgroundFlush(void)
 	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	if (WriteRqst.Write <= LogFlushResult)
 	{
-		pg_memory_barrier();
-		SpinLockAcquire(&XLogCtl->info_lck);
-		WriteRqst.Write = XLogCtl->asyncXactLSN;
-		SpinLockRelease(&XLogCtl->info_lck);
+		pg_read_barrier();
+		WriteRqst.Write = pg_atomic_read_u64(&XLogCtl->asyncXactLSN);
 		flexible = false;		/* ensure it all gets written */
 	}
 
@@ -2826,6 +2822,7 @@ XLogBackgroundFlush(void)
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 	LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+	/* no barrier needed here */
 	if (WriteRqst.Write > LogWriteResult ||
 		WriteRqst.Flush > LogFlushResult)
 	{
@@ -4494,7 +4491,7 @@ XLOGShmemInit(void)
 	XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
 	XLogCtl->SharedRecoveryState = RECOVERY_STATE_CRASH;
 	XLogCtl->InstallXLogFileSegmentActive = false;
-	XLogCtl->WalWriterSleeping = false;
+	pg_atomic_write_u32(&XLogCtl->WalWriterSleeping, (uint32) false);
 
 	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
 	SpinLockInit(&XLogCtl->info_lck);
@@ -9213,11 +9210,11 @@ IsInstallXLogFileSegmentActive(void)
 
 /*
  * Update the WalWriterSleeping flag.
+ *
+ * Note there is no memory barrier here.  Caller must insert one if needed.
  */
 void
 SetWalWriterSleeping(bool sleeping)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	XLogCtl->WalWriterSleeping = sleeping;
-	SpinLockRelease(&XLogCtl->info_lck);
+	pg_atomic_write_u32(&XLogCtl->WalWriterSleeping, (uint32) sleeping);
 }
-- 
2.30.2

Reply via email to