Jaime Casanova recently reported a situation where pglogical replicating
from 64 POS sites to a single central (64-core) node, each with two
replication sets, causes XLog's info_lck to become highly contended
because of frequently reading LogwrtResult.  We tested the simple fix of
adding a new LWLock that protects LogwrtResult and LogwrtRqst; that
seems to solve the problem easily enough.

At first I wanted to make the new LWLock cover only LogwrtResult proper,
and leave LogwrtRqst alone.  However on doing it, it seemed that that
might change the locking protocol in a nontrivial way.  So I decided to
make it cover both and call it a day.  We did verify that the patch
solves the reported problem, at any rate.

-- 
Álvaro Herrera                PostgreSQL Expert, https://www.2ndQuadrant.com/
>From 3eb871f235c1b6005ff5dc88561173e4e92c1314 Mon Sep 17 00:00:00 2001
From: Alvaro Herrera <alvhe...@alvh.no-ip.org>
Date: Mon, 17 Aug 2020 19:05:22 -0400
Subject: [PATCH] use an LWLock rather than spinlock for Logwrt{Result,Rqst}

---
 src/backend/access/transam/xlog.c        | 76 +++++++++++++-----------
 src/backend/storage/lmgr/lwlocknames.txt |  2 +-
 2 files changed, 43 insertions(+), 35 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 09c01ed4ae..d838739bc4 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -403,15 +403,13 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr;
  * These structs are identical but are declared separately to indicate their
  * slightly different functions.
  *
- * To read XLogCtl->LogwrtResult, you must hold either info_lck or
- * WALWriteLock.  To update it, you need to hold both locks.  The point of
- * this arrangement is that the value can be examined by code that already
- * holds WALWriteLock without needing to grab info_lck as well.  In addition
- * to the shared variable, each backend has a private copy of LogwrtResult,
- * which is updated when convenient.
+ * To read/write XLogCtl->LogwrtResult, you must hold WALWriteResultLock.
+ * In addition to the shared variable, each backend has a private copy of
+ * LogwrtResult, which is updated when convenient.
  *
  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
- * (protected by info_lck), but we don't need to cache any copies of it.
+ * (also protected by WALWriteResultLock), but we don't need to cache any
+ * copies of it.
  *
  * info_lck is only held long enough to read/update the protected variables,
  * so it's a plain spinlock.  The other locks are held longer (potentially
@@ -615,10 +613,7 @@ typedef struct XLogCtlData
 	pg_time_t	lastSegSwitchTime;
 	XLogRecPtr	lastSegSwitchLSN;
 
-	/*
-	 * Protected by info_lck and WALWriteLock (you must hold either lock to
-	 * read it, but both to update)
-	 */
+	/* Protected by WALWriteResultLock. */
 	XLogwrtResult LogwrtResult;
 
 	/*
@@ -1158,13 +1153,12 @@ XLogInsertRecord(XLogRecData *rdata,
 	 */
 	if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ)
 	{
-		SpinLockAcquire(&XLogCtl->info_lck);
 		/* advance global request to include new block(s) */
+		LWLockAcquire(WALWriteResultLock, LW_EXCLUSIVE);
 		if (XLogCtl->LogwrtRqst.Write < EndPos)
 			XLogCtl->LogwrtRqst.Write = EndPos;
-		/* update local result copy while I have the chance */
 		LogwrtResult = XLogCtl->LogwrtResult;
-		SpinLockRelease(&XLogCtl->info_lck);
+		LWLockRelease(WALWriteResultLock);
 	}
 
 	/*
@@ -2155,12 +2149,12 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 			if (opportunistic)
 				break;
 
-			/* Before waiting, get info_lck and update LogwrtResult */
-			SpinLockAcquire(&XLogCtl->info_lck);
+			/* Before waiting, update LogwrtResult */
+			LWLockAcquire(WALWriteResultLock, LW_EXCLUSIVE);
 			if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
 				XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
 			LogwrtResult = XLogCtl->LogwrtResult;
-			SpinLockRelease(&XLogCtl->info_lck);
+			LWLockRelease(WALWriteResultLock);
 
 			/*
 			 * Now that we have an up-to-date LogwrtResult value, see if we
@@ -2178,9 +2172,11 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic)
 
 				WaitXLogInsertionsToFinish(OldPageRqstPtr);
 
-				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
-
+				LWLockAcquire(WALWriteResultLock, LW_SHARED);
 				LogwrtResult = XLogCtl->LogwrtResult;
+				LWLockRelease(WALWriteResultLock);
+
+				LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 				if (LogwrtResult.Write >= OldPageRqstPtr)
 				{
 					/* OK, someone wrote it already */
@@ -2426,7 +2422,9 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	/*
 	 * Update local LogwrtResult (caller probably did this already, but...)
 	 */
+	LWLockAcquire(WALWriteResultLock, LW_SHARED);
 	LogwrtResult = XLogCtl->LogwrtResult;
+	LWLockRelease(WALWriteResultLock);
 
 	/*
 	 * Since successive pages in the xlog cache are consecutively allocated,
@@ -2663,13 +2661,14 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
 	 * code in a couple of places.
 	 */
 	{
-		SpinLockAcquire(&XLogCtl->info_lck);
+		LWLockAcquire(WALWriteResultLock, LW_EXCLUSIVE);
 		XLogCtl->LogwrtResult = LogwrtResult;
+
 		if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
 			XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
 		if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
 			XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
-		SpinLockRelease(&XLogCtl->info_lck);
+		LWLockRelease(WALWriteResultLock);
 	}
 }
 
@@ -2684,8 +2683,10 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
 	XLogRecPtr	WriteRqstPtr = asyncXactLSN;
 	bool		sleeping;
 
-	SpinLockAcquire(&XLogCtl->info_lck);
+	LWLockAcquire(WALWriteResultLock, LW_SHARED);
 	LogwrtResult = XLogCtl->LogwrtResult;
+	LWLockRelease(WALWriteResultLock);
+	SpinLockAcquire(&XLogCtl->info_lck);
 	sleeping = XLogCtl->WalWriterSleeping;
 	if (XLogCtl->asyncXactLSN < asyncXactLSN)
 		XLogCtl->asyncXactLSN = asyncXactLSN;
@@ -2893,11 +2894,11 @@ XLogFlush(XLogRecPtr record)
 		XLogRecPtr	insertpos;
 
 		/* read LogwrtResult and update local state */
-		SpinLockAcquire(&XLogCtl->info_lck);
+		LWLockAcquire(WALWriteResultLock, LW_SHARED);
 		if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
 			WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
 		LogwrtResult = XLogCtl->LogwrtResult;
-		SpinLockRelease(&XLogCtl->info_lck);
+		LWLockRelease(WALWriteResultLock);
 
 		/* done already? */
 		if (record <= LogwrtResult.Flush)
@@ -2927,12 +2928,15 @@ XLogFlush(XLogRecPtr record)
 		}
 
 		/* Got the lock; recheck whether request is satisfied */
+		LWLockAcquire(WALWriteResultLock, LW_SHARED);
 		LogwrtResult = XLogCtl->LogwrtResult;
 		if (record <= LogwrtResult.Flush)
 		{
+			LWLockRelease(WALWriteResultLock);
 			LWLockRelease(WALWriteLock);
 			break;
 		}
+		LWLockRelease(WALWriteResultLock);
 
 		/*
 		 * Sleep before flush! By adding a delay here, we may give further
@@ -3043,10 +3047,10 @@ XLogBackgroundFlush(void)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
+	LWLockAcquire(WALWriteResultLock, LW_SHARED);
 	LogwrtResult = XLogCtl->LogwrtResult;
 	WriteRqst = XLogCtl->LogwrtRqst;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LWLockRelease(WALWriteResultLock);
 
 	/* back off to last completed page boundary */
 	WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
@@ -3128,7 +3132,9 @@ XLogBackgroundFlush(void)
 	/* now wait for any in-progress insertions to finish and get write lock */
 	WaitXLogInsertionsToFinish(WriteRqst.Write);
 	LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+	LWLockAcquire(WALWriteResultLock, LW_SHARED);
 	LogwrtResult = XLogCtl->LogwrtResult;
+	LWLockRelease(WALWriteResultLock);
 	if (WriteRqst.Write > LogwrtResult.Write ||
 		WriteRqst.Flush > LogwrtResult.Flush)
 	{
@@ -3216,9 +3222,9 @@ XLogNeedsFlush(XLogRecPtr record)
 		return false;
 
 	/* read LogwrtResult and update local state */
-	SpinLockAcquire(&XLogCtl->info_lck);
+	LWLockAcquire(WALWriteResultLock, LW_SHARED);
 	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LWLockRelease(WALWriteResultLock);
 
 	/* check again */
 	if (record <= LogwrtResult.Flush)
@@ -7698,10 +7704,12 @@ StartupXLOG(void)
 
 	LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
 
+	LWLockAcquire(WALWriteResultLock, LW_EXCLUSIVE);
 	XLogCtl->LogwrtResult = LogwrtResult;
 
 	XLogCtl->LogwrtRqst.Write = EndOfLog;
 	XLogCtl->LogwrtRqst.Flush = EndOfLog;
+	LWLockRelease(WALWriteResultLock);
 
 	/*
 	 * Update full_page_writes in shared memory and write an XLOG_FPW_CHANGE
@@ -8406,9 +8414,9 @@ GetInsertRecPtr(void)
 {
 	XLogRecPtr	recptr;
 
-	SpinLockAcquire(&XLogCtl->info_lck);
+	LWLockAcquire(WALWriteResultLock, LW_SHARED);
 	recptr = XLogCtl->LogwrtRqst.Write;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LWLockRelease(WALWriteResultLock);
 
 	return recptr;
 }
@@ -8420,9 +8428,9 @@ GetInsertRecPtr(void)
 XLogRecPtr
 GetFlushRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
+	LWLockAcquire(WALWriteResultLock, LW_SHARED);
 	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LWLockRelease(WALWriteResultLock);
 
 	return LogwrtResult.Flush;
 }
@@ -11509,9 +11517,9 @@ GetXLogInsertRecPtr(void)
 XLogRecPtr
 GetXLogWriteRecPtr(void)
 {
-	SpinLockAcquire(&XLogCtl->info_lck);
+	LWLockAcquire(WALWriteResultLock, LW_SHARED);
 	LogwrtResult = XLogCtl->LogwrtResult;
-	SpinLockRelease(&XLogCtl->info_lck);
+	LWLockRelease(WALWriteResultLock);
 
 	return LogwrtResult.Write;
 }
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index 774292fd94..b4d114cfd5 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,6 +50,6 @@ MultiXactTruncationLock				41
 OldSnapshotTimeMapLock				42
 LogicalRepWorkerLock				43
 XactTruncationLock					44
-# 45 was XactTruncationLock until removal of BackendRandomLock
+WALWriteResultLock					45
 WrapLimitsVacuumLock				46
 NotifyQueueTailLock					47
-- 
2.20.1

Reply via email to