Hello,

So I addressed about half of your comments in this version merely by
fixing silly bugs.  The problem I had which I described as
"synchronization fails" was one of those silly bugs.

So in further thinking, it seems simpler to make only LogwrtResult
atomic, and leave LogwrtRqst as currently, using the spinlock.  This
should solve the contention problem we saw at the customer (but I've
asked Jaime very nicely to do a test run, if possible, to confirm).

For things like logical replication, which call GetFlushRecPtr() very
frequently (causing the spinlock issue we saw) it is good, because we're
no longer hitting the spinlock at all in that case.

I have another (pretty mechanical) patch that renames LogwrtResult.Write
to LogWriteResult and LogwrtResult.Flush to LogFlushResult.  That more
clearly shows that we're no longer updating them on unison.  Didn't want
to attach here because I didn't rebase on current one.  But it seems
logical: there's no longer any point in doing struct assignment, which
is the only thing that stuff was good for.


On 2021-Jan-29, Andres Freund wrote:

> > @@ -1169,6 +1169,8 @@ XLogInsertRecord(XLogRecData *rdata,
> >     {
> >             /* advance global request to include new block(s)
> >             */
> >             pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtRqst.Write, 
> > EndPos);
> > +           pg_memory_barrier();
> 
> That's not really useful - the path that actually updates already
> implies a barrier. It'd probably be better to add a barrier to a "never
> executed cmpxchg" fastpath.

Got it.  Do you mean in pg_atomic_monotonic_advance_u64() itself?  I'm
not sure which is the nicer semantics.  (If it's got to be at the
caller, then we'll need to return a boolean from there, which sounds
worse.)

> > @@ -2905,6 +2909,7 @@ XLogFlush(XLogRecPtr record)
> >             WriteRqstPtr = pg_atomic_read_u64(&XLogCtl->LogwrtRqst.Write);
> >             LogwrtResult.Write = 
> > pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
> >             LogwrtResult.Flush = 
> > pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
> > +           pg_read_barrier();
> 
> I'm not sure you really can get away with just a read barrier in these
> places. We can't e.g. have later updates to other shared memory
> variables be "moved" to before the barrier (which a read barrier
> allows).

Ah, that makes sense.

I have not really studied the barrier locations terribly closely in this
version of the patch.  It probably misses some (eg. in GetFlushRecPtr
and GetXLogWriteRecPtr).  It is passing the tests for me, but alone
that's probably not enough.  I'm gonna try and study the generated
assembly and see if I can make sense of things ...

-- 
Álvaro Herrera                            39°49'30"S 73°17'W
>From ca05447f41270d6b3ac3b6fcf816f86aba86d08f Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Fri, 29 Jan 2021 22:34:23 -0300
Subject: [PATCH 1/2] add pg_atomic_monotonic_advance_u64

---
 src/include/port/atomics.h | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git a/src/include/port/atomics.h b/src/include/port/atomics.h
index 856338f161..a1c4764d18 100644
--- a/src/include/port/atomics.h
+++ b/src/include/port/atomics.h
@@ -519,6 +519,27 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
 	return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
 }
 
+/*
+ * Monotonically advance the given variable using only atomic operations until
+ * it's at least the target value.
+ */
+static inline void
+pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
+{
+	uint64		currval;
+
+#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
+	AssertPointerAlignment(ptr, 8);
+#endif
+
+	currval = pg_atomic_read_u64(ptr);
+	while (currval < target_)
+	{
+		if (pg_atomic_compare_exchange_u64(ptr, &currval, target_))
+			break;
+	}
+}
+
 #undef INSIDE_ATOMICS_H
 
 #endif							/* ATOMICS_H */
-- 
2.20.1

>From d185cf50a47a0f9e346e49ccda038bb016ce246b Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Tue, 2 Feb 2021 14:03:43 -0300
Subject: [PATCH 2/2] make LogwrtResult atomic

---
 src/backend/access/transam/xlog.c | 66 +++++++++++++++----------------
 1 file changed, 32 insertions(+), 34 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f03bd473e2..10802bd56f 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -405,12 +405,9 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
  * These structs are identical but are declared separately to indicate their
  * slightly different functions.
  *
- * To read XLogCtl->LogwrtResult, you must hold either info_lck or
- * WALWriteLock.  To update it, you need to hold both locks.  The point of
- * this arrangement is that the value can be examined by code that already
- * holds WALWriteLock without needing to grab info_lck as well.  In addition
- * to the shared variable, each backend has a private copy of LogwrtResult,
- * which is updated when convenient.
+ * XLogCtl->LogwrtResult is read and written using atomic operations.
+ * In addition to the shared variable, each backend has a private copy of
+ * LogwrtResult, each member of which is separately updated when convenient.
  *
  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
  * (protected by info_lck), but we don't need to cache any copies of it.
@@ -433,6 +430,12 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
  *----------
  */
 
+typedef struct XLogwrtAtomic
+{
+	pg_atomic_uint64 Write;			/* last byte + 1 of write position */
+	pg_atomic_uint64 Flush;			/* last byte + 1 of flush position */
+} XLogwrtAtomic;
+
 typedef struct XLogwrtRqst
 {
 	XLogRecPtr	Write;			/* last byte + 1 to write out */
@@ -596,6 +599,7 @@ typedef struct XLogCtlData
 {
 	XLogCtlInsert Insert;
 
+	XLogwrtAtomic LogwrtResult;
 	/* Protected by info_lck: */
 	XLogwrtRqst LogwrtRqst;
 	XLogRecPtr	RedoRecPtr;		/* a recent copy of Insert->RedoRecPtr */
@@ -613,12 +617,6 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
-	XLogwrtResult LogwrtResult;
-
 	/*
 	 * Latest initialized page in the cache (last byte position + 1).
 	 *
@@ -1172,9 +1170,10 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* advance global request to include new block(s) */
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
-		/* update local result copy while I have the chance */
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
+		/* update local result copy */
+		LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	}
 
 	/*
@@ -2170,13 +2169,13 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 			SpinLockAcquire(&XLogCtl->info_lck);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
-			LogwrtResult = XLogCtl->LogwrtResult;
 			SpinLockRelease(&XLogCtl->info_lck);
 
 			/*
 			 * Now that we have an up-to-date LogwrtResult value, see if we
 			 * still need to write it or if someone else already did.
 			 */
+			LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 			if (LogwrtResult.Write < OldPageRqstPtr)
 			{
 				/*
@@ -2191,7 +2190,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 
 				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
-				LogwrtResult = XLogCtl->LogwrtResult;
+				LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 				if (LogwrtResult.Write >= OldPageRqstPtr)
 				{
 					/* OK, someone wrote it already */
@@ -2438,7 +2437,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	/*
 	 * Update local LogwrtResult (caller probably did this already, but...)
 	 */
-	LogwrtResult = XLogCtl->LogwrtResult;
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
 	/*
 	 * Since successive pages in the xlog cache are consecutively allocated,
@@ -2636,7 +2635,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	 */
 	if (LogwrtResult.Flush < WriteRqst.Flush &&
 		LogwrtResult.Flush < LogwrtResult.Write)
-
 	{
 		/*
 		 * Could get here without iterating above loop, in which case we might
@@ -2675,8 +2673,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	 * code in a couple of places.
 	 */
 	{
+		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Write, LogwrtResult.Write);
+		pg_atomic_monotonic_advance_u64(&XLogCtl->LogwrtResult.Flush, LogwrtResult.Flush);
+		pg_memory_barrier();
 		SpinLockAcquire(&XLogCtl->info_lck);
-		XLogCtl->LogwrtResult = LogwrtResult;
 		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
 			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
 		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
@@ -2696,8 +2696,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	WriteRqstPtr = asyncXactLSN;
 	bool		sleeping;
 
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	sleeping = XLogCtl->WalWriterSleeping;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
 		XLogCtl->asyncXactLSN = asyncXactLSN;
@@ -2908,10 +2908,10 @@ XLogFlush(XLogRecPtr record)
 		SpinLockAcquire(&XLogCtl->info_lck);
 		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
 			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
-		LogwrtResult = XLogCtl->LogwrtResult;
 		SpinLockRelease(&XLogCtl->info_lck);
 
 		/* done already? */
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 		if (record <= LogwrtResult.Flush)
 			break;
 
@@ -2939,7 +2939,7 @@ XLogFlush(XLogRecPtr record)
 		}
 
 		/* Got the lock; recheck whether request is satisfied */
-		LogwrtResult = XLogCtl->LogwrtResult;
+		LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 		if (record <= LogwrtResult.Flush)
 		{
 			LWLockRelease(WALWriteLock);
@@ -3056,7 +3056,6 @@ XLogBackgroundFlush(void)
 
 	/* read LogwrtResult and update local state */
 	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
 	WriteRqst = XLogCtl->LogwrtRqst;
 	SpinLockRelease(&XLogCtl->info_lck);
 
@@ -3064,8 +3063,10 @@ XLogBackgroundFlush(void)
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
 
 	/* if we have already flushed that far, consider async commit records */
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	if (WriteRqst.Write <= LogwrtResult.Flush)
 	{
+		pg_memory_barrier();
 		SpinLockAcquire(&XLogCtl->info_lck);
 		WriteRqst.Write = XLogCtl->asyncXactLSN;
 		SpinLockRelease(&XLogCtl->info_lck);
@@ -3081,6 +3082,7 @@ XLogBackgroundFlush(void)
 	{
 		if (openLogFile >= 0)
 		{
+			LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 			if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo,
 								 wal_segment_size))
 			{
@@ -3140,7 +3142,8 @@ XLogBackgroundFlush(void)
 	/* now wait for any in-progress insertions to finish and get write lock */
 	WaitXLogInsertionsToFinish(WriteRqst.Write);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-	LogwrtResult = XLogCtl->LogwrtResult;
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 	if (WriteRqst.Write > LogwrtResult.Write ||
 		WriteRqst.Flush > LogwrtResult.Flush)
 	{
@@ -3228,9 +3231,7 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -7779,7 +7780,8 @@ StartupXLOG(void)
 
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
-	XLogCtl->LogwrtResult = LogwrtResult;
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Write, EndOfLog);
+	pg_atomic_write_u64(&XLogCtl->LogwrtResult.Flush, EndOfLog);
 
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
@@ -8498,9 +8500,7 @@ GetInsertRecPtr(void)
 XLogRecPtr
 GetFlushRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Flush = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Flush);
 
 	return LogwrtResult.Flush;
 }
@@ -11649,9 +11649,7 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
-	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LogwrtResult.Write = pg_atomic_read_u64(&XLogCtl->LogwrtResult.Write);
 
 	return LogwrtResult.Write;
 }
-- 
2.20.1

Reply via email to