From c41f7b72d57b4fa4211079f7637f0a8470ec9348 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?=D0=AE=D1=80=D0=B8=D0=B9=20=D0=A1=D0=BE=D0=BA=D0=BE=D0=BB?=
 =?UTF-8?q?=D0=BE=D0=B2?= <yura@Urijs-MacBook-Air.local>
Date: Mon, 6 Jan 2025 21:54:06 +0300
Subject: [PATCH v1] Lock-free XLog Reservation using lock-free hash-table

Removed PrevBytePos to eliminate lock contention, allowing atomic updates
to CurrBytePos. Use lock-free hash-table based on 4-way Cuckoo Hashing
to store link to PrevBytePos.
---
 src/backend/access/transam/xlog.c | 346 ++++++++++++++++++++++++------
 1 file changed, 285 insertions(+), 61 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index b9ea92a542..2f35e1645a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -68,6 +68,8 @@
 #include "catalog/pg_database.h"
 #include "common/controldata_utils.h"
 #include "common/file_utils.h"
+#include "common/hashfn.h"
+#include "common/pg_prng.h"
 #include "executor/instrument.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
@@ -147,7 +149,7 @@ int			wal_segment_size = DEFAULT_XLOG_SEG_SIZE;
  * to happen concurrently, but adds some CPU overhead to flushing the WAL,
  * which needs to iterate all the locks.
  */
-#define NUM_XLOGINSERT_LOCKS  8
+#define NUM_XLOGINSERT_LOCKS  128
 
 /*
  * Max distance from last checkpoint, before triggering a new xlog-based
@@ -384,6 +386,36 @@ typedef union WALInsertLockPadded
 	char		pad[PG_CACHE_LINE_SIZE];
 } WALInsertLockPadded;
 
+/*
+ * It links current position with previous one.
+ * - CurrPosId is (CurrBytePos ^ (CurrBytePos>>32))
+ *   Since CurrBytePos grows monotonically and it is aligned to MAXALIGN,
+ *   CurrPosId correctly identifies CurrBytePos for at least 4*2^32 = 32GB of
+ *   WAL logs.
+ * - PrevSize is difference between CurrBytePos and PrevBytePos
+ */
+typedef struct
+{
+	uint32		CurrPosId;
+	uint32		PrevSize;
+}			WALPrevPosLinkVal;
+
+/*
+ * This is an element of lock-free hash-table.
+ * PrevSize's lowest bit is used as a lock, relying on fact it is MAXALIGN-ed.
+ */
+typedef struct
+{
+	pg_atomic_uint32 CurrPosId;
+	pg_atomic_uint32 PrevSize;
+}			WALPrevPosLink;
+
+#define PREV_LINKS_HASH_CAPA 256
+StaticAssertDecl(!(PREV_LINKS_HASH_CAPA & (PREV_LINKS_HASH_CAPA - 1)),
+				 "PREV_LINKS_HASH_CAPA should be power of two");
+
+#define SWAP_ONCE_IN 128
+
 /*
  * Session status of running backup, used for sanity checks in SQL-callable
  * functions to start and stop backups.
@@ -395,26 +427,31 @@ static SessionBackupState sessionBackupState = SESSION_BACKUP_NONE;
  */
 typedef struct XLogCtlInsert
 {
-	slock_t		insertpos_lck;	/* protects CurrBytePos and PrevBytePos */
-
 	/*
 	 * CurrBytePos is the end of reserved WAL. The next record will be
-	 * inserted at that position. PrevBytePos is the start position of the
-	 * previously inserted (or rather, reserved) record - it is copied to the
-	 * prev-link of the next record. These are stored as "usable byte
-	 * positions" rather than XLogRecPtrs (see XLogBytePosToRecPtr()).
+	 * inserted at that position.
+	 *
+	 * The start position of the previously inserted (or rather, reserved)
+	 * record (it is copied to the prev-link of the next record) will be
+	 * stored in PrevLinksHash.
+	 *
+	 * These are stored as "usable byte positions" rather than XLogRecPtrs
+	 * (see XLogBytePosToRecPtr()).
 	 */
-	uint64		CurrBytePos;
-	uint64		PrevBytePos;
+	pg_atomic_uint64 CurrBytePos pg_attribute_aligned(PG_CACHE_LINE_SIZE);
 
 	/*
-	 * Make sure the above heavily-contended spinlock and byte positions are
-	 * on their own cache line. In particular, the RedoRecPtr and full page
-	 * write variables below should be on a different cache line. They are
-	 * read on every WAL insertion, but updated rarely, and we don't want
-	 * those reads to steal the cache line containing Curr/PrevBytePos.
+	 * PrevLinksHash is a lock-free hash table based on Cuckoo algorith. It is
+	 * mostly 4 way: for every element computed two positions h1, h2, and
+	 * neighbour h1+1 and h2+2 are used as well. This way even on collision we
+	 * have 3 distinct position, which provide us ~75% fill rate without
+	 * unsolvable cycles (due to Cuckoo's theory).
+	 *
+	 * Certainly, we rely on the fact we will delete elements with same speed
+	 * as we add them, and even unsolvable cycles will be destroyed soon by
+	 * concurrent deletions.
 	 */
-	char		pad[PG_CACHE_LINE_SIZE];
+	WALPrevPosLink PrevLinksHash[PREV_LINKS_HASH_CAPA];
 
 	/*
 	 * fullPageWrites is the authoritative value used by all backends to
@@ -700,6 +737,15 @@ static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch,
 								XLogRecData *rdata,
 								XLogRecPtr StartPos, XLogRecPtr EndPos,
 								TimeLineID tli);
+
+static bool WALPrevPosLinkInsert(WALPrevPosLink * link, WALPrevPosLinkVal val);
+static bool WALPrevPosLinkConsume(WALPrevPosLink * link, WALPrevPosLinkVal * val);
+static bool WALPrevPosLinkSwap(WALPrevPosLink * link, WALPrevPosLinkVal * val);
+static void LinkAndFindPrevPos(XLogRecPtr StartPos, XLogRecPtr EndPos,
+							   XLogRecPtr *PrevPtr);
+static void LinkStartPrevPos(XLogRecPtr EndOfLog, XLogRecPtr LastRec);
+static XLogRecPtr ReadInsertCurrBytePos(void);
+
 static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
 									  XLogRecPtr *EndPos, XLogRecPtr *PrevPtr);
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
@@ -1086,6 +1132,192 @@ XLogInsertRecord(XLogRecData *rdata,
 	return EndPos;
 }
 
+/*
+ * Attempt to write into empty link.
+ */
+static pg_attribute_always_inline bool
+WALPrevPosLinkInsert(WALPrevPosLink * link, WALPrevPosLinkVal val)
+{
+	uint32		empty = 0;
+
+	/* first check it read-only */
+	if (pg_atomic_read_u32(&link->PrevSize) != 0)
+		return false;
+	if (!pg_atomic_compare_exchange_u32(&link->PrevSize, &empty, 1))
+		/* someone else occupied the entry */
+		return false;
+
+	pg_atomic_write_u32(&link->CurrPosId, val.CurrPosId);
+	/* This write acts as unlock as well. */
+	pg_atomic_write_membarrier_u32(&link->PrevSize, val.PrevSize);
+	return true;
+}
+
+/*
+ * Attempt to consume matched link.
+ */
+static pg_attribute_always_inline bool
+WALPrevPosLinkConsume(WALPrevPosLink * link, WALPrevPosLinkVal * val)
+{
+
+	if (pg_atomic_read_u32(&link->CurrPosId) != val->CurrPosId)
+		return false;
+
+	/* Try lock */
+	val->PrevSize = pg_atomic_fetch_or_u32(&link->PrevSize, 1);
+	if (val->PrevSize & 1)
+		/* Lock failed */
+		return false;
+
+	if (pg_atomic_read_u32(&link->CurrPosId) != val->CurrPosId)
+	{
+		/* unlock with old value */
+		pg_atomic_write_u32(&link->PrevSize, val->PrevSize);
+		return false;
+	}
+
+	pg_atomic_write_u32(&link->CurrPosId, 0);
+	/* This write acts as unlock as well. */
+	pg_atomic_write_membarrier_u32(&link->PrevSize, 0);
+	return true;
+}
+
+/*
+ * Attempt to swap entry: remember existing link and write our.
+ * It could happen we consume empty entry. Caller will detect it by checking
+ * remembered value.
+ */
+static pg_attribute_always_inline bool
+WALPrevPosLinkSwap(WALPrevPosLink * link, WALPrevPosLinkVal * val)
+{
+	uint32		oldCur;
+	uint32		oldPrev;
+
+	/* Attempt to lock entry against concurrent consumer or swapper */
+	oldPrev = pg_atomic_fetch_or_u32(&link->PrevSize, 1);
+	if (oldPrev & 1)
+		/* Lock failed */
+		return false;
+
+	oldCur = pg_atomic_read_u32(&link->CurrPosId);
+	pg_atomic_write_u32(&link->CurrPosId, val->CurrPosId);
+	/* This write acts as unlock as well. */
+	pg_atomic_write_membarrier_u32(&link->PrevSize, val->PrevSize);
+
+	val->CurrPosId = oldCur;
+	val->PrevSize = oldPrev;
+	return true;
+}
+
+static pg_attribute_always_inline void
+CalcCuckooPositions(uint32 ptr, uint32 pos[4])
+{
+	uint32		hash = murmurhash32(ptr);
+
+	pos[0] = hash % PREV_LINKS_HASH_CAPA;
+	pos[1] = (pos[0] + 1) % PREV_LINKS_HASH_CAPA;
+	pos[2] = (hash >> 16) % PREV_LINKS_HASH_CAPA;
+	pos[3] = (pos[2] + 2) % PREV_LINKS_HASH_CAPA;
+}
+
+/*
+ * Write new link (EndPos, StartPos) and find PrevPtr for StartPos.
+ *
+ * Links are stored in lock-free Cuckoo based hash-table.
+ * We use mostly-4 way Cuckoo hashing which provides high fill rate without
+ * hard cycle collisions. Also we rely on concurrent consumers of existing
+ * entry, so cycles will be broken in mean time.
+ *
+ * Cuckoo hashing relies on re-insertion for balancing, so we occasionally
+ * swaps entry and try to insert swapped instead of our.
+ */
+static void
+LinkAndFindPrevPos(XLogRecPtr StartPos, XLogRecPtr EndPos, XLogRecPtr *PrevPtr)
+{
+	WALPrevPosLink *hashtable = XLogCtl->Insert.PrevLinksHash;
+	WALPrevPosLinkVal lookup = {
+		.CurrPosId = StartPos ^ (StartPos >> 32),
+	};
+	WALPrevPosLinkVal insert = {
+		.CurrPosId = EndPos ^ (EndPos >> 32),
+		.PrevSize = EndPos - StartPos
+	};
+	pg_prng_state prng;
+	uint32		lookup_pos[4];
+	uint32		insert_pos[4];
+	uint32		i;
+	bool		inserted = false;
+	bool		found = false;
+
+	CalcCuckooPositions(lookup.CurrPosId, lookup_pos);
+	CalcCuckooPositions(insert.CurrPosId, insert_pos);
+	pg_prng_seed(&prng, StartPos);
+
+	while (!inserted || !found)
+	{
+		for (i = 0; !found && i < 4; i++)
+			found = WALPrevPosLinkConsume(&hashtable[lookup_pos[i]], &lookup);
+
+		if (inserted)
+			goto next;
+
+		for (i = 0; !inserted && i < 4; i++)
+			inserted = WALPrevPosLinkInsert(&hashtable[insert_pos[i]], insert);
+
+		if (inserted)
+			goto next;
+
+		if (pg_prng_uint32(&prng) % SWAP_ONCE_IN != 0)
+			goto next;
+
+		i = pg_prng_uint32(&prng) % 4;
+		if (!WALPrevPosLinkSwap(&hashtable[insert_pos[i]], &insert))
+			goto next;
+
+		if (insert.PrevSize == 0)
+			/* Lucky case: entry become empty and we inserted into */
+			inserted = true;
+		else if (insert.CurrPosId == lookup.CurrPosId)
+		{
+			/*
+			 * We occasionally replaced entry we looked for. No need to insert
+			 * it again.
+			 */
+			inserted = true;
+			Assert(!found);
+			found = true;
+			lookup.PrevSize = insert.PrevSize;
+			break;
+		}
+		else
+			CalcCuckooPositions(insert.CurrPosId, insert_pos);
+
+next:
+		pg_spin_delay();
+	}
+
+	*PrevPtr = StartPos - lookup.PrevSize;
+}
+
+static pg_attribute_always_inline void
+LinkStartPrevPos(XLogRecPtr EndOfLog, XLogRecPtr LastRec)
+{
+	WALPrevPosLink *hashtable = XLogCtl->Insert.PrevLinksHash;
+	uint32		insert_pos[4];
+
+	CalcCuckooPositions(EndOfLog ^ (EndOfLog >> 32), insert_pos);
+	pg_atomic_write_u32(&hashtable[insert_pos[0]].CurrPosId,
+						EndOfLog ^ (EndOfLog >> 32));
+	pg_atomic_write_u32(&hashtable[insert_pos[0]].PrevSize,
+						EndOfLog - LastRec);
+}
+
+static pg_attribute_always_inline XLogRecPtr
+ReadInsertCurrBytePos(void)
+{
+	return pg_atomic_read_u64(&XLogCtl->Insert.CurrBytePos);
+}
+
 /*
  * Reserves the right amount of space for a record of given size from the WAL.
  * *StartPos is set to the beginning of the reserved section, *EndPos to
@@ -1118,25 +1350,9 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 	/* All (non xlog-switch) records should contain data. */
 	Assert(size > SizeOfXLogRecord);
 
-	/*
-	 * The duration the spinlock needs to be held is minimized by minimizing
-	 * the calculations that have to be done while holding the lock. The
-	 * current tip of reserved WAL is kept in CurrBytePos, as a byte position
-	 * that only counts "usable" bytes in WAL, that is, it excludes all WAL
-	 * page headers. The mapping between "usable" byte positions and physical
-	 * positions (XLogRecPtrs) can be done outside the locked region, and
-	 * because the usable byte position doesn't include any headers, reserving
-	 * X bytes from WAL is almost as simple as "CurrBytePos += X".
-	 */
-	SpinLockAcquire(&Insert->insertpos_lck);
-
-	startbytepos = Insert->CurrBytePos;
+	startbytepos = pg_atomic_fetch_add_u64(&Insert->CurrBytePos, size);
 	endbytepos = startbytepos + size;
-	prevbytepos = Insert->PrevBytePos;
-	Insert->CurrBytePos = endbytepos;
-	Insert->PrevBytePos = startbytepos;
-
-	SpinLockRelease(&Insert->insertpos_lck);
+	LinkAndFindPrevPos(startbytepos, endbytepos, &prevbytepos);
 
 	*StartPos = XLogBytePosToRecPtr(startbytepos);
 	*EndPos = XLogBytePosToEndRecPtr(endbytepos);
@@ -1172,26 +1388,24 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 	uint32		segleft;
 
 	/*
-	 * These calculations are a bit heavy-weight to be done while holding a
-	 * spinlock, but since we're holding all the WAL insertion locks, there
-	 * are no other inserters competing for it. GetXLogInsertRecPtr() does
-	 * compete for it, but that's not called very frequently.
+	 * Currently ReserveXLogInsertLocation is protected with exclusive
+	 * insertion lock, so there is no contention against CurrBytePos, But we
+	 * still do CAS loop for being uniform.
+	 *
+	 * Probably we'll get rid of exclusive lock in a future.
 	 */
-	SpinLockAcquire(&Insert->insertpos_lck);
 
-	startbytepos = Insert->CurrBytePos;
+repeat:
+	startbytepos = pg_atomic_read_u64(&Insert->CurrBytePos);
 
 	ptr = XLogBytePosToEndRecPtr(startbytepos);
 	if (XLogSegmentOffset(ptr, wal_segment_size) == 0)
 	{
-		SpinLockRelease(&Insert->insertpos_lck);
 		*EndPos = *StartPos = ptr;
 		return false;
 	}
 
 	endbytepos = startbytepos + size;
-	prevbytepos = Insert->PrevBytePos;
-
 	*StartPos = XLogBytePosToRecPtr(startbytepos);
 	*EndPos = XLogBytePosToEndRecPtr(endbytepos);
 
@@ -1202,10 +1416,19 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 		*EndPos += segleft;
 		endbytepos = XLogRecPtrToBytePos(*EndPos);
 	}
-	Insert->CurrBytePos = endbytepos;
-	Insert->PrevBytePos = startbytepos;
 
-	SpinLockRelease(&Insert->insertpos_lck);
+	if (!pg_atomic_compare_exchange_u64(&Insert->CurrBytePos,
+										&startbytepos,
+										endbytepos))
+	{
+		/*
+		 * Don't use spin delay here: perform_spin_delay primary case is for
+		 * solving single core contention. But on single core we will succeed
+		 * on the next attempt.
+		 */
+		goto repeat;
+	}
+	LinkAndFindPrevPos(startbytepos, endbytepos, &prevbytepos);
 
 	*PrevPtr = XLogBytePosToRecPtr(prevbytepos);
 
@@ -1507,7 +1730,6 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	XLogRecPtr	inserted;
 	XLogRecPtr	reservedUpto;
 	XLogRecPtr	finishedUpto;
-	XLogCtlInsert *Insert = &XLogCtl->Insert;
 	int			i;
 
 	if (MyProc == NULL)
@@ -1522,9 +1744,7 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 		return inserted;
 
 	/* Read the current insert position */
-	SpinLockAcquire(&Insert->insertpos_lck);
-	bytepos = Insert->CurrBytePos;
-	SpinLockRelease(&Insert->insertpos_lck);
+	bytepos = ReadInsertCurrBytePos();
 	reservedUpto = XLogBytePosToEndRecPtr(bytepos);
 
 	/*
@@ -5017,12 +5237,18 @@ XLOGShmemInit(void)
 	XLogCtl->InstallXLogFileSegmentActive = false;
 	XLogCtl->WalWriterSleeping = false;
 
-	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
 	SpinLockInit(&XLogCtl->info_lck);
+	pg_atomic_init_u64(&XLogCtl->Insert.CurrBytePos, 0);
 	pg_atomic_init_u64(&XLogCtl->logInsertResult, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
 	pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
+
+	for (i = 0; i < PREV_LINKS_HASH_CAPA; i++)
+	{
+		pg_atomic_init_u32(&XLogCtl->Insert.PrevLinksHash[i].CurrPosId, 0);
+		pg_atomic_init_u32(&XLogCtl->Insert.PrevLinksHash[i].PrevSize, 0);
+	}
 }
 
 /*
@@ -6018,8 +6244,13 @@ StartupXLOG(void)
 	 * previous incarnation.
 	 */
 	Insert = &XLogCtl->Insert;
-	Insert->PrevBytePos = XLogRecPtrToBytePos(endOfRecoveryInfo->lastRec);
-	Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog);
+	{
+		XLogRecPtr	endOfLog = XLogRecPtrToBytePos(EndOfLog);
+		XLogRecPtr	lastRec = XLogRecPtrToBytePos(endOfRecoveryInfo->lastRec);
+
+		pg_atomic_write_u64(&Insert->CurrBytePos, endOfLog);
+		LinkStartPrevPos(endOfLog, lastRec);
+	}
 
 	/*
 	 * Tricky point here: lastPage contains the *last* block that the LastRec
@@ -7005,7 +7236,7 @@ CreateCheckPoint(int flags)
 
 	if (shutdown)
 	{
-		XLogRecPtr	curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
+		XLogRecPtr	curInsert = XLogBytePosToRecPtr(ReadInsertCurrBytePos());
 
 		/*
 		 * Compute new REDO record ptr = location of next XLOG record.
@@ -9434,14 +9665,7 @@ register_persistent_abort_backup_handler(void)
 XLogRecPtr
 GetXLogInsertRecPtr(void)
 {
-	XLogCtlInsert *Insert = &XLogCtl->Insert;
-	uint64		current_bytepos;
-
-	SpinLockAcquire(&Insert->insertpos_lck);
-	current_bytepos = Insert->CurrBytePos;
-	SpinLockRelease(&Insert->insertpos_lck);
-
-	return XLogBytePosToRecPtr(current_bytepos);
+	return XLogBytePosToRecPtr(ReadInsertCurrBytePos());
 }
 
 /*
-- 
2.39.3 (Apple Git-146)

