diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 02a5df8da9..7e1a555094 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -107,6 +107,10 @@ struct shm_mq
  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
  * the buffer, and mqh_buflen is the number of bytes allocated for it.
  *
+ * mqh_increment_pending is the number of bytes that we have locally consumed
+ * but not yet marked added to mq_bytes_read, while mqh_signal_pending is the
+ * number of bytes we have consumed without signalling the sender.
+ *
  * mqh_partial_bytes, mqh_expected_bytes, and mqh_length_word_complete
  * are used to track the state of non-blocking operations.  When the caller
  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
@@ -134,7 +138,8 @@ struct shm_mq_handle
 	BackgroundWorkerHandle *mqh_handle;
 	char	   *mqh_buffer;
 	Size		mqh_buflen;
-	Size		mqh_consume_pending;
+	Size		mqh_increment_pending;
+	Size		mqh_signal_pending;
 	Size		mqh_partial_bytes;
 	Size		mqh_expected_bytes;
 	bool		mqh_length_word_complete;
@@ -293,7 +298,8 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 	mqh->mqh_handle = handle;
 	mqh->mqh_buffer = NULL;
 	mqh->mqh_buflen = 0;
-	mqh->mqh_consume_pending = 0;
+	mqh->mqh_increment_pending = 0;
+	mqh->mqh_signal_pending = 0;
 	mqh->mqh_partial_bytes = 0;
 	mqh->mqh_expected_bytes = 0;
 	mqh->mqh_length_word_complete = false;
@@ -594,10 +600,15 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 	 * because SetLatch() is fairly expensive and we don't want to do it too
 	 * often.
 	 */
-	if (mqh->mqh_consume_pending > mq->mq_ring_size / 4)
+	if (mqh->mqh_increment_pending > 0)
+	{
+		shm_mq_inc_bytes_read(mq, mqh->mqh_increment_pending);
+		mqh->mqh_increment_pending = 0;
+	}
+	if (mqh->mqh_signal_pending > mq->mq_ring_size / 4)
 	{
-		shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
-		mqh->mqh_consume_pending = 0;
+		SetLatch(&mq->mq_sender->procLatch);
+		mqh->mqh_signal_pending = 0;
 	}
 
 	/* Try to read, or finish reading, the length word from the buffer. */
@@ -625,7 +636,8 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 			needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
 			if (rb >= needed)
 			{
-				mqh->mqh_consume_pending += needed;
+				mqh->mqh_increment_pending += needed;
+				mqh->mqh_signal_pending += needed;
 				*nbytesp = nbytes;
 				*datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
 				return SHM_MQ_SUCCESS;
@@ -637,7 +649,8 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 			 */
 			mqh->mqh_expected_bytes = nbytes;
 			mqh->mqh_length_word_complete = true;
-			mqh->mqh_consume_pending += MAXALIGN(sizeof(Size));
+			mqh->mqh_increment_pending += MAXALIGN(sizeof(Size));
+			mqh->mqh_signal_pending += MAXALIGN(sizeof(Size));
 			rb -= MAXALIGN(sizeof(Size));
 		}
 		else
@@ -664,7 +677,8 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 			memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
 				   lengthbytes);
 			mqh->mqh_partial_bytes += lengthbytes;
-			mqh->mqh_consume_pending += MAXALIGN(lengthbytes);
+			mqh->mqh_increment_pending += MAXALIGN(lengthbytes);
+			mqh->mqh_signal_pending += MAXALIGN(lengthbytes);
 			rb -= lengthbytes;
 
 			/* If we now have the whole word, we're ready to read payload. */
@@ -692,7 +706,8 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 		if (rb >= nbytes)
 		{
 			mqh->mqh_length_word_complete = false;
-			mqh->mqh_consume_pending += MAXALIGN(nbytes);
+			mqh->mqh_increment_pending += MAXALIGN(nbytes);
+			mqh->mqh_signal_pending += MAXALIGN(nbytes);
 			*nbytesp = nbytes;
 			*datap = rawdata;
 			return SHM_MQ_SUCCESS;
@@ -738,7 +753,8 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 		 * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
 		 */
 		Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
-		mqh->mqh_consume_pending += MAXALIGN(rb);
+		mqh->mqh_increment_pending += MAXALIGN(rb);
+		mqh->mqh_signal_pending += MAXALIGN(rb);
 
 		/* If we got all the data, exit the loop. */
 		if (mqh->mqh_partial_bytes >= nbytes)
@@ -1034,7 +1050,7 @@ shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
 		 * consumed.
 		 */
 		read = pg_atomic_read_u64(&mq->mq_bytes_read) +
-			mqh->mqh_consume_pending;
+			mqh->mqh_increment_pending;
 		used = written - read;
 		Assert(used <= ringsize);
 		offset = read % (uint64) ringsize;
@@ -1067,12 +1083,18 @@ shm_mq_receive_bytes(shm_mq_handle *mqh, Size bytes_needed, bool nowait,
 
 		/*
 		 * We didn't get enough data to satisfy the request, so mark any data
-		 * previously-consumed as read to make more buffer space.
+		 * previously-consumed as read to make more buffer space, and signal
+		 * the sender as necessary to make sure they're not waiting for us.
 		 */
-		if (mqh->mqh_consume_pending > 0)
+		if (mqh->mqh_increment_pending > 0)
+		{
+			shm_mq_inc_bytes_read(mq, mqh->mqh_increment_pending);
+			mqh->mqh_increment_pending = 0;
+		}
+		if (mqh->mqh_signal_pending > 0)
 		{
-			shm_mq_inc_bytes_read(mq, mqh->mqh_consume_pending);
-			mqh->mqh_consume_pending = 0;
+			SetLatch(&mq->mq_sender->procLatch);
+			mqh->mqh_signal_pending = 0;
 		}
 
 		/* Skip manipulation of our latch if nowait = true. */
@@ -1193,8 +1215,6 @@ shm_mq_wait_internal(volatile shm_mq *mq, PGPROC *volatile *ptr,
 static void
 shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
 {
-	PGPROC	   *sender;
-
 	/*
 	 * Separate prior reads of mq_ring from the increment of mq_bytes_read
 	 * which follows.  Pairs with the full barrier in shm_mq_send_bytes(). We
@@ -1209,14 +1229,6 @@ shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
 	 */
 	pg_atomic_write_u64(&mq->mq_bytes_read,
 						pg_atomic_read_u64(&mq->mq_bytes_read) + n);
-
-	/*
-	 * We shouldn't have any bytes to read without a sender, so we can read
-	 * mq_sender here without a lock.  Once it's initialized, it can't change.
-	 */
-	sender = mq->mq_sender;
-	Assert(sender != NULL);
-	SetLatch(&sender->procLatch);
 }
 
 /*
