There was an earlier comment by Andres that asyncXactLSN should also be
atomic, to avoid an ugly spinlock interaction with the new atomic-based
logwrtResult.  The 0002 here is an attempt at doing that; I found that
it also needed to change WalWriterSleeping to use atomics, to avoid
XLogSetAsyncXactLSN having to grab the spinlock for that.


-- 
Álvaro Herrera              Valdivia, Chile  —  https://www.EnterpriseDB.com/
"Learn about compilers. Then everything looks like either a compiler or
a database, and now you have two problems but one of them is fun."
            https://twitter.com/thingskatedid/status/1456027786158776329
>From 0d48422df1a0e738721bfba5cd2d3f1ede5db423 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Tue, 2 Feb 2021 14:03:43 -0300
Subject: [PATCH v5 1/2] make LogwrtResult atomic

---
 src/backend/access/transam/xlog.c | 192 +++++++++++++++---------------
 src/include/port/atomics.h        |  29 +++++
 src/tools/pgindent/typedefs.list  |   1 +
 3 files changed, 123 insertions(+), 99 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 221e4cb34f..225ac8bc2d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -382,16 +382,14 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
  *
  * LogwrtRqst indicates a byte position that we need to write and/or fsync
  * the log up to (all records before that point must be written or fsynced).
- * LogwrtResult indicates the byte positions we have already written/fsynced.
+ * LogWriteResult, LogFlushResult indicate the byte positions we have already
+ * written/fsynced.
  * These structs are identical but are declared separately to indicate their
  * slightly different functions.
  *
- * To read XLogCtl->LogwrtResult, you must hold either info_lck or
- * WALWriteLock.  To update it, you need to hold both locks.  The point of
- * this arrangement is that the value can be examined by code that already
- * holds WALWriteLock without needing to grab info_lck as well.  In addition
- * to the shared variable, each backend has a private copy of LogwrtResult,
- * which is updated when convenient.
+ * XLogCtl->LogwrtResult is read and written using atomic operations.
+ * In addition to the shared variable, each backend has a private copy of
+ * LogwrtResult, each member of which is separately updated when convenient.
  *
  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
  * (protected by info_lck), but we don't need to cache any copies of it.
@@ -414,18 +412,18 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
  *----------
  */
 
+typedef struct XLogwrtAtomic
+{
+	pg_atomic_uint64 Write;		/* last byte + 1 of write position */
+	pg_atomic_uint64 Flush;		/* last byte + 1 of flush position */
+} XLogwrtAtomic;
+
 typedef struct XLogwrtRqst
 {
 	XLogRecPtr	Write;			/* last byte + 1 to write out */
 	XLogRecPtr	Flush;			/* last byte + 1 to flush */
 } XLogwrtRqst;
 
-typedef struct XLogwrtResult
-{
-	XLogRecPtr	Write;			/* last byte + 1 written out */
-	XLogRecPtr	Flush;			/* last byte + 1 flushed */
-} XLogwrtResult;
-
 /*
  * Inserting to WAL is protected by a small fixed number of WAL insertion
  * locks. To insert to the WAL, you must hold one of the locks - it doesn't
@@ -577,6 +575,7 @@ typedef struct XLogCtlData
 {
 	XLogCtlInsert Insert;
 
+	XLogwrtAtomic LogwrtResult; /* uses atomics */
 	/* Protected by info_lck: */
 	XLogwrtRqst LogwrtRqst;
 	XLogRecPtr	RedoRecPtr;		/* a recent copy of Insert->RedoRecPtr */
@@ -594,12 +593,6 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
-	XLogwrtResult LogwrtResult;
-
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
 	 *
@@ -770,10 +763,11 @@ static ControlFileData *ControlFile = NULL;
 static int	UsableBytesInSegment;
 
 /*
- * Private, possibly out-of-date copy of shared LogwrtResult.
+ * Private, possibly out-of-date copy of shared XLogCtl->LogwrtResult.
  * See discussion above.
  */
-static XLogwrtResult LogwrtResult = {0, 0};
+static XLogRecPtr LogWriteResult = 0;
+static XLogRecPtr LogFlushResult = 0;
 
 /*
  * Codes indicating where we got a WAL file from during recovery, or where
@@ -1201,8 +1195,6 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* advance global request to include new block(s) */
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
-		/* update local result copy while I have the chance */
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
 	}
 
@@ -2174,6 +2166,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 	 * Now that we have the lock, check if someone initialized the page
 	 * already.
 	 */
+	LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 	while (upto >= XLogCtl->InitializedUpTo || opportunistic)
 	{
 		nextidx = XLogRecPtrToBufIdx(XLogCtl->InitializedUpTo);
@@ -2184,7 +2177,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 		 * already written out.
 		 */
 		OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
-		if (LogwrtResult.Write < OldPageRqstPtr)
+		if (LogWriteResult < OldPageRqstPtr)
 		{
 			/*
 			 * Nope, got work to do. If we just want to pre-initialize as much
@@ -2193,18 +2186,18 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 			if (opportunistic)
 				break;
 
-			/* Before waiting, get info_lck and update LogwrtResult */
+			/* Advance shared memory write request position */
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			LogwrtResult = XLogCtl->LogwrtResult;
 			SpinLockRelease(&XLogCtl->info_lck);
 
 			/*
-			 * Now that we have an up-to-date LogwrtResult value, see if we
-			 * still need to write it or if someone else already did.
+			 * Before waiting, update LogWriteResult and see if we still need
+			 * to write it or if someone else already did.
 			 */
-			if (LogwrtResult.Write < OldPageRqstPtr)
+			LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+			if (LogWriteResult < OldPageRqstPtr)
 			{
 				/*
 				 * Must acquire write lock. Release WALBufMappingLock first,
@@ -2218,8 +2211,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
-				LogwrtResult = XLogCtl->LogwrtResult;
-				if (LogwrtResult.Write >= OldPageRqstPtr)
+				LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+				if (LogWriteResult >= OldPageRqstPtr)
 				{
 					/* OK, someone wrote it already */
 					LWLockRelease(WALWriteLock);
@@ -2476,7 +2469,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	/*
 	 * Update local LogwrtResult (caller probably did this already, but...)
 	 */
-	LogwrtResult = XLogCtl->LogwrtResult;
+	LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
 	/*
 	 * Since successive pages in the xlog cache are consecutively allocated,
@@ -2496,9 +2489,9 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 	 * consider writing.  Begin at the buffer containing the next unwritten
 	 * page, or last partially written page.
 	 */
-	curridx = XLogRecPtrToBufIdx(LogwrtResult.Write);
+	curridx = XLogRecPtrToBufIdx(LogWriteResult);
 
-	while (LogwrtResult.Write < WriteRqst.Write)
+	while (LogWriteResult < WriteRqst.Write)
 	{
 		/*
 		 * Make sure we're not ahead of the insert process.  This could happen
@@ -2507,16 +2500,16 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		 */
 		XLogRecPtr	EndPtr = XLogCtl->xlblocks[curridx];
 
-		if (LogwrtResult.Write >= EndPtr)
+		if (LogWriteResult >= EndPtr)
 			elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
-				 LSN_FORMAT_ARGS(LogwrtResult.Write),
+				 LSN_FORMAT_ARGS(LogWriteResult),
 				 LSN_FORMAT_ARGS(EndPtr));
 
-		/* Advance LogwrtResult.Write to end of current buffer page */
-		LogwrtResult.Write = EndPtr;
-		ispartialpage = WriteRqst.Write < LogwrtResult.Write;
+		/* Advance LogWriteResult to end of current buffer page */
+		LogWriteResult = EndPtr;
+		ispartialpage = WriteRqst.Write < LogWriteResult;
 
-		if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
+		if (!XLByteInPrevSeg(LogWriteResult, openLogSegNo,
 							 wal_segment_size))
 		{
 			/*
@@ -2526,7 +2519,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 			Assert(npages == 0);
 			if (openLogFile >= 0)
 				XLogFileClose();
-			XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
+			XLByteToPrevSeg(LogWriteResult, openLogSegNo,
 							wal_segment_size);
 			openLogTLI = tli;
 
@@ -2538,7 +2531,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		/* Make sure we have the current logfile open */
 		if (openLogFile < 0)
 		{
-			XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
+			XLByteToPrevSeg(LogWriteResult, openLogSegNo,
 							wal_segment_size);
 			openLogTLI = tli;
 			openLogFile = XLogFileOpen(openLogSegNo, tli);
@@ -2550,7 +2543,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		{
 			/* first of group */
 			startidx = curridx;
-			startoffset = XLogSegmentOffset(LogwrtResult.Write - XLOG_BLCKSZ,
+			startoffset = XLogSegmentOffset(LogWriteResult - XLOG_BLCKSZ,
 											wal_segment_size);
 		}
 		npages++;
@@ -2561,7 +2554,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		 * contiguous in memory), or if we are at the end of the logfile
 		 * segment.
 		 */
-		last_iteration = WriteRqst.Write <= LogwrtResult.Write;
+		last_iteration = WriteRqst.Write <= LogWriteResult;
 
 		finishing_seg = !ispartialpage &&
 			(startoffset + npages * XLOG_BLCKSZ) >= wal_segment_size;
@@ -2652,13 +2645,13 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 				/* signal that we need to wakeup walsenders later */
 				WalSndWakeupRequest();
 
-				LogwrtResult.Flush = LogwrtResult.Write;	/* end of page */
+				LogFlushResult = LogWriteResult;	/* end of page */
 
 				if (XLogArchivingActive())
 					XLogArchiveNotifySeg(openLogSegNo, tli);
 
 				XLogCtl->lastSegSwitchTime = (pg_time_t) time(NULL);
-				XLogCtl->lastSegSwitchLSN = LogwrtResult.Flush;
+				XLogCtl->lastSegSwitchLSN = LogFlushResult;
 
 				/*
 				 * Request a checkpoint if we've consumed too much xlog since
@@ -2679,7 +2672,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		if (ispartialpage)
 		{
 			/* Only asked to write a partial page */
-			LogwrtResult.Write = WriteRqst.Write;
+			LogWriteResult = WriteRqst.Write;
 			break;
 		}
 		curridx = NextBufIdx(curridx);
@@ -2691,12 +2684,14 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 
 	Assert(npages == 0);
 
+	/* Publish current write result position */
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write, LogWriteResult);
+
 	/*
 	 * If asked to flush, do so
 	 */
-	if (LogwrtResult.Flush < WriteRqst.Flush &&
-		LogwrtResult.Flush < LogwrtResult.Write)
-
+	if (LogFlushResult < WriteRqst.Flush &&
+		LogFlushResult < LogWriteResult)
 	{
 		/*
 		 * Could get here without iterating above loop, in which case we might
@@ -2707,12 +2702,12 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 			sync_method != SYNC_METHOD_OPEN_DSYNC)
 		{
 			if (openLogFile >= 0 &&
-				!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
+				!XLByteInPrevSeg(LogWriteResult, openLogSegNo,
 								 wal_segment_size))
 				XLogFileClose();
 			if (openLogFile < 0)
 			{
-				XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo,
+				XLByteToPrevSeg(LogWriteResult, openLogSegNo,
 								wal_segment_size);
 				openLogTLI = tli;
 				openLogFile = XLogFileOpen(openLogSegNo, tli);
@@ -2725,23 +2720,23 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
 		/* signal that we need to wakeup walsenders later */
 		WalSndWakeupRequest();
 
-		LogwrtResult.Flush = LogwrtResult.Write;
+		LogFlushResult = LogWriteResult;
 	}
 
+	/* Publish current flush result position */
+	pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush, LogFlushResult);
+
 	/*
-	 * Update shared-memory status
-	 *
-	 * We make sure that the shared 'request' values do not fall behind the
+	 * Make sure that the shared 'request' values do not fall behind the
 	 * 'result' values.  This is not absolutely essential, but it saves some
 	 * code in a couple of places.
 	 */
 	{
 		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->LogwrtResult = LogwrtResult;
-		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
-			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
-		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
-			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
+		if (XLogCtl->LogwrtRqst.Write < LogWriteResult)
+			XLogCtl->LogwrtRqst.Write = LogWriteResult;
+		if (XLogCtl->LogwrtRqst.Flush < LogFlushResult)
+			XLogCtl->LogwrtRqst.Flush = LogFlushResult;
 		SpinLockRelease(&XLogCtl->info_lck);
 	}
 }
@@ -2757,8 +2752,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	WriteRqstPtr = asyncXactLSN;
 	bool		sleeping;
 
+	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	sleeping = XLogCtl->WalWriterSleeping;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
 		XLogCtl->asyncXactLSN = asyncXactLSN;
@@ -2775,7 +2770,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 		WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
 
 		/* if we have already flushed that far, we're done */
-		if (WriteRqstPtr <= LogwrtResult.Flush)
+		if (WriteRqstPtr <= LogFlushResult)
 			return;
 	}
 
@@ -2931,15 +2926,15 @@ XLogFlush(XLogRecPtr record)
 	}
 
 	/* Quick exit if already known flushed */
-	if (record <= LogwrtResult.Flush)
+	if (record <= LogFlushResult)
 		return;
 
 #ifdef WAL_DEBUG
 	if (XLOG_DEBUG)
 		elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
 			 LSN_FORMAT_ARGS(record),
-			 LSN_FORMAT_ARGS(LogwrtResult.Write),
-			 LSN_FORMAT_ARGS(LogwrtResult.Flush));
+			 LSN_FORMAT_ARGS(LogWriteResult),
+			 LSN_FORMAT_ARGS(LogFlushResult));
 #endif
 
 	START_CRIT_SECTION();
@@ -2948,8 +2943,8 @@ XLogFlush(XLogRecPtr record)
 	 * Since fsync is usually a horribly expensive operation, we try to
 	 * piggyback as much data as we can on each fsync: if we see any more data
 	 * entered into the xlog buffer, we'll write and fsync that too, so that
-	 * the final value of LogwrtResult.Flush is as large as possible. This
-	 * gives us some chance of avoiding another fsync immediately after.
+	 * the final value of LogFlushResult is as large as possible. This gives
+	 * us some chance of avoiding another fsync immediately after.
 	 */
 
 	/* initialize to given target; may increase below */
@@ -2967,11 +2962,11 @@ XLogFlush(XLogRecPtr record)
 		SpinLockAcquire(&XLogCtl->info_lck);
 		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
 			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
 
 		/* done already? */
-		if (record <= LogwrtResult.Flush)
+		LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+		if (record <= LogFlushResult)
 			break;
 
 		/*
@@ -2998,8 +2993,8 @@ XLogFlush(XLogRecPtr record)
 		}
 
 		/* Got the lock; recheck whether request is satisfied */
-		LogwrtResult = XLogCtl->LogwrtResult;
-		if (record <= LogwrtResult.Flush)
+		LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+		if (record <= LogFlushResult)
 		{
 			LWLockRelease(WALWriteLock);
 			break;
@@ -3069,11 +3064,11 @@ XLogFlush(XLogRecPtr record)
 	 * calls from bufmgr.c are not within critical sections and so we will not
 	 * force a restart for a bad LSN on a data page.
 	 */
-	if (LogwrtResult.Flush < record)
+	if (LogFlushResult < record)
 		elog(ERROR,
 			 "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
 			 LSN_FORMAT_ARGS(record),
-			 LSN_FORMAT_ARGS(LogwrtResult.Flush));
+			 LSN_FORMAT_ARGS(LogFlushResult));
 }
 
 /*
@@ -3122,7 +3117,6 @@ XLogBackgroundFlush(void)
 
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	WriteRqst = XLogCtl->LogwrtRqst;
 	SpinLockRelease(&XLogCtl->info_lck);
 
@@ -3130,8 +3124,10 @@ XLogBackgroundFlush(void)
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
 
 	/* if we have already flushed that far, consider async commit records */
-	if (WriteRqst.Write <= LogwrtResult.Flush)
+	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+	if (WriteRqst.Write <= LogFlushResult)
 	{
+		pg_memory_barrier();
 		SpinLockAcquire(&XLogCtl->info_lck);
 		WriteRqst.Write = XLogCtl->asyncXactLSN;
 		SpinLockRelease(&XLogCtl->info_lck);
@@ -3143,11 +3139,12 @@ XLogBackgroundFlush(void)
 	 * holding an open file handle to a logfile that's no longer in use,
 	 * preventing the file from being deleted.
 	 */
-	if (WriteRqst.Write <= LogwrtResult.Flush)
+	if (WriteRqst.Write <= LogFlushResult)
 	{
 		if (openLogFile >= 0)
 		{
-			if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
+			LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+			if (!XLByteInPrevSeg(LogWriteResult, openLogSegNo,
 								 wal_segment_size))
 			{
 				XLogFileClose();
@@ -3162,7 +3159,7 @@ XLogBackgroundFlush(void)
 	 */
 	now = GetCurrentTimestamp();
 	flushbytes =
-		WriteRqst.Write / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ;
+		WriteRqst.Write / XLOG_BLCKSZ - LogFlushResult / XLOG_BLCKSZ;
 
 	if (WalWriterFlushAfter == 0 || lastflush == 0)
 	{
@@ -3197,8 +3194,8 @@ XLogBackgroundFlush(void)
 		elog(LOG, "xlog bg flush request write %X/%X; flush: %X/%X, current is write %X/%X; flush %X/%X",
 			 LSN_FORMAT_ARGS(WriteRqst.Write),
 			 LSN_FORMAT_ARGS(WriteRqst.Flush),
-			 LSN_FORMAT_ARGS(LogwrtResult.Write),
-			 LSN_FORMAT_ARGS(LogwrtResult.Flush));
+			 LSN_FORMAT_ARGS(LogWriteResult),
+			 LSN_FORMAT_ARGS(LogFlushResult));
 #endif
 
 	START_CRIT_SECTION();
@@ -3206,9 +3203,10 @@ XLogBackgroundFlush(void)
 	/* now wait for any in-progress insertions to finish and get write lock */
 	WaitXLogInsertionsToFinish(WriteRqst.Write);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	if (WriteRqst.Write > LogwrtResult.Write ||
-		WriteRqst.Flush > LogwrtResult.Flush)
+	LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+	if (WriteRqst.Write > LogWriteResult ||
+		WriteRqst.Flush > LogFlushResult)
 	{
 		XLogWrite(WriteRqst, insertTLI, flexible);
 	}
@@ -3290,16 +3288,14 @@ XLogNeedsFlush(XLogRecPtr record)
 	}
 
 	/* Quick exit if already known flushed */
-	if (record <= LogwrtResult.Flush)
+	if (record <= LogFlushResult)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/* check again */
-	if (record <= LogwrtResult.Flush)
+	if (record <= LogFlushResult)
 		return false;
 
 	return true;
@@ -8086,9 +8082,11 @@ StartupXLOG(void)
 		XLogCtl->InitializedUpTo = EndOfLog;
 	}
 
-	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
+	LogWriteResult = LogFlushResult = EndOfLog;
 
-	XLogCtl->LogwrtResult = LogwrtResult;
+	/* XXX OK to write without WALWriteLock? */
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog);
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog);
 
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
@@ -8750,9 +8748,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 {
 	Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
 
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/*
 	 * If we're writing and flushing WAL, the time line can't be changing,
@@ -8761,7 +8757,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
 	if (insertTLI)
 		*insertTLI = XLogCtl->InsertTimeLineID;
 
-	return LogwrtResult.Flush;
+	return LogFlushResult;
 }
 
 /*
@@ -12041,11 +12037,9 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
-	return LogwrtResult.Write;
+	return LogWriteResult;
 }
 
 /*
diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index 856338f161..4ff9ce9864 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -519,6 +519,35 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
+/*
+ * Monotonically advance the given variable using only atomic operations until
+ * it's at least the target value.
+ *
+ * Full barrier semantics (even when value is unchanged).
+ */
+static inline void
+pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
+{
+	uint64		currval;
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(ptr, 8);
+#endif
+
+	currval = pg_atomic_read_u64(ptr);
+	if (currval >= target_)
+	{
+		pg_memory_barrier();
+		return;
+	}
+
+	while (currval < target_)
+	{
+		if (pg_atomic_compare_exchange_u64(ptr, &currval, target_))
+			break;
+	}
+}
+
 #undef INSIDE_ATOMICS_H
 
 #endif							/* ATOMICS_H */
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index da6ac8ed83..edc9bfa5ce 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2943,6 +2943,7 @@ XLogRecordBuffer
 XLogRedoAction
 XLogSegNo
 XLogSource
+XLogwrtAtomic
 XLogwrtResult
 XLogwrtRqst
 XPVIV
-- 
2.30.2

>From 7da32bd3b130cab4b1358f8da0014bde154c9ffc Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 22 Nov 2021 18:13:09 -0300
Subject: [PATCH v5 2/2] more atomics

---
 src/backend/access/transam/xlog.c | 31 ++++++++++++++-----------------
 1 file changed, 14 insertions(+), 17 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 225ac8bc2d..57885488da 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -576,11 +576,12 @@ typedef struct XLogCtlData
 	XLogCtlInsert Insert;
 
 	XLogwrtAtomic LogwrtResult; /* uses atomics */
+	pg_atomic_uint64 asyncXactLSN;	/* LSN of newest async commit/abort */
+
 	/* Protected by info_lck: */
 	XLogwrtRqst LogwrtRqst;
 	XLogRecPtr	RedoRecPtr;		/* a recent copy of Insert->RedoRecPtr */
 	FullTransactionId ckptFullXid;	/* nextXid of latest checkpoint */
-	XLogRecPtr	asyncXactLSN;	/* LSN of newest async commit/abort */
 	XLogRecPtr	replicationSlotMinLSN;	/* oldest LSN needed by any slot */
 
 	XLogSegNo	lastRemovedSegNo;	/* latest removed/recycled XLOG segment */
@@ -656,9 +657,9 @@ typedef struct XLogCtlData
 	/*
 	 * WalWriterSleeping indicates whether the WAL writer is currently in
 	 * low-power mode (and hence should be nudged if an async commit occurs).
-	 * Protected by info_lck.
+	 * Uses atomics.
 	 */
-	bool		WalWriterSleeping;
+	pg_atomic_uint32 WalWriterSleeping;
 
 	/*
 	 * recoveryWakeupLatch is used to wake up the startup process to continue
@@ -2752,12 +2753,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	WriteRqstPtr = asyncXactLSN;
 	bool		sleeping;
 
-	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
-	SpinLockAcquire(&XLogCtl->info_lck);
-	sleeping = XLogCtl->WalWriterSleeping;
-	if (XLogCtl->asyncXactLSN < asyncXactLSN)
-		XLogCtl->asyncXactLSN = asyncXactLSN;
-	SpinLockRelease(&XLogCtl->info_lck);
+	sleeping = (bool) pg_atomic_read_u32(&XLogCtl->WalWriterSleeping);
+	pg_atomic_monotonic_advance_u64(&XLogCtl->asyncXactLSN, asyncXactLSN);
 
 	/*
 	 * If the WALWriter is sleeping, we should kick it to make it come out of
@@ -2770,6 +2767,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 		WriteRqstPtr -= WriteRqstPtr % XLOG_BLCKSZ;
 
 		/* if we have already flushed that far, we're done */
+		LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 		if (WriteRqstPtr <= LogFlushResult)
 			return;
 	}
@@ -3127,10 +3125,8 @@ XLogBackgroundFlush(void)
 	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	if (WriteRqst.Write <= LogFlushResult)
 	{
-		pg_memory_barrier();
-		SpinLockAcquire(&XLogCtl->info_lck);
-		WriteRqst.Write = XLogCtl->asyncXactLSN;
-		SpinLockRelease(&XLogCtl->info_lck);
+		pg_read_barrier();
+		WriteRqst.Write = pg_atomic_read_u64(&XLogCtl->asyncXactLSN);
 		flexible = false;		/* ensure it all gets written */
 	}
 
@@ -3205,6 +3201,7 @@ XLogBackgroundFlush(void)
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 	LogWriteResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 	LogFlushResult = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+	/* no barrier needed here */
 	if (WriteRqst.Write > LogWriteResult ||
 		WriteRqst.Flush > LogFlushResult)
 	{
@@ -5318,7 +5315,7 @@ XLOGShmemInit(void)
 	XLogCtl->SharedHotStandbyActive = false;
 	XLogCtl->InstallXLogFileSegmentActive = false;
 	XLogCtl->SharedPromoteIsTriggered = false;
-	XLogCtl->WalWriterSleeping = false;
+	pg_atomic_write_u32(&XLogCtl->WalWriterSleeping, (uint32) false);
 
 	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
 	SpinLockInit(&XLogCtl->info_lck);
@@ -13256,13 +13253,13 @@ WakeupRecovery(void)
 
 /*
  * Update the WalWriterSleeping flag.
+ *
+ * Note there is no memory barrier here.  Caller must insert one if needed.
  */
 void
 SetWalWriterSleeping(bool sleeping)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	XLogCtl->WalWriterSleeping = sleeping;
-	SpinLockRelease(&XLogCtl->info_lck);
+	pg_atomic_write_u32(&XLogCtl->WalWriterSleeping, (uint32) sleeping);
 }
 
 /*
-- 
2.30.2

Reply via email to