So I tried this, but -- perhaps not suprisingly -- I can't get it to
work properly; the synchronization fails.  I suspect I need some
barriers, but I tried adding a few (probably some that are not really
necessary) and that didn't have the expected effect.  Strangely, all
tests work for me, but the pg_upgrade one in particular fails.

(The attached is of course POC quality at best.)

I'll have another look next week.

-- 
Álvaro Herrera                            39°49'30"S 73°17'W
>From 90507185357391a661cd856fc231b140079c8d78 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Fri, 29 Jan 2021 22:34:23 -0300
Subject: [PATCH v2 1/3] add pg_atomic_monotonic_advance_u64

---
 src/include/port/atomics.h | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index 856338f161..752f39d66e 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -519,6 +519,25 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
+static inline void
+pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
+{
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(ptr, 8);
+#endif
+	/* FIXME is this algorithm correct if we have u64 simulation? */
+	while (true)
+	{
+		uint64		currval;
+
+		currval = pg_atomic_read_u64(ptr);
+		if (currval > target_)
+			break;	/* already done by somebody else */
+		if (pg_atomic_compare_exchange_u64(ptr, &currval, target_))
+			break;	/* successfully done */
+	}
+}
+
 #undef INSIDE_ATOMICS_H
 
 #endif							/* ATOMICS_H */
-- 
2.20.1

>From 84a62ac0594f8f0b13c06958cf6e94e2a2723082 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Fri, 29 Jan 2021 22:34:04 -0300
Subject: [PATCH v2 2/3] atomics in xlog.c

---
 src/backend/access/transam/xlog.c | 129 ++++++++++++++----------------
 1 file changed, 61 insertions(+), 68 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f03bd473e2..8073a92ceb 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -413,7 +413,8 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
  * which is updated when convenient.
  *
  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
- * (protected by info_lck), but we don't need to cache any copies of it.
+ * (which use atomic access, so no locks are needed), but we don't need to
+ * cache any copies of it.
  *
  * info_lck is only held long enough to read/update the protected variables,
  * so it's a plain spinlock.  The other locks are held longer (potentially
@@ -433,17 +434,19 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
  *----------
  */
 
-typedef struct XLogwrtRqst
+/* Write/Flush position to be used in shared memory */
+typedef struct XLogwrtAtomic
 {
-	XLogRecPtr	Write;			/* last byte + 1 to write out */
-	XLogRecPtr	Flush;			/* last byte + 1 to flush */
-} XLogwrtRqst;
+	pg_atomic_uint64 Write;			/* last byte + 1 of write position */
+	pg_atomic_uint64 Flush;			/* last byte + 1 of flush position */
+} XLogwrtAtomic;
 
-typedef struct XLogwrtResult
+/* Write/Flush position to be used in process-local memory */
+typedef struct XLogwrt
 {
 	XLogRecPtr	Write;			/* last byte + 1 written out */
 	XLogRecPtr	Flush;			/* last byte + 1 flushed */
-} XLogwrtResult;
+} XLogwrt;
 
 /*
  * Inserting to WAL is protected by a small fixed number of WAL insertion
@@ -596,8 +599,10 @@ typedef struct XLogCtlData
 {
 	XLogCtlInsert Insert;
 
+	XLogwrtAtomic LogwrtRqst;
+	XLogwrtAtomic LogwrtResult;
+
 	/* Protected by info_lck: */
-	XLogwrtRqst LogwrtRqst;
 	XLogRecPtr	RedoRecPtr;		/* a recent copy of Insert->RedoRecPtr */
 	FullTransactionId ckptFullXid;	/* nextXid of latest checkpoint */
 	XLogRecPtr	asyncXactLSN;	/* LSN of newest async commit/abort */
@@ -613,12 +618,6 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
-	XLogwrtResult LogwrtResult;
-
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
 	 *
@@ -779,7 +778,7 @@ static int	UsableBytesInSegment;
  * Private, possibly out-of-date copy of shared LogwrtResult.
  * See discussion above.
  */
-static XLogwrtResult LogwrtResult = {0, 0};
+static XLogwrt LogwrtResult = {0, 0};
 
 /*
  * Codes indicating where we got a WAL file from during recovery, or where
@@ -910,7 +909,7 @@ static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
 static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic);
 static bool XLogCheckpointNeeded(XLogSegNo new_segno);
-static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
+static void XLogWrite(XLogwrt WriteRqst, bool flexible);
 static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath,
 								   bool find_free, XLogSegNo max_segno,
 								   bool use_lock);
@@ -1168,13 +1167,11 @@ XLogInsertRecord(XLogRecData *rdata,
 	 */
 	if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
 	{
-		SpinLockAcquire(&XLogCtl->info_lck);
 		/* advance global request to include new block(s) */
-		if (XLogCtl->LogwrtRqst.Write < EndPos)
-			XLogCtl->LogwrtRqst.Write = EndPos;
-		/* update local result copy while I have the chance */
-		LogwrtResult = XLogCtl->LogwrtResult;
-		SpinLockRelease(&XLogCtl->info_lck);
+		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Write, EndPos);
+		/* update local result copy */
+		LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	}
 
 	/*
@@ -2135,7 +2132,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 	XLogCtlInsert *Insert = &XLogCtl->Insert;
 	int			nextidx;
 	XLogRecPtr	OldPageRqstPtr;
-	XLogwrtRqst WriteRqst;
+	XLogwrt		WriteRqst;
 	XLogRecPtr	NewPageEndPtr = InvalidXLogRecPtr;
 	XLogRecPtr	NewPageBeginPtr;
 	XLogPageHeader NewPage;
@@ -2166,12 +2163,11 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 			if (opportunistic)
 				break;
 
-			/* Before waiting, get info_lck and update LogwrtResult */
-			SpinLockAcquire(&XLogCtl->info_lck);
-			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
-				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			LogwrtResult = XLogCtl->LogwrtResult;
-			SpinLockRelease(&XLogCtl->info_lck);
+			/* Before waiting, update LogwrtResult */
+			pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Write, OldPageRqstPtr);
+
+			LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+			LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 			/*
 			 * Now that we have an up-to-date LogwrtResult value, see if we
@@ -2191,7 +2187,8 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
-				LogwrtResult = XLogCtl->LogwrtResult;
+				LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+				LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 				if (LogwrtResult.Write >= OldPageRqstPtr)
 				{
 					/* OK, someone wrote it already */
@@ -2421,7 +2418,7 @@ XLogCheckpointNeeded(XLogSegNo new_segno)
  * write.
  */
 static void
-XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
+XLogWrite(XLogwrt WriteRqst, bool flexible)
 {
 	bool		ispartialpage;
 	bool		last_iteration;
@@ -2438,7 +2435,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	/*
 	 * Update local LogwrtResult (caller probably did this already, but...)
 	 */
-	LogwrtResult = XLogCtl->LogwrtResult;
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/*
 	 * Since successive pages in the xlog cache are consecutively allocated,
@@ -2675,13 +2673,11 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	 * code in a couple of places.
 	 */
 	{
-		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->LogwrtResult = LogwrtResult;
-		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
-			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
-		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
-			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
-		SpinLockRelease(&XLogCtl->info_lck);
+		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write, LogwrtResult.Write);
+		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush, LogwrtResult.Flush);
+
+		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Write, LogwrtResult.Write);
+		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Flush, LogwrtResult.Flush);
 	}
 }
 
@@ -2696,8 +2692,9 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	WriteRqstPtr = asyncXactLSN;
 	bool		sleeping;
 
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	sleeping = XLogCtl->WalWriterSleeping;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
 		XLogCtl->asyncXactLSN = asyncXactLSN;
@@ -2856,7 +2853,7 @@ void
 XLogFlush(XLogRecPtr record)
 {
 	XLogRecPtr	WriteRqstPtr;
-	XLogwrtRqst WriteRqst;
+	XLogwrt		WriteRqst;
 
 	/*
 	 * During REDO, we are reading not writing WAL.  Therefore, instead of
@@ -2905,11 +2902,9 @@ XLogFlush(XLogRecPtr record)
 		XLogRecPtr	insertpos;
 
 		/* read LogwrtResult and update local state */
-		SpinLockAcquire(&XLogCtl->info_lck);
-		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
-			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		LogwrtResult = XLogCtl->LogwrtResult;
-		SpinLockRelease(&XLogCtl->info_lck);
+		WriteRqstPtr = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Write);
+		LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 		/* done already? */
 		if (record <= LogwrtResult.Flush)
@@ -2939,7 +2934,8 @@ XLogFlush(XLogRecPtr record)
 		}
 
 		/* Got the lock; recheck whether request is satisfied */
-		LogwrtResult = XLogCtl->LogwrtResult;
+		LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 		if (record <= LogwrtResult.Flush)
 		{
 			LWLockRelease(WALWriteLock);
@@ -3044,7 +3040,7 @@ XLogFlush(XLogRecPtr record)
 bool
 XLogBackgroundFlush(void)
 {
-	XLogwrtRqst WriteRqst;
+	XLogwrt		WriteRqst;
 	bool		flexible = true;
 	static TimestampTz lastflush;
 	TimestampTz now;
@@ -3055,10 +3051,10 @@ XLogBackgroundFlush(void)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	WriteRqst = XLogCtl->LogwrtRqst;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+	WriteRqst.Write = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Write);
+	WriteRqst.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Flush);
 
 	/* back off to last completed page boundary */
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
@@ -3140,7 +3136,8 @@ XLogBackgroundFlush(void)
 	/* now wait for any in-progress insertions to finish and get write lock */
 	WaitXLogInsertionsToFinish(WriteRqst.Write);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	if (WriteRqst.Write > LogwrtResult.Write ||
 		WriteRqst.Flush > LogwrtResult.Flush)
 	{
@@ -3228,9 +3225,8 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -7779,10 +7775,11 @@ StartupXLOG(void)
 
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
-	XLogCtl->LogwrtResult = LogwrtResult;
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, LogwrtResult.Write);
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, LogwrtResult.Flush);
 
-	XLogCtl->LogwrtRqst.Write = EndOfLog;
-	XLogCtl->LogwrtRqst.Flush = EndOfLog;
+	pg_atomic_write_u64(&XLogCtl->LogwrtRqst.Write, EndOfLog);
+	pg_atomic_write_u64(&XLogCtl->LogwrtRqst.Flush, EndOfLog);
 
 	/*
 	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
@@ -8484,9 +8481,7 @@ GetInsertRecPtr(void)
 {
 	XLogRecPtr	recptr;
 
-	SpinLockAcquire(&XLogCtl->info_lck);
-	recptr = XLogCtl->LogwrtRqst.Write;
-	SpinLockRelease(&XLogCtl->info_lck);
+	recptr = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Write);
 
 	return recptr;
 }
@@ -8498,9 +8493,8 @@ GetInsertRecPtr(void)
 XLogRecPtr
 GetFlushRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	return LogwrtResult.Flush;
 }
@@ -11649,9 +11643,8 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	return LogwrtResult.Write;
 }
-- 
2.20.1

>From b487979a362f90b866a86b1600791178b5574a75 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Fri, 29 Jan 2021 22:34:58 -0300
Subject: [PATCH v2 3/3] add barriers

---
 src/backend/access/transam/xlog.c | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 8073a92ceb..458efc5a55 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1169,6 +1169,8 @@ XLogInsertRecord(XLogRecData *rdata,
 	{
 		/* advance global request to include new block(s) */
 		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Write, EndPos);
+		pg_memory_barrier();
+
 		/* update local result copy */
 		LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
@@ -2165,6 +2167,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 
 			/* Before waiting, update LogwrtResult */
 			pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Write, OldPageRqstPtr);
+			pg_memory_barrier();
 
 			LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 			LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
@@ -2678,6 +2681,7 @@ XLogWrite(XLogwrt WriteRqst, bool flexible)
 
 		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Write, LogwrtResult.Write);
 		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Flush, LogwrtResult.Flush);
+		pg_write_barrier();
 	}
 }
 
@@ -2905,6 +2909,7 @@ XLogFlush(XLogRecPtr record)
 		WriteRqstPtr = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Write);
 		LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+		pg_read_barrier();
 
 		/* done already? */
 		if (record <= LogwrtResult.Flush)
@@ -2936,6 +2941,7 @@ XLogFlush(XLogRecPtr record)
 		/* Got the lock; recheck whether request is satisfied */
 		LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+		pg_read_barrier();
 		if (record <= LogwrtResult.Flush)
 		{
 			LWLockRelease(WALWriteLock);
@@ -3055,6 +3061,7 @@ XLogBackgroundFlush(void)
 	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	WriteRqst.Write = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Write);
 	WriteRqst.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Flush);
+	pg_read_barrier();
 
 	/* back off to last completed page boundary */
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
@@ -3138,6 +3145,7 @@ XLogBackgroundFlush(void)
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+	pg_read_barrier();
 	if (WriteRqst.Write > LogwrtResult.Write ||
 		WriteRqst.Flush > LogwrtResult.Flush)
 	{
@@ -3227,6 +3235,7 @@ XLogNeedsFlush(XLogRecPtr record)
 	/* read LogwrtResult and update local state */
 	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
+	pg_read_barrier();
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -7780,6 +7789,7 @@ StartupXLOG(void)
 
 	pg_atomic_write_u64(&XLogCtl->LogwrtRqst.Write, EndOfLog);
 	pg_atomic_write_u64(&XLogCtl->LogwrtRqst.Flush, EndOfLog);
+	pg_write_barrier();
 
 	/*
 	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
-- 
2.20.1

Reply via email to