From cdb447903425317b04088eea1beb2abde0f8fa16 Mon Sep 17 00:00:00 2001
From: Pavel Borisov <pashkin.elfe@gmail.com>
Date: Wed, 7 Sep 2022 19:33:58 +0400
Subject: [PATCH v3] Lockless queue of LWLock waiters

The patch changes LWLock waiters queue manipulation to avoid spinlocks
and complex two-part algorithm of adding a waiter to a queue.
For this it uses atomically modified 64-bit LWLock state that contains
references to head and tail of LWLock waiters queue. This is to avoid
spinlocks and their contention while handles all possibilities of
concurrent manipulations with LWLock waiters queue.

Authors: Alexander Korotkov <aekorotkov@gmail.com>, Pavel Borisov
<pashkin.elfe@gmail.com>
---
 src/backend/storage/lmgr/lwlock.c | 1260 +++++++++++++++++------------
 src/include/storage/lwlock.h      |    3 +-
 src/include/storage/proc.h        |    3 +-
 3 files changed, 734 insertions(+), 532 deletions(-)

diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 0fc0cf6ebbd..0917b5874db 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -97,16 +97,41 @@
 /* We use the ShmemLock spinlock to protect LWLockCounter */
 extern slock_t *ShmemLock;
 
-#define LW_FLAG_HAS_WAITERS			((uint32) 1 << 30)
-#define LW_FLAG_RELEASE_OK			((uint32) 1 << 29)
-#define LW_FLAG_LOCKED				((uint32) 1 << 28)
-
-#define LW_VAL_EXCLUSIVE			((uint32) 1 << 24)
+/* 64-bit LWlock state layout is following:
+ * 18-bit waiting list head (number of backend in procarray)
+ * 18-bit waiting list tail (number of backend in procarray)
+ * LWlock state flags
+ * 19-bit LW_LOCK_MASK
+ */
+#define LW_WAITERS_LIST_MASK 		((uint64) 0xFFFFFFFFF0000000)
+#define LW_WAITERS_HEAD_MASK		((uint64) 0xFFFFC00000000000)
+#define LW_WAITERS_TAIL_MASK		((uint64) 0x00003FFFF0000000)
+#define LW_FLAG_WAKE_UP_LOCK		((uint64) 1 << 22)
+#define LW_FLAG_RELEASE_OK			((uint64) 1 << 21)	/* Wake up waiters only if set */
+#define LW_FLAG_LOCK_VAR			((uint64) 1 << 20)
+
+#define LW_VAL_EXCLUSIVE			((uint64) 1 << 19)
 #define LW_VAL_SHARED				1
 
-#define LW_LOCK_MASK				((uint32) ((1 << 25)-1))
-/* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
-#define LW_SHARED_MASK				((uint32) ((1 << 24)-1))
+#define LW_LOCK_MASK				((uint64) ((1 << 20)-1))
+/* Must be greater than MAX_BACKENDS - which is 2^18-1, so we're fine. */
+#define LW_SHARED_MASK				((uint64) ((1 << 19)-1))
+
+/* Access macros for procarray waiters queue */
+#define CurWaitMode(pgprocno) 		(GetPGProcByNumber(pgprocno)->lwWaitMode)
+#define NextWaitLink(pgprocno) 		(GetPGProcByNumber(pgprocno)->lwWaitLink)
+#define NextReleaseLink(pgprocno) 	(GetPGProcByNumber(pgprocno)->lwReleaseLink)
+#define CurIsWaiting(pgprocno) 		(GetPGProcByNumber(pgprocno)->lwWaiting)
+#define CurSem(pgprocno)			(GetPGProcByNumber(pgprocno)->sem)
+
+/* Access macros for LWlock state */
+#define INVALID_LOCK_PROCNO			((uint32) 0x3FFFF)
+#define LW_GET_WAIT_HEAD(state)		((state) >> 46)
+#define LW_SET_WAIT_HEAD(state, waiter) (((uint64)(state) & (~LW_WAITERS_HEAD_MASK)) | ((uint64)(waiter) << 46))
+#define LW_GET_WAIT_TAIL(state)		(((uint64)(state) & LW_WAITERS_TAIL_MASK) >> 28)
+#define LW_SET_WAIT_TAIL(state, waiter) (((uint64)(state) & (~LW_WAITERS_TAIL_MASK)) | ((uint64)(waiter) << 28))
+#define LW_HAS_WAITERS(state)		(LW_GET_WAIT_HEAD((state)) != INVALID_LOCK_PROCNO)
+#define LW_SET_STATE(state, head, tail) (((uint64)(state) & (~LW_WAITERS_LIST_MASK)) | ((uint64)(head) << 46) | ((uint64)(tail) << 28))
 
 /*
  * There are three sorts of LWLock "tranches":
@@ -291,7 +316,7 @@ PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
 								 where, T_NAME(lock), lock,
 								 (state & LW_VAL_EXCLUSIVE) != 0,
 								 state & LW_SHARED_MASK,
-								 (state & LW_FLAG_HAS_WAITERS) != 0,
+								 LW_HAS_WAITERS(state),
 								 pg_atomic_read_u32(&lock->nwaiters),
 								 (state & LW_FLAG_RELEASE_OK) != 0)));
 	}
@@ -733,12 +758,11 @@ RequestNamedLWLockTranche(const char *tranche_name, int num_lwlocks)
 void
 LWLockInitialize(LWLock *lock, int tranche_id)
 {
-	pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
+	pg_atomic_init_u64(&lock->state, LW_SET_STATE(LW_FLAG_RELEASE_OK, INVALID_LOCK_PROCNO, INVALID_LOCK_PROCNO));
 #ifdef LOCK_DEBUG
 	pg_atomic_init_u32(&lock->nwaiters, 0);
 #endif
 	lock->tranche = tranche_id;
-	proclist_init(&lock->waiters);
 }
 
 /*
@@ -812,378 +836,169 @@ GetLWLockIdentifier(uint32 classId, uint16 eventId)
  * Returns true if the lock isn't free and we need to wait.
  */
 static bool
-LWLockAttemptLock(LWLock *lock, LWLockMode mode)
+LWLockAttemptLock(LWLock *lock, LWLockMode mode, LWLockMode waitMode,
+				  bool queue_ok, bool wakeup)
 {
-	uint32		old_state;
+	uint64		old_state;
+	uint64		mask;
+	uint64		increment;
 
-	Assert(mode == LW_EXCLUSIVE || mode == LW_SHARED);
+	if (mode == LW_EXCLUSIVE)
+	{
+		mask = LW_LOCK_MASK;
+		increment = LW_VAL_EXCLUSIVE;
+	}
+	else
+	{
+		mask = LW_VAL_EXCLUSIVE;
+		increment = LW_VAL_SHARED;
+	}
 
 	/*
 	 * Read once outside the loop, later iterations will get the newer value
 	 * via compare & exchange.
 	 */
-	old_state = pg_atomic_read_u32(&lock->state);
+	old_state = pg_atomic_read_u64(&lock->state);
 
 	/* loop until we've determined whether we could acquire the lock or not */
 	while (true)
 	{
-		uint32		desired_state;
+		uint64		desired_state;
 		bool		lock_free;
 
-		desired_state = old_state;
-
-		if (mode == LW_EXCLUSIVE)
-		{
-			lock_free = (old_state & LW_LOCK_MASK) == 0;
-			if (lock_free)
-				desired_state += LW_VAL_EXCLUSIVE;
-		}
-		else
-		{
-			lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0;
-			if (lock_free)
-				desired_state += LW_VAL_SHARED;
-		}
+		lock_free = (old_state & mask) == 0;
 
 		/*
-		 * Attempt to swap in the state we are expecting. If we didn't see
-		 * lock to be free, that's just the old value. If we saw it as free,
-		 * we'll attempt to mark it acquired. The reason that we always swap
-		 * in the value is that this doubles as a memory barrier. We could try
-		 * to be smarter and only swap in values if we saw the lock as free,
-		 * but benchmark haven't shown it as beneficial so far.
-		 *
-		 * Retry if the value changed since we last looked at it.
+		 * The retries on spin-wait helps to avoid going to relatively expensive
+		 * OS semaphore when lock is freed fast enough by a concurrent process.
+		 * XXX The initial delay and number of repeats are optimized based on
+		 * heavy lock concurrency tests and the values are to be discussed.
 		 */
-		if (pg_atomic_compare_exchange_u32(&lock->state,
-										   &old_state, desired_state))
-		{
-			if (lock_free)
-			{
-				/* Great! Got the lock. */
-#ifdef LOCK_DEBUG
-				if (mode == LW_EXCLUSIVE)
-					lock->owner = MyProc;
-#endif
-				return false;
-			}
-			else
-				return true;	/* somebody else has the lock */
-		}
-	}
-	pg_unreachable();
-}
-
-/*
- * Lock the LWLock's wait list against concurrent activity.
- *
- * NB: even though the wait list is locked, non-conflicting lock operations
- * may still happen concurrently.
- *
- * Time spent holding mutex should be short!
- */
-static void
-LWLockWaitListLock(LWLock *lock)
-{
-	uint32		old_state;
-#ifdef LWLOCK_STATS
-	lwlock_stats *lwstats;
-	uint32		delays = 0;
-
-	lwstats = get_lwlock_stats_entry(lock);
-#endif
-
-	while (true)
-	{
-		/* always try once to acquire lock directly */
-		old_state = pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_LOCKED);
-		if (!(old_state & LW_FLAG_LOCKED))
-			break;				/* got lock */
-
-		/* and then spin without atomic operations until lock is released */
+		if (!lock_free && queue_ok)
 		{
+			int		i = 0;
 			SpinDelayStatus delayStatus;
 
 			init_local_spin_delay(&delayStatus);
+			delayStatus.cur_delay = 500L;
 
-			while (old_state & LW_FLAG_LOCKED)
+			while (!lock_free && i < 2)
 			{
 				perform_spin_delay(&delayStatus);
-				old_state = pg_atomic_read_u32(&lock->state);
+				old_state = pg_atomic_read_u64(&lock->state);
+				lock_free = (old_state & mask) == 0;
+				i++;
 			}
-#ifdef LWLOCK_STATS
-			delays += delayStatus.delays;
-#endif
-			finish_spin_delay(&delayStatus);
 		}
 
-		/*
-		 * Retry. The lock might obviously already be re-acquired by the time
-		 * we're attempting to get it again.
-		 */
-	}
-
-#ifdef LWLOCK_STATS
-	lwstats->spin_delay_count += delays;
-#endif
-}
-
-/*
- * Unlock the LWLock's wait list.
- *
- * Note that it can be more efficient to manipulate flags and release the
- * locks in a single atomic operation.
- */
-static void
-LWLockWaitListUnlock(LWLock *lock)
-{
-	uint32		old_state PG_USED_FOR_ASSERTS_ONLY;
-
-	old_state = pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_LOCKED);
-
-	Assert(old_state & LW_FLAG_LOCKED);
-}
-
-/*
- * Wakeup all the lockers that currently have a chance to acquire the lock.
- */
-static void
-LWLockWakeup(LWLock *lock)
-{
-	bool		new_release_ok;
-	bool		wokeup_somebody = false;
-	proclist_head wakeup;
-	proclist_mutable_iter iter;
-
-	proclist_init(&wakeup);
-
-	new_release_ok = true;
-
-	/* lock wait list while collecting backends to wake up */
-	LWLockWaitListLock(lock);
-
-	proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
-	{
-		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
+		desired_state = old_state;
 
-		if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
-			continue;
+		if (lock_free)
+			desired_state += increment;
 
-		proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
-		proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
+		if (wakeup)
+			desired_state |= LW_FLAG_RELEASE_OK;
 
-		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
+		if (!lock_free && queue_ok)
 		{
 			/*
-			 * Prevent additional wakeups until retryer gets to run. Backends
-			 * that are just waiting for the lock to become free don't retry
-			 * automatically.
+			 * If we don't have a PGPROC structure, there's no way to wait.
+			 * This should never occur, since MyProc should only be null
+			 * during shared memory initialization.
 			 */
-			new_release_ok = false;
+			if (MyProc == NULL)
+				elog(PANIC, "cannot wait without a PGPROC structure");
 
-			/*
-			 * Don't wakeup (further) exclusive locks.
-			 */
-			wokeup_somebody = true;
-		}
-
-		/*
-		 * Once we've woken up an exclusive lock, there's no point in waking
-		 * up anybody else.
-		 */
-		if (waiter->lwWaitMode == LW_EXCLUSIVE)
-			break;
-	}
+			if (MyProc->lwWaiting)
+				elog(PANIC, "queueing for lock while waiting on another one");
 
-	Assert(proclist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
+			MyProc->lwWaiting = true;
+			MyProc->lwWaitMode = waitMode;
 
-	/* unset required flags, and release lock, in one fell swoop */
-	{
-		uint32		old_state;
-		uint32		desired_state;
-
-		old_state = pg_atomic_read_u32(&lock->state);
-		while (true)
-		{
-			desired_state = old_state;
-
-			/* compute desired flags */
-
-			if (new_release_ok)
-				desired_state |= LW_FLAG_RELEASE_OK;
+			if (waitMode == LW_WAIT_UNTIL_FREE)
+			{
+				/*
+				 * WAIT_UNTIL_FREE locks should be woken up at each wakeup,
+				 * push them to list head to save time iterating it at unlock.
+				 */
+				desired_state = LW_SET_WAIT_HEAD(desired_state | LW_FLAG_RELEASE_OK, MyProc->pgprocno);
+				/* if list was empty set tail to same value as head */
+				if (!LW_HAS_WAITERS(old_state))
+				{
+					Assert(LW_GET_WAIT_TAIL(old_state) == INVALID_LOCK_PROCNO);
+					desired_state = LW_SET_WAIT_TAIL(desired_state, MyProc->pgprocno);
+				}
+				MyProc->lwWaitLink = LW_GET_WAIT_HEAD(old_state);
+			}
 			else
-				desired_state &= ~LW_FLAG_RELEASE_OK;
-
-			if (proclist_is_empty(&wakeup))
-				desired_state &= ~LW_FLAG_HAS_WAITERS;
-
-			desired_state &= ~LW_FLAG_LOCKED;	/* release lock */
-
-			if (pg_atomic_compare_exchange_u32(&lock->state, &old_state,
-											   desired_state))
-				break;
+			{
+				/*
+				 * Shared and exclusive waiters are added to the tail of a list
+				 * so that we don't need to iterate the whole list at each wakeup.
+				 */
+				desired_state = LW_SET_WAIT_TAIL(desired_state, MyProc->pgprocno);
+				/* if list was empty set head to same value as tail */
+				if (!LW_HAS_WAITERS(old_state))
+					desired_state = LW_SET_WAIT_HEAD(desired_state, MyProc->pgprocno);
+				MyProc->lwWaitLink = INVALID_LOCK_PROCNO;
+
+				/*
+				 * Waiters linked to tail of a queue only after successful
+				 * atomic update of state. Otherwise we can get a queue with
+				 * tail entries behind a tail pointed from state, so that they
+				 * could be unlocked by a concurrent process before being locked.
+				 */
+			}
 		}
-	}
-
-	/* Awaken any waiters I removed from the queue. */
-	proclist_foreach_modify(iter, &wakeup, lwWaitLink)
-	{
-		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
-
-		LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
-		proclist_delete(&wakeup, iter.cur, lwWaitLink);
 
 		/*
-		 * Guarantee that lwWaiting being unset only becomes visible once the
-		 * unlink from the link has completed. Otherwise the target backend
-		 * could be woken up for other reason and enqueue for a new lock - if
-		 * that happens before the list unlink happens, the list would end up
-		 * being corrupted.
+		 * Attempt to swap in the state we are expecting. If we didn't see
+		 * lock to be free, that's just the old value. If we saw it as free,
+		 * we'll attempt to mark it acquired. The reason that we always swap
+		 * in the value is that this doubles as a memory barrier. We could try
+		 * to be smarter and only swap in values if we saw the lock as free,
+		 * but benchmark haven't shown it as beneficial so far.
 		 *
-		 * The barrier pairs with the LWLockWaitListLock() when enqueuing for
-		 * another lock.
+		 * Retry if the value changed since we last looked at it.
 		 */
-		pg_write_barrier();
-		waiter->lwWaiting = false;
-		PGSemaphoreUnlock(waiter->sem);
-	}
-}
-
-/*
- * Add ourselves to the end of the queue.
- *
- * NB: Mode can be LW_WAIT_UNTIL_FREE here!
- */
-static void
-LWLockQueueSelf(LWLock *lock, LWLockMode mode)
-{
-	/*
-	 * If we don't have a PGPROC structure, there's no way to wait. This
-	 * should never occur, since MyProc should only be null during shared
-	 * memory initialization.
-	 */
-	if (MyProc == NULL)
-		elog(PANIC, "cannot wait without a PGPROC structure");
-
-	if (MyProc->lwWaiting)
-		elog(PANIC, "queueing for lock while waiting on another one");
-
-	LWLockWaitListLock(lock);
-
-	/* setting the flag is protected by the spinlock */
-	pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
-
-	MyProc->lwWaiting = true;
-	MyProc->lwWaitMode = mode;
-
-	/* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
-	if (mode == LW_WAIT_UNTIL_FREE)
-		proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink);
-	else
-		proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink);
-
-	/* Can release the mutex now */
-	LWLockWaitListUnlock(lock);
-
+		if (pg_atomic_compare_exchange_u64(&lock->state,
+										   &old_state, desired_state))
+		{
+			if (lock_free)
+			{
+				/* Great! Got the lock. */
 #ifdef LOCK_DEBUG
-	pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
+				if (mode == LW_EXCLUSIVE)
+					lock->owner = MyProc;
 #endif
-}
-
-/*
- * Remove ourselves from the waitlist.
- *
- * This is used if we queued ourselves because we thought we needed to sleep
- * but, after further checking, we discovered that we don't actually need to
- * do so.
- */
-static void
-LWLockDequeueSelf(LWLock *lock)
-{
-	bool		found = false;
-	proclist_mutable_iter iter;
-
-#ifdef LWLOCK_STATS
-	lwlock_stats *lwstats;
-
-	lwstats = get_lwlock_stats_entry(lock);
-
-	lwstats->dequeue_self_count++;
+				return false;
+			}
+			else
+			{
+				/*
+				 * If we pushed new waiters to a tail of non-empty wait queue,
+				 * relink lwWaitLink of a previous tail item in the queue.
+				 */
+				if (queue_ok && waitMode != LW_WAIT_UNTIL_FREE && LW_HAS_WAITERS(old_state))
+				{
+					Assert(LW_GET_WAIT_TAIL(old_state) != INVALID_LOCK_PROCNO);
+					Assert(NextWaitLink(LW_GET_WAIT_TAIL(old_state)) == INVALID_LOCK_PROCNO);
+					Assert(LW_GET_WAIT_TAIL(old_state) != LW_GET_WAIT_TAIL(desired_state));
+					NextWaitLink(LW_GET_WAIT_TAIL(old_state)) = LW_GET_WAIT_TAIL(desired_state);
+				}
+#ifdef LOCK_DEBUG
+				pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
 #endif
-
-	LWLockWaitListLock(lock);
-
-	/*
-	 * Can't just remove ourselves from the list, but we need to iterate over
-	 * all entries as somebody else could have dequeued us.
-	 */
-	proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
-	{
-		if (iter.cur == MyProc->pgprocno)
-		{
-			found = true;
-			proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
-			break;
-		}
-	}
-
-	if (proclist_is_empty(&lock->waiters) &&
-		(pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
-	{
-		pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
-	}
-
-	/* XXX: combine with fetch_and above? */
-	LWLockWaitListUnlock(lock);
-
-	/* clear waiting state again, nice for debugging */
-	if (found)
-		MyProc->lwWaiting = false;
-	else
-	{
-		int			extraWaits = 0;
-
-		/*
-		 * Somebody else dequeued us and has or will wake us up. Deal with the
-		 * superfluous absorption of a wakeup.
-		 */
-
-		/*
-		 * Reset RELEASE_OK flag if somebody woke us before we removed
-		 * ourselves - they'll have set it to false.
-		 */
-		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
-
-		/*
-		 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
-		 * get reset at some inconvenient point later. Most of the time this
-		 * will immediately return.
-		 */
-		for (;;)
-		{
-			PGSemaphoreLock(MyProc->sem);
-			if (!MyProc->lwWaiting)
-				break;
-			extraWaits++;
+				return true;	/* somebody else has the lock */
+			}
 		}
-
-		/*
-		 * Fix the process wait semaphore's count for any absorbed wakeups.
-		 */
-		while (extraWaits-- > 0)
-			PGSemaphoreUnlock(MyProc->sem);
-	}
-
-#ifdef LOCK_DEBUG
-	{
-		/* not waiting anymore */
-		uint32		nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
-
-		Assert(nwaiters < MAX_BACKENDS);
+		else
+			MyProc->lwWaiting = false;
 	}
-#endif
+	pg_unreachable();
 }
 
+
 /*
  * LWLockAcquire - acquire a lightweight lock in the specified mode
  *
@@ -1197,6 +1012,7 @@ LWLockAcquire(LWLock *lock, LWLockMode mode)
 {
 	PGPROC	   *proc = MyProc;
 	bool		result = true;
+	bool		wakeup = false;
 	int			extraWaits = 0;
 #ifdef LWLOCK_STATS
 	lwlock_stats *lwstats;
@@ -1252,46 +1068,12 @@ LWLockAcquire(LWLock *lock, LWLockMode mode)
 	 */
 	for (;;)
 	{
-		bool		mustwait;
-
-		/*
-		 * Try to grab the lock the first time, we're not in the waitqueue
-		 * yet/anymore.
-		 */
-		mustwait = LWLockAttemptLock(lock, mode);
-
-		if (!mustwait)
+		if (!LWLockAttemptLock(lock, mode, mode, true, wakeup))
 		{
 			LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
 			break;				/* got the lock */
 		}
 
-		/*
-		 * Ok, at this point we couldn't grab the lock on the first try. We
-		 * cannot simply queue ourselves to the end of the list and wait to be
-		 * woken up because by now the lock could long have been released.
-		 * Instead add us to the queue and try to grab the lock again. If we
-		 * succeed we need to revert the queuing and be happy, otherwise we
-		 * recheck the lock. If we still couldn't grab it, we know that the
-		 * other locker will see our queue entries when releasing since they
-		 * existed before we checked for the lock.
-		 */
-
-		/* add to the queue */
-		LWLockQueueSelf(lock, mode);
-
-		/* we're now guaranteed to be woken up if necessary */
-		mustwait = LWLockAttemptLock(lock, mode);
-
-		/* ok, grabbed the lock the second time round, need to undo queueing */
-		if (!mustwait)
-		{
-			LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
-
-			LWLockDequeueSelf(lock);
-			break;
-		}
-
 		/*
 		 * Wait until awakened.
 		 *
@@ -1317,9 +1099,7 @@ LWLockAcquire(LWLock *lock, LWLockMode mode)
 				break;
 			extraWaits++;
 		}
-
-		/* Retrying, allow LWLockRelease to release waiters again. */
-		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
+		wakeup = true;
 
 #ifdef LOCK_DEBUG
 		{
@@ -1384,7 +1164,7 @@ LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
 	HOLD_INTERRUPTS();
 
 	/* Check for the lock */
-	mustwait = LWLockAttemptLock(lock, mode);
+	mustwait = LWLockAttemptLock(lock, mode, mode, false, false);
 
 	if (mustwait)
 	{
@@ -1451,64 +1231,45 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
 	 * NB: We're using nearly the same twice-in-a-row lock acquisition
 	 * protocol as LWLockAcquire(). Check its comments for details.
 	 */
-	mustwait = LWLockAttemptLock(lock, mode);
+	mustwait = LWLockAttemptLock(lock, mode, LW_WAIT_UNTIL_FREE, true, false);
 
 	if (mustwait)
 	{
-		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
-
-		mustwait = LWLockAttemptLock(lock, mode);
-
-		if (mustwait)
-		{
-			/*
-			 * Wait until awakened.  Like in LWLockAcquire, be prepared for
-			 * bogus wakeups.
-			 */
-			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
+		/*
+		 * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
+		 * wakeups.
+		 */
+		LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
 
 #ifdef LWLOCK_STATS
-			lwstats->block_count++;
+		lwstats->block_count++;
 #endif
 
-			LWLockReportWaitStart(lock);
-			if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
-				TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
+		LWLockReportWaitStart(lock);
+		if (TRACE_POSTGRESQL_LWLOCK_WAIT_START_ENABLED())
+			TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), mode);
 
-			for (;;)
-			{
-				PGSemaphoreLock(proc->sem);
-				if (!proc->lwWaiting)
-					break;
-				extraWaits++;
-			}
+		for (;;)
+		{
+			PGSemaphoreLock(proc->sem);
+			if (!proc->lwWaiting)
+				break;
+			extraWaits++;
+		}
 
 #ifdef LOCK_DEBUG
-			{
-				/* not waiting anymore */
-				uint32		nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
-
-				Assert(nwaiters < MAX_BACKENDS);
-			}
-#endif
-			if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
-				TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
-			LWLockReportWaitEnd();
-
-			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
-		}
-		else
 		{
-			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
+			/* not waiting anymore */
+			uint32		nwaiters PG_USED_FOR_ASSERTS_ONLY = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
 
-			/*
-			 * Got lock in the second attempt, undo queueing. We need to treat
-			 * this as having successfully acquired the lock, otherwise we'd
-			 * not necessarily wake up people we've prevented from acquiring
-			 * the lock.
-			 */
-			LWLockDequeueSelf(lock);
+			Assert(nwaiters < MAX_BACKENDS);
 		}
+#endif
+		if (TRACE_POSTGRESQL_LWLOCK_WAIT_DONE_ENABLED())
+			TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), mode);
+		LWLockReportWaitEnd();
+
+		LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
 	}
 
 	/*
@@ -1552,8 +1313,8 @@ LWLockConflictsWithVar(LWLock *lock,
 					   uint64 *valptr, uint64 oldval, uint64 *newval,
 					   bool *result)
 {
-	bool		mustwait;
 	uint64		value;
+	uint64		old_state;
 
 	/*
 	 * Test first to see if it the slot is free right now.
@@ -1562,36 +1323,106 @@ LWLockConflictsWithVar(LWLock *lock,
 	 * barrier here as far as the current usage is concerned.  But that might
 	 * not be safe in general.
 	 */
-	mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
+	old_state = pg_atomic_read_u64(&lock->state);
 
-	if (!mustwait)
+	while (true)
 	{
-		*result = true;
-		return false;
+		if ((old_state & LW_VAL_EXCLUSIVE) == 0)
+		{
+			*result = true;
+			return false;
+		}
+
+		/* and then spin without atomic operations until lock is released */
+		if (old_state & LW_FLAG_LOCK_VAR)
+		{
+			SpinDelayStatus delayStatus;
+
+			init_local_spin_delay(&delayStatus);
+
+			while (old_state & LW_FLAG_LOCK_VAR)
+			{
+				perform_spin_delay(&delayStatus);
+				old_state = pg_atomic_read_u64(&lock->state);
+			}
+#ifdef LWLOCK_STATS
+			delays += delayStatus.delays;
+#endif
+			finish_spin_delay(&delayStatus);
+		}
+
+		if (pg_atomic_compare_exchange_u64(&lock->state, &old_state,
+										   old_state | LW_FLAG_LOCK_VAR))
+			break;
 	}
 
 	*result = false;
 
-	/*
-	 * Read value using the lwlock's wait list lock, as we can't generally
-	 * rely on atomic 64 bit reads/stores.  TODO: On platforms with a way to
-	 * do atomic 64 bit reads/writes the spinlock should be optimized away.
-	 */
-	LWLockWaitListLock(lock);
 	value = *valptr;
-	LWLockWaitListUnlock(lock);
 
 	if (value != oldval)
 	{
-		mustwait = false;
 		*newval = value;
+
+		pg_atomic_fetch_and_u64(&lock->state, ~LW_FLAG_LOCK_VAR);
+		return false;
 	}
-	else
+
+	/* loop until we've determined whether we could acquire the lock or not */
+	while (true)
 	{
-		mustwait = true;
+		uint64		desired_state = old_state & (~LW_FLAG_LOCK_VAR);
+
+		if ((old_state & LW_VAL_EXCLUSIVE) != 0)
+		{
+			/*
+			 * If we don't have a PGPROC structure, there's no way to wait.
+			 * This should never occur, since MyProc should only be null
+			 * during shared memory initialization.
+			 */
+			if (MyProc == NULL)
+				elog(PANIC, "cannot wait without a PGPROC structure");
+
+			if (MyProc->lwWaiting)
+				elog(PANIC, "queueing for lock while waiting on another one");
+
+			MyProc->lwWaiting = true;
+			MyProc->lwWaitMode = LW_WAIT_UNTIL_FREE;
+			MyProc->lwWaitLink = LW_GET_WAIT_HEAD(old_state);
+			Assert(LW_GET_WAIT_HEAD(old_state) != MyProc->pgprocno);
+			desired_state = LW_SET_WAIT_HEAD(desired_state | LW_FLAG_RELEASE_OK, MyProc->pgprocno);
+			/* if list was empty set tail to same value as head */
+			if (!LW_HAS_WAITERS(old_state))
+			{
+				Assert(LW_GET_WAIT_TAIL(old_state) == INVALID_LOCK_PROCNO);
+				desired_state = LW_SET_WAIT_TAIL(desired_state, MyProc->pgprocno);
+			}
+		}
+
+		if (pg_atomic_compare_exchange_u64(&lock->state,
+										   &old_state, desired_state))
+		{
+
+			if (MyProc->lwWaiting)
+			{
+#ifdef LOCK_DEBUG
+				pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
+#endif
+				return true;
+			}
+			else
+			{
+				*result = true;
+				return false;
+			}
+		}
+		else
+		{
+			MyProc->lwWaiting = false;
+		}
 	}
 
-	return mustwait;
+	return true;
 }
 
 /*
@@ -1641,38 +1472,6 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 		if (!mustwait)
 			break;				/* the lock was free or value didn't match */
 
-		/*
-		 * Add myself to wait queue. Note that this is racy, somebody else
-		 * could wakeup before we're finished queuing. NB: We're using nearly
-		 * the same twice-in-a-row lock acquisition protocol as
-		 * LWLockAcquire(). Check its comments for details. The only
-		 * difference is that we also have to check the variable's values when
-		 * checking the state of the lock.
-		 */
-		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
-
-		/*
-		 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
-		 * lock is released.
-		 */
-		pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
-
-		/*
-		 * We're now guaranteed to be woken up if necessary. Recheck the lock
-		 * and variables state.
-		 */
-		mustwait = LWLockConflictsWithVar(lock, valptr, oldval, newval,
-										  &result);
-
-		/* Ok, no conflict after we queued ourselves. Undo queueing. */
-		if (!mustwait)
-		{
-			LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
-
-			LWLockDequeueSelf(lock);
-			break;
-		}
-
 		/*
 		 * Wait until awakened.
 		 *
@@ -1731,6 +1530,80 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 	return result;
 }
 
+static uint64
+lockVar(LWLock *lock)
+{
+	uint64		oldState;
+
+	oldState = pg_atomic_fetch_or_u64(&lock->state, LW_FLAG_LOCK_VAR);
+
+	while (oldState & LW_FLAG_LOCK_VAR)
+	{
+		SpinDelayStatus delayStatus;
+
+		init_local_spin_delay(&delayStatus);
+
+		while (oldState & LW_FLAG_LOCK_VAR)
+		{
+			perform_spin_delay(&delayStatus);
+			oldState = pg_atomic_read_u64(&lock->state);
+		}
+#ifdef LWLOCK_STATS
+		delays += delayStatus.delays;
+#endif
+		finish_spin_delay(&delayStatus);
+
+		oldState = pg_atomic_fetch_or_u64(&lock->state, LW_FLAG_LOCK_VAR);
+	}
+
+	return oldState;
+}
+
+/*
+ * waitForQueueRelink - Wait on a spinlock until concurrent
+ * lock procedure links new tail to waiters list.
+ *
+ * This is to process the rare case when the
+ * concurrent process try to iterate waiters list after lock
+ * procedure updated lock state but before it linked tail of
+ * the queue.
+ *
+ * The vice versa situation of linking tail to waiters list
+ * before updating lock state could lead to releasing waiters
+ * that haven't added to a wait queue yet.
+ */
+static inline void
+waitForQueueRelink(uint32 pgprocno)
+{
+	SpinDelayStatus delayStatus;
+
+	init_local_spin_delay(&delayStatus);
+	while (NextWaitLink(pgprocno) == INVALID_LOCK_PROCNO)
+		perform_spin_delay(&delayStatus);
+
+	finish_spin_delay(&delayStatus);
+	return;
+}
+
+/* awakenWaiters - Awaken all waiters from the wake queue */
+static inline void
+awakenWaiters(uint32 pgprocno, LWLock *lock)
+{
+	while (pgprocno != INVALID_LOCK_PROCNO)
+	{
+		/* Must read this before wakeup! */
+		uint32		nextlink = NextReleaseLink(pgprocno);
+
+		LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
+
+		pg_read_barrier();
+		CurIsWaiting(pgprocno) = false;
+		pg_write_barrier();
+		PGSemaphoreUnlock(CurSem(pgprocno));
+
+		pgprocno = nextlink;
+	}
+}
 
 /*
  * LWLockUpdateVar - Update a variable and wake up waiters atomically
@@ -1745,53 +1618,155 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 void
 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 {
-	proclist_head wakeup;
-	proclist_mutable_iter iter;
+	uint64		oldState;
+	uint32		pgprocno,
+				newHead,
+				newTail,
+				oldHead = INVALID_LOCK_PROCNO,
+				oldTail = INVALID_LOCK_PROCNO,
+				oldReplaceHead = INVALID_LOCK_PROCNO,
+				wakeupTail = INVALID_LOCK_PROCNO;
 
 	PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
 
-	proclist_init(&wakeup);
+	oldState = lockVar(lock);
 
-	LWLockWaitListLock(lock);
-
-	Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
+	Assert(oldState & LW_VAL_EXCLUSIVE);
 
 	/* Update the lock's value */
 	*valptr = val;
 
 	/*
 	 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
-	 * up. They are always in the front of the queue.
+	 * up. They are always in the front of the queue. Add them to release queue.
+	 *
+	 * If during a processing, concurrent process adds some waiters in the wait queue
+	 * atomic compare-and-exchange on lock state will not succeed. Then we construct
+	 * lock state again and update existing release queue with items from added head
+	 * of wait queue.
+	 *
+	 * Only after we suceeded in updating atomic lock state, we actually wakeup
+	 * processes from release queue.
+
 	 */
-	proclist_foreach_modify(iter, &lock->waiters, lwWaitLink)
+	while (true)
 	{
-		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
+		uint64		newState = oldState;
 
-		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
-			break;
+		newHead = LW_GET_WAIT_HEAD(newState);
+		newTail = LW_GET_WAIT_TAIL(newState);
 
-		proclist_delete(&lock->waiters, iter.cur, lwWaitLink);
-		proclist_push_tail(&wakeup, iter.cur, lwWaitLink);
-	}
+		/* Move all LW_WAIT_UNTIL_FREE waiters into wakeup list. */
+		if (newHead != oldHead)
+		{
+			pgprocno = newHead;
+		}
+		else
+		{
+			pgprocno = INVALID_LOCK_PROCNO;
+			if (oldReplaceHead != INVALID_LOCK_PROCNO)
+			{
+				newHead = oldReplaceHead;
+			}
+			else if (newTail == oldTail)
+			{
+				newHead = newTail = oldReplaceHead;
+			}
+			else
+			{
+				newHead = NextWaitLink(oldTail);
+				if (newHead == INVALID_LOCK_PROCNO)
+				{
+					waitForQueueRelink(oldTail);
+					newHead = NextWaitLink(oldTail);
+				}
+			}
+		}
 
-	/* We are done updating shared state of the lock itself. */
-	LWLockWaitListUnlock(lock);
+		while (pgprocno != INVALID_LOCK_PROCNO)
+		{
+			uint32		nextPgprocno = (pgprocno != newTail ? NextWaitLink(pgprocno) : INVALID_LOCK_PROCNO),
+						nextStepPgprocno;
 
-	/*
-	 * Awaken any waiters I removed from the queue.
-	 */
-	proclist_foreach_modify(iter, &wakeup, lwWaitLink)
-	{
-		PGPROC	   *waiter = GetPGProcByNumber(iter.cur);
+			/* Lock tail is updated but queue tail haven't relinked yet. */
+			if (pgprocno != newTail &&
+				nextPgprocno == INVALID_LOCK_PROCNO)
+			{
+				waitForQueueRelink(pgprocno);
+				nextPgprocno = NextWaitLink(pgprocno);
+			}
+			nextStepPgprocno = nextPgprocno;
 
-		proclist_delete(&wakeup, iter.cur, lwWaitLink);
-		/* check comment in LWLockWakeup() about this barrier */
-		pg_write_barrier();
-		waiter->lwWaiting = false;
-		PGSemaphoreUnlock(waiter->sem);
+			if (nextPgprocno == oldHead &&
+				nextPgprocno != INVALID_LOCK_PROCNO)
+			{
+				Assert(oldTail != INVALID_LOCK_PROCNO);
+				Assert(newTail != INVALID_LOCK_PROCNO);
+
+				nextStepPgprocno = (oldTail != newTail ? NextWaitLink(oldTail) : INVALID_LOCK_PROCNO);
+				if (nextStepPgprocno == INVALID_LOCK_PROCNO &&
+					oldTail != newTail)
+				{
+					waitForQueueRelink(oldTail);
+					nextStepPgprocno = NextWaitLink(oldTail);
+				}
+
+				if (oldReplaceHead != INVALID_LOCK_PROCNO)
+				{
+					if (oldReplaceHead != nextPgprocno)
+					{
+						Assert(pgprocno != oldReplaceHead);
+						nextPgprocno = NextWaitLink(pgprocno) = oldReplaceHead;
+					}
+				}
+				else
+				{
+					if (nextStepPgprocno != nextPgprocno)
+					{
+						Assert(pgprocno != nextStepPgprocno);
+						nextPgprocno = NextWaitLink(pgprocno) = nextStepPgprocno;
+					}
+				}
+			}
+
+			/* Remove LW_WAIT_UNTIL_FREE from the list head */
+			if (CurWaitMode(pgprocno) == LW_WAIT_UNTIL_FREE)
+			{
+				newHead = nextPgprocno;
+				if (newHead == INVALID_LOCK_PROCNO)
+					newTail = newHead;
+
+				/* Push to the wakeup list */
+				Assert(pgprocno != wakeupTail);
+				NextReleaseLink(pgprocno) = wakeupTail;
+				wakeupTail = pgprocno;
+			}
+			else
+			{
+				/*
+				 * At the tail part of a list there could be only LW_EXCLUSIVE
+				 * or LW_SHARED
+				 */
+				break;
+			}
+
+			pgprocno = nextStepPgprocno;
+		}
+
+		newState = LW_SET_STATE(newState & ~LW_FLAG_LOCK_VAR, newHead, newTail);
+		oldHead = LW_GET_WAIT_HEAD(oldState);
+		oldTail = LW_GET_WAIT_TAIL(oldState);
+		oldReplaceHead = newHead;
+
+		if (pg_atomic_compare_exchange_u64(&lock->state, &oldState, newState))
+		{
+			break;
+		}
 	}
-}
 
+	/* Awaken any waiters removed from the queue. */
+	awakenWaiters(wakeupTail, lock);
+}
 
 /*
  * LWLockRelease - release a previously acquired lock
@@ -1800,9 +1775,20 @@ void
 LWLockRelease(LWLock *lock)
 {
 	LWLockMode	mode;
-	uint32		oldstate;
-	bool		check_waiters;
+	uint64		oldState;
 	int			i;
+	uint32		pgprocno,
+				prevPgprocno,
+				newHead,
+				newTail,
+				oldHead = INVALID_LOCK_PROCNO,
+				oldTail = INVALID_LOCK_PROCNO,
+				oldReplaceHead = INVALID_LOCK_PROCNO,
+				oldReplaceTail = INVALID_LOCK_PROCNO,
+				wakeupTail = INVALID_LOCK_PROCNO;
+	bool		new_release_ok = true;
+	bool		wakeup = false;
+	LWLockMode	lastMode = LW_WAIT_UNTIL_FREE;
 
 	/*
 	 * Remove lock from list of locks held.  Usually, but not always, it will
@@ -1827,42 +1813,264 @@ LWLockRelease(LWLock *lock)
 	 * Release my hold on lock, after that it can immediately be acquired by
 	 * others, even if we still have to wakeup other waiters.
 	 */
-	if (mode == LW_EXCLUSIVE)
-		oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
-	else
-		oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
+	oldState = pg_atomic_read_u64(&lock->state);
 
-	/* nobody else can have that kind of lock */
-	Assert(!(oldstate & LW_VAL_EXCLUSIVE));
+	while (true)
+	{
+		uint64		newState = oldState;
+
+		if (mode == LW_EXCLUSIVE)
+			newState -= LW_VAL_EXCLUSIVE;
+		else
+			newState -= LW_VAL_SHARED;
+
+		if (LW_HAS_WAITERS(oldState) &&
+			(newState & LW_LOCK_MASK) == 0 &&
+			(newState & LW_FLAG_RELEASE_OK) == LW_FLAG_RELEASE_OK)
+			newState |= LW_FLAG_WAKE_UP_LOCK;
+		newState &= ~LW_FLAG_LOCK_VAR;
+
+		if (pg_atomic_compare_exchange_u64(&lock->state, &oldState, newState))
+		{
+			if (!(oldState & LW_FLAG_WAKE_UP_LOCK) &&
+				(newState & LW_FLAG_WAKE_UP_LOCK))
+				wakeup = true;
+			oldState = newState;
+			break;
+		}
+	}
 
 	if (TRACE_POSTGRESQL_LWLOCK_RELEASE_ENABLED())
 		TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock));
 
-	/*
-	 * We're still waiting for backends to get scheduled, don't wake them up
-	 * again.
-	 */
-	if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
-		(LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
-		(oldstate & LW_LOCK_MASK) == 0)
-		check_waiters = true;
-	else
-		check_waiters = false;
+	if (!wakeup)
+	{
+		RESUME_INTERRUPTS();
+		return;
+	}
+
+	/* XXX: remove before commit? */
+	LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
 
 	/*
-	 * As waking up waiters requires the spinlock to be acquired, only do so
-	 * if necessary.
+	 * Move waiters to be awaken from wait queue to a release queue are:
+	 * - all LW_WAIT_UNTIL_FREE waiters
+	 * And after all LW_WAIT_UNTIL_FREE are removed:
+	 * - if first in the queue is LW_EXCLUSIVE then only one LW_EXCLUSIVE
+	 * - if first in the queue is LW_SHARED then all LW_SHARED from the queue.
+	 *
+	 * If during a processing, concurrent process adds some waiters in the wait queue
+	 * atomic compare-and-exchange on lock state will not succeed. Then we construct
+	 * lock state again and update existing release queue with items from added tail
+	 * and head of wait queue.
+	 *
+	 * Only after we suceeded in updating atomic lock state, we actually wakeup
+	 * processes from release queue.
 	 */
-	if (check_waiters)
+	while (true)
 	{
-		/* XXX: remove before commit? */
-		LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
-		LWLockWakeup(lock);
+		uint64		newState = oldState;
+
+		newHead = LW_GET_WAIT_HEAD(newState);
+		newTail = LW_GET_WAIT_TAIL(newState);
+		prevPgprocno = INVALID_LOCK_PROCNO;
+
+		/* Move all LW_WAIT_UNTIL_FREE waiters into wakeup list. */
+		if (oldHead == INVALID_LOCK_PROCNO ||
+			oldTail == INVALID_LOCK_PROCNO ||
+			newHead != oldHead)
+		{
+			pgprocno = newHead;
+
+		}
+		else if (oldTail != newTail)
+		{
+			Assert(newHead != INVALID_LOCK_PROCNO &&
+					newTail != INVALID_LOCK_PROCNO &&
+					oldHead != INVALID_LOCK_PROCNO &&
+					oldTail != INVALID_LOCK_PROCNO);
+
+			pgprocno = NextWaitLink(oldTail);
+			if (pgprocno == INVALID_LOCK_PROCNO)
+			{
+				waitForQueueRelink(oldTail);
+				pgprocno = NextWaitLink(oldTail);
+			}
+			prevPgprocno = oldReplaceTail;
+
+			if (oldReplaceHead != INVALID_LOCK_PROCNO)
+				newHead = oldReplaceHead;
+			else
+				newHead = pgprocno;
+
+			if (oldTail != oldReplaceTail &&
+				oldReplaceTail != INVALID_LOCK_PROCNO)
+			{
+				Assert(prevPgprocno != pgprocno);
+				NextWaitLink(prevPgprocno) = pgprocno;
+			}
+		}
+		else
+		{
+			Assert(newHead == oldHead);
+			Assert(oldTail == newTail);
+			newHead = oldReplaceHead;
+			newTail = oldReplaceTail;
+			pgprocno = INVALID_LOCK_PROCNO;
+		}
+
+		while (pgprocno != INVALID_LOCK_PROCNO)
+		{
+			LWLockMode	curMode = CurWaitMode(pgprocno);
+			uint32		nextPgprocno = (pgprocno != newTail ? NextWaitLink(pgprocno) : INVALID_LOCK_PROCNO),
+						nextStepPgprocno,
+						nextStepPrevPgprocno = pgprocno;
+
+			/* Lock tail is updated but queue tail haven't relinked yet. */
+			if (pgprocno != newTail &&
+				nextPgprocno == INVALID_LOCK_PROCNO)
+			{
+				waitForQueueRelink(pgprocno);
+				nextPgprocno = NextWaitLink(pgprocno);
+			}
+			nextStepPgprocno = nextPgprocno;
+
+			if (nextPgprocno == oldHead &&
+				nextPgprocno != INVALID_LOCK_PROCNO)
+			{
+				Assert(oldTail != INVALID_LOCK_PROCNO);
+				Assert(newTail != INVALID_LOCK_PROCNO);
+
+				nextStepPrevPgprocno = (oldReplaceTail != INVALID_LOCK_PROCNO ? oldReplaceTail : pgprocno);
+				nextStepPgprocno = (oldTail != newTail ? NextWaitLink(oldTail) : INVALID_LOCK_PROCNO);
+				if (nextStepPgprocno == INVALID_LOCK_PROCNO &&
+					oldTail != newTail)
+				{
+					waitForQueueRelink(oldTail);
+					nextStepPgprocno = NextWaitLink(oldTail);
+				}
+
+				if (oldTail != oldReplaceTail &&
+					oldReplaceTail != INVALID_LOCK_PROCNO)
+				{
+					Assert(oldReplaceTail != nextStepPgprocno);
+					NextWaitLink(oldReplaceTail) = nextStepPgprocno;
+				}
+
+				if (oldReplaceHead != INVALID_LOCK_PROCNO)
+				{
+					if (oldReplaceHead != nextPgprocno)
+					{
+						Assert(pgprocno != oldReplaceHead);
+						nextPgprocno = NextWaitLink(pgprocno) = oldReplaceHead;
+					}
+				}
+				else
+				{
+					if (nextStepPgprocno != nextPgprocno)
+					{
+						Assert(pgprocno != nextStepPgprocno);
+						nextPgprocno = NextWaitLink(pgprocno) = nextStepPgprocno;
+					}
+				}
+
+				if (oldTail == newTail)
+				{
+					if (oldReplaceTail != INVALID_LOCK_PROCNO)
+						newTail = oldReplaceTail;
+					else
+						newTail = pgprocno;
+				}
+			}
+
+			/* Remove LW_WAIT_UNTIL_FREE waiters from the list head */
+			if (curMode == LW_WAIT_UNTIL_FREE)
+			{
+				Assert(prevPgprocno == INVALID_LOCK_PROCNO);
+				newHead = nextPgprocno;
+				if (newHead == INVALID_LOCK_PROCNO)
+					newTail = newHead;
+				if (nextStepPrevPgprocno == pgprocno)
+					nextStepPrevPgprocno = prevPgprocno;
+
+				/* Push to the wakeup list */
+				Assert(pgprocno != wakeupTail);
+				NextReleaseLink(pgprocno) = wakeupTail;
+				wakeupTail = pgprocno;
+			}
+			else
+			{
+				if (lastMode == LW_EXCLUSIVE)
+					break;
+
+				if (lastMode == LW_WAIT_UNTIL_FREE)
+					lastMode = curMode;
+
+				if (lastMode == curMode)
+				{
+					/*
+					 * Prevent additional wakeups until retrier gets to
+						* run. Backends that are just waiting for the lock to
+						* become free don't retry automatically.
+						*/
+					new_release_ok = false;
+
+					if (prevPgprocno == INVALID_LOCK_PROCNO)
+					{
+						newHead = nextPgprocno;
+					}
+					else
+					{
+						Assert(prevPgprocno != nextPgprocno);
+						NextWaitLink(prevPgprocno) = nextPgprocno;
+						if (nextPgprocno == INVALID_LOCK_PROCNO)
+						{
+							Assert(newTail == pgprocno);
+							newTail = prevPgprocno;
+						}
+					}
+
+					if (nextStepPrevPgprocno == pgprocno)
+						nextStepPrevPgprocno = prevPgprocno;
+
+					if (newHead == INVALID_LOCK_PROCNO ||
+						newTail == INVALID_LOCK_PROCNO)
+					{
+						newHead = INVALID_LOCK_PROCNO;
+						newTail = INVALID_LOCK_PROCNO;
+					}
+
+					/* Push to the wakeup list */
+					Assert(pgprocno != wakeupTail);
+					NextReleaseLink(pgprocno) = wakeupTail;
+					wakeupTail = pgprocno;
+				}
+			}
+
+			prevPgprocno = nextStepPrevPgprocno;
+			pgprocno = nextStepPgprocno;
+		}
+
+		newState = LW_SET_STATE(newState & ~LW_FLAG_WAKE_UP_LOCK, newHead, newTail);
+
+		if (new_release_ok)
+			newState |= LW_FLAG_RELEASE_OK;
+		else
+			newState &= ~LW_FLAG_RELEASE_OK;
+
+		oldHead = LW_GET_WAIT_HEAD(oldState);
+		oldTail = LW_GET_WAIT_TAIL(oldState);
+		oldReplaceHead = newHead;
+		oldReplaceTail = newTail;
+
+		if (pg_atomic_compare_exchange_u64(&lock->state, &oldState, newState))
+			break;
 	}
 
-	/*
-	 * Now okay to allow cancel/die interrupts.
-	 */
+	/* Awaken any waiters removed from the queue. */
+	awakenWaiters(wakeupTail, lock);
+
+	/* Now okay to allow cancel/die interrupts. */
 	RESUME_INTERRUPTS();
 }
 
@@ -1872,15 +2080,9 @@ LWLockRelease(LWLock *lock)
 void
 LWLockReleaseClearVar(LWLock *lock, uint64 *valptr, uint64 val)
 {
-	LWLockWaitListLock(lock);
+	(void) lockVar(lock);
 
-	/*
-	 * Set the variable's value before releasing the lock, that prevents race
-	 * a race condition wherein a new locker acquires the lock, but hasn't yet
-	 * set the variables value.
-	 */
 	*valptr = val;
-	LWLockWaitListUnlock(lock);
 
 	LWLockRelease(lock);
 }
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index ca4eca76f41..e79f74fea4f 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -31,8 +31,7 @@ struct PGPROC;
 typedef struct LWLock
 {
 	uint16		tranche;		/* tranche ID */
-	pg_atomic_uint32 state;		/* state of exclusive/nonexclusive lockers */
-	proclist_head waiters;		/* list of waiting PGPROCs */
+	pg_atomic_uint64 state;		/* state of exclusive/nonexclusive lockers */
 #ifdef LOCK_DEBUG
 	pg_atomic_uint32 nwaiters;	/* number of waiters */
 	struct PGPROC *owner;		/* last exclusive owner of the lock */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 8d096fdeeb1..5cde824b365 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -219,7 +219,8 @@ struct PGPROC
 	/* Info about LWLock the process is currently waiting for, if any. */
 	bool		lwWaiting;		/* true if waiting for an LW lock */
 	uint8		lwWaitMode;		/* lwlock mode being waited for */
-	proclist_node lwWaitLink;	/* position in LW lock wait list */
+	uint32		lwWaitLink;		/* next proc number in LW lock wait list */
+	uint32		lwReleaseLink;	/* next proc number in LW lock release list */
 
 	/* Support for condition variables. */
 	proclist_node cvWaitLink;	/* position in CV wait list */
-- 
2.24.3 (Apple Git-128)

