On 02/17/2014 10:36 PM, Andres Freund wrote:
On 2014-02-17 22:30:54 +0200, Heikki Linnakangas wrote:
This is what I came up with. I like it, I didn't have to contort lwlocks as
much as I feared. I added one field to LWLock structure, which is used to
store the position of how far a WAL inserter has progressed. The LWLock code
calls it just "value", without caring what's stored in it, and it's used by
new functions LWLockWait and LWLockWakeup to implement the behavior the WAL
insertion slots have, to wake up other processes waiting for the slot
without releasing it.

This passes regression tests, but I'll have to re-run the performance tests
with this. One worry is that if the padded size of the LWLock struct is
smaller than cache line, neighboring WAL insertion locks will compete for
the cache line. Another worry is that since I added a field to LWLock
struct, it might now take 64 bytes on platforms where it used to be 32 bytes
before. That wastes some memory.

Why don't you allocate them in a separate tranche, from xlog.c? Then you
can store them inside whatever bigger object you want, guaranteeing
exactly the alignment you need. possibly you even can have the extra
value in the enclosing object?

Good idea. New patch attached, doing that.

I'll try to find time on some multi-CPU hardware to performance test this against current master, to make sure there's no regression.

- Heikki
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 508970a..3eef968 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -86,7 +86,7 @@ int			sync_method = DEFAULT_SYNC_METHOD;
 int			wal_level = WAL_LEVEL_MINIMAL;
 int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
-int			num_xloginsert_slots = 8;
+int			num_xloginsert_locks = 8;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
@@ -289,7 +289,7 @@ XLogRecPtr	XactLastRecEnd = InvalidXLogRecPtr;
  * (which is almost but not quite the same as a pointer to the most recent
  * CHECKPOINT record).	We update this from the shared-memory copy,
  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold an insertion slot).  See XLogInsert for details.  We are also allowed
+ * hold an insertion lock).  See XLogInsert for details.  We are also allowed
  * to update from XLogCtl->RedoRecPtr if we hold the info_lck;
  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
  * InitXLOGAccess.
@@ -361,63 +361,45 @@ typedef struct XLogwrtResult
 	XLogRecPtr	Flush;			/* last byte + 1 flushed */
 } XLogwrtResult;
 
-
 /*
- * A slot for inserting to the WAL. This is similar to an LWLock, the main
- * difference is that there is an extra xlogInsertingAt field that is protected
- * by the same mutex. Unlike an LWLock, a slot can only be acquired in
- * exclusive mode.
- *
- * The xlogInsertingAt field is used to advertise to other processes how far
- * the slot owner has progressed in inserting the record. When a backend
- * acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't
- * yet know where it's going to insert the record. That's conservative
- * but correct; the new insertion is certainly going to go to a byte position
- * greater than 1. If another backend needs to flush the WAL, it will have to
- * wait for the new insertion. xlogInsertingAt is updated after finishing the
- * insert or when crossing a page boundary, which will wake up anyone waiting
- * for it, whether the wait was necessary in the first place or not.
- *
- * A process can wait on a slot in two modes: LW_EXCLUSIVE or
- * LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is
- * released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes
- * waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is
- * released, or xlogInsertingAt is updated. In other words, a process in
- * LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress
- * copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to
- * the front of the queue, while LW_EXCLUSIVE waiters are appended to the end.
- *
- * To join the wait queue, a process must set MyProc->lwWaitMode to the mode
- * it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head
- * or tail of the wait queue. The same mechanism is used to wait on an LWLock,
- * see lwlock.c for details.
+ * Inserting to WAL is protected by a bunch of WALInsertLocks. Each WAL
+ * insertion lock consists of a lightweight lock, plus an indicator of how
+ * far the insertion has progressed (insertingAt).
+ *
+ * The insertingAt value is used when writing the WAL to disk, to avoid
+ * waiting unnecessarily for an insertion that's still in-progress, but has
+ * already finished inserting all WAL beyond the point you're going to write
+ * the WAL up to. This isn't just to optimize, it's necessary to avoid
+ * deadlocks when an inserter has to switch to new WAL buffer. An inserter
+ * that's holding a WAL insert lock might need to flush the WAL, to evict an
+ * old WAL buffer, to make room for the new record. If it's possible for an
+ * inserter to wait for another inserter unnecessarily, that can lead to
+ * a deadlock if two inserters holding a WAL insert lock wait for each other
+ * to finish their insertion.
+ *
+ * Small WAL records that don't cross a page boundary never update the value,
+ * the WAL record is just copied to the page and the lock is released. But
+ * when crossing a page boundary, it's updated to let others know that the
+ * backend has finished modifying the previous page.
  */
 typedef struct
 {
-	slock_t		mutex;			/* protects the below fields */
-	XLogRecPtr	xlogInsertingAt; /* insert has completed up to this point */
-
-	PGPROC	   *owner;			/* for debugging purposes */
-
-	bool		releaseOK;		/* T if ok to release waiters */
-	char		exclusive;		/* # of exclusive holders (0 or 1) */
-	PGPROC	   *head;			/* head of list of waiting PGPROCs */
-	PGPROC	   *tail;			/* tail of list of waiting PGPROCs */
-	/* tail is undefined when head is NULL */
-} XLogInsertSlot;
+	LWLock		lock;
+	XLogRecPtr	insertingAt;
+} WALInsertLock;
 
 /*
- * All the slots are allocated as an array in shared memory. We force the
- * array stride to be a power of 2, which saves a few cycles in indexing, but
- * more importantly also ensures that individual slots don't cross cache line
- * boundaries.	(Of course, we have to also ensure that the array start
- * address is suitably aligned.)
+ * All the WAL insertion locks are allocated as an array in shared memory. We
+ * force the array stride to be a power of 2, which saves a few cycles in
+ * indexing, but more importantly also ensures that individual slots don't
+ * cross cache line boundaries. (Of course, we have to also ensure that the
+ * array start address is suitably aligned.)
  */
-typedef union XLogInsertSlotPadded
+typedef union WALInsertLockPadded
 {
-	XLogInsertSlot slot;
+	WALInsertLock l;
 	char		pad[CACHE_LINE_SIZE];
-} XLogInsertSlotPadded;
+} WALInsertLockPadded;
 
 /*
  * Shared state data for XLogInsert.
@@ -452,8 +434,8 @@ typedef struct XLogCtlInsert
 	 * we must WAL-log it before it actually affects WAL-logging by backends.
 	 * Checkpointer sets at startup or after SIGHUP.
 	 *
-	 * To read these fields, you must hold an insertion slot. To modify them,
-	 * you must hold ALL the slots.
+	 * To read these fields, you must hold an insertion lock. To modify them,
+	 * you must hold ALL the locks.
 	 */
 	XLogRecPtr	RedoRecPtr;		/* current redo point for insertions */
 	bool		forcePageWrites;	/* forcing full-page writes for PITR? */
@@ -470,8 +452,16 @@ typedef struct XLogCtlInsert
 	int			nonExclusiveBackups;
 	XLogRecPtr	lastBackupStart;
 
-	/* insertion slots, see XLogInsertSlot struct above for details */
-	XLogInsertSlotPadded *insertSlots;
+	/*
+	 * To insert a new WAL record, you must hold a WAL insertion lock. Before
+	 * 9.4, there was a single WALInsertLock, but that became a bottleneck
+	 * on multi-core systems that insert a lot of WAL. Now, there are several
+	 * WAL insertion locks, and to insert WAL, you must hold one of them (in
+	 * exclusive mode). It doesn't matter which one.
+	 */
+	WALInsertLockPadded	*WALInsertLocks;
+	LWLockTranche WALInsertLockTranche;
+	int			WALInsertLockTrancheId;
 } XLogCtlInsert;
 
 /*
@@ -609,6 +599,9 @@ typedef struct XLogCtlData
 
 static XLogCtlData *XLogCtl = NULL;
 
+/* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */
+static WALInsertLockPadded *WALInsertLocks = NULL;
+
 /*
  * We maintain an image of pg_control in shared memory.
  */
@@ -732,9 +725,9 @@ static bool InRedo = false;
 /* Have we launched bgwriter during recovery? */
 static bool bgwriterLaunched = false;
 
-/* For WALInsertSlotAcquire/Release functions */
-static int	MySlotNo = 0;
-static bool holdingAllSlots = false;
+/* For WALInsertLockAcquire/Release functions */
+static int	MyLockNo = 0;
+static bool holdingAllLocks = false;
 
 static void readRecoveryCommandFile(void);
 static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
@@ -808,16 +801,14 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos,
 static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos,
 				  XLogRecPtr *PrevPtr);
 static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto);
-static void WakeupWaiters(XLogRecPtr EndPos);
 static char *GetXLogBuffer(XLogRecPtr ptr);
 static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
 static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
 
-static void WALInsertSlotAcquire(bool exclusive);
-static void WALInsertSlotAcquireOne(int slotno);
-static void WALInsertSlotRelease(void);
-static void WALInsertSlotReleaseOne(int slotno);
+static void WALInsertLockAcquire(bool exclusive);
+static void WALInsertLockRelease(void);
+static void WALInsertLockWakeup(XLogRecPtr insertingAt);
 
 /*
  * Insert an XLOG record having the specified RMID and info bytes,
@@ -894,7 +885,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
 	 *
 	 * We may have to loop back to here if a race condition is detected below.
 	 * We could prevent the race by doing all this work while holding an
-	 * insertion slot, but it seems better to avoid doing CRC calculations
+	 * insertion lock, but it seems better to avoid doing CRC calculations
 	 * while holding one.
 	 *
 	 * We add entries for backup blocks to the chain, so that they don't need
@@ -912,8 +903,8 @@ begin:;
 	/*
 	 * Decide if we need to do full-page writes in this XLOG record: true if
 	 * full_page_writes is on or we have a PITR request for it.  Since we
-	 * don't yet have an insertion slot, fullPageWrites and forcePageWrites
-	 * could change under us, but we'll recheck them once we have a slot.
+	 * don't yet have an insertion lock, fullPageWrites and forcePageWrites
+	 * could change under us, but we'll recheck them once we have a lock.
 	 */
 	doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites;
 
@@ -1087,16 +1078,15 @@ begin:;
 	 *    record in place. This can be done concurrently in multiple processes.
 	 *
 	 * To keep track of which insertions are still in-progress, each concurrent
-	 * inserter allocates an "insertion slot", which tells others how far the
+	 * inserter acquires an insertion lock. In addition to just indicating that
+	 * an insertion is in progress, the lock tells others how far the
 	 * inserter has progressed. There is a small fixed number of insertion
-	 * slots, determined by the num_xloginsert_slots GUC. When an inserter
-	 * finishes, it updates the xlogInsertingAt of its slot to the end of the
-	 * record it inserted, to let others know that it's done. xlogInsertingAt
-	 * is also updated when crossing over to a new WAL buffer, to allow the
-	 * the previous buffer to be flushed.
+	 * locks, determined by the num_xloginsert_locks GUC. When an inserter
+	 * crosses a page boundary, it updates the value stored in the lock to the
+	 * how far it has inserted, to allow the the previous buffer to be flushed.
 	 *
-	 * Holding onto a slot also protects RedoRecPtr and fullPageWrites from
-	 * changing until the insertion is finished.
+	 * Holding onto an insertion lock also protects RedoRecPtr and
+	 * fullPageWrites from changing until the insertion is finished.
 	 *
 	 * Step 2 can usually be done completely in parallel. If the required WAL
 	 * page is not initialized yet, you have to grab WALBufMappingLock to
@@ -1106,7 +1096,7 @@ begin:;
 	 *----------
 	 */
 	START_CRIT_SECTION();
-	WALInsertSlotAcquire(isLogSwitch);
+	WALInsertLockAcquire(isLogSwitch);
 
 	/*
 	 * Check to see if my RedoRecPtr is out of date.  If so, may have to go
@@ -1135,7 +1125,7 @@ begin:;
 					 * Oops, this buffer now needs to be backed up, but we
 					 * didn't think so above.  Start over.
 					 */
-					WALInsertSlotRelease();
+					WALInsertLockRelease();
 					END_CRIT_SECTION();
 					rdt_lastnormal->next = NULL;
 					info = info_orig;
@@ -1154,7 +1144,7 @@ begin:;
 	if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites)
 	{
 		/* Oops, must redo it with full-page data. */
-		WALInsertSlotRelease();
+		WALInsertLockRelease();
 		END_CRIT_SECTION();
 		rdt_lastnormal->next = NULL;
 		info = info_orig;
@@ -1202,7 +1192,7 @@ begin:;
 	/*
 	 * Done! Let others know that we're finished.
 	 */
-	WALInsertSlotRelease();
+	WALInsertLockRelease();
 
 	MarkCurrentTransactionIdLoggedIfAny();
 
@@ -1363,7 +1353,7 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
 
 	/*
 	 * These calculations are a bit heavy-weight to be done while holding a
-	 * spinlock, but since we're holding all the WAL insertion slots, there
+	 * spinlock, but since we're holding all the WAL insertion locks, there
 	 * are no other inserters competing for it. GetXLogInsertRecPtr() does
 	 * compete for it, but that's not called very frequently.
 	 */
@@ -1523,7 +1513,7 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
 		while (CurrPos < EndPos)
 		{
 			/* initialize the next page (if not initialized already) */
-			WakeupWaiters(CurrPos);
+			WALInsertLockWakeup(CurrPos);
 			AdvanceXLInsertBuffer(CurrPos, false);
 			CurrPos += XLOG_BLCKSZ;
 		}
@@ -1534,452 +1524,116 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
 }
 
 /*
- * Allocate a slot for insertion.
+ * Acquire a WAL insertion lock.
  *
- * In exclusive mode, all slots are reserved for the current process. That
- * blocks all concurrent insertions.
+ * In exclusive mode, all locks are acquired. That blocks all concurrent
+ * insertions.
  */
 static void
-WALInsertSlotAcquire(bool exclusive)
+WALInsertLockAcquire(bool exclusive)
 {
 	int			i;
 
 	if (exclusive)
 	{
-		for (i = 0; i < num_xloginsert_slots; i++)
-			WALInsertSlotAcquireOne(i);
-		holdingAllSlots = true;
+		for (i = 0; i < num_xloginsert_locks; i++)
+			LWLockAcquire(&WALInsertLocks[i].l.lock, LW_EXCLUSIVE);
+		holdingAllLocks = true;
 	}
 	else
-		WALInsertSlotAcquireOne(-1);
-}
-
-/*
- * Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary
- * one if slotno == -1. The index of the slot that was acquired is stored in
- * MySlotNo.
- *
- * This is more or less equivalent to LWLockAcquire().
- */
-static void
-WALInsertSlotAcquireOne(int slotno)
-{
-	volatile XLogInsertSlot *slot;
-	PGPROC	   *proc = MyProc;
-	bool		retry = false;
-	int			extraWaits = 0;
-	static int	slotToTry = -1;
-
-	/*
-	 * Try to use the slot we used last time. If the system isn't particularly
-	 * busy, it's a good bet that it's available, and it's good to have some
-	 * affinity to a particular slot so that you don't unnecessarily bounce
-	 * cache lines between processes when there is no contention.
-	 *
-	 * If this is the first time through in this backend, pick a slot
-	 * (semi-)randomly. This allows the slots to be used evenly if you have a
-	 * lot of very short connections.
-	 */
-	if (slotno != -1)
-		MySlotNo = slotno;
-	else
 	{
-		if (slotToTry == -1)
-			slotToTry = MyProc->pgprocno % num_xloginsert_slots;
-		MySlotNo = slotToTry;
-	}
-
-	/*
-	 * We can't wait if we haven't got a PGPROC.  This should only occur
-	 * during bootstrap or shared memory initialization.  Put an Assert here
-	 * to catch unsafe coding practices.
-	 */
-	Assert(MyProc != NULL);
-
-	/*
-	 * Lock out cancel/die interrupts until we exit the code section protected
-	 * by the slot.  This ensures that interrupts will not interfere with
-	 * manipulations of data structures in shared memory. There is no cleanup
-	 * mechanism to release the slot if the backend dies while holding one,
-	 * so make this a critical section.
-	 */
-	START_CRIT_SECTION();
-
-	/*
-	 * Loop here to try to acquire slot after each time we are signaled by
-	 * WALInsertSlotRelease.
-	 */
-	for (;;)
-	{
-		bool		mustwait;
-
-		slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
-
-		/* Acquire mutex.  Time spent holding mutex should be short! */
-		SpinLockAcquire(&slot->mutex);
-
-		/* If retrying, allow WALInsertSlotRelease to release waiters again */
-		if (retry)
-			slot->releaseOK = true;
-
-		/* If I can get the slot, do so quickly. */
-		if (slot->exclusive == 0)
-		{
-			slot->exclusive++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-
-		if (!mustwait)
-			break;				/* got the lock */
-
-		Assert(slot->owner != MyProc);
-
-		/*
-		 * Add myself to wait queue.
-		 */
-		proc->lwWaiting = true;
-		proc->lwWaitMode = LW_EXCLUSIVE;
-		proc->lwWaitLink = NULL;
-		if (slot->head == NULL)
-			slot->head = proc;
-		else
-			slot->tail->lwWaitLink = proc;
-		slot->tail = proc;
-
-		/* Can release the mutex now */
-		SpinLockRelease(&slot->mutex);
+		bool		immed;
 
 		/*
-		 * Wait until awakened.
+		 * Acquire one of the locks. It doesn't matter which one, but try to
+		 * use the lock we used last time. If the system isn't particularly
+		 * busy, it's a good bet that it's available, and it's good to have
+		 * some affinity to a particular lock so that you don't unnecessarily
+		 * bounce cache lines between processes when there is no contention.
 		 *
-		 * Since we share the process wait semaphore with the regular lock
-		 * manager and ProcWaitForSignal, and we may need to acquire a slot
-		 * while one of those is pending, it is possible that we get awakened
-		 * for a reason other than being signaled by WALInsertSlotRelease. If
-		 * so, loop back and wait again.  Once we've gotten the slot,
-		 * re-increment the sema by the number of additional signals received,
-		 * so that the lock manager or signal manager will see the received
-		 * signal when it next waits.
+		 * If this is the first time through in this backend, pick a lock
+		 * (semi-)randomly. This allows the locks to be used evenly if you have
+		 * a lot of very short connections.
 		 */
-		for (;;)
-		{
-			/* "false" means cannot accept cancel/die interrupt here. */
-			PGSemaphoreLock(&proc->sem, false);
-			if (!proc->lwWaiting)
-				break;
-			extraWaits++;
-		}
+		static int	lockToTry = -1;
+		if (lockToTry == -1)
+			lockToTry = MyProc->pgprocno % num_xloginsert_locks;
+		MyLockNo = lockToTry;
 
-		/* Now loop back and try to acquire lock again. */
-		retry = true;
-	}
+		immed = LWLockAcquire(&WALInsertLocks[MyLockNo].l.lock, LW_EXCLUSIVE);
 
-	slot->owner = proc;
-
-	/*
-	 * Normally, we initialize the xlogInsertingAt value of the slot to 1,
-	 * because we don't yet know where in the WAL we're going to insert. It's
-	 * not critical what it points to right now - leaving it to a too small
-	 * value just means that WaitXlogInsertionsToFinish() might wait on us
-	 * unnecessarily, until we update the value (when we finish the insert or
-	 * move to next page).
-	 *
-	 * If we're grabbing all the slots, however, stamp all but the last one
-	 * with InvalidXLogRecPtr, meaning there is no insert in progress. The last
-	 * slot is the one that we will update as we proceed with the insert, the
-	 * rest are held just to keep off other inserters.
-	 */
-	if (slotno != -1 && slotno != num_xloginsert_slots - 1)
-		slot->xlogInsertingAt = InvalidXLogRecPtr;
-	else
-		slot->xlogInsertingAt = 1;
-
-	/* We are done updating shared state of the slot itself. */
-	SpinLockRelease(&slot->mutex);
-
-	/*
-	 * Fix the process wait semaphore's count for any absorbed wakeups.
-	 */
-	while (extraWaits-- > 0)
-		PGSemaphoreUnlock(&proc->sem);
-
-	/*
-	 * If we couldn't get the slot immediately, try another slot next time.
-	 * On a system with more insertion slots than concurrent inserters, this
-	 * causes all the inserters to eventually migrate to a slot that no-one
-	 * else is using. On a system with more inserters than slots, it still
-	 * causes the inserters to be distributed quite evenly across the slots.
-	 */
-	if (slotno != -1 && retry)
-		slotToTry = (slotToTry + 1) % num_xloginsert_slots;
-}
-
-/*
- * Wait for the given slot to become free, or for its xlogInsertingAt location
- * to change to something else than 'waitptr'. In other words, wait for the
- * inserter using the given slot to finish its insertion, or to at least make
- * some progress.
- */
-static void
-WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr)
-{
-	PGPROC	   *proc = MyProc;
-	int			extraWaits = 0;
-
-	/*
-	 * Lock out cancel/die interrupts while we sleep on the slot. There is
-	 * no cleanup mechanism to remove us from the wait queue if we got
-	 * interrupted.
-	 */
-	HOLD_INTERRUPTS();
-
-	/*
-	 * Loop here to try to acquire lock after each time we are signaled.
-	 */
-	for (;;)
-	{
-		bool		mustwait;
-
-		/* Acquire mutex.  Time spent holding mutex should be short! */
-		SpinLockAcquire(&slot->mutex);
-
-		/* If I can get the lock, do so quickly. */
-		if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr)
-			mustwait = false;
-		else
-			mustwait = true;
-
-		if (!mustwait)
-			break;				/* the lock was free */
-
-		Assert(slot->owner != MyProc);
-
-		/*
-		 * Add myself to wait queue.
-		 */
-		proc->lwWaiting = true;
-		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-		proc->lwWaitLink = NULL;
-
-		/* waiters are added to the front of the queue */
-		proc->lwWaitLink = slot->head;
-		if (slot->head == NULL)
-			slot->tail = proc;
-		slot->head = proc;
-
-		/* Can release the mutex now */
-		SpinLockRelease(&slot->mutex);
-
-		/*
-		 * Wait until awakened.
-		 *
-		 * Since we share the process wait semaphore with other things, like
-		 * the regular lock manager and ProcWaitForSignal, and we may need to
-		 * acquire an LWLock while one of those is pending, it is possible that
-		 * we get awakened for a reason other than being signaled by
-		 * LWLockRelease. If so, loop back and wait again.  Once we've gotten
-		 * the LWLock, re-increment the sema by the number of additional
-		 * signals received, so that the lock manager or signal manager will
-		 * see the received signal when it next waits.
-		 */
-		for (;;)
+		if (!immed)
 		{
-			/* "false" means cannot accept cancel/die interrupt here. */
-			PGSemaphoreLock(&proc->sem, false);
-			if (!proc->lwWaiting)
-				break;
-			extraWaits++;
+			/*
+			 * If we couldn't get the lock immediately, try another lock next
+			 * time.  On a system with more insertion locks than concurrent
+			 * inserters, this causes all the inserters to eventually migrate
+			 * to a lock that no-one else is using.  On a system with more
+			 * inserters than locks, it still helps to distribute the inserters
+			 * quite evenly across the locks.
+			 */
+			lockToTry = (lockToTry + 1) % num_xloginsert_locks;
 		}
-
-		/* Now loop back and try to acquire lock again. */
 	}
-
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&slot->mutex);
-
-	/*
-	 * Fix the process wait semaphore's count for any absorbed wakeups.
-	 */
-	while (extraWaits-- > 0)
-		PGSemaphoreUnlock(&proc->sem);
-
-	/*
-	 * Now okay to allow cancel/die interrupts.
-	 */
-	RESUME_INTERRUPTS();
 }
 
 /*
- * Wake up all processes waiting for us with WaitOnSlot(). Sets our
- * xlogInsertingAt value to EndPos, without releasing the slot.
+ * Release our insertion lock (or locks, if we're holding them all).
  */
 static void
-WakeupWaiters(XLogRecPtr EndPos)
+WALInsertLockRelease(void)
 {
-	volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot;
-	PGPROC	   *head;
-	PGPROC	   *proc;
-	PGPROC	   *next;
-
-	/*
-	 * If we have already reported progress up to the same point, do nothing.
-	 * No other process can modify xlogInsertingAt, so we can check this before
-	 * grabbing the spinlock.
-	 */
-	if (slot->xlogInsertingAt == EndPos)
-		return;
-	/* xlogInsertingAt should not go backwards */
-	Assert(slot->xlogInsertingAt < EndPos);
-
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&slot->mutex);
-
-	/* we should own the slot */
-	Assert(slot->exclusive == 1 && slot->owner == MyProc);
-
-	slot->xlogInsertingAt = EndPos;
-
-	/*
-	 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
-	 * up. They are always in the front of the queue.
-	 */
-	head = slot->head;
+	int			i;
 
-	if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE)
+	if (holdingAllLocks)
 	{
-		proc = head;
-		next = proc->lwWaitLink;
-		while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
+		for (i = 0; i < num_xloginsert_locks; i++)
 		{
-			proc = next;
-			next = next->lwWaitLink;
+			/* see below */
+			WALInsertLocks[i].l.insertingAt = 0;
+			LWLockRelease(&WALInsertLocks[i].l.lock);
 		}
-
-		/* proc is now the last PGPROC to be released */
-		slot->head = next;
-		proc->lwWaitLink = NULL;
+		holdingAllLocks = false;
 	}
 	else
-		head = NULL;
-
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&slot->mutex);
-
-	/*
-	 * Awaken any waiters I removed from the queue.
-	 */
-	while (head != NULL)
 	{
-		proc = head;
-		head = proc->lwWaitLink;
-		proc->lwWaitLink = NULL;
-		proc->lwWaiting = false;
-		PGSemaphoreUnlock(&proc->sem);
+		/*
+		 * Reset insertingAt value to ease debugging. It would be OK to let
+		 * it point to any old value as long is it's smaller than the current
+		 * end of reserved WAL, but seems tidier to reset it to zero.
+		 *
+		 * No need for holding a spinlock while we reset it, because we don't
+		 * care if someone transiently sees a bogus value. We're just about to
+		 * release the lock anyway, waking up anyone who might wait for us.
+		 */
+		WALInsertLocks[MyLockNo].l.insertingAt = 0;
+		LWLockRelease(&WALInsertLocks[MyLockNo].l.lock);
 	}
 }
 
 /*
- * Release our insertion slot (or slots, if we're holding them all).
+ * Update our insertingAt value, to let others know that we've finished
+ * inserting up to that point.
  */
 static void
-WALInsertSlotRelease(void)
+WALInsertLockWakeup(XLogRecPtr insertingAt)
 {
 	int			i;
 
-	if (holdingAllSlots)
+	if (holdingAllLocks)
 	{
-		for (i = 0; i < num_xloginsert_slots; i++)
-			WALInsertSlotReleaseOne(i);
-		holdingAllSlots = false;
+		for (i = 0; i < num_xloginsert_locks; i++)
+			LWLockWakeup(&WALInsertLocks[i].l.lock,
+						 &WALInsertLocks[i].l.insertingAt,
+						 insertingAt);
+		holdingAllLocks = false;
 	}
 	else
-		WALInsertSlotReleaseOne(MySlotNo);
-}
-
-static void
-WALInsertSlotReleaseOne(int slotno)
-{
-	volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot;
-	PGPROC	   *head;
-	PGPROC	   *proc;
-
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&slot->mutex);
-
-	/* we must be holding it */
-	Assert(slot->exclusive == 1 && slot->owner == MyProc);
-
-	slot->xlogInsertingAt = InvalidXLogRecPtr;
-
-	/* Release my hold on the slot */
-	slot->exclusive = 0;
-	slot->owner = NULL;
-
-	/*
-	 * See if I need to awaken any waiters..
-	 */
-	head = slot->head;
-	if (head != NULL)
-	{
-		if (slot->releaseOK)
-		{
-			/*
-			 * Remove the to-be-awakened PGPROCs from the queue.
-			 */
-			bool		releaseOK = true;
-
-			proc = head;
-
-			/*
-			 * First wake up any backends that want to be woken up without
-			 * acquiring the lock. These are always in the front of the queue.
-			 */
-			while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
-				proc = proc->lwWaitLink;
-
-			/*
-			 * Awaken the first exclusive-waiter, if any.
-			 */
-			if (proc->lwWaitLink)
-			{
-				Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE);
-				proc = proc->lwWaitLink;
-				releaseOK = false;
-			}
-			/* proc is now the last PGPROC to be released */
-			slot->head = proc->lwWaitLink;
-			proc->lwWaitLink = NULL;
-
-			slot->releaseOK = releaseOK;
-		}
-		else
-			head = NULL;
-	}
-
-	/* We are done updating shared state of the slot itself. */
-	SpinLockRelease(&slot->mutex);
-
-	/*
-	 * Awaken any waiters I removed from the queue.
-	 */
-	while (head != NULL)
-	{
-		proc = head;
-		head = proc->lwWaitLink;
-		proc->lwWaitLink = NULL;
-		proc->lwWaiting = false;
-		PGSemaphoreUnlock(&proc->sem);
-	}
-
-	/*
-	 * Now okay to allow cancel/die interrupts.
-	 */
-	END_CRIT_SECTION();
+		LWLockWakeup(&WALInsertLocks[MyLockNo].l.lock,
+					 &WALInsertLocks[MyLockNo].l.insertingAt,
+					 insertingAt);
 }
 
-
 /*
  * Wait for any WAL insertions < upto to finish.
  *
@@ -2029,79 +1683,48 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
 	}
 
 	/*
+	 * Loop through all the locks, sleeping on any in-progress insert older
+	 * than 'upto'.
+	 *
 	 * finishedUpto is our return value, indicating the point upto which
 	 * all the WAL insertions have been finished. Initialize it to the head
-	 * of reserved WAL, and as we iterate through the insertion slots, back it
+	 * of reserved WAL, and as we iterate through the insertion locks, back it
 	 * out for any insertion that's still in progress.
 	 */
 	finishedUpto = reservedUpto;
-
-	/*
-	 * Loop through all the slots, sleeping on any in-progress insert older
-	 * than 'upto'.
-	 */
-	for (i = 0; i < num_xloginsert_slots; i++)
+	for (i = 0; i < num_xloginsert_locks; i++)
 	{
-		volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
-		XLogRecPtr insertingat;
-
-	retry:
-		/*
-		 * We can check if the slot is in use without grabbing the spinlock.
-		 * The spinlock acquisition of insertpos_lck before this loop acts
-		 * as a memory barrier. If someone acquires the slot after that, it
-		 * can't possibly be inserting to anything < reservedUpto. If it was
-		 * acquired before that, an unlocked test will return true.
-		 */
-		if (!slot->exclusive)
-			continue;
-
-		SpinLockAcquire(&slot->mutex);
-		/* re-check now that we have the lock */
-		if (!slot->exclusive)
-		{
-			SpinLockRelease(&slot->mutex);
-			continue;
-		}
-		insertingat = slot->xlogInsertingAt;
-		SpinLockRelease(&slot->mutex);
-
-		if (insertingat == InvalidXLogRecPtr)
+		XLogRecPtr insertingat = InvalidXLogRecPtr;
+		do
 		{
 			/*
-			 * slot is reserved just to hold off other inserters, there is no
-			 * actual insert in progress.
+			 * See if this insertion is in progress. LWLockWait will wait for
+			 * the lock to be released, or for the 'value' to be set by a
+			 * LWLockWakeup call. When a lock is initially acquired, its
+			 * value is 0 (InvalidXLogRecPtr), which means that we don't know
+			 * where it's inserting yet. We will have to wait for it. If it's
+			 * a small insertion, the record will most likely fit on the same
+			 * page and the inserter will release the lock without ever
+			 * calling LWLockWakeup. But if it has to cross a page, it will
+			 * advertise the insertion point with LWLockWakeup.
 			 */
-			continue;
-		}
+			if (LWLockWait(&WALInsertLocks[i].l.lock,
+						   &WALInsertLocks[i].l.insertingAt,
+						   insertingat, &insertingat))
+			{
+				/* the lock was free, so no insertion in progress */
+				insertingat = InvalidXLogRecPtr;
+				break;
+			}
 
-		/*
-		 * This insertion is still in progress. Do we need to wait for it?
-		 *
-		 * When an inserter acquires a slot, it doesn't reset 'insertingat', so
-		 * it will initially point to the old value of some already-finished
-		 * insertion. The inserter will update the value as soon as it finishes
-		 * the insertion, moves to the next page, or has to do I/O to flush an
-		 * old dirty buffer. That means that when we see a slot with
-		 * insertingat value < upto, we don't know if that insertion is still
-		 * truly in progress, or if the slot is reused by a new inserter that
-		 * hasn't updated the insertingat value yet. We have to assume it's the
-		 * latter, and wait.
-		 */
-		if (insertingat < upto)
-		{
-			WaitOnSlot(slot, insertingat);
-			goto retry;
-		}
-		else
-		{
 			/*
-			 * We don't need to wait for this insertion, but update the
-			 * return value.
+			 * This insertion is still in progress. Have to wait, unless the
+			 * inserter has proceeded past 'upto'.
 			 */
-			if (insertingat < finishedUpto)
-				finishedUpto = insertingat;
-		}
+		} while (insertingat < upto);
+
+		if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
+			finishedUpto = insertingat;
 	}
 	return finishedUpto;
 }
@@ -2115,8 +1738,8 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
  *
  * The caller must ensure that the page containing the requested location
  * isn't evicted yet, and won't be evicted. The way to ensure that is to
- * hold onto an XLogInsertSlot with the xlogInsertingAt position set to
- * something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs
+ * hold onto a WAL insertion lock with the insertingAt position set to
+ * something <= ptr. GetXLogBuffer() will update insertingAt if it needs
  * to evict an old page from the buffer. (This means that once you call
  * GetXLogBuffer() with a given 'ptr', you must not access anything before
  * that point anymore, and must not call GetXLogBuffer() with an older 'ptr'
@@ -2176,7 +1799,7 @@ GetXLogBuffer(XLogRecPtr ptr)
 		 * Let others know that we're finished inserting the record up
 		 * to the page boundary.
 		 */
-		WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ);
+		WALInsertLockWakeup(expectedEndPtr - XLOG_BLCKSZ);
 
 		AdvanceXLInsertBuffer(ptr, false);
 		endptr = XLogCtl->xlblocks[idx];
@@ -5071,8 +4694,8 @@ XLOGShmemSize(void)
 	/* XLogCtl */
 	size = sizeof(XLogCtlData);
 
-	/* xlog insertion slots, plus alignment */
-	size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1));
+	/* WAL insertion locks, plus alignment */
+	size = add_size(size, mul_size(sizeof(WALInsertLockPadded), num_xloginsert_locks + 1));
 	/* xlblocks array */
 	size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
 	/* extra alignment padding for XLOG I/O buffers */
@@ -5120,11 +4743,27 @@ XLOGShmemInit(void)
 	memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
 	allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
 
-	/* Xlog insertion slots. Ensure they're aligned to the full padded size */
-	allocptr += sizeof(XLogInsertSlotPadded) -
-		((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded);
-	XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr;
-	allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots;
+
+	/* WAL insertion locks. Ensure they're aligned to the full padded size */
+	allocptr += sizeof(WALInsertLockPadded) -
+		((uintptr_t) allocptr) % sizeof(WALInsertLockPadded);
+	WALInsertLocks = XLogCtl->Insert.WALInsertLocks =
+		(WALInsertLockPadded *) allocptr;
+	allocptr += sizeof(WALInsertLockPadded) * num_xloginsert_locks;
+
+	XLogCtl->Insert.WALInsertLockTrancheId = LWLockNewTrancheId();
+
+	XLogCtl->Insert.WALInsertLockTranche.name = "WALInsertLocks";
+	XLogCtl->Insert.WALInsertLockTranche.array_base = WALInsertLocks;
+	XLogCtl->Insert.WALInsertLockTranche.array_stride = sizeof(WALInsertLockPadded);
+
+	LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId, &XLogCtl->Insert.WALInsertLockTranche);
+	for (i = 0; i < num_xloginsert_locks; i++)
+	{
+		LWLockInitialize(&WALInsertLocks[i].l.lock,
+						 XLogCtl->Insert.WALInsertLockTrancheId);
+		WALInsertLocks[i].l.insertingAt = InvalidXLogRecPtr;
+	}
 
 	/*
 	 * Align the start of the page buffers to a full xlog block size boundary.
@@ -5144,19 +4783,6 @@ XLOGShmemInit(void)
 	XLogCtl->SharedHotStandbyActive = false;
 	XLogCtl->WalWriterSleeping = false;
 
-	for (i = 0; i < num_xloginsert_slots; i++)
-	{
-		XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot;
-		SpinLockInit(&slot->mutex);
-		slot->xlogInsertingAt = InvalidXLogRecPtr;
-		slot->owner = NULL;
-
-		slot->releaseOK = true;
-		slot->exclusive = 0;
-		slot->head = NULL;
-		slot->tail = NULL;
-	}
-
 	SpinLockInit(&XLogCtl->Insert.insertpos_lck);
 	SpinLockInit(&XLogCtl->info_lck);
 	SpinLockInit(&XLogCtl->ulsn_lck);
@@ -7883,6 +7509,11 @@ InitXLOGAccess(void)
 	ThisTimeLineID = XLogCtl->ThisTimeLineID;
 	Assert(ThisTimeLineID != 0 || IsBootstrapProcessingMode());
 
+	/* Initialize our copy of WALInsertLocks and register the tranche */
+	WALInsertLocks = XLogCtl->Insert.WALInsertLocks;
+	LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId,
+						  &XLogCtl->Insert.WALInsertLockTranche);
+
 	/* Use GetRedoRecPtr to copy the RedoRecPtr safely */
 	(void) GetRedoRecPtr();
 }
@@ -7901,7 +7532,7 @@ GetRedoRecPtr(void)
 
 	/*
 	 * The possibly not up-to-date copy in XlogCtl is enough. Even if we
-	 * grabbed a WAL insertion slot to read the master copy, someone might
+	 * grabbed a WAL insertion lock to read the master copy, someone might
 	 * update it just after we've released the lock.
 	 */
 	SpinLockAcquire(&xlogctl->info_lck);
@@ -7919,7 +7550,7 @@ GetRedoRecPtr(void)
  *
  * NOTE: The value *actually* returned is the position of the last full
  * xlog page. It lags behind the real insert position by at most 1 page.
- * For that, we don't need to scan through WAL insertion slots, and an
+ * For that, we don't need to scan through WAL insertion locks, and an
  * approximation is enough for the current usage of this function.
  */
 XLogRecPtr
@@ -8280,7 +7911,7 @@ CreateCheckPoint(int flags)
 	 * We must block concurrent insertions while examining insert state to
 	 * determine the checkpoint REDO pointer.
 	 */
-	WALInsertSlotAcquire(true);
+	WALInsertLockAcquire(true);
 	curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos);
 
 	/*
@@ -8305,7 +7936,7 @@ CreateCheckPoint(int flags)
 			MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
 			ControlFile->checkPoint == ControlFile->checkPointCopy.redo)
 		{
-			WALInsertSlotRelease();
+			WALInsertLockRelease();
 			LWLockRelease(CheckpointLock);
 			END_CRIT_SECTION();
 			return;
@@ -8349,7 +7980,7 @@ CreateCheckPoint(int flags)
 
 	/*
 	 * Here we update the shared RedoRecPtr for future XLogInsert calls; this
-	 * must be done while holding the insertion slots.
+	 * must be done while holding all the insertion locks.
 	 *
 	 * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
 	 * pointing past where it really needs to point.  This is okay; the only
@@ -8361,10 +7992,10 @@ CreateCheckPoint(int flags)
 	RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
 
 	/*
-	 * Now we can release the WAL insertion slots, allowing other xacts to
+	 * Now we can release the WAL insertion locks, allowing other xacts to
 	 * proceed while we are flushing disk buffers.
 	 */
-	WALInsertSlotRelease();
+	WALInsertLockRelease();
 
 	/* Update the info_lck-protected copy of RedoRecPtr as well */
 	SpinLockAcquire(&xlogctl->info_lck);
@@ -8394,7 +8025,7 @@ CreateCheckPoint(int flags)
 	 * we wait till he's out of his commit critical section before proceeding.
 	 * See notes in RecordTransactionCommit().
 	 *
-	 * Because we've already released the insertion slots, this test is a bit
+	 * Because we've already released the insertion locks, this test is a bit
 	 * fuzzy: it is possible that we will wait for xacts we didn't really need
 	 * to wait for.  But the delay should be short and it seems better to make
 	 * checkpoint take a bit longer than to hold off insertions longer than
@@ -8625,10 +8256,10 @@ CreateEndOfRecoveryRecord(void)
 
 	xlrec.end_time = time(NULL);
 
-	WALInsertSlotAcquire(true);
+	WALInsertLockAcquire(true);
 	xlrec.ThisTimeLineID = ThisTimeLineID;
 	xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID;
-	WALInsertSlotRelease();
+	WALInsertLockRelease();
 
 	LocalSetXLogInsertAllowed();
 
@@ -8834,9 +8465,9 @@ CreateRestartPoint(int flags)
 	 * during recovery this is just pro forma, because no WAL insertions are
 	 * happening.
 	 */
-	WALInsertSlotAcquire(true);
+	WALInsertLockAcquire(true);
 	xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo;
-	WALInsertSlotRelease();
+	WALInsertLockRelease();
 
 	/* Also update the info_lck-protected copy */
 	SpinLockAcquire(&xlogctl->info_lck);
@@ -9296,9 +8927,9 @@ UpdateFullPageWrites(void)
 	 */
 	if (fullPageWrites)
 	{
-		WALInsertSlotAcquire(true);
+		WALInsertLockAcquire(true);
 		Insert->fullPageWrites = true;
-		WALInsertSlotRelease();
+		WALInsertLockRelease();
 	}
 
 	/*
@@ -9319,9 +8950,9 @@ UpdateFullPageWrites(void)
 
 	if (!fullPageWrites)
 	{
-		WALInsertSlotAcquire(true);
+		WALInsertLockAcquire(true);
 		Insert->fullPageWrites = false;
-		WALInsertSlotRelease();
+		WALInsertLockRelease();
 	}
 	END_CRIT_SECTION();
 }
@@ -9952,15 +9583,15 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 	 * Note that forcePageWrites has no effect during an online backup from
 	 * the standby.
 	 *
-	 * We must hold all the insertion slots to change the value of
+	 * We must hold all the insertion locks to change the value of
 	 * forcePageWrites, to ensure adequate interlocking against XLogInsert().
 	 */
-	WALInsertSlotAcquire(true);
+	WALInsertLockAcquire(true);
 	if (exclusive)
 	{
 		if (XLogCtl->Insert.exclusiveBackup)
 		{
-			WALInsertSlotRelease();
+			WALInsertLockRelease();
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 					 errmsg("a backup is already in progress"),
@@ -9971,7 +9602,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 	else
 		XLogCtl->Insert.nonExclusiveBackups++;
 	XLogCtl->Insert.forcePageWrites = true;
-	WALInsertSlotRelease();
+	WALInsertLockRelease();
 
 	/* Ensure we release forcePageWrites if fail below */
 	PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive));
@@ -10086,13 +9717,13 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 			 * taking a checkpoint right after another is not that expensive
 			 * either because only few buffers have been dirtied yet.
 			 */
-			WALInsertSlotAcquire(true);
+			WALInsertLockAcquire(true);
 			if (XLogCtl->Insert.lastBackupStart < startpoint)
 			{
 				XLogCtl->Insert.lastBackupStart = startpoint;
 				gotUniqueStartpoint = true;
 			}
-			WALInsertSlotRelease();
+			WALInsertLockRelease();
 		} while (!gotUniqueStartpoint);
 
 		XLByteToSeg(startpoint, _logSegNo);
@@ -10182,7 +9813,7 @@ pg_start_backup_callback(int code, Datum arg)
 	bool		exclusive = DatumGetBool(arg);
 
 	/* Update backup counters and forcePageWrites on failure */
-	WALInsertSlotAcquire(true);
+	WALInsertLockAcquire(true);
 	if (exclusive)
 	{
 		Assert(XLogCtl->Insert.exclusiveBackup);
@@ -10199,7 +9830,7 @@ pg_start_backup_callback(int code, Datum arg)
 	{
 		XLogCtl->Insert.forcePageWrites = false;
 	}
-	WALInsertSlotRelease();
+	WALInsertLockRelease();
 }
 
 /*
@@ -10268,7 +9899,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
 	/*
 	 * OK to update backup counters and forcePageWrites
 	 */
-	WALInsertSlotAcquire(true);
+	WALInsertLockAcquire(true);
 	if (exclusive)
 		XLogCtl->Insert.exclusiveBackup = false;
 	else
@@ -10288,7 +9919,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
 	{
 		XLogCtl->Insert.forcePageWrites = false;
 	}
-	WALInsertSlotRelease();
+	WALInsertLockRelease();
 
 	if (exclusive)
 	{
@@ -10573,7 +10204,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
 void
 do_pg_abort_backup(void)
 {
-	WALInsertSlotAcquire(true);
+	WALInsertLockAcquire(true);
 	Assert(XLogCtl->Insert.nonExclusiveBackups > 0);
 	XLogCtl->Insert.nonExclusiveBackups--;
 
@@ -10582,7 +10213,7 @@ do_pg_abort_backup(void)
 	{
 		XLogCtl->Insert.forcePageWrites = false;
 	}
-	WALInsertSlotRelease();
+	WALInsertLockRelease();
 }
 
 /*
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 82ef440..f88bf76 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -10,6 +10,14 @@
  * locking should be done with the full lock manager --- which depends on
  * LWLocks to protect its shared state.
  *
+ * In addition to exclusive and shared modes, lightweight locks can be used
+ * to wait until a variable changes value. The variable is set with
+ * LWLockWakeup, and waited for with LWLockWait.  LWLockWait waits until the
+ * lock is free, or the variable changes.  LWLockWakeup assigns to the
+ * variable, waking up any LWLockWait() callers, without releasing the lock.
+ * The meaning of the value assigned is up to the caller, the lightweight
+ * lock code just assigns and compares it.
+ *
  *
  * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -443,16 +451,18 @@ LWLockInitialize(LWLock *lock, int tranche_id)
 /*
  * LWLockAcquire - acquire a lightweight lock in the specified mode
  *
- * If the lock is not available, sleep until it is.
+ * If the lock is not available, sleep until it is.  Returns true if the lock
+ * was available immediately, false if we had to sleep.
  *
  * Side effect: cancel/die interrupts are held off until lock release.
  */
-void
+bool
 LWLockAcquire(LWLock *l, LWLockMode mode)
 {
 	volatile LWLock *lock = l;
 	PGPROC	   *proc = MyProc;
 	bool		retry = false;
+	bool		result = true;
 	int			extraWaits = 0;
 #ifdef LWLOCK_STATS
 	lwlock_stats *lwstats;
@@ -601,6 +611,7 @@ LWLockAcquire(LWLock *l, LWLockMode mode)
 
 		/* Now loop back and try to acquire lock again. */
 		retry = true;
+		result = false;
 	}
 
 	/* We are done updating shared state of the lock itself. */
@@ -616,6 +627,8 @@ LWLockAcquire(LWLock *l, LWLockMode mode)
 	 */
 	while (extraWaits-- > 0)
 		PGSemaphoreUnlock(&proc->sem);
+
+	return result;
 }
 
 /*
@@ -835,6 +848,226 @@ LWLockAcquireOrWait(LWLock *l, LWLockMode mode)
 }
 
 /*
+ * LWLockWait - Wait until lock is free, or a variable is updated.
+ *
+ * If the lock is held, and the *valptr equals oldval, waits until the lock
+ * is either freed, or the lock holder updates *valptr by calling
+ * LWLockWakeup. If the lock is free on exit (immediately or after waiting),
+ * returns true. If the lock is still held, but *valptr no longer matches
+ * oldval, returns false and sets *newval to the current value in *valptr.
+ *
+ * It's possible that the lock holder releases the lock, but another backend
+ * acquires it again before we get a chance to observe that the lock was
+ * momentarily released. We wouldn't need to wait for the new lock holder, but
+ * we cannot distinguish that case, so we will have to wait.
+ *
+ * Note: this function ignores shared lock holders; if the lock is held
+ * in shared mode, returns 'true'.
+ */
+bool
+LWLockWait(LWLock *l, uint64 *valptr, uint64 oldval, uint64 *newval)
+{
+	volatile LWLock *lock = l;
+	volatile uint64 *valp = valptr;
+	PGPROC	   *proc = MyProc;
+	int			extraWaits = 0;
+	bool		result = false;
+
+	/*
+	 * Quick test first to see if it the slot is free right now.
+	 *
+	 * XXX: the caller uses a spinlock before this, so we don't need a memory
+	 * barrier here as far as the current usage is concerned. But that might
+	 * not be safe in general.
+	 */
+	if (lock->exclusive == 0)
+		return true;
+
+	/*
+	 * Lock out cancel/die interrupts while we sleep on the lock. There is
+	 * no cleanup mechanism to remove us from the wait queue if we got
+	 * interrupted.
+	 */
+	HOLD_INTERRUPTS();
+
+	/*
+	 * Loop here to check the lock's status after each time we are signaled.
+	 */
+	for (;;)
+	{
+		bool		mustwait;
+		uint64		value;
+
+		/* Acquire mutex.  Time spent holding mutex should be short! */
+#ifdef LWLOCK_STATS
+		lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+		SpinLockAcquire(&lock->mutex);
+#endif
+
+		/* Is the lock now free, and if not, does the value match? */
+		if (lock->exclusive == 0)
+		{
+			result = true;
+			mustwait = false;
+		}
+		else
+		{
+			value = *valp;
+			if (value != oldval)
+			{
+				result = false;
+				mustwait = false;
+				*newval = value;
+			}
+			else
+				mustwait = true;
+		}
+
+		if (!mustwait)
+			break;				/* the lock was free or value didn't match */
+
+		/*
+		 * Add myself to wait queue.
+		 */
+		proc->lwWaiting = true;
+		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
+		proc->lwWaitLink = NULL;
+
+		/* waiters are added to the front of the queue */
+		proc->lwWaitLink = lock->head;
+		if (lock->head == NULL)
+			lock->tail = proc;
+		lock->head = proc;
+
+		/* Can release the mutex now */
+		SpinLockRelease(&lock->mutex);
+
+		/*
+		 * Wait until awakened.
+		 *
+		 * Since we share the process wait semaphore with other things, like
+		 * the regular lock manager and ProcWaitForSignal, and we may need to
+		 * acquire an LWLock while one of those is pending, it is possible that
+		 * we get awakened for a reason other than being signaled by
+		 * LWLockRelease. If so, loop back and wait again.  Once we've gotten
+		 * the LWLock, re-increment the sema by the number of additional
+		 * signals received, so that the lock manager or signal manager will
+		 * see the received signal when it next waits.
+		 */
+		LOG_LWDEBUG("LWLockWait", T_NAME(l), T_ID(l), "waiting");
+
+#ifdef LWLOCK_STATS
+		lwstats->block_count++;
+#endif
+
+		TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode);
+
+		for (;;)
+		{
+			/* "false" means cannot accept cancel/die interrupt here. */
+			PGSemaphoreLock(&proc->sem, false);
+			if (!proc->lwWaiting)
+				break;
+			extraWaits++;
+		}
+
+		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode);
+
+		LOG_LWDEBUG("LWLockWait", T_NAME(l), T_ID(l), "awakened");
+
+		/* Now loop back and check the status of the lock again. */
+	}
+
+	/* We are done updating shared state of the lock itself. */
+	SpinLockRelease(&lock->mutex);
+
+	TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(l), T_ID(l), mode);
+
+	/*
+	 * Fix the process wait semaphore's count for any absorbed wakeups.
+	 */
+	while (extraWaits-- > 0)
+		PGSemaphoreUnlock(&proc->sem);
+
+	/*
+	 * Now okay to allow cancel/die interrupts.
+	 */
+	RESUME_INTERRUPTS();
+
+	return result;
+}
+
+
+/*
+ * LWLockWakeup - Update a variable and wake up waiters atomically
+ *
+ * Sets *valptr to 'val', and wakes up all processes waiting for us with
+ * LWLockWait(). Setting the value and waking up the process happen
+ * atomically so that any process calling LWLockWait() on the same lock is
+ * guaranteed to see the new value, and act accordingly.
+ *
+ * The caller must be holding the lock in exclusive mode.
+ */
+void
+LWLockWakeup(LWLock *l, uint64 *valptr, uint64 val)
+{
+	volatile LWLock *lock = l;
+	volatile uint64 *valp = valptr;
+	PGPROC	   *head;
+	PGPROC	   *proc;
+	PGPROC	   *next;
+
+	/* Acquire mutex.  Time spent holding mutex should be short! */
+	SpinLockAcquire(&lock->mutex);
+
+	/* we should hold the lock */
+	Assert(lock->exclusive == 1);
+
+	/* Update the lock's value */
+	*valp = val;
+
+	/*
+	 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
+	 * up. They are always in the front of the queue.
+	 */
+	head = lock->head;
+
+	if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE)
+	{
+		proc = head;
+		next = proc->lwWaitLink;
+		while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
+		{
+			proc = next;
+			next = next->lwWaitLink;
+		}
+
+		/* proc is now the last PGPROC to be released */
+		lock->head = next;
+		proc->lwWaitLink = NULL;
+	}
+	else
+		head = NULL;
+
+	/* We are done updating shared state of the lock itself. */
+	SpinLockRelease(&lock->mutex);
+
+	/*
+	 * Awaken any waiters I removed from the queue.
+	 */
+	while (head != NULL)
+	{
+		proc = head;
+		head = proc->lwWaitLink;
+		proc->lwWaitLink = NULL;
+		proc->lwWaiting = false;
+		PGSemaphoreUnlock(&proc->sem);
+	}
+}
+
+
+/*
  * LWLockRelease - release a previously acquired lock
  */
 void
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 2812a73..3f9b366 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -2103,12 +2103,12 @@ static struct config_int ConfigureNamesInt[] =
 	},
 
 	{
-		{"xloginsert_slots", PGC_POSTMASTER, WAL_SETTINGS,
-			gettext_noop("Sets the number of slots for concurrent xlog insertions."),
+		{"xloginsert_locks", PGC_POSTMASTER, WAL_SETTINGS,
+			gettext_noop("Sets the number of locks used for concurrent xlog insertions."),
 			NULL,
 			GUC_NOT_IN_SAMPLE
 		},
-		&num_xloginsert_slots,
+		&num_xloginsert_locks,
 		8, 1, 1000,
 		NULL, NULL, NULL
 	},
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 11ab277..f5152f4 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -192,7 +192,7 @@ extern bool EnableHotStandby;
 extern bool fullPageWrites;
 extern bool wal_log_hints;
 extern bool log_checkpoints;
-extern int	num_xloginsert_slots;
+extern int	num_xloginsert_locks;
 
 /* WAL levels */
 typedef enum WalLevel
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index c8ff4eb..ecebd4d 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -169,9 +169,11 @@ typedef enum LWLockMode
 extern bool Trace_lwlocks;
 #endif
 
-extern void LWLockAcquire(LWLock *lock, LWLockMode mode);
+extern bool LWLockAcquire(LWLock *lock, LWLockMode mode);
 extern bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode);
 extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode);
+extern bool LWLockWait(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval);
+extern void LWLockWakeup(LWLock *lock, uint64 *valptr, uint64 value);
 extern void LWLockRelease(LWLock *lock);
 extern void LWLockReleaseAll(void);
 extern bool LWLockHeldByMe(LWLock *lock);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to