From bae1bea8d1af58683a62203019b78a1541116747 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Sat, 4 Nov 2017 17:42:53 +0100
Subject: [PATCH 1/2] shm-mq-less-spinlocks-v2

---
 src/backend/storage/ipc/shm_mq.c | 237 +++++++++++++++++++--------------------
 1 file changed, 116 insertions(+), 121 deletions(-)

diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 770559a03e..75c6bbd4fb 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -31,27 +31,27 @@
  * Some notes on synchronization:
  *
  * mq_receiver and mq_bytes_read can only be changed by the receiver; and
- * mq_sender and mq_bytes_written can only be changed by the sender.  However,
- * because most of these fields are 8 bytes and we don't assume that 8 byte
- * reads and writes are atomic, the spinlock must be taken whenever the field
- * is updated, and whenever it is read by a process other than the one allowed
- * to modify it. But the process that is allowed to modify it is also allowed
- * to read it without the lock.  On architectures where 8-byte writes are
- * atomic, we could replace these spinlocks with memory barriers, but
- * testing found no performance benefit, so it seems best to keep things
- * simple for now.
+ * mq_sender and mq_bytes_written can only be changed by the sender.
+ * mq_receiver and mq_sender are protected by mq_mutex, although, importantly,
+ * they cannot change once set, and thus may be read without a lock once this
+ * is known to be the case.
  *
- * mq_detached can be set by either the sender or the receiver, so the mutex
- * must be held to read or write it.  Memory barriers could be used here as
- * well, if needed.
+ * mq_bytes_read and mq_bytes_written are not protected by the mutex.  Instead,
+ * they are written atomically using 8 byte loads and stores.  Memory barriers
+ * must be carefully used to synchronize reads and writes of these values with
+ * reads and writes of the actual data in mq_ring.
+ *
+ * mq_detached needs no locking.  It can be set by either the sender or the
+ * receiver, but only ever from false to true, so redundant writes don't
+ * matter.  It is important that if we set mq_detached and then set the
+ * counterparty's latch, the counterparty must be certain to see the change
+ * after waking up.  Since SetLatch begins with a memory barrier and ResetLatch
+ * ends with one, this should be OK.
  *
  * mq_ring_size and mq_ring_offset never change after initialization, and
  * can therefore be read without the lock.
  *
- * Importantly, mq_ring can be safely read and written without a lock.  Were
- * this not the case, we'd have to hold the spinlock for much longer
- * intervals, and performance might suffer.  Fortunately, that's not
- * necessary.  At any given time, the difference between mq_bytes_read and
+ * At any given time, the difference between mq_bytes_read and
  * mq_bytes_written defines the number of bytes within mq_ring that contain
  * unread data, and mq_bytes_read defines the position where those bytes
  * begin.  The sender can increase the number of unread bytes at any time,
@@ -71,8 +71,8 @@ struct shm_mq
 	slock_t		mq_mutex;
 	PGPROC	   *mq_receiver;
 	PGPROC	   *mq_sender;
-	uint64		mq_bytes_read;
-	uint64		mq_bytes_written;
+	pg_atomic_uint64 mq_bytes_read;
+	pg_atomic_uint64 mq_bytes_written;
 	Size		mq_ring_size;
 	bool		mq_detached;
 	uint8		mq_ring_offset;
@@ -150,11 +150,8 @@ static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
 						 BackgroundWorkerHandle *handle);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
 					 BackgroundWorkerHandle *handle);
-static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
 static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
-static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
 static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
-static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
 
 /* Minimum queue size is enough for header and at least one chunk of data. */
@@ -182,8 +179,8 @@ shm_mq_create(void *address, Size size)
 	SpinLockInit(&mq->mq_mutex);
 	mq->mq_receiver = NULL;
 	mq->mq_sender = NULL;
-	mq->mq_bytes_read = 0;
-	mq->mq_bytes_written = 0;
+	pg_atomic_init_u64(&mq->mq_bytes_read, 0);
+	pg_atomic_init_u64(&mq->mq_bytes_written, 0);
 	mq->mq_ring_size = size - data_offset;
 	mq->mq_detached = false;
 	mq->mq_ring_offset = data_offset - offsetof(shm_mq, mq_ring);
@@ -352,6 +349,7 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 {
 	shm_mq_result res;
 	shm_mq	   *mq = mqh->mqh_queue;
+	PGPROC	   *receiver;
 	Size		nbytes = 0;
 	Size		bytes_written;
 	int			i;
@@ -492,8 +490,30 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 	mqh->mqh_partial_bytes = 0;
 	mqh->mqh_length_word_complete = false;
 
+	/* If queue has been detached, let caller know. */
+	if (mq->mq_detached)
+		return SHM_MQ_DETACHED;
+
+	/*
+	 * If the counterpary is known to have attached, we can read mq_receiver
+	 * without acquiring the spinlock and assume it isn't NULL.  Otherwise,
+	 * more caution is needed.
+	 */
+	if (mqh->mqh_counterparty_attached)
+		receiver = mq->mq_receiver;
+	else
+	{
+		SpinLockAcquire(&mq->mq_mutex);
+		receiver = mq->mq_receiver;
+		SpinLockRelease(&mq->mq_mutex);
+		if (receiver == NULL)
+			return SHM_MQ_SUCCESS;
+		mqh->mqh_counterparty_attached = true;
+	}
+
 	/* Notify receiver of the newly-written data, and return. */
-	return shm_mq_notify_receiver(mq);
+	SetLatch(&receiver->procLatch);
+	return SHM_MQ_SUCCESS;
 }
 
 /*
@@ -848,18 +868,19 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 
 	while (sent < nbytes)
 	{
-		bool		detached;
 		uint64		rb;
+		uint64		wb;
 
 		/* Compute number of ring buffer bytes used and available. */
-		rb = shm_mq_get_bytes_read(mq, &detached);
-		Assert(mq->mq_bytes_written >= rb);
-		used = mq->mq_bytes_written - rb;
+		rb = pg_atomic_read_u64(&mq->mq_bytes_read);
+		wb = pg_atomic_read_u64(&mq->mq_bytes_written);
+		Assert(wb >= rb);
+		used = wb - rb;
 		Assert(used <= ringsize);
 		available = Min(ringsize - used, nbytes - sent);
 
 		/* Bail out if the queue has been detached. */
-		if (detached)
+		if (mq->mq_detached)
 		{
 			*bytes_written = sent;
 			return SHM_MQ_DETACHED;
@@ -900,15 +921,13 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 		}
 		else if (available == 0)
 		{
-			shm_mq_result res;
-
-			/* Let the receiver know that we need them to read some data. */
-			res = shm_mq_notify_receiver(mq);
-			if (res != SHM_MQ_SUCCESS)
-			{
-				*bytes_written = sent;
-				return res;
-			}
+			/*
+			 * Since mq->mqh_counterparty_attached is known to be true at this
+			 * point, mq_receiver has been set, and it can't change once set.
+			 * Therefore, we can read it without acquiring the spinlock.
+			 */
+			Assert(mqh->mqh_counterparty_attached);
+			SetLatch(&mq->mq_receiver->procLatch);
 
 			/* Skip manipulation of our latch if nowait = true. */
 			if (nowait)
@@ -934,10 +953,18 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 		}
 		else
 		{
-			Size		offset = mq->mq_bytes_written % (uint64) ringsize;
-			Size		sendnow = Min(available, ringsize - offset);
+			Size		offset;
+			Size		sendnow;
+
+			offset = wb % (uint64) ringsize;
+			sendnow = Min(available, ringsize - offset);
 
-			/* Write as much data as we can via a single memcpy(). */
+			/*
+			 * Write as much data as we can via a single memcpy(). Make sure
+			 * these writes happen after the read of mq_bytes_read, above.
+			 * This barrier pairs with the one in shm_mq_inc_bytes_read.
+			 */
+			pg_memory_barrier();
 			memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
 				   (char *) data + sent, sendnow);
 			sent += sendnow;
@@ -983,19 +1010,27 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
 	for (;;)
 	{
 		Size		offset;
-		bool		detached;
+		uint64		read;
 
 		/* Get bytes written, so we can compute what's available to read. */
-		written = shm_mq_get_bytes_written(mq, &detached);
-		used = written - mq->mq_bytes_read;
+		written = pg_atomic_read_u64(&mq->mq_bytes_written);
+		read = pg_atomic_read_u64(&mq->mq_bytes_read);
+		used = written - read;
 		Assert(used <= ringsize);
-		offset = mq->mq_bytes_read % (uint64) ringsize;
+		offset = read % (uint64) ringsize;
 
 		/* If we have enough data or buffer has wrapped, we're done. */
 		if (used >= bytes_needed || offset + used >= ringsize)
 		{
 			*nbytesp = Min(used, ringsize - offset);
 			*datap = &mq->mq_ring[mq->mq_ring_offset + offset];
+
+			/*
+			 * Separate the read of mq_bytes_written, above, from caller's
+			 * attempt to read the data itself.  Pairs with the barrier in
+			 * shm_mq_inc_bytes_written.
+			 */
+			pg_read_barrier();
 			return SHM_MQ_SUCCESS;
 		}
 
@@ -1007,7 +1042,7 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
 		 * receiving a message stored in the buffer even after the sender has
 		 * detached.
 		 */
-		if (detached)
+		if (mq->mq_detached)
 			return SHM_MQ_DETACHED;
 
 		/* Skip manipulation of our latch if nowait = true. */
@@ -1037,16 +1072,10 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
 static bool
 shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle)
 {
-	bool		detached;
 	pid_t		pid;
 
-	/* Acquire the lock just long enough to check the pointer. */
-	SpinLockAcquire(&mq->mq_mutex);
-	detached = mq->mq_detached;
-	SpinLockRelease(&mq->mq_mutex);
-
 	/* If the queue has been detached, counterparty is definitely gone. */
-	if (detached)
+	if (mq->mq_detached)
 		return true;
 
 	/* If there's a handle, check worker status. */
@@ -1059,9 +1088,7 @@ shm_mq_counterparty_gone(volatile shm_mq *mq, BackgroundWorkerHandle *handle)
 		if (status != BGWH_STARTED && status != BGWH_NOT_YET_STARTED)
 		{
 			/* Mark it detached, just to make it official. */
-			SpinLockAcquire(&mq->mq_mutex);
 			mq->mq_detached = true;
-			SpinLockRelease(&mq->mq_mutex);
 			return true;
 		}
 	}
@@ -1091,16 +1118,14 @@ shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
 	{
 		BgwHandleStatus status;
 		pid_t		pid;
-		bool		detached;
 
 		/* Acquire the lock just long enough to check the pointer. */
 		SpinLockAcquire(&mq->mq_mutex);
-		detached = mq->mq_detached;
 		result = (*ptr != NULL);
 		SpinLockRelease(&mq->mq_mutex);
 
 		/* Fail if detached; else succeed if initialized. */
-		if (detached)
+		if (mq->mq_detached)
 		{
 			result = false;
 			break;
@@ -1133,23 +1158,6 @@ shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
 }
 
 /*
- * Get the number of bytes read.  The receiver need not use this to access
- * the count of bytes read, but the sender must.
- */
-static uint64
-shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
-{
-	uint64		v;
-
-	SpinLockAcquire(&mq->mq_mutex);
-	v = mq->mq_bytes_read;
-	*detached = mq->mq_detached;
-	SpinLockRelease(&mq->mq_mutex);
-
-	return v;
-}
-
-/*
  * Increment the number of bytes read.
  */
 static void
@@ -1157,63 +1165,50 @@ shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
 {
 	PGPROC	   *sender;
 
-	SpinLockAcquire(&mq->mq_mutex);
-	mq->mq_bytes_read += n;
+	/*
+	 * Separate prior reads of mq_ring from the increment of mq_bytes_read
+	 * which follows.  Pairs with the full barrier in shm_mq_send_bytes().
+	 * We only need a read barrier here because the increment of mq_bytes_read
+	 * is actually a read followed by a dependent write.
+	 */
+	pg_read_barrier();
+
+	/*
+	 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
+	 * else can be changing this value.  This method should be cheaper.
+	 */
+	pg_atomic_write_u64(&mq->mq_bytes_read,
+						pg_atomic_read_u64(&mq->mq_bytes_read) + n);
+
+	/*
+	 * We shouldn't have any bytes to read without a sender, so we can read
+	 * mq_sender here without a lock.  Once it's initialized, it can't change.
+	 */
 	sender = mq->mq_sender;
-	SpinLockRelease(&mq->mq_mutex);
-
-	/* We shouldn't have any bytes to read without a sender. */
 	Assert(sender != NULL);
 	SetLatch(&sender->procLatch);
 }
 
 /*
- * Get the number of bytes written.  The sender need not use this to access
- * the count of bytes written, but the receiver must.
- */
-static uint64
-shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
-{
-	uint64		v;
-
-	SpinLockAcquire(&mq->mq_mutex);
-	v = mq->mq_bytes_written;
-	*detached = mq->mq_detached;
-	SpinLockRelease(&mq->mq_mutex);
-
-	return v;
-}
-
-/*
  * Increment the number of bytes written.
  */
 static void
 shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
 {
-	SpinLockAcquire(&mq->mq_mutex);
-	mq->mq_bytes_written += n;
-	SpinLockRelease(&mq->mq_mutex);
-}
-
-/*
- * Set receiver's latch, unless queue is detached.
- */
-static shm_mq_result
-shm_mq_notify_receiver(volatile shm_mq *mq)
-{
-	PGPROC	   *receiver;
-	bool		detached;
-
-	SpinLockAcquire(&mq->mq_mutex);
-	detached = mq->mq_detached;
-	receiver = mq->mq_receiver;
-	SpinLockRelease(&mq->mq_mutex);
-
-	if (detached)
-		return SHM_MQ_DETACHED;
-	if (receiver)
-		SetLatch(&receiver->procLatch);
-	return SHM_MQ_SUCCESS;
+	/*
+	 * Separate prior reads of mq_ring from the write of mq_bytes_written
+	 * which we're about to do.  Pairs with the read barrier found in
+	 * shm_mq_get_receive_bytes.
+	 */
+	pg_write_barrier();
+
+	/*
+	 * There's no need to use pg_atomic_fetch_add_u64 here, because nobody
+	 * else can be changing this value.  This method avoids taking the bus
+	 * lock unnecessarily.
+	 */
+	pg_atomic_write_u64(&mq->mq_bytes_written,
+						pg_atomic_read_u64(&mq->mq_bytes_written) + n);
 }
 
 /* Shim for on_dsm_callback. */
-- 
2.13.5 (Apple Git-94)

