Hi,

Attached you can find the next version of my LW_SHARED patchset. Now
that atomics are committed, it seems like a good idea to also add their
raison d'ĂȘtre.

Since the last public version I have:
* Addressed lots of Amit's comments. Thanks!
* Peformed a fair amount of testing.
* Rebased the code. The volatile removal made that not entirely
  trivial...
* Significantly cleaned up and simplified the code.
* Updated comments and such
* Fixed a minor bug (unpaired HOLD/RESUME_INTERRUPTS in a corner case)

The feature currently consists out of two patches:
1) Convert PGPROC->lwWaitLink into a dlist. The old code was frail and
   verbose. This also does:
    * changes the logic in LWLockRelease() to release all shared lockers
      when waking up any. This can yield some significant performance
      improvements - and the fairness isn't really much worse than
      before,
      as we always allowed new shared lockers to jump the queue.

    * adds a memory pg_write_barrier() in the wakeup paths between
      dequeuing and unsetting ->lwWaiting. That was always required on
      weakly ordered machines, but f4077cda2 made it more urgent. I can
      reproduce crashes without it.
2) Implement the wait free LW_SHARED algorithm.

Personally I'm quite happy with the new state. I think it needs more
review, but I personally don't know of anything that needs
changing. There's lots of further improvements that could be done, but
let's get this in first.

Greetings,

Andres Freund

--
 Andres Freund                     http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
>From 6885a15cc6f2e193ff575a4463d90ad252d74f5e Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Tue, 7 Oct 2014 15:32:34 +0200
Subject: [PATCH 1/2] Convert the PGPROC->lwWaitLink list into a dlist instead
 of open coding it.

Besides being shorter and much easier to read it:

* changes the logic in LWLockRelease() to release all shared lockers
  when waking up any. This can yield some significant performance
  improvements - and the fairness isn't really much worse than before,
  as we always allowed new shared lockers to jump the queue.

* adds a memory pg_write_barrier() in the wakeup paths between
  dequeuing and unsetting ->lwWaiting. That was always required on
  weakly ordered machines, but f4077cda2 made it more urgent.

Author: Andres Freund
---
 src/backend/access/transam/twophase.c |   1 -
 src/backend/storage/lmgr/lwlock.c     | 151 +++++++++++++---------------------
 src/backend/storage/lmgr/proc.c       |   2 -
 src/include/storage/lwlock.h          |   5 +-
 src/include/storage/proc.h            |   3 +-
 5 files changed, 60 insertions(+), 102 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index d5409a6..6401943 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -389,7 +389,6 @@ MarkAsPreparing(TransactionId xid, const char *gid,
 	proc->roleId = owner;
 	proc->lwWaiting = false;
 	proc->lwWaitMode = 0;
-	proc->lwWaitLink = NULL;
 	proc->waitLock = NULL;
 	proc->waitProcLock = NULL;
 	for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 9fe6855..e6f9158 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -35,6 +35,7 @@
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "replication/slot.h"
+#include "storage/barrier.h"
 #include "storage/ipc.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -115,9 +116,9 @@ inline static void
 PRINT_LWDEBUG(const char *where, const LWLock *lock)
 {
 	if (Trace_lwlocks)
-		elog(LOG, "%s(%s %d): excl %d shared %d head %p rOK %d",
+		elog(LOG, "%s(%s %d): excl %d shared %d rOK %d",
 			 where, T_NAME(lock), T_ID(lock),
-			 (int) lock->exclusive, lock->shared, lock->head,
+			 (int) lock->exclusive, lock->shared,
 			 (int) lock->releaseOK);
 }
 
@@ -475,8 +476,7 @@ LWLockInitialize(LWLock *lock, int tranche_id)
 	lock->exclusive = 0;
 	lock->shared = 0;
 	lock->tranche = tranche_id;
-	lock->head = NULL;
-	lock->tail = NULL;
+	dlist_init(&lock->waiters);
 }
 
 
@@ -615,12 +615,7 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 
 		proc->lwWaiting = true;
 		proc->lwWaitMode = mode;
-		proc->lwWaitLink = NULL;
-		if (lock->head == NULL)
-			lock->head = proc;
-		else
-			lock->tail->lwWaitLink = proc;
-		lock->tail = proc;
+		dlist_push_head(&lock->waiters, &proc->lwWaitLink);
 
 		/* Can release the mutex now */
 		SpinLockRelease(&lock->mutex);
@@ -836,12 +831,7 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
 
 		proc->lwWaiting = true;
 		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-		proc->lwWaitLink = NULL;
-		if (lock->head == NULL)
-			lock->head = proc;
-		else
-			lock->tail->lwWaitLink = proc;
-		lock->tail = proc;
+		dlist_push_head(&lock->waiters, &proc->lwWaitLink);
 
 		/* Can release the mutex now */
 		SpinLockRelease(&lock->mutex);
@@ -997,13 +987,8 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 		 */
 		proc->lwWaiting = true;
 		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-		proc->lwWaitLink = NULL;
-
 		/* waiters are added to the front of the queue */
-		proc->lwWaitLink = lock->head;
-		if (lock->head == NULL)
-			lock->tail = proc;
-		lock->head = proc;
+		dlist_push_head(&lock->waiters, &proc->lwWaitLink);
 
 		/* Can release the mutex now */
 		SpinLockRelease(&lock->mutex);
@@ -1079,9 +1064,10 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 void
 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 {
-	PGPROC	   *head;
-	PGPROC	   *proc;
-	PGPROC	   *next;
+	dlist_head	wakeup;
+	dlist_mutable_iter iter;
+
+	dlist_init(&wakeup);
 
 	/* Acquire mutex.  Time spent holding mutex should be short! */
 	SpinLockAcquire(&lock->mutex);
@@ -1096,24 +1082,16 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 	 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
 	 * up. They are always in the front of the queue.
 	 */
-	head = lock->head;
-
-	if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE)
+	dlist_foreach_modify(iter, &lock->waiters)
 	{
-		proc = head;
-		next = proc->lwWaitLink;
-		while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE)
-		{
-			proc = next;
-			next = next->lwWaitLink;
-		}
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
 
-		/* proc is now the last PGPROC to be released */
-		lock->head = next;
-		proc->lwWaitLink = NULL;
+		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
+			break;
+
+		dlist_delete(&waiter->lwWaitLink);
+		dlist_push_tail(&wakeup, &waiter->lwWaitLink);
 	}
-	else
-		head = NULL;
 
 	/* We are done updating shared state of the lock itself. */
 	SpinLockRelease(&lock->mutex);
@@ -1121,13 +1099,13 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 	/*
 	 * Awaken any waiters I removed from the queue.
 	 */
-	while (head != NULL)
+	dlist_foreach_modify(iter, &wakeup)
 	{
-		proc = head;
-		head = proc->lwWaitLink;
-		proc->lwWaitLink = NULL;
-		proc->lwWaiting = false;
-		PGSemaphoreUnlock(&proc->sem);
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+		dlist_delete(&waiter->lwWaitLink);
+		pg_write_barrier();
+		waiter->lwWaiting = false;
+		PGSemaphoreUnlock(&waiter->sem);
 	}
 }
 
@@ -1138,10 +1116,12 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 void
 LWLockRelease(LWLock *lock)
 {
-	PGPROC	   *head;
-	PGPROC	   *proc;
+	dlist_head	wakeup;
+	dlist_mutable_iter iter;
 	int			i;
 
+	dlist_init(&wakeup);
+
 	PRINT_LWDEBUG("LWLockRelease", lock);
 
 	/*
@@ -1177,58 +1157,39 @@ LWLockRelease(LWLock *lock)
 	 * if someone has already awakened waiters that haven't yet acquired the
 	 * lock.
 	 */
-	head = lock->head;
-	if (head != NULL)
+	if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
 	{
-		if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
+		/*
+		 * Remove the to-be-awakened PGPROCs from the queue.
+		 */
+		bool		releaseOK = true;
+		bool		wokeup_somebody = false;
+
+		dlist_foreach_modify(iter, &lock->waiters)
 		{
-			/*
-			 * Remove the to-be-awakened PGPROCs from the queue.
-			 */
-			bool		releaseOK = true;
+			PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
 
-			proc = head;
+			if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
+				continue;
 
-			/*
-			 * First wake up any backends that want to be woken up without
-			 * acquiring the lock.
-			 */
-			while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink)
-				proc = proc->lwWaitLink;
+			dlist_delete(&waiter->lwWaitLink);
+			dlist_push_tail(&wakeup, &waiter->lwWaitLink);
 
 			/*
-			 * If the front waiter wants exclusive lock, awaken him only.
-			 * Otherwise awaken as many waiters as want shared access.
+			 * Prevent additional wakeups until retryer gets to
+			 * run. Backends that are just waiting for the lock to become
+			 * free don't retry automatically.
 			 */
-			if (proc->lwWaitMode != LW_EXCLUSIVE)
+			if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
 			{
-				while (proc->lwWaitLink != NULL &&
-					   proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE)
-				{
-					if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
-						releaseOK = false;
-					proc = proc->lwWaitLink;
-				}
-			}
-			/* proc is now the last PGPROC to be released */
-			lock->head = proc->lwWaitLink;
-			proc->lwWaitLink = NULL;
-
-			/*
-			 * Prevent additional wakeups until retryer gets to run. Backends
-			 * that are just waiting for the lock to become free don't retry
-			 * automatically.
-			 */
-			if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE)
 				releaseOK = false;
+				wokeup_somebody = true;
+			}
 
-			lock->releaseOK = releaseOK;
-		}
-		else
-		{
-			/* lock is still held, can't awaken anything */
-			head = NULL;
+			if(waiter->lwWaitMode == LW_EXCLUSIVE)
+				break;
 		}
+		lock->releaseOK = releaseOK;
 	}
 
 	/* We are done updating shared state of the lock itself. */
@@ -1239,15 +1200,15 @@ LWLockRelease(LWLock *lock)
 	/*
 	 * Awaken any waiters I removed from the queue.
 	 */
-	while (head != NULL)
+	dlist_foreach_modify(iter, &wakeup)
 	{
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
 		LOG_LWDEBUG("LWLockRelease", T_NAME(lock), T_ID(lock),
 					"release waiter");
-		proc = head;
-		head = proc->lwWaitLink;
-		proc->lwWaitLink = NULL;
-		proc->lwWaiting = false;
-		PGSemaphoreUnlock(&proc->sem);
+		dlist_delete(&waiter->lwWaitLink);
+		pg_write_barrier();
+		waiter->lwWaiting = false;
+		PGSemaphoreUnlock(&waiter->sem);
 	}
 
 	/*
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index ea88a24..a4789fc 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -372,7 +372,6 @@ InitProcess(void)
 		MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM;
 	MyProc->lwWaiting = false;
 	MyProc->lwWaitMode = 0;
-	MyProc->lwWaitLink = NULL;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
 #ifdef USE_ASSERT_CHECKING
@@ -535,7 +534,6 @@ InitAuxiliaryProcess(void)
 	MyPgXact->vacuumFlags = 0;
 	MyProc->lwWaiting = false;
 	MyProc->lwWaitMode = 0;
-	MyProc->lwWaitLink = NULL;
 	MyProc->waitLock = NULL;
 	MyProc->waitProcLock = NULL;
 #ifdef USE_ASSERT_CHECKING
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 02c8f1a..fea5d33 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -14,6 +14,7 @@
 #ifndef LWLOCK_H
 #define LWLOCK_H
 
+#include "lib/ilist.h"
 #include "storage/s_lock.h"
 
 struct PGPROC;
@@ -50,9 +51,7 @@ typedef struct LWLock
 	char		exclusive;		/* # of exclusive holders (0 or 1) */
 	int			shared;			/* # of shared holders (0..MaxBackends) */
 	int			tranche;		/* tranche ID */
-	struct PGPROC *head;		/* head of list of waiting PGPROCs */
-	struct PGPROC *tail;		/* tail of list of waiting PGPROCs */
-	/* tail is undefined when head is NULL */
+	dlist_head	waiters;		/* list of waiting PGPROCs */
 } LWLock;
 
 /*
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index c23f4da..38758d3 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -15,6 +15,7 @@
 #define _PROC_H_
 
 #include "access/xlogdefs.h"
+#include "lib/ilist.h"
 #include "storage/latch.h"
 #include "storage/lock.h"
 #include "storage/pg_sema.h"
@@ -104,7 +105,7 @@ struct PGPROC
 	/* Info about LWLock the process is currently waiting for, if any. */
 	bool		lwWaiting;		/* true if waiting for an LW lock */
 	uint8		lwWaitMode;		/* lwlock mode being waited for */
-	struct PGPROC *lwWaitLink;	/* next waiter for same LW lock */
+	dlist_node	lwWaitLink;		/* position in LW lock wait list */
 
 	/* Info about lock the process is currently waiting for, if any. */
 	/* waitLock and waitProcLock are NULL if not currently waiting. */
-- 
1.8.3.251.g1462b67

>From cbd80574f5dfe631f492dd497700dd05b211b3e3 Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Thu, 18 Sep 2014 16:14:16 +0200
Subject: [PATCH 2/2] Wait free LW_SHARED LWLock acquisition.

The old LWLock implementation had the problem that concurrent shared
lock acquisitions required exclusively acquiring a spinlock. Often
that could lead to acquirers waiting behind the spinlock, even if the
actual LWLock was free.

The new implementation doesn't acquire the spinlock when acquiring the
lock itself. Instead the new atomic operations are used to atomically
manipulate the state. Only the waitqueue, only used in the slow path,
is still protected by the spinlock. Check lwlock.c's header for an
explanation about the used algorithm.

For some common workloads on larger machines this can yield
significant performance improvements. Particularly in read mostly
workloads.

Reviewed-By: Amit Kapila
Author: Andres Freund

Discussion: 20130926225545.gb26...@awork2.anarazel.de
---
 src/backend/storage/lmgr/lwlock.c | 968 +++++++++++++++++++++++++++-----------
 src/include/storage/lwlock.h      |   9 +-
 2 files changed, 704 insertions(+), 273 deletions(-)

diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index e6f9158..b10c121 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -24,6 +24,78 @@
  * IDENTIFICATION
  *	  src/backend/storage/lmgr/lwlock.c
  *
+ * NOTES:
+ *
+ * This used to be a pretty straight forward reader-writer lock
+ * implementation, in which the internal state was protected by a
+ * spinlock. Unfortunately the overhead of taking the spinlock proved to be
+ * too high for workloads/locks that were locked in shared mode very
+ * frequently. Often we were spinning in the (obviously exlusive) spinlock,
+ * while trying to acquire a shared lock that was actually free.
+ *
+ * Thus a new implementation was devised that provides wait-free shared lock
+ * acquiration for locks that aren't exclusively locked.
+ *
+ * The basic idea is to have a single atomic variable 'lockcount' instead of
+ * the formerly separate shared and exclusive counters and to use an atomic
+ * increment to acquire the lock. That's fairly easy to do for rw-spinlocks,
+ * but a lot harder for something like LWLocks that want to wait in the OS.
+ *
+ * For exlusive lock acquisition we use an atomic compare-and-exchange on the
+ * lockcount variable swapping in EXCLUSIVE_LOCK/1<<31-1/0x7FFFFFFF if and only
+ * if the current value of lockcount is 0. If the swap was not successfull, we
+ * have to wait.
+ *
+ * For shared lock acquisition we use an atomic add to the lockcount variable
+ * to add 1. If the value is bigger than EXCLUSIVE_LOCK we know that somebody
+ * actually has an exclusive lock, and we back out by atomically decrementing
+ * by 1 again. If so, we have to wait for the exlusive locker to release the
+ * lock.
+ *
+ * To release the lock we use an atomic decrement to release the lock. If the
+ * new value is zero (we get that atomically), we know we have to release
+ * waiters.
+ *
+ * The attentive reader probably might have noticed that naively doing the
+ * above has two glaring race conditions:
+ *
+ * 1) too-quick-for-queueing: We try to lock using the atomic operations and
+ * notice that we have to wait. Unfortunately until we have finished queuing,
+ * the former locker very well might have already finished it's work. That's
+ * problematic because we're now stuck waiting inside the OS.
+ *
+ * 2) spurious failed locks: Due to the logic of backing out of shared
+ * locks after we unconditionally added a 1 to lockcount, we might have
+ * prevented another exclusive locker from getting the lock:
+ *   1) Session A: LWLockAcquire(LW_EXCLUSIVE) - success
+ *   2) Session B: LWLockAcquire(LW_SHARED) - lockcount += 1
+ *   3) Session B: LWLockAcquire(LW_SHARED) - oops, bigger than EXCLUSIVE_LOCK
+ *   4) Session A: LWLockRelease()
+ *   5) Session C: LWLockAcquire(LW_EXCLUSIVE) - check if lockcount = 0, no. wait.
+ *   6) Session B: LWLockAcquire(LW_SHARED) - lockcount -= 1
+ *   7) Session B: LWLockAcquire(LW_SHARED) - wait
+ *
+ * So we'd now have both B) and C) waiting on a lock that nobody is holding
+ * anymore. Not good.
+ *
+ * To mitigate those races we use a two phased attempt at locking:
+ *   Phase 1: Try to do it atomically, if we succeed, nice
+ *   Phase 2: Add us too the waitqueue of the lock
+ *   Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
+ *            the queue
+ *   Phase 4: Sleep till wakeup, goto Phase 1
+ *
+ * This protects us against both problems from above:
+ * 1) Nobody can release too quick, before we're queued, since after Phase 2
+ *    we're already queued.
+ * 2) If somebody spuriously got blocked from acquiring the lock, they will
+ *    get queued in Phase 2 and we can wake them up if neccessary or they will
+ *    have gotten the lock in Phase 3.
+ *
+ * There above algorithm only works for LWLockAcquire, not directly for
+ * LWLockAcquireConditional where we don't want to wait. In that case we just
+ * need to retry acquiring the lock until we're sure we didn't disturb anybody
+ * in doing so.
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -35,7 +107,6 @@
 #include "miscadmin.h"
 #include "pg_trace.h"
 #include "replication/slot.h"
-#include "storage/barrier.h"
 #include "storage/ipc.h"
 #include "storage/predicate.h"
 #include "storage/proc.h"
@@ -50,6 +121,11 @@
 /* We use the ShmemLock spinlock to protect LWLockAssign */
 extern slock_t *ShmemLock;
 
+#define EXCLUSIVE_LOCK (((uint32) 1) << (31 - 1))
+
+/* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
+#define SHARED_LOCK_MASK (~EXCLUSIVE_LOCK)
+
 /*
  * This is indexed by tranche ID and stores metadata for all tranches known
  * to the current backend.
@@ -80,8 +156,14 @@ static LWLockTranche MainLWLockTranche;
  */
 #define MAX_SIMUL_LWLOCKS	200
 
+typedef struct LWLockHandle
+{
+	LWLock *lock;
+	LWLockMode	mode;
+} LWLockHandle;
+
 static int	num_held_lwlocks = 0;
-static LWLock *held_lwlocks[MAX_SIMUL_LWLOCKS];
+static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
 
 static int	lock_addin_request = 0;
 static bool lock_addin_request_allowed = true;
@@ -100,8 +182,10 @@ typedef struct lwlock_stats
 {
 	lwlock_stats_key key;
 	int			sh_acquire_count;
+	int			sh_attempt_backout;
 	int			ex_acquire_count;
 	int			block_count;
+	int			dequeue_self_count;
 	int			spin_delay_count;
 }	lwlock_stats;
 
@@ -113,24 +197,32 @@ static lwlock_stats lwlock_stats_dummy;
 bool		Trace_lwlocks = false;
 
 inline static void
-PRINT_LWDEBUG(const char *where, const LWLock *lock)
+PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
 {
 	if (Trace_lwlocks)
-		elog(LOG, "%s(%s %d): excl %d shared %d rOK %d",
+	{
+		uint32 lockcount = pg_atomic_read_u32(&lock->lockcount);
+
+		elog(LOG, "%d: %s(%s %d): excl %u shared %u waiters %u rOK %d\n",
+			 MyProcPid,
 			 where, T_NAME(lock), T_ID(lock),
-			 (int) lock->exclusive, lock->shared,
+			 lockcount >= EXCLUSIVE_LOCK,
+			 lockcount & SHARED_LOCK_MASK,
+			 pg_atomic_read_u32(&lock->nwaiters),
 			 (int) lock->releaseOK);
+	}
 }
 
 inline static void
-LOG_LWDEBUG(const char *where, const char *name, int index, const char *msg)
+LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
 {
 	if (Trace_lwlocks)
-		elog(LOG, "%s(%s %d): %s", where, name, index, msg);
+		elog(LOG, "%s(%s %d): %s", where, T_NAME(lock), T_ID(lock), msg);
 }
+
 #else							/* not LOCK_DEBUG */
-#define PRINT_LWDEBUG(a,b)
-#define LOG_LWDEBUG(a,b,c,d)
+#define PRINT_LWDEBUG(a,b,c)
+#define LOG_LWDEBUG(a,b,c)
 #endif   /* LOCK_DEBUG */
 
 #ifdef LWLOCK_STATS
@@ -192,11 +284,12 @@ print_lwlock_stats(int code, Datum arg)
 	while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
 	{
 		fprintf(stderr,
-			  "PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u\n",
+				"PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u backout %u dequeue self %u\n",
 				MyProcPid, LWLockTrancheArray[lwstats->key.tranche]->name,
 				lwstats->key.instance, lwstats->sh_acquire_count,
 				lwstats->ex_acquire_count, lwstats->block_count,
-				lwstats->spin_delay_count);
+				lwstats->spin_delay_count, lwstats->sh_attempt_backout,
+				lwstats->dequeue_self_count);
 	}
 
 	LWLockRelease(&MainLWLockArray[0].lock);
@@ -224,8 +317,10 @@ get_lwlock_stats_entry(LWLock *lock)
 	if (!found)
 	{
 		lwstats->sh_acquire_count = 0;
+		lwstats->sh_attempt_backout = 0;
 		lwstats->ex_acquire_count = 0;
 		lwstats->block_count = 0;
+		lwstats->dequeue_self_count = 0;
 		lwstats->spin_delay_count = 0;
 	}
 	return lwstats;
@@ -473,12 +568,309 @@ LWLockInitialize(LWLock *lock, int tranche_id)
 {
 	SpinLockInit(&lock->mutex);
 	lock->releaseOK = true;
-	lock->exclusive = 0;
-	lock->shared = 0;
+	pg_atomic_init_u32(&lock->lockcount, 0);
+	pg_atomic_init_u32(&lock->nwaiters, 0);
 	lock->tranche = tranche_id;
 	dlist_init(&lock->waiters);
 }
 
+/*
+ * Internal function that tries to atomically acquire the lwlock in the passed
+ * in mode.
+ *
+ * This function will not block waiting for a lock to become free - that's the
+ * callers job.
+ *
+ * Returns true if the lock isn't free and we need to wait.
+ *
+ * When acquiring shared locks it's possible that we disturb an exclusive
+ * waiter. If that's a problem for the specific user, pass in a valid pointer
+ * for 'potentially_spurious'. Its value will be set to true if we possibly
+ * did so. The caller then has to handle that scenario.
+ */
+static bool
+LWLockAttemptLock(LWLock* lock, LWLockMode mode, bool *potentially_spurious)
+{
+	uint32		oldstate;
+#ifdef LWLOCK_STATS
+	lwlock_stats *lwstats;
+
+	lwstats = get_lwlock_stats_entry(lock);
+#endif
+
+	AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
+
+	if (potentially_spurious != NULL)
+		*potentially_spurious = false;
+
+	if (mode == LW_EXCLUSIVE)
+	{
+		uint32 expected;
+
+		/*
+		 * First check whether the variable is free without a atomic
+		 * operation; it's often quite a bit cheaper for contended
+		 * locks. Doing so can cause a superflous shared-exclusive cacheline
+		 * transition, but benchmarks show that it's still worth doing so.
+		 */
+		expected = pg_atomic_read_u32(&lock->lockcount);
+
+		if (expected != 0)
+			return true;
+		else if (!pg_atomic_compare_exchange_u32(&lock->lockcount,
+												 &expected, EXCLUSIVE_LOCK))
+		{
+			/*
+			 * No can do. Between the pg_atomic_read() above and the CAS
+			 * somebody else acquired the lock.
+			 */
+			return true;
+		}
+		else
+		{
+			/* yipeyyahee */
+#ifdef LOCK_DEBUG
+			lock->owner = MyProc;
+#endif
+			Assert(expected == 0);
+			return false;
+		}
+	}
+	else
+	{
+		/*
+		 * If the caller is interested in spurious locks, do an unlocked check
+		 * first.  This is useful if potentially spurious results have a
+		 * noticeable cost.
+		 */
+		if (potentially_spurious != NULL &&
+			pg_atomic_read_u32(&lock->lockcount) >= EXCLUSIVE_LOCK)
+			return true;
+
+		/*
+		 * Acquire the share lock unconditionally using an atomic addition. We
+		 * might have to back out again if it turns out somebody else has an
+		 * exclusive lock.
+		 */
+		oldstate = pg_atomic_fetch_add_u32(&lock->lockcount, 1);
+
+		if (oldstate >= EXCLUSIVE_LOCK)
+		{
+			/*
+			 * Ok, somebody else holds the lock exclusively. We need to back
+			 * away from the shared lock, since we don't actually hold it right
+			 * now.  Since there's a window between lockcount += 1 and lockcount
+			 * -= 1, the previous exclusive locker could have released and
+			 * another exclusive locker could have seen our +1. We need to
+			 * signal that to the upper layers so they can deal with the race
+			 * condition.
+			 */
+
+			/*
+			 * XXX: We could check the return value if (double_check), it's
+			 * not spurious if still exclusively locked. Should work for the
+			 * current callers. There might be some cases where ABA like
+			 * problems exist.
+			 */
+			pg_atomic_fetch_sub_u32(&lock->lockcount, 1);
+
+
+			if (potentially_spurious != NULL)
+				*potentially_spurious = true;
+#ifdef LWLOCK_STATS
+			lwstats->sh_attempt_backout++;
+#endif
+			return true;
+		}
+		else
+		{
+			/* yipeyyahee */
+			return false;
+		}
+	}
+
+	pg_unreachable();
+}
+
+/*
+ * Wakeup all the lockers that currently have a chance to run.
+ */
+static void
+LWLockWakeup(LWLock *lock, LWLockMode released_mode)
+{
+	bool		releaseOK;
+	bool		wokeup_somebody = false;
+	dlist_head	wakeup;
+	dlist_mutable_iter iter;
+#ifdef LWLOCK_STATS
+	lwlock_stats *lwstats;
+
+	lwstats = get_lwlock_stats_entry(lock);
+#endif
+
+	dlist_init(&wakeup);
+
+	/* remove the to-be-awakened PGPROCs from the queue */
+	releaseOK = true;
+
+	/* Acquire mutex.  Time spent holding mutex should be short! */
+#ifdef LWLOCK_STATS
+	lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+	SpinLockAcquire(&lock->mutex);
+#endif
+
+	/*
+	 * We're still waiting for backends to get scheduled, don't wake them up
+	 * again.
+	 */
+	if (!lock->releaseOK)
+	{
+		SpinLockRelease(&lock->mutex);
+		LOG_LWDEBUG("LWLockRelease", lock, "skipping due to releaseOK");
+		return;
+	}
+
+	dlist_foreach_modify(iter, &lock->waiters)
+	{
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+
+		if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
+			continue;
+
+		dlist_delete(&waiter->lwWaitLink);
+		dlist_push_tail(&wakeup, &waiter->lwWaitLink);
+
+		if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
+		{
+			/*
+			 * Prevent additional wakeups until retryer gets to run. Backends
+			 * that are just waiting for the lock to become free don't retry
+			 * automatically.
+			 */
+			releaseOK = false;
+			/*
+			 * Don't wakeup (further) exclusive locks.
+			 */
+			wokeup_somebody = true;
+		}
+
+		/*
+		 * Once we've woken up an exclusive lock, there's no point in waking
+		 * up anybody else.
+		 */
+		if(waiter->lwWaitMode == LW_EXCLUSIVE)
+			break;
+	}
+	lock->releaseOK = releaseOK;
+
+
+	/* We are done updating shared state of the lock queue. */
+	SpinLockRelease(&lock->mutex);
+
+	/*
+	 * Awaken any waiters I removed from the queue.
+	 */
+	dlist_foreach_modify(iter, &wakeup)
+	{
+		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
+
+		LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
+		dlist_delete(&waiter->lwWaitLink);
+		pg_write_barrier();
+		waiter->lwWaiting = false;
+		PGSemaphoreUnlock(&waiter->sem);
+	}
+}
+
+/*
+ * Add ourselves to the end of the queue. Mode can be LW_WAIT_UNTIL_FREE here!
+ */
+static void
+LWLockQueueSelf(LWLock *lock, LWLockMode mode)
+{
+#ifdef LWLOCK_STATS
+	lwlock_stats *lwstats;
+
+	lwstats = get_lwlock_stats_entry(lock);
+#endif
+
+	/*
+	 * If we don't have a PGPROC structure, there's no way to wait. This
+	 * should never occur, since MyProc should only be null during shared
+	 * memory initialization.
+	 */
+	if (MyProc == NULL)
+		elog(PANIC, "cannot wait without a PGPROC structure");
+
+	pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
+
+#ifdef LWLOCK_STATS
+	lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+	SpinLockAcquire(&lock->mutex);
+#endif
+
+	if (MyProc->lwWaiting)
+		elog(PANIC, "queueing for lock while waiting on another one");
+
+	MyProc->lwWaiting = true;
+	MyProc->lwWaitMode = mode;
+
+	/* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
+	if (mode == LW_WAIT_UNTIL_FREE)
+		dlist_push_head(&lock->waiters, &MyProc->lwWaitLink);
+	else
+		dlist_push_tail(&lock->waiters, &MyProc->lwWaitLink);
+
+	/* Can release the mutex now */
+	SpinLockRelease(&lock->mutex);
+}
+
+/*
+ * Remove ourselves from the waitlist.  This is used if we queued ourselves
+ * because we thought we needed to sleep but, after further checking, we
+ * discovered that we don't actually need to do so. Somebody else might have
+ * already woken us up though, in that case return false.
+ */
+static bool
+LWLockDequeueSelf(LWLock *lock)
+{
+	bool	found = false;
+	dlist_mutable_iter iter;
+#ifdef LWLOCK_STATS
+	lwlock_stats *lwstats;
+
+	lwstats = get_lwlock_stats_entry(lock);
+#endif
+
+#ifdef LWLOCK_STATS
+	lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+	SpinLockAcquire(&lock->mutex);
+#endif
+
+	/* need to iterate, somebody else could have unqueued us */
+	dlist_foreach_modify(iter, &lock->waiters)
+	{
+		PGPROC *proc = dlist_container(PGPROC, lwWaitLink, iter.cur);
+		if (proc == MyProc)
+		{
+			found = true;
+			dlist_delete(&proc->lwWaitLink);
+			break;
+		}
+	}
+
+	/* clear waiting state again, nice for debugging */
+	if (found)
+		MyProc->lwWaiting = false;
+
+	SpinLockRelease(&lock->mutex);
+
+	pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
+	return found;
+}
 
 /*
  * LWLockAcquire - acquire a lightweight lock in the specified mode
@@ -510,18 +902,19 @@ static inline bool
 LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 {
 	PGPROC	   *proc = MyProc;
-	bool		retry = false;
 	bool		result = true;
 	int			extraWaits = 0;
 #ifdef LWLOCK_STATS
 	lwlock_stats *lwstats;
+
+	lwstats = get_lwlock_stats_entry(lock);
 #endif
 
-	PRINT_LWDEBUG("LWLockAcquire", lock);
+	AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
 
-#ifdef LWLOCK_STATS
-	lwstats = get_lwlock_stats_entry(lock);
+	PRINT_LWDEBUG("LWLockAcquire", lock, mode);
 
+#ifdef LWLOCK_STATS
 	/* Count lock acquisition attempts */
 	if (mode == LW_EXCLUSIVE)
 		lwstats->ex_acquire_count++;
@@ -567,58 +960,78 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 	{
 		bool		mustwait;
 
-		/* Acquire mutex.  Time spent holding mutex should be short! */
-#ifdef LWLOCK_STATS
-		lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
-#else
-		SpinLockAcquire(&lock->mutex);
-#endif
-
-		/* If retrying, allow LWLockRelease to release waiters again */
-		if (retry)
-			lock->releaseOK = true;
+		/*
+		 * try to grab the lock the first time, we're not in the waitqueue yet.
+		 */
+		mustwait = LWLockAttemptLock(lock, mode, NULL);
 
-		/* If I can get the lock, do so quickly. */
-		if (mode == LW_EXCLUSIVE)
+		if (!mustwait)
 		{
-			if (lock->exclusive == 0 && lock->shared == 0)
-			{
-				lock->exclusive++;
-				mustwait = false;
-			}
-			else
-				mustwait = true;
+			/* XXX: remove before commit? */
+			LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
+			break;				/* got the lock */
 		}
-		else
+
+		/*
+		 * Ok, at this point we couldn't grab the lock on the first try. We
+		 * cannot simply queue ourselves to the end of the list and wait to be
+		 * woken up because by now the lock could long have been released.
+		 * Instead add us to the queue and try to grab the lock again. If we
+		 * succeed we need to revert the queuing and be happy, otherwise we
+		 * recheck the lock. If we still couldn't grab it, we know that the
+		 * other lock will see our queue entries when releasing since they
+		 * existed before we checked for the lock.
+		 */
+
+		/* add to the queue */
+		LWLockQueueSelf(lock, mode);
+
+		/* we're now guaranteed to be woken up if necessary */
+		mustwait = LWLockAttemptLock(lock, mode, NULL);
+
+		/* ok, grabbed the lock the second time round, need to undo queueing */
+		if (!mustwait)
 		{
-			if (lock->exclusive == 0)
+			LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
+
+#ifdef LWLOCK_STATS
+			lwstats->dequeue_self_count++;
+#endif
+			if (!LWLockDequeueSelf(lock))
 			{
-				lock->shared++;
-				mustwait = false;
+				/*
+				 * Somebody else dequeued us and has or will wake us up. Wait
+				 * for the correct wakeup, otherwise our ->lwWaiting would get
+				 * reset at some inconvenient point later, and releaseOk
+				 * wouldn't be managed correctly.
+				 */
+				for (;;)
+				{
+					PGSemaphoreLock(&proc->sem, false);
+					if (!proc->lwWaiting)
+						break;
+					extraWaits++;
+				}
+				/*
+				 * Reset releaseOk - if somebody woke us they'll have set it
+				 * to false.
+				 */
+#ifdef LWLOCK_STATS
+				lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+				SpinLockAcquire(&lock->mutex);
+#endif
+				lock->releaseOK = true;
+				SpinLockRelease(&lock->mutex);
 			}
-			else
-				mustwait = true;
+			break;
 		}
 
-		if (!mustwait)
-			break;				/* got the lock */
-
 		/*
-		 * Add myself to wait queue.
-		 *
-		 * If we don't have a PGPROC structure, there's no way to wait. This
-		 * should never occur, since MyProc should only be null during shared
-		 * memory initialization.
+		 * NB: There's no need to deal with spurious lock attempts
+		 * here. Anyone we prevented from acquiring the lock will
+		 * enqueue themselves using the same protocol we used here.
 		 */
-		if (proc == NULL)
-			elog(PANIC, "cannot wait without a PGPROC structure");
-
-		proc->lwWaiting = true;
-		proc->lwWaitMode = mode;
-		dlist_push_head(&lock->waiters, &proc->lwWaitLink);
-
-		/* Can release the mutex now */
-		SpinLockRelease(&lock->mutex);
 
 		/*
 		 * Wait until awakened.
@@ -632,7 +1045,7 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 		 * so that the lock manager or signal manager will see the received
 		 * signal when it next waits.
 		 */
-		LOG_LWDEBUG("LWLockAcquire", T_NAME(lock), T_ID(lock), "waiting");
+		LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
 
 #ifdef LWLOCK_STATS
 		lwstats->block_count++;
@@ -649,12 +1062,22 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 			extraWaits++;
 		}
 
+		/* not waiting anymore */
+		pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
+
 		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
 
-		LOG_LWDEBUG("LWLockAcquire", T_NAME(lock), T_ID(lock), "awakened");
+		LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
 
 		/* Now loop back and try to acquire lock again. */
-		retry = true;
+#ifdef LWLOCK_STATS
+		lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
+		SpinLockAcquire(&lock->mutex);
+#endif
+		lock->releaseOK = true;
+		SpinLockRelease(&lock->mutex);
+
 		result = false;
 	}
 
@@ -662,13 +1085,11 @@ LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
 	if (valptr)
 		*valptr = val;
 
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
-
 	TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), mode);
 
 	/* Add lock to list of locks held by this backend */
-	held_lwlocks[num_held_lwlocks++] = lock;
+	held_lwlocks[num_held_lwlocks].lock = lock;
+	held_lwlocks[num_held_lwlocks++].mode = mode;
 
 	/*
 	 * Fix the process wait semaphore's count for any absorbed wakeups.
@@ -690,8 +1111,11 @@ bool
 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
 {
 	bool		mustwait;
+	bool		potentially_spurious;
 
-	PRINT_LWDEBUG("LWLockConditionalAcquire", lock);
+	AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
+
+	PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
 
 	/* Ensure we will have room to remember the lock */
 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
@@ -704,50 +1128,44 @@ LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
 	 */
 	HOLD_INTERRUPTS();
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
+retry:
+	/*
+	 * We need to check whether it's possible that we prevented somebody else
+	 * from acquiring the lock. If so, potentially_spurious will be set, and
+	 * we'll retry.
+	 */
+	mustwait = LWLockAttemptLock(lock, mode, &potentially_spurious);
 
-	/* If I can get the lock, do so quickly. */
-	if (mode == LW_EXCLUSIVE)
-	{
-		if (lock->exclusive == 0 && lock->shared == 0)
-		{
-			lock->exclusive++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
-	else
+	if (mustwait)
 	{
-		if (lock->exclusive == 0)
+		/*
+		 * We ran into an exclusive lock and might have blocked another
+		 * exclusive lock from taking a shot because it took a time to back
+		 * off. Retry till we are either sure we didn't block somebody (because
+		 * somebody else certainly has the lock) or till we got it.
+		 *
+		 * We cannot rely on the two-step lock-acquisition protocol as in
+		 * LWLockAcquire because we're not using it.
+		 */
+		if (potentially_spurious)
 		{
-			lock->shared++;
-			mustwait = false;
+			SPIN_DELAY();
+			goto retry;
 		}
-		else
-			mustwait = true;
-	}
 
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
-
-	if (mustwait)
-	{
 		/* Failed to get lock, so release interrupt holdoff */
 		RESUME_INTERRUPTS();
-		LOG_LWDEBUG("LWLockConditionalAcquire",
-					T_NAME(lock), T_ID(lock), "failed");
-		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock),
-												 T_ID(lock), mode);
+
+		LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
+		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), T_ID(lock), mode);
 	}
 	else
 	{
 		/* Add lock to list of locks held by this backend */
-		held_lwlocks[num_held_lwlocks++] = lock;
+		held_lwlocks[num_held_lwlocks].lock = lock;
+		held_lwlocks[num_held_lwlocks++].mode = mode;
 		TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), T_ID(lock), mode);
 	}
-
 	return !mustwait;
 }
 
@@ -773,14 +1191,14 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
 	int			extraWaits = 0;
 #ifdef LWLOCK_STATS
 	lwlock_stats *lwstats;
-#endif
-
-	PRINT_LWDEBUG("LWLockAcquireOrWait", lock);
 
-#ifdef LWLOCK_STATS
 	lwstats = get_lwlock_stats_entry(lock);
 #endif
 
+	Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
+
+	PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
+
 	/* Ensure we will have room to remember the lock */
 	if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
 		elog(ERROR, "too many LWLocks taken");
@@ -792,81 +1210,61 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
 	 */
 	HOLD_INTERRUPTS();
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
-
-	/* If I can get the lock, do so quickly. */
-	if (mode == LW_EXCLUSIVE)
-	{
-		if (lock->exclusive == 0 && lock->shared == 0)
-		{
-			lock->exclusive++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
-	else
-	{
-		if (lock->exclusive == 0)
-		{
-			lock->shared++;
-			mustwait = false;
-		}
-		else
-			mustwait = true;
-	}
+	/*
+	 * NB: We're using nearly the same twice-in-a-row lock acquisition
+	 * protocol as LWLockAcquire(). Check its comments for details.
+	 */
+	mustwait = LWLockAttemptLock(lock, mode, NULL);
 
 	if (mustwait)
 	{
-		/*
-		 * Add myself to wait queue.
-		 *
-		 * If we don't have a PGPROC structure, there's no way to wait.  This
-		 * should never occur, since MyProc should only be null during shared
-		 * memory initialization.
-		 */
-		if (proc == NULL)
-			elog(PANIC, "cannot wait without a PGPROC structure");
+		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
 
-		proc->lwWaiting = true;
-		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-		dlist_push_head(&lock->waiters, &proc->lwWaitLink);
+		mustwait = LWLockAttemptLock(lock, mode, NULL);
 
-		/* Can release the mutex now */
-		SpinLockRelease(&lock->mutex);
-
-		/*
-		 * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
-		 * wakups, because we share the semaphore with ProcWaitForSignal.
-		 */
-		LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(lock), T_ID(lock),
-					"waiting");
+		if (mustwait)
+		{
+			/*
+			 * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
+			 * wakups, because we share the semaphore with ProcWaitForSignal.
+			 */
+			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
 
 #ifdef LWLOCK_STATS
-		lwstats->block_count++;
+			lwstats->block_count++;
 #endif
+			TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode);
 
-		TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), mode);
+			for (;;)
+			{
+				/* "false" means cannot accept cancel/die interrupt here. */
+				PGSemaphoreLock(&proc->sem, false);
+				if (!proc->lwWaiting)
+					break;
+				extraWaits++;
+			}
+			pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
 
-		for (;;)
-		{
-			/* "false" means cannot accept cancel/die interrupt here. */
-			PGSemaphoreLock(&proc->sem, false);
-			if (!proc->lwWaiting)
-				break;
-			extraWaits++;
-		}
+			TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
 
-		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
+			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
+		}
+		else
+		{
+			LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
 
-		LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(lock), T_ID(lock),
-					"awakened");
-	}
-	else
-	{
-		/* We are done updating shared state of the lock itself. */
-		SpinLockRelease(&lock->mutex);
+			/* got lock in the second attempt, undo queueing */
+			if (!LWLockDequeueSelf(lock))
+			{
+				for (;;)
+				{
+					PGSemaphoreLock(&proc->sem, false);
+					if (!proc->lwWaiting)
+						break;
+					extraWaits++;
+				}
+			}
+		}
 	}
 
 	/*
@@ -879,16 +1277,17 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
 	{
 		/* Failed to get lock, so release interrupt holdoff */
 		RESUME_INTERRUPTS();
-		LOG_LWDEBUG("LWLockAcquireOrWait", T_NAME(lock), T_ID(lock), "failed");
+		LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
 		TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), T_ID(lock),
 													 mode);
 	}
 	else
 	{
+		LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
 		/* Add lock to list of locks held by this backend */
-		held_lwlocks[num_held_lwlocks++] = lock;
-		TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), T_ID(lock),
-												mode);
+		held_lwlocks[num_held_lwlocks].lock = lock;
+		held_lwlocks[num_held_lwlocks++].mode = mode;
+		TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), T_ID(lock), mode);
 	}
 
 	return !mustwait;
@@ -920,13 +1319,11 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 	bool		result = false;
 #ifdef LWLOCK_STATS
 	lwlock_stats *lwstats;
-#endif
 
-	PRINT_LWDEBUG("LWLockWaitForVar", lock);
-
-#ifdef LWLOCK_STATS
 	lwstats = get_lwlock_stats_entry(lock);
-#endif   /* LWLOCK_STATS */
+#endif
+
+	PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
 
 	/*
 	 * Quick test first to see if it the slot is free right now.
@@ -935,7 +1332,7 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 	 * barrier here as far as the current usage is concerned.  But that might
 	 * not be safe in general.
 	 */
-	if (lock->exclusive == 0)
+	if (pg_atomic_read_u32(&lock->lockcount) == 0)
 		return true;
 
 	/*
@@ -953,21 +1350,24 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 		bool		mustwait;
 		uint64		value;
 
-		/* Acquire mutex.  Time spent holding mutex should be short! */
+		mustwait = pg_atomic_read_u32(&lock->lockcount) != 0;
+
+		if (mustwait)
+		{
+			/*
+			 * Perform comparison using spinlock as we can't rely on atomic 64
+			 * bit reads/stores.
+			 */
 #ifdef LWLOCK_STATS
-		lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+			lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
 #else
-		SpinLockAcquire(&lock->mutex);
+			SpinLockAcquire(&lock->mutex);
 #endif
 
-		/* Is the lock now free, and if not, does the value match? */
-		if (lock->exclusive == 0)
-		{
-			result = true;
-			mustwait = false;
-		}
-		else
-		{
+			/*
+			 * XXX: We can significantly optimize this on platforms with 64bit
+			 * atomics.
+			 */
 			value = *valptr;
 			if (value != oldval)
 			{
@@ -977,21 +1377,58 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 			}
 			else
 				mustwait = true;
+			SpinLockRelease(&lock->mutex);
 		}
+		else
+			mustwait = false;
 
 		if (!mustwait)
 			break;				/* the lock was free or value didn't match */
 
 		/*
-		 * Add myself to wait queue.
+		 * Add myself to wait queue. Note that this is racy, somebody else
+		 * could wakeup before we're finished queuing.
+		 * NB: We're using nearly the same twice-in-a-row lock acquisition
+		 * protocol as LWLockAcquire(). Check its comments for details.
 		 */
-		proc->lwWaiting = true;
-		proc->lwWaitMode = LW_WAIT_UNTIL_FREE;
-		/* waiters are added to the front of the queue */
-		dlist_push_head(&lock->waiters, &proc->lwWaitLink);
+		LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
 
-		/* Can release the mutex now */
-		SpinLockRelease(&lock->mutex);
+		/*
+		 * We're now guaranteed to be woken up if necessary. Recheck the
+		 * lock's state.
+		 */
+		mustwait = pg_atomic_read_u32(&lock->lockcount) != 0;
+
+		/* ok, grabbed the lock the second time round, need to undo queueing */
+		if (!mustwait)
+		{
+#ifdef LWLOCK_STATS
+			lwstats->dequeue_self_count++;
+#endif
+			LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
+
+			if (!LWLockDequeueSelf(lock))
+			{
+				/*
+				 * Somebody else dequeued us and has or will wake us up. Wait
+				 * for the correct wakeup, otherwise our ->lwWaiting would get
+				 * reset at some inconvenient point later.
+				 */
+				for (;;)
+				{
+					PGSemaphoreLock(&proc->sem, false);
+					if (!proc->lwWaiting)
+						break;
+					extraWaits++;
+				}
+			}
+			break;
+		}
+
+		/*
+		 * NB: Just as in LWLockAcquireCommon() there's no need to deal with
+		 * spurious lock attempts here.
+		 */
 
 		/*
 		 * Wait until awakened.
@@ -1005,7 +1442,7 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 		 * so that the lock manager or signal manager will see the received
 		 * signal when it next waits.
 		 */
-		LOG_LWDEBUG("LWLockWaitForVar", T_NAME(lock), T_ID(lock), "waiting");
+		LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
 
 #ifdef LWLOCK_STATS
 		lwstats->block_count++;
@@ -1023,17 +1460,16 @@ LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
 			extraWaits++;
 		}
 
+		pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
+
 		TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock),
 										  LW_EXCLUSIVE);
 
-		LOG_LWDEBUG("LWLockWaitForVar", T_NAME(lock), T_ID(lock), "awakened");
+		LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
 
 		/* Now loop back and check the status of the lock again. */
 	}
 
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
-
 	TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), LW_EXCLUSIVE);
 
 	/*
@@ -1066,14 +1502,24 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 {
 	dlist_head	wakeup;
 	dlist_mutable_iter iter;
+#ifdef LWLOCK_STATS
+	lwlock_stats *lwstats;
+
+	lwstats = get_lwlock_stats_entry(lock);
+#endif
+
+	PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
 
 	dlist_init(&wakeup);
 
 	/* Acquire mutex.  Time spent holding mutex should be short! */
+#ifdef LWLOCK_STATS
+	lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
+#else
 	SpinLockAcquire(&lock->mutex);
+#endif
 
-	/* we should hold the lock */
-	Assert(lock->exclusive == 1);
+	Assert(pg_atomic_read_u32(&lock->lockcount) >= EXCLUSIVE_LOCK);
 
 	/* Update the lock's value */
 	*valptr = val;
@@ -1116,22 +1562,23 @@ LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
 void
 LWLockRelease(LWLock *lock)
 {
-	dlist_head	wakeup;
-	dlist_mutable_iter iter;
+	LWLockMode	mode;
+	uint32		lockcount;
+	bool		check_waiters;
+	bool		have_waiters = false;
 	int			i;
 
-	dlist_init(&wakeup);
-
-	PRINT_LWDEBUG("LWLockRelease", lock);
-
 	/*
 	 * Remove lock from list of locks held.  Usually, but not always, it will
 	 * be the latest-acquired lock; so search array backwards.
 	 */
 	for (i = num_held_lwlocks; --i >= 0;)
 	{
-		if (lock == held_lwlocks[i])
+		if (lock == held_lwlocks[i].lock)
+		{
+			mode = held_lwlocks[i].mode;
 			break;
+		}
 	}
 	if (i < 0)
 		elog(ERROR, "lock %s %d is not held", T_NAME(lock), T_ID(lock));
@@ -1139,78 +1586,57 @@ LWLockRelease(LWLock *lock)
 	for (; i < num_held_lwlocks; i++)
 		held_lwlocks[i] = held_lwlocks[i + 1];
 
-	/* Acquire mutex.  Time spent holding mutex should be short! */
-	SpinLockAcquire(&lock->mutex);
-
-	/* Release my hold on lock */
-	if (lock->exclusive > 0)
-		lock->exclusive--;
-	else
-	{
-		Assert(lock->shared > 0);
-		lock->shared--;
-	}
+	PRINT_LWDEBUG("LWLockRelease", lock, mode);
 
 	/*
-	 * See if I need to awaken any waiters.  If I released a non-last shared
-	 * hold, there cannot be anything to do.  Also, do not awaken any waiters
-	 * if someone has already awakened waiters that haven't yet acquired the
-	 * lock.
+	 * Release my hold on lock, after that it can immediately be acquired by
+	 * others, even if we still have to wakeup other waiters.
 	 */
-	if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
-	{
-		/*
-		 * Remove the to-be-awakened PGPROCs from the queue.
-		 */
-		bool		releaseOK = true;
-		bool		wokeup_somebody = false;
-
-		dlist_foreach_modify(iter, &lock->waiters)
-		{
-			PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
-
-			if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
-				continue;
-
-			dlist_delete(&waiter->lwWaitLink);
-			dlist_push_tail(&wakeup, &waiter->lwWaitLink);
-
-			/*
-			 * Prevent additional wakeups until retryer gets to
-			 * run. Backends that are just waiting for the lock to become
-			 * free don't retry automatically.
-			 */
-			if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
-			{
-				releaseOK = false;
-				wokeup_somebody = true;
-			}
+	if (mode == LW_EXCLUSIVE)
+		lockcount = pg_atomic_sub_fetch_u32(&lock->lockcount, EXCLUSIVE_LOCK);
+	else
+		lockcount = pg_atomic_sub_fetch_u32(&lock->lockcount, 1);
 
-			if(waiter->lwWaitMode == LW_EXCLUSIVE)
-				break;
-		}
-		lock->releaseOK = releaseOK;
-	}
+	/* nobody else can have that kind of lock */
+	Assert(lockcount < EXCLUSIVE_LOCK);
 
-	/* We are done updating shared state of the lock itself. */
-	SpinLockRelease(&lock->mutex);
+	/*
+	 * Anybody we need to wakeup needs to have started queueing before we
+	 * removed ourselves from the queue and the atomic operations above are
+	 * full barriers. So we can just do plain read.
+	 */
+	if (pg_atomic_read_u32(&lock->nwaiters) > 0)
+		have_waiters = true;
 
-	TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock), T_ID(lock));
+	/*
+	 * If we just released an exclusive lock always wakeup waiters, even if
+	 * lockcount is still > 0. A shared acquisition temporarily (and
+	 * spuriously) might have increased the value.
+	 */
+	if (mode == LW_EXCLUSIVE && have_waiters)
+		check_waiters = true;
+	/*
+	 * nobody has this locked anymore, potential exclusive lockers get a chance
+	 */
+	else if (lockcount == 0 && have_waiters)
+		check_waiters = true;
+	/* nobody queued or not free */
+	else
+		check_waiters = false;
 
 	/*
-	 * Awaken any waiters I removed from the queue.
+	 * As waking up waiters requires the spinlock to be acquired, only do so
+	 * if necessary.
 	 */
-	dlist_foreach_modify(iter, &wakeup)
+	if (check_waiters)
 	{
-		PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
-		LOG_LWDEBUG("LWLockRelease", T_NAME(lock), T_ID(lock),
-					"release waiter");
-		dlist_delete(&waiter->lwWaitLink);
-		pg_write_barrier();
-		waiter->lwWaiting = false;
-		PGSemaphoreUnlock(&waiter->sem);
+		/* XXX: remove before commit? */
+		LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
+		LWLockWakeup(lock, mode);
 	}
 
+	TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock), T_ID(lock));
+
 	/*
 	 * Now okay to allow cancel/die interrupts.
 	 */
@@ -1234,7 +1660,7 @@ LWLockReleaseAll(void)
 	{
 		HOLD_INTERRUPTS();		/* match the upcoming RESUME_INTERRUPTS */
 
-		LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
+		LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
 	}
 }
 
@@ -1242,8 +1668,8 @@ LWLockReleaseAll(void)
 /*
  * LWLockHeldByMe - test whether my process currently holds a lock
  *
- * This is meant as debug support only.  We do not distinguish whether the
- * lock is held shared or exclusive.
+ * This is meant as debug support only.  We currently do not distinguish
+ * whether the lock is held shared or exclusive.
  */
 bool
 LWLockHeldByMe(LWLock *l)
@@ -1252,7 +1678,7 @@ LWLockHeldByMe(LWLock *l)
 
 	for (i = 0; i < num_held_lwlocks; i++)
 	{
-		if (held_lwlocks[i] == l)
+		if (held_lwlocks[i].lock == l)
 			return true;
 	}
 	return false;
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index fea5d33..595e69d 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -16,6 +16,7 @@
 
 #include "lib/ilist.h"
 #include "storage/s_lock.h"
+#include "port/atomics.h"
 
 struct PGPROC;
 
@@ -48,10 +49,14 @@ typedef struct LWLock
 {
 	slock_t		mutex;			/* Protects LWLock and queue of PGPROCs */
 	bool		releaseOK;		/* T if ok to release waiters */
-	char		exclusive;		/* # of exclusive holders (0 or 1) */
-	int			shared;			/* # of shared holders (0..MaxBackends) */
+
+	pg_atomic_uint32 lockcount;	/* state of exlusive/nonexclusive lockers */
+	pg_atomic_uint32 nwaiters;	/* number of waiters */
 	int			tranche;		/* tranche ID */
 	dlist_head	waiters;		/* list of waiting PGPROCs */
+#ifdef LOCK_DEBUG
+	struct PGPROC *owner;		/* last exlusive owner of the lock */
+#endif
 } LWLock;
 
 /*
-- 
1.8.3.251.g1462b67

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to