On Sat, Aug 28, 2021 at 5:04 PM Zhihong Yu <z...@yugabyte.com> wrote:
>

>>
>>  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
>>
>> +   Size        mqh_send_pending;
>>     bool        mqh_length_word_complete;
>>     bool        mqh_counterparty_attached;
>>
>> I wonder if mqh_send_pending should be declared after 
>> mqh_length_word_complete - this way, the order of fields matches the order 
>> of explanation for the fields.

Moved it after mqh_consume_pending and moved comment as well in the
correct order.
>
> There  was a typo in suggested code above. It should be:
>
> +   if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))

Done


-- 
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com
From b111756f7136f3e0065a089a8616ad77b9963935 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Wed, 4 Aug 2021 16:51:01 +0530
Subject: [PATCH v2] Optimize parallel tuple send (shm_mq_send_bytes)

Do not update shm_mq's mq_bytes_written until we have written
an amount of data greater than 1/4th of the ring size.  This
will prevent frequent CPU cache misses, and it will also avoid
frequent SetLatch() calls, which are quite expensive.
---
 src/backend/executor/tqueue.c         |  2 +-
 src/backend/libpq/pqmq.c              |  7 +++-
 src/backend/storage/ipc/shm_mq.c      | 64 +++++++++++++++++++++++++++++------
 src/include/storage/shm_mq.h          |  8 +++--
 src/test/modules/test_shm_mq/test.c   |  7 ++--
 src/test/modules/test_shm_mq/worker.c |  2 +-
 6 files changed, 71 insertions(+), 19 deletions(-)

diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index 7af9fbe..eb0cbd7 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -60,7 +60,7 @@ tqueueReceiveSlot(TupleTableSlot *slot, DestReceiver *self)
 
 	/* Send the tuple itself. */
 	tuple = ExecFetchSlotMinimalTuple(slot, &should_free);
-	result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false);
+	result = shm_mq_send(tqueue->queue, tuple->t_len, tuple, false, false);
 
 	if (should_free)
 		pfree(tuple);
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index d1a1f47..846494b 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -154,7 +154,12 @@ mq_putmessage(char msgtype, const char *s, size_t len)
 
 	for (;;)
 	{
-		result = shm_mq_sendv(pq_mq_handle, iov, 2, true);
+		/*
+		 * Immediately notify the receiver by passing force_flush as true so
+		 * that the shared memory value is updated before we send the parallel
+		 * message signal right after this.
+		 */
+		result = shm_mq_sendv(pq_mq_handle, iov, 2, true, true);
 
 		if (pq_mq_parallel_leader_pid != 0)
 			SendProcSignal(pq_mq_parallel_leader_pid,
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 91a7093..4493fc1 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -109,6 +109,12 @@ struct shm_mq
  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
  * the buffer, and mqh_buflen is the number of bytes allocated for it.
  *
+ * mqh_send_pending, is number of bytes that is written to the queue but not
+ * yet updated in the shared memory.  We will not update it until the written
+ * data is 1/4th of the ring size or the tuple queue is full.  This will
+ * prevent frequent CPU cache misses, and it will also avoid frequent
+ * SetLatch() calls, which are quite expensive.
+ *
  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
  * are used to track the state of non-blocking operations.  When the caller
  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
@@ -137,6 +143,7 @@ struct shm_mq_handle
 	char	   *mqh_buffer;
 	Size		mqh_buflen;
 	Size		mqh_consume_pending;
+	Size		mqh_send_pending;
 	Size		mqh_partial_bytes;
 	Size		mqh_expected_bytes;
 	bool		mqh_length_word_complete;
@@ -292,6 +299,7 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 	mqh->mqh_buffer = NULL;
 	mqh->mqh_buflen = 0;
 	mqh->mqh_consume_pending = 0;
+	mqh->mqh_send_pending = 0;
 	mqh->mqh_partial_bytes = 0;
 	mqh->mqh_expected_bytes = 0;
 	mqh->mqh_length_word_complete = false;
@@ -317,16 +325,22 @@ shm_mq_set_handle(shm_mq_handle *mqh, BackgroundWorkerHandle *handle)
 
 /*
  * Write a message into a shared message queue.
+ *
+ * When force_flush = true, we immediately update the shm_mq's mq_bytes_written
+ * and notify the receiver if it is already attached.  Otherwise, we don't
+ * update it until we have written an amount of data greater than 1/4th of the
+ * ring size.
  */
 shm_mq_result
-shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
+shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait,
+			bool force_flush)
 {
 	shm_mq_iovec iov;
 
 	iov.data = data;
 	iov.len = nbytes;
 
-	return shm_mq_sendv(mqh, &iov, 1, nowait);
+	return shm_mq_sendv(mqh, &iov, 1, nowait, force_flush);
 }
 
 /*
@@ -343,9 +357,12 @@ shm_mq_send(shm_mq_handle *mqh, Size nbytes, const void *data, bool nowait)
  * arguments, each time the process latch is set.  (Once begun, the sending
  * of a message cannot be aborted except by detaching from the queue; changing
  * the length or payload will corrupt the queue.)
+ *
+ * For force_flush, refer comments atop shm_mq_send interface.
  */
 shm_mq_result
-shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
+shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait,
+			 bool force_flush)
 {
 	shm_mq_result res;
 	shm_mq	   *mq = mqh->mqh_queue;
@@ -518,8 +535,18 @@ shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov, int iovcnt, bool nowait)
 		mqh->mqh_counterparty_attached = true;
 	}
 
-	/* Notify receiver of the newly-written data, and return. */
-	SetLatch(&receiver->procLatch);
+	/*
+	 * If the caller has requested force flush or we have written more than 1/4
+	 * of the ring size, mark it as written in shared memory and notify the
+	 * receiver.  For more detail refer comments atop shm_mq_handle structure.
+	 */
+	if (force_flush || mqh->mqh_send_pending > (mq->mq_ring_size >> 2))
+	{
+		shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
+		SetLatch(&receiver->procLatch);
+		mqh->mqh_send_pending = 0;
+	}
+
 	return SHM_MQ_SUCCESS;
 }
 
@@ -816,6 +843,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh)
 void
 shm_mq_detach(shm_mq_handle *mqh)
 {
+	/* Before detaching, notify already written data to the receiver. */
+	if (mqh->mqh_send_pending > 0)
+	{
+		shm_mq_inc_bytes_written(mqh->mqh_queue, mqh->mqh_send_pending);
+		mqh->mqh_send_pending = 0;
+	}
+
 	/* Notify counterparty that we're outta here. */
 	shm_mq_detach_internal(mqh->mqh_queue);
 
@@ -894,7 +928,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 
 		/* Compute number of ring buffer bytes used and available. */
 		rb = pg_atomic_read_u64(&mq->mq_bytes_read);
-		wb = pg_atomic_read_u64(&mq->mq_bytes_written);
+		wb = pg_atomic_read_u64(&mq->mq_bytes_written) + mqh->mqh_send_pending;
 		Assert(wb >= rb);
 		used = wb - rb;
 		Assert(used <= ringsize);
@@ -951,6 +985,9 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 		}
 		else if (available == 0)
 		{
+			/* Update the pending send bytes in the shared memory. */
+			shm_mq_inc_bytes_written(mq, mqh->mqh_send_pending);
+
 			/*
 			 * Since mq->mqh_counterparty_attached is known to be true at this
 			 * point, mq_receiver has been set, and it can't change once set.
@@ -959,6 +996,12 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 			Assert(mqh->mqh_counterparty_attached);
 			SetLatch(&mq->mq_receiver->procLatch);
 
+			/*
+			 * We have just updated the mqh_send_pending bytes in the shared
+			 * memory so reset it.
+			 */
+			mqh->mqh_send_pending = 0;
+
 			/* Skip manipulation of our latch if nowait = true. */
 			if (nowait)
 			{
@@ -1009,13 +1052,14 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
 			 * MAXIMUM_ALIGNOF, and each read is as well.
 			 */
 			Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
-			shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
 
 			/*
-			 * For efficiency, we don't set the reader's latch here.  We'll do
-			 * that only when the buffer fills up or after writing an entire
-			 * message.
+			 * For efficiency, we don't update the bytes written in the shared
+			 * memory and also don't set the reader's latch here.  Refer to
+			 * the comments atop the shm_mq_handle structure for more
+			 * information.
 			 */
+			mqh->mqh_send_pending += MAXALIGN(sendnow);
 		}
 	}
 
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index e693f3f..cb1c555 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -70,11 +70,13 @@ extern shm_mq *shm_mq_get_queue(shm_mq_handle *mqh);
 
 /* Send or receive messages. */
 extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
-								 Size nbytes, const void *data, bool nowait);
-extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh,
-								  shm_mq_iovec *iov, int iovcnt, bool nowait);
+								 Size nbytes, const void *data, bool nowait,
+								 bool force_flush);
+extern shm_mq_result shm_mq_sendv(shm_mq_handle *mqh, shm_mq_iovec *iov,
+								  int iovcnt, bool nowait, bool force_flush);
 extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
 									Size *nbytesp, void **datap, bool nowait);
+extern void shm_mq_flush(shm_mq_handle *mqh);
 
 /* Wait for our counterparty to attach to the queue. */
 extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
diff --git a/src/test/modules/test_shm_mq/test.c b/src/test/modules/test_shm_mq/test.c
index 2d8d695..be074f0 100644
--- a/src/test/modules/test_shm_mq/test.c
+++ b/src/test/modules/test_shm_mq/test.c
@@ -73,7 +73,7 @@ test_shm_mq(PG_FUNCTION_ARGS)
 	test_shm_mq_setup(queue_size, nworkers, &seg, &outqh, &inqh);
 
 	/* Send the initial message. */
-	res = shm_mq_send(outqh, message_size, message_contents, false);
+	res = shm_mq_send(outqh, message_size, message_contents, false, true);
 	if (res != SHM_MQ_SUCCESS)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -97,7 +97,7 @@ test_shm_mq(PG_FUNCTION_ARGS)
 			break;
 
 		/* Send it back out. */
-		res = shm_mq_send(outqh, len, data, false);
+		res = shm_mq_send(outqh, len, data, false, true);
 		if (res != SHM_MQ_SUCCESS)
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -177,7 +177,8 @@ test_shm_mq_pipelined(PG_FUNCTION_ARGS)
 		 */
 		if (send_count < loop_count)
 		{
-			res = shm_mq_send(outqh, message_size, message_contents, true);
+			res = shm_mq_send(outqh, message_size, message_contents, true,
+							  true);
 			if (res == SHM_MQ_SUCCESS)
 			{
 				++send_count;
diff --git a/src/test/modules/test_shm_mq/worker.c b/src/test/modules/test_shm_mq/worker.c
index 2180776..9b037b9 100644
--- a/src/test/modules/test_shm_mq/worker.c
+++ b/src/test/modules/test_shm_mq/worker.c
@@ -190,7 +190,7 @@ copy_messages(shm_mq_handle *inqh, shm_mq_handle *outqh)
 			break;
 
 		/* Send it back out. */
-		res = shm_mq_send(outqh, len, data, false);
+		res = shm_mq_send(outqh, len, data, false, true);
 		if (res != SHM_MQ_SUCCESS)
 			break;
 	}
-- 
1.8.3.1

Reply via email to