From 2f4a32b1d09419167fe5040465c8a3464010e012 Mon Sep 17 00:00:00 2001
From: Zhiguo Zhou <zhiguo.zhou@intel.com>
Date: Thu, 2 Jan 2025 12:04:02 +0800
Subject: [PATCH] Lock-free XLog Reservation from WAL

Removed PrevBytePos to eliminate lock contention, allowing atomic updates
to CurrBytePos. Adjusted XLog reservation to exceed current XLog memory
slightly, enabling the next XLog's prev-link update without waiting for
the current XLog. Backends now update next XLog's prev-link before
inserting current log, breaking sequential write convention. Updated
GetXLogBuffer to handle separate access and update LSNs. Increased
NUM_XLOGINSERT_LOCKS.
---
 src/backend/access/transam/xlog.c | 186 +++++++++++++++++++++---------
 1 file changed, 131 insertions(+), 55 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6f58412bca..6215ea6977 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -147,7 +147,7 @@ int			wal_segment_size = DEFAULT_XLOG_SEG_SIZE;
  * to happen concurrently, but adds some CPU overhead to flushing the WAL,
  * which needs to iterate all the locks.
  */
-#define NUM_XLOGINSERT_LOCKS  8
+#define NUM_XLOGINSERT_LOCKS  128
 
 /*
  * Max distance from last checkpoint, before triggering a new xlog-based
@@ -404,8 +404,7 @@ typedef struct XLogCtlInsert
 	 * prev-link of the next record. These are stored as "usable byte
 	 * positions" rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
 	 */
-	uint64		CurrBytePos;
-	uint64		PrevBytePos;
+	pg_atomic_uint64		CurrBytePos;
 
 	/*
 	 * Make sure the above heavily-contended spinlock and byte positions are
@@ -700,12 +699,14 @@ static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
 								XLogRecData *rdata,
 								XLogRecPtr StartPos, XLogRecPtr EndPos,
 								TimeLineID tli);
+static XLogRecPtr GetPrevRecPtr(XLogRecPtr hdr, TimeLineID tli);
+static void SetPrevRecPtr(XLogRecPtr hdr, XLogRecPtr PrevPos,
+						  TimeLineID tli, bool in_order);
 static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
-									  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
-static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
-							  XLogRecPtr *PrevPtr);
+									  XLogRecPtr *EndPos);
+static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos);
 static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
-static char *GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli);
+static char *GetXLogBuffer(XLogRecPtr ptr, XLogRecPtr upto, TimeLineID tli);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
 static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
@@ -862,8 +863,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		 * Reserve space for the record in the WAL. This also sets the xl_prev
 		 * pointer.
 		 */
-		ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
-								  &rechdr->xl_prev);
+		ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos);
 
 		/* Normal records are always inserted. */
 		inserted = true;
@@ -883,7 +883,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		 */
 		Assert(fpw_lsn == InvalidXLogRecPtr);
 		WALInsertLockAcquireExclusive();
-		inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
+		inserted = ReserveXLogSwitch(&StartPos, &EndPos);
 	}
 	else
 	{
@@ -898,14 +898,22 @@ XLogInsertRecord(XLogRecData *rdata,
 		 */
 		Assert(fpw_lsn == InvalidXLogRecPtr);
 		WALInsertLockAcquireExclusive();
-		ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos,
-								  &rechdr->xl_prev);
+		ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos);
 		RedoRecPtr = Insert->RedoRecPtr = StartPos;
 		inserted = true;
 	}
 
 	if (inserted)
 	{
+		bool		islargerecord;
+
+		islargerecord = EndPos - StartPos >= (Size) XLOGbuffers * XLOG_BLCKSZ;
+		if (!islargerecord)
+		{
+			SetPrevRecPtr(EndPos, StartPos, insertTLI, false);
+		}
+		while (!(rechdr->xl_prev = GetPrevRecPtr(StartPos, insertTLI)));
+
 		/*
 		 * Now that xl_prev has been filled in, calculate CRC of the record
 		 * header.
@@ -923,6 +931,11 @@ XLogInsertRecord(XLogRecData *rdata,
 							class == WALINSERT_SPECIAL_SWITCH, rdata,
 							StartPos, EndPos, insertTLI);
 
+		if (islargerecord)
+		{
+			SetPrevRecPtr(EndPos, StartPos, insertTLI, true);
+		}
+
 		/*
 		 * Unless record is flagged as not important, update LSN of last
 		 * important record in the current slot. When holding all locks, just
@@ -1105,13 +1118,11 @@ XLogInsertRecord(XLogRecData *rdata,
  * inline. We use pg_attribute_always_inline here to try to convince it.
  */
 static pg_attribute_always_inline void
-ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
-						  XLogRecPtr *PrevPtr)
+ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos)
 {
 	XLogCtlInsert *Insert = &XLogCtl->Insert;
 	uint64		startbytepos;
 	uint64		endbytepos;
-	uint64		prevbytepos;
 
 	size = MAXALIGN(size);
 
@@ -1128,19 +1139,11 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 	 * because the usable byte position doesn't include any headers, reserving
 	 * X bytes from WAL is almost as simple as "CurrBytePos += X".
 	 */
-	SpinLockAcquire(&Insert->insertpos_lck);
-
-	startbytepos = Insert->CurrBytePos;
+	startbytepos = pg_atomic_fetch_add_u64(&Insert->CurrBytePos, size);
 	endbytepos = startbytepos + size;
-	prevbytepos = Insert->PrevBytePos;
-	Insert->CurrBytePos = endbytepos;
-	Insert->PrevBytePos = startbytepos;
-
-	SpinLockRelease(&Insert->insertpos_lck);
 
 	*StartPos = XLogBytePosToRecPtr(startbytepos);
 	*EndPos = XLogBytePosToEndRecPtr(endbytepos);
-	*PrevPtr = XLogBytePosToRecPtr(prevbytepos);
 
 	/*
 	 * Check that the conversions between "usable byte positions" and
@@ -1148,7 +1151,6 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 	 */
 	Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
 	Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
-	Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
 }
 
 /*
@@ -1161,12 +1163,11 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
  * reserving any space, and the function returns false.
 */
 static bool
-ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
+ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos)
 {
 	XLogCtlInsert *Insert = &XLogCtl->Insert;
 	uint64		startbytepos;
 	uint64		endbytepos;
-	uint64		prevbytepos;
 	uint32		size = MAXALIGN(SizeOfXLogRecord);
 	XLogRecPtr	ptr;
 	uint32		segleft;
@@ -1179,7 +1180,7 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 	 */
 	SpinLockAcquire(&Insert->insertpos_lck);
 
-	startbytepos = Insert->CurrBytePos;
+	startbytepos = pg_atomic_read_u64(&Insert->CurrBytePos);
 
 	ptr = XLogBytePosToEndRecPtr(startbytepos);
 	if (XLogSegmentOffset(ptr, wal_segment_size) == 0)
@@ -1190,7 +1191,6 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 	}
 
 	endbytepos = startbytepos + size;
-	prevbytepos = Insert->PrevBytePos;
 
 	*StartPos = XLogBytePosToRecPtr(startbytepos);
 	*EndPos = XLogBytePosToEndRecPtr(endbytepos);
@@ -1202,17 +1202,13 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 		*EndPos += segleft;
 		endbytepos = XLogRecPtrToBytePos(*EndPos);
 	}
-	Insert->CurrBytePos = endbytepos;
-	Insert->PrevBytePos = startbytepos;
+	pg_atomic_write_u64(&Insert->CurrBytePos, endbytepos);
 
 	SpinLockRelease(&Insert->insertpos_lck);
 
-	*PrevPtr = XLogBytePosToRecPtr(prevbytepos);
-
 	Assert(XLogSegmentOffset(*EndPos, wal_segment_size) == 0);
 	Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos);
 	Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos);
-	Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos);
 
 	return true;
 }
@@ -1236,7 +1232,7 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
 	 * inserting to.
 	 */
 	CurrPos = StartPos;
-	currpos = GetXLogBuffer(CurrPos, tli);
+	currpos = GetXLogBuffer(CurrPos, CurrPos, tli);
 	freespace = INSERT_FREESPACE(CurrPos);
 
 	/*
@@ -1273,7 +1269,7 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
 			 * page was initialized, in AdvanceXLInsertBuffer, and we're the
 			 * only backend that needs to set the contrecord flag.
 			 */
-			currpos = GetXLogBuffer(CurrPos, tli);
+			currpos = GetXLogBuffer(CurrPos, CurrPos, tli);
 			pagehdr = (XLogPageHeader) currpos;
 			pagehdr->xlp_rem_len = write_len - written;
 			pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
@@ -1346,7 +1342,7 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
 			 * (which itself calls the two methods we need) to get the pointer
 			 * and zero most of the page.  Then we just zero the page header.
 			 */
-			currpos = GetXLogBuffer(CurrPos, tli);
+			currpos = GetXLogBuffer(CurrPos, CurrPos, tli);
 			MemSet(currpos, 0, SizeOfXLogShortPHD);
 
 			CurrPos += XLOG_BLCKSZ;
@@ -1364,6 +1360,90 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
 				errmsg_internal("space reserved for WAL record does not match what was written"));
 }
 
+static XLogRecPtr
+GetPrevRecPtr(XLogRecPtr hdr, TimeLineID tli)
+{
+	uint64		xlprevbytepos;
+	char		*xl_prev_pos;
+	int			freespace;
+
+	XLogRecPtr	res;
+	XLogRecPtr	xl_prev_ptr;
+
+	xlprevbytepos = XLogRecPtrToBytePos(hdr) + offsetof(XLogRecord, xl_prev);
+	xl_prev_ptr = XLogBytePosToRecPtr(xlprevbytepos);
+	xl_prev_pos = GetXLogBuffer(xl_prev_ptr, hdr, tli);
+	freespace = INSERT_FREESPACE(xl_prev_ptr);
+
+	if (freespace >= sizeof(XLogRecPtr))
+	{
+		res = *((XLogRecPtr *) xl_prev_pos);
+	}
+	else
+	{
+		char	*res_data;
+
+		res_data = (char *) &res;
+		memcpy(res_data, xl_prev_pos, freespace);
+
+		if (XLogSegmentOffset(xl_prev_ptr + freespace, wal_segment_size) == 0)
+		{
+			xl_prev_pos += SizeOfXLogLongPHD;
+		}
+		else
+		{
+			xl_prev_pos += SizeOfXLogShortPHD;
+		}
+
+		memcpy(res_data + freespace, xl_prev_pos, sizeof(XLogRecPtr) - freespace);
+	}
+
+	return res;
+}
+
+static void
+SetPrevRecPtr(XLogRecPtr hdr, XLogRecPtr xl_prev_to_insert, TimeLineID tli, bool in_order)
+{
+	uint64		xlprevbytepos;
+	char		*xl_prev_pos;
+	int			freespace;
+
+	XLogRecPtr	xl_prev_ptr;
+
+	xlprevbytepos = XLogRecPtrToBytePos(hdr) + offsetof(XLogRecord, xl_prev);
+	xl_prev_ptr = XLogBytePosToRecPtr(xlprevbytepos);
+
+	if (in_order)
+		xl_prev_pos = GetXLogBuffer(xl_prev_ptr, xl_prev_ptr, tli);
+	else
+		xl_prev_pos = GetXLogBuffer(xl_prev_ptr, xl_prev_to_insert, tli);
+
+	freespace = INSERT_FREESPACE(xl_prev_ptr);
+
+	if (freespace >= sizeof(XLogRecPtr))
+	{
+		*((XLogRecPtr *) xl_prev_pos) = xl_prev_to_insert;
+	}
+	else
+	{
+		char	*xl_prev_to_insert_data;
+
+		xl_prev_to_insert_data = (char *) &xl_prev_to_insert;
+		memcpy(xl_prev_pos, xl_prev_to_insert_data, freespace);
+
+		if (XLogSegmentOffset(xl_prev_ptr + freespace, wal_segment_size) == 0)
+		{
+			xl_prev_pos += SizeOfXLogLongPHD;
+		}
+		else
+		{
+			xl_prev_pos += SizeOfXLogShortPHD;
+		}
+
+		memcpy(xl_prev_pos, xl_prev_to_insert_data + freespace, sizeof(XLogRecPtr) - freespace);
+	}
+}
+
 /*
  * Acquire a WAL insertion lock, for inserting to WAL.
  */
@@ -1522,9 +1602,7 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 		return inserted;
 
 	/* Read the current insert position */
-	SpinLockAcquire(&Insert->insertpos_lck);
-	bytepos = Insert->CurrBytePos;
-	SpinLockRelease(&Insert->insertpos_lck);
+	bytepos = pg_atomic_read_u64(&Insert->CurrBytePos);
 	reservedUpto = XLogBytePosToEndRecPtr(bytepos);
 
 	/*
@@ -1629,7 +1707,7 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
  * later, because older buffers might be recycled already)
  */
 static char *
-GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
+GetXLogBuffer(XLogRecPtr ptr, XLogRecPtr upto, TimeLineID tli)
 {
 	int			idx;
 	XLogRecPtr	endptr;
@@ -1690,14 +1768,14 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli)
 		 * sure that it's initialized, before we let insertingAt to move past
 		 * the page header.
 		 */
-		if (ptr % XLOG_BLCKSZ == SizeOfXLogShortPHD &&
-			XLogSegmentOffset(ptr, wal_segment_size) > XLOG_BLCKSZ)
-			initializedUpto = ptr - SizeOfXLogShortPHD;
-		else if (ptr % XLOG_BLCKSZ == SizeOfXLogLongPHD &&
-				 XLogSegmentOffset(ptr, wal_segment_size) < XLOG_BLCKSZ)
-			initializedUpto = ptr - SizeOfXLogLongPHD;
+		if (upto % XLOG_BLCKSZ == SizeOfXLogShortPHD &&
+			XLogSegmentOffset(upto, wal_segment_size) > XLOG_BLCKSZ)
+			initializedUpto = upto - SizeOfXLogShortPHD;
+		else if (upto % XLOG_BLCKSZ == SizeOfXLogLongPHD &&
+				 XLogSegmentOffset(upto, wal_segment_size) < XLOG_BLCKSZ)
+			initializedUpto = upto - SizeOfXLogLongPHD;
 		else
-			initializedUpto = ptr;
+			initializedUpto = upto;
 
 		WALInsertLockUpdateInsertingAt(initializedUpto);
 
@@ -6018,8 +6096,7 @@ StartupXLOG(void)
 	 * previous incarnation.
 	 */
 	Insert = &XLogCtl->Insert;
-	Insert->PrevBytePos = XLogRecPtrToBytePos(endOfRecoveryInfo->lastRec);
-	Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
+	pg_atomic_write_u64(&Insert->CurrBytePos, XLogRecPtrToBytePos(EndOfLog));
 
 	/*
 	 * Tricky point here: lastPage contains the *last* block that the LastRec
@@ -6053,6 +6130,7 @@ StartupXLOG(void)
 		 */
 		XLogCtl->InitializedUpTo = EndOfLog;
 	}
+	SetPrevRecPtr(EndOfLog, endOfRecoveryInfo->lastRec, newTLI, true);
 
 	/*
 	 * Update local and shared status.  This is OK to do without any locks
@@ -7005,7 +7083,7 @@ CreateCheckPoint(int flags)
 
 	if (shutdown)
 	{
-		XLogRecPtr	curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
+		XLogRecPtr	curInsert = XLogBytePosToRecPtr(pg_atomic_read_u64(&Insert->CurrBytePos));
 
 		/*
 		 * Compute new REDO record ptr = location of next XLOG record.
@@ -7473,7 +7551,7 @@ CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn, XLogRecPtr pagePtr,
 	 * insertion lock is just pro forma.
 	 */
 	WALInsertLockAcquire();
-	pagehdr = (XLogPageHeader) GetXLogBuffer(pagePtr, newTLI);
+	pagehdr = (XLogPageHeader) GetXLogBuffer(pagePtr, pagePtr, newTLI);
 	pagehdr->xlp_info |= XLP_FIRST_IS_OVERWRITE_CONTRECORD;
 	WALInsertLockRelease();
 
@@ -9437,9 +9515,7 @@ GetXLogInsertRecPtr(void)
 	XLogCtlInsert *Insert = &XLogCtl->Insert;
 	uint64		current_bytepos;
 
-	SpinLockAcquire(&Insert->insertpos_lck);
-	current_bytepos = Insert->CurrBytePos;
-	SpinLockRelease(&Insert->insertpos_lck);
+	current_bytepos = pg_atomic_read_u64(&Insert->CurrBytePos);
 
 	return XLogBytePosToRecPtr(current_bytepos);
 }
-- 
2.43.0

