From 666d33a363036a647dde83cb28b9d7ad0b31f76c Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Sat, 4 Nov 2017 19:03:03 +0100
Subject: [PATCH 2/2] shm-mq-reduce-receiver-latch-set-v1

---
 src/backend/storage/ipc/shm_mq.c | 69 +++++++++++++++++++++++++---------------
 1 file changed, 43 insertions(+), 26 deletions(-)

diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 75c6bbd4fb..ceeb487390 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -142,10 +142,10 @@ struct shm_mq_handle
 };
 
 static void shm_mq_detach_internal(shm_mq *mq);
-static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
+static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes,
 				  const void *data, bool nowait, Size *bytes_written);
-static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
-					 bool nowait, Size *nbytesp, void **datap);
+static shm_mq_result shm_mq_receive_bytes(shm_mq_handle *mqh,
+				  Size bytes_needed, bool nowait, Size *nbytesp, void **datap);
 static bool shm_mq_counterparty_gone(volatile shm_mq *mq,
 						 BackgroundWorkerHandle *handle);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
@@ -585,8 +585,14 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 		mqh->mqh_counterparty_attached = true;
 	}
 
-	/* Consume any zero-copy data from previous receive operation. */
-	if (mqh->mqh_consume_pending > 0)
+	/*
+	 * If we've consumed an amount of data greater than 1/4th of the ring
+	 * size, mark it consumed in shared memory.  We try to avoid doing this
+	 * unnecessarily when only a small amount of data has been consumed,
+	 * because SetLatch() is fairly expensive and we don't want to do it
+	 * too often.
+	 */
+	if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
 	{
 		shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
 		mqh->mqh_consume_pending = 0;
@@ -597,7 +603,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 	{
 		/* Try to receive the message length word. */
 		Assert(mqh->mqh_partial_bytes < sizeof(Size));
-		res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
+		res = shm_mq_receive_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
 								   nowait, &rb, &rawdata);
 		if (res != SHM_MQ_SUCCESS)
 			return res;
@@ -617,13 +623,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 			needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
 			if (rb >= needed)
 			{
-				/*
-				 * Technically, we could consume the message length
-				 * information at this point, but the extra write to shared
-				 * memory wouldn't be free and in most cases we would reap no
-				 * benefit.
-				 */
-				mqh->mqh_consume_pending = needed;
+				mqh->mqh_consume_pending += needed;
 				*nbytesp = nbytes;
 				*datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
 				return SHM_MQ_SUCCESS;
@@ -635,7 +635,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 			 */
 			mqh->mqh_expected_bytes = nbytes;
 			mqh->mqh_length_word_complete = true;
-			shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
+			mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
 			rb -= MAXALIGN(sizeof(Size));
 		}
 		else
@@ -654,7 +654,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 			}
 			Assert(mqh->mqh_buflen >= sizeof(Size));
 
-			/* Copy and consume partial length word. */
+			/* Copy partial length word; remember to consume it. */
 			if (mqh->mqh_partial_bytes + rb > sizeof(Size))
 				lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
 			else
@@ -662,7 +662,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 			memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
 				   lengthbytes);
 			mqh->mqh_partial_bytes += lengthbytes;
-			shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
+			mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
 			rb -= lengthbytes;
 
 			/* If we now have the whole word, we're ready to read payload. */
@@ -684,13 +684,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 		 * we need not copy the data and can return a pointer directly into
 		 * shared memory.
 		 */
-		res = shm_mq_receive_bytes(mq, nbytes, nowait, &rb, &rawdata);
+		res = shm_mq_receive_bytes(mqh, nbytes, nowait, &rb, &rawdata);
 		if (res != SHM_MQ_SUCCESS)
 			return res;
 		if (rb >= nbytes)
 		{
 			mqh->mqh_length_word_complete = false;
-			mqh->mqh_consume_pending = MAXALIGN(nbytes);
+			mqh->mqh_consume_pending += MAXALIGN(nbytes);
 			*nbytesp = nbytes;
 			*datap = rawdata;
 			return SHM_MQ_SUCCESS;
@@ -730,13 +730,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 		mqh->mqh_partial_bytes += rb;
 
 		/*
-		 * Update count of bytes read, with alignment padding.  Note that this
-		 * will never actually insert any padding except at the end of a
-		 * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
-		 * and each read and write is as well.
+		 * Update count of bytes that can be consumed, accounting for
+		 * alignment padding.  Note that this will never actually insert any
+		 * padding except at the end of a message, because the buffer size is
+		 * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
 		 */
 		Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
-		shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
+		mqh->mqh_consume_pending += MAXALIGN(rb);
 
 		/* If we got all the data, exit the loop. */
 		if (mqh->mqh_partial_bytes >= nbytes)
@@ -744,7 +744,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 
 		/* Wait for some more data. */
 		still_needed = nbytes - mqh->mqh_partial_bytes;
-		res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
+		res = shm_mq_receive_bytes(mqh, still_needed, nowait, &rb, &rawdata);
 		if (res != SHM_MQ_SUCCESS)
 			return res;
 		if (rb > still_needed)
@@ -1000,9 +1000,10 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
  * is SHM_MQ_SUCCESS.
  */
 static shm_mq_result
-shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
+shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
 					 Size *nbytesp, void **datap)
 {
+	shm_mq	   *mq = mqh->mqh_queue;
 	Size		ringsize = mq->mq_ring_size;
 	uint64		used;
 	uint64		written;
@@ -1014,7 +1015,13 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
 
 		/* Get bytes written, so we can compute what's available to read. */
 		written = pg_atomic_read_u64(&mq->mq_bytes_written);
-		read = pg_atomic_read_u64(&mq->mq_bytes_read);
+
+		/*
+		 * Get bytes read.  Include bytes we could consume but have not yet
+		 * consumed.
+		 */
+		read = pg_atomic_read_u64(&mq->mq_bytes_read) +
+			mqh->mqh_consume_pending;
 		used = written - read;
 		Assert(used <= ringsize);
 		offset = read % (uint64) ringsize;
@@ -1045,6 +1052,16 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
 		if (mq->mq_detached)
 			return SHM_MQ_DETACHED;
 
+		/*
+		 * We didn't get enough data to satisfy the request, so mark any
+		 * data previously-consumed as read to make more buffer space.
+		 */
+		if (mqh->mqh_consume_pending > 0)
+		{
+			shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
+			mqh->mqh_consume_pending = 0;
+		}
+
 		/* Skip manipulation of our latch if nowait = true. */
 		if (nowait)
 			return SHM_MQ_WOULD_BLOCK;
-- 
2.13.5 (Apple Git-94)

