On Mon, Mar 17, 2014 at 1:26 PM, Robert Haas <robertmh...@gmail.com> wrote:
> Oh, yeah.  Duh.  Clearly my brain isn't working today.  Hmm, so maybe
> this will be fairly simple... will try it out.

OK, I tried this out.  The major complication that cropped up was
that, if we make the length word always a Size but align the buffer to
MAXIMUM_ALIGNOF, then the length word might get split if sizeof(Size)
> MAXIMUM_ALIGNOF.  That doesn't look too bad, but required changing a
couple of if statements into while loops, and changing around the
structure of a shm_mq_handle a bit.  See attached.

-- 
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 2d298a3..b31f4fb 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -72,7 +72,7 @@ struct shm_mq
 	PGPROC	   *mq_sender;
 	uint64		mq_bytes_read;
 	uint64		mq_bytes_written;
-	uint64		mq_ring_size;
+	Size		mq_ring_size;
 	bool		mq_detached;
 	uint8		mq_ring_offset;
 	char		mq_ring[FLEXIBLE_ARRAY_MEMBER];
@@ -103,15 +103,16 @@ struct shm_mq
  * locally by copying the chunks into a backend-local buffer.  mqh_buffer is
  * the buffer, and mqh_buflen is the number of bytes allocated for it.
  *
- * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_did_length_word
+ * mqh_partial_message_bytes, mqh_expected_bytes, and mqh_length_word_complete
  * are used to track the state of non-blocking operations.  When the caller
  * attempts a non-blocking operation that returns SHM_MQ_WOULD_BLOCK, they
  * are expected to retry the call at a later time with the same argument;
  * we need to retain enough state to pick up where we left off.
- * mqh_did_length_word tracks whether we read or wrote the length word,
- * mqh_partial_message_bytes tracks the number of payload bytes read or
- * written, and mqh_expected_bytes - which is used only for reads - tracks
- * the expected total size of the payload.
+ * mqh_length_word_complete tracks whether we are done sending or receiving
+ * (whichever we're doing) the entire length word.  mqh_partial_bytes tracks
+ * the number of bytes read or written for either the length word or the
+ * message itself, and mqh_expected_bytes - which is used only for reads -
+ * tracks the expected total size of the payload.
  *
  * mqh_counterparty_attached tracks whether we know the counterparty to have
  * attached to the queue at some previous point.  This lets us avoid some
@@ -128,25 +129,25 @@ struct shm_mq_handle
 	dsm_segment *mqh_segment;
 	BackgroundWorkerHandle *mqh_handle;
 	char	   *mqh_buffer;
-	uint64		mqh_buflen;
-	uint64		mqh_consume_pending;
-	uint64		mqh_partial_message_bytes;
-	uint64		mqh_expected_bytes;
-	bool		mqh_did_length_word;
+	Size		mqh_buflen;
+	Size		mqh_consume_pending;
+	Size		mqh_partial_bytes;
+	Size		mqh_expected_bytes;
+	bool		mqh_length_word_complete;
 	bool		mqh_counterparty_attached;
 	MemoryContext mqh_context;
 };
 
-static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, uint64 nbytes,
-				  void *data, bool nowait, uint64 *bytes_written);
-static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed,
-					 bool nowait, uint64 *nbytesp, void **datap);
+static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes,
+				  void *data, bool nowait, Size *bytes_written);
+static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed,
+					 bool nowait, Size *nbytesp, void **datap);
 static bool shm_mq_wait_internal(volatile shm_mq *mq, PGPROC * volatile *ptr,
 					 BackgroundWorkerHandle *handle);
 static uint64 shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached);
-static void shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n);
+static void shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n);
 static uint64 shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached);
-static void shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n);
+static void shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n);
 static shm_mq_result shm_mq_notify_receiver(volatile shm_mq *mq);
 static void shm_mq_detach_callback(dsm_segment *seg, Datum arg);
 
@@ -163,7 +164,7 @@ shm_mq *
 shm_mq_create(void *address, Size size)
 {
 	shm_mq	   *mq = address;
-	uint64		data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
+	Size		data_offset = MAXALIGN(offsetof(shm_mq, mq_ring));
 
 	/* If the size isn't MAXALIGN'd, just discard the odd bytes. */
 	size = MAXALIGN_DOWN(size);
@@ -289,8 +290,8 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
 	mqh->mqh_buflen = 0;
 	mqh->mqh_consume_pending = 0;
 	mqh->mqh_context = CurrentMemoryContext;
-	mqh->mqh_partial_message_bytes = 0;
-	mqh->mqh_did_length_word = false;
+	mqh->mqh_partial_bytes = 0;
+	mqh->mqh_length_word_complete = false;
 	mqh->mqh_counterparty_attached = false;
 
 	if (seg != NULL)
@@ -314,41 +315,48 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle)
  * the length or payload will corrupt the queue.)
  */
 shm_mq_result
-shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait)
+shm_mq_send(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait)
 {
 	shm_mq_result	res;
 	shm_mq		   *mq = mqh->mqh_queue;
-	uint64			bytes_written;
+	Size			bytes_written;
 
 	Assert(mq->mq_sender == MyProc);
 
-	/* Write the message length into the buffer. */
-	if (!mqh->mqh_did_length_word)
+	/* Try to write, or finish writing, the length word into the buffer. */
+	while (!mqh->mqh_length_word_complete)
 	{
-		res = shm_mq_send_bytes(mqh, sizeof(uint64), &nbytes, nowait,
-								&bytes_written);
+		Assert(mqh->mqh_partial_bytes < sizeof(Size));
+		res = shm_mq_send_bytes(mqh, sizeof(Size) - mqh->mqh_partial_bytes,
+								((char *) &nbytes) + mqh->mqh_partial_bytes,
+								nowait, &bytes_written);
+		mqh->mqh_partial_bytes += bytes_written;
 		if (res != SHM_MQ_SUCCESS)
 			return res;
 
-		/*
-		 * We're sure to have sent the length in full, since we always
-		 * write a MAXALIGN'd chunk.
-		 */
-		Assert(bytes_written == MAXALIGN64(sizeof(uint64)));
-		mqh->mqh_did_length_word = true;
+		if (mqh->mqh_partial_bytes >= sizeof(Size))
+		{
+			Assert(mqh->mqh_partial_bytes == sizeof(Size));
+
+			mqh->mqh_partial_bytes = 0;
+			mqh->mqh_length_word_complete = true;
+		}
+
+		/* Length word can't be split unless bigger than required alignment. */
+		Assert(mqh->mqh_length_word_complete || sizeof(Size) > MAXIMUM_ALIGNOF);
 	}
 
 	/* Write the actual data bytes into the buffer. */
-	Assert(mqh->mqh_partial_message_bytes <= nbytes);
-	res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_message_bytes,
-							((char *) data) + mqh->mqh_partial_message_bytes,
+	Assert(mqh->mqh_partial_bytes <= nbytes);
+	res = shm_mq_send_bytes(mqh, nbytes - mqh->mqh_partial_bytes,
+							((char *) data) + mqh->mqh_partial_bytes,
 							nowait, &bytes_written);
 	if (res == SHM_MQ_WOULD_BLOCK)
-		mqh->mqh_partial_message_bytes += bytes_written;
+		mqh->mqh_partial_bytes += bytes_written;
 	else
 	{
-		mqh->mqh_partial_message_bytes = 0;
-		mqh->mqh_did_length_word = false;
+		mqh->mqh_partial_bytes = 0;
+		mqh->mqh_length_word_complete = false;
 	}
 	if (res != SHM_MQ_SUCCESS)
 		return res;
@@ -380,13 +388,12 @@ shm_mq_send(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait)
  * function again after the process latch has been set.
  */
 shm_mq_result
-shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
+shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 {
 	shm_mq		   *mq = mqh->mqh_queue;
 	shm_mq_result	res;
-	uint64			rb = 0;
-	uint64			nbytes;
-	uint64			needed;
+	Size			rb = 0;
+	Size			nbytes;
 	void		   *rawdata;
 
 	Assert(mq->mq_receiver == MyProc);
@@ -414,44 +421,91 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
 		mqh->mqh_consume_pending = 0;
 	}
 
-	/* Determine the message length. */
-	if (mqh->mqh_did_length_word)
-	{
-		/* We've partially received a message; recall expected length. */
-		nbytes = mqh->mqh_expected_bytes;
-	}
-	else
+	/* Try to read, or finish reading, the length word from the buffer. */
+	while (!mqh->mqh_length_word_complete)
 	{
 		/* Try to receive the message length word. */
-		res = shm_mq_receive_bytes(mq, sizeof(uint64), nowait, &rb, &rawdata);
+		Assert(mqh->mqh_partial_bytes < sizeof(Size));
+		res = shm_mq_receive_bytes(mq, sizeof(Size) - mqh->mqh_partial_bytes,
+								   nowait, &rb, &rawdata);
 		if (res != SHM_MQ_SUCCESS)
 			return res;
-		Assert(rb >= sizeof(uint64));
-		memcpy(&nbytes, rawdata, sizeof(uint64));
-		mqh->mqh_expected_bytes = nbytes;
 
-		/* If we've already got the whole message, we're done. */
-		needed = MAXALIGN64(sizeof(uint64)) + MAXALIGN64(nbytes);
-		if (rb >= needed)
+		/*
+		 * Hopefully, we'll receive the entire message length word at once.
+		 * But if sizeof(Size) > MAXIMUM_ALIGNOF, then it might be split over
+		 * multiple reads.
+		 */
+		if (mqh->mqh_partial_bytes == 0 && rb >= sizeof(Size))
 		{
+			Size			needed;
+
+			nbytes = * (Size *) rawdata;
+
+			/* If we've already got the whole message, we're done. */
+			needed = MAXALIGN(sizeof(Size)) + MAXALIGN(nbytes);
+			if (rb >= needed)
+			{
+				/*
+				 * Technically, we could consume the message length information
+				 * at this point, but the extra write to shared memory wouldn't
+				 * be free and in most cases we would reap no benefit.
+				 */
+				mqh->mqh_consume_pending = needed;
+				*nbytesp = nbytes;
+				*datap = ((char *) rawdata) + MAXALIGN(sizeof(Size));
+				return SHM_MQ_SUCCESS;
+			}
+
 			/*
-			 * Technically, we could consume the message length information at
-			 * this point, but the extra write to shared memory wouldn't be
-			 * free and in most cases we would reap no benefit.
+			 * We don't have the whole message, but we at least have the whole
+			 * length word.
 			 */
-			mqh->mqh_consume_pending = needed;
-			*nbytesp = nbytes;
-			*datap = ((char *) rawdata) + MAXALIGN64(sizeof(uint64));
-			return SHM_MQ_SUCCESS;
+			mqh->mqh_expected_bytes = nbytes;
+			mqh->mqh_length_word_complete = true;
+			shm_mq_inc_bytes_read(mq, MAXALIGN(sizeof(Size)));
+			rb -= MAXALIGN(sizeof(Size));
 		}
+		else
+		{
+			Size	lengthbytes;
+
+			/* Can't be split unless bigger than required alignment. */
+			Assert(sizeof(Size) > MAXIMUM_ALIGNOF);
 
-		/* Consume the length word. */
-		shm_mq_inc_bytes_read(mq, MAXALIGN64(sizeof(uint64)));
-		mqh->mqh_did_length_word = true;
-		rb -= MAXALIGN64(sizeof(uint64));
+			/* Message word is split; need buffer to reassemble. */
+			if (mqh->mqh_buffer == NULL)
+			{
+				mqh->mqh_buffer = MemoryContextAlloc(mqh->mqh_context,
+													 MQH_INITIAL_BUFSIZE);
+				mqh->mqh_buflen = MQH_INITIAL_BUFSIZE;
+			}
+			Assert(mqh->mqh_buflen >= sizeof(Size));
+
+			/* Copy and consume partial length word. */
+			if (mqh->mqh_partial_bytes + rb > sizeof(Size))
+				lengthbytes = sizeof(Size) - mqh->mqh_partial_bytes;
+			else
+				lengthbytes = rb - mqh->mqh_partial_bytes;
+			memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata,
+				   lengthbytes);
+			mqh->mqh_partial_bytes += lengthbytes;
+			shm_mq_inc_bytes_read(mq, MAXALIGN(lengthbytes));
+			rb -= lengthbytes;
+
+			/* If we now have the whole word, we're ready to read payload. */
+			if (mqh->mqh_partial_bytes >= sizeof(Size))
+			{
+				Assert(mqh->mqh_partial_bytes == sizeof(Size));
+				mqh->mqh_expected_bytes = * (Size *) mqh->mqh_buffer;
+				mqh->mqh_length_word_complete = true;
+				mqh->mqh_partial_bytes = 0;
+			}
+		}
 	}
+	nbytes = mqh->mqh_expected_bytes;
 
-	if (mqh->mqh_partial_message_bytes == 0)
+	if (mqh->mqh_partial_bytes == 0)
 	{
 		/*
 		 * Try to obtain the whole message in a single chunk.  If this works,
@@ -463,8 +517,8 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
 			return res;
 		if (rb >= nbytes)
 		{
-			mqh->mqh_did_length_word = false;
-			mqh->mqh_consume_pending = MAXALIGN64(nbytes);
+			mqh->mqh_length_word_complete = false;
+			mqh->mqh_consume_pending = MAXALIGN(nbytes);
 			*nbytesp = nbytes;
 			*datap = rawdata;
 			return SHM_MQ_SUCCESS;
@@ -477,7 +531,7 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
 		 */
 		if (mqh->mqh_buflen < nbytes)
 		{
-			uint64		newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
+			Size	newbuflen = Max(mqh->mqh_buflen, MQH_INITIAL_BUFSIZE);
 
 			while (newbuflen < nbytes)
 				newbuflen *= 2;
@@ -496,12 +550,12 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
 	/* Loop until we've copied the entire message. */
 	for (;;)
 	{
-		uint64	still_needed;
+		Size	still_needed;
 
 		/* Copy as much as we can. */
-		Assert(mqh->mqh_partial_message_bytes + rb <= nbytes);
-		memcpy(&mqh->mqh_buffer[mqh->mqh_partial_message_bytes], rawdata, rb);
-		mqh->mqh_partial_message_bytes += rb;
+		Assert(mqh->mqh_partial_bytes + rb <= nbytes);
+		memcpy(&mqh->mqh_buffer[mqh->mqh_partial_bytes], rawdata, rb);
+		mqh->mqh_partial_bytes += rb;
 
 		/*
 		 * Update count of bytes read, with alignment padding.  Note
@@ -509,16 +563,15 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
 		 * end of a message, because the buffer size is a multiple of
 		 * MAXIMUM_ALIGNOF, and each read and write is as well.
 		 */
-		Assert(mqh->mqh_partial_message_bytes == nbytes ||
-				rb == MAXALIGN64(rb));
-		shm_mq_inc_bytes_read(mq, MAXALIGN64(rb));
+		Assert(mqh->mqh_partial_bytes == nbytes || rb == MAXALIGN(rb));
+		shm_mq_inc_bytes_read(mq, MAXALIGN(rb));
 
 		/* If we got all the data, exit the loop. */
-		if (mqh->mqh_partial_message_bytes >= nbytes)
+		if (mqh->mqh_partial_bytes >= nbytes)
 			break;
 
 		/* Wait for some more data. */
-		still_needed = nbytes - mqh->mqh_partial_message_bytes;
+		still_needed = nbytes - mqh->mqh_partial_bytes;
 		res = shm_mq_receive_bytes(mq, still_needed, nowait, &rb, &rawdata);
 		if (res != SHM_MQ_SUCCESS)
 			return res;
@@ -529,8 +582,8 @@ shm_mq_receive(shm_mq_handle *mqh, uint64 *nbytesp, void **datap, bool nowait)
 	/* Return the complete message, and reset for next message. */
 	*nbytesp = nbytes;
 	*datap = mqh->mqh_buffer;
-	mqh->mqh_did_length_word = false;
-	mqh->mqh_partial_message_bytes = 0;
+	mqh->mqh_length_word_complete = false;
+	mqh->mqh_partial_bytes = 0;
 	return SHM_MQ_SUCCESS;
 }
 
@@ -598,14 +651,14 @@ shm_mq_detach(shm_mq *mq)
  * Write bytes into a shared message queue.
  */
 static shm_mq_result
-shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
-				  uint64 *bytes_written)
+shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, void *data, bool nowait,
+				  Size *bytes_written)
 {
 	shm_mq	   *mq = mqh->mqh_queue;
-	uint64		sent = 0;
+	Size		sent = 0;
 	uint64		used;
-	uint64		ringsize = mq->mq_ring_size;
-	uint64		available;
+	Size		ringsize = mq->mq_ring_size;
+	Size		available;
 
 	while (sent < nbytes)
 	{
@@ -651,7 +704,7 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
 			res = shm_mq_notify_receiver(mq);
 			if (res != SHM_MQ_SUCCESS)
 			{
-				*bytes_written = res;
+				*bytes_written = sent;
 				return res;
 			}
 
@@ -679,8 +732,8 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
 		}
 		else
 		{
-			uint64	offset = mq->mq_bytes_written % ringsize;
-			uint64	sendnow = Min(available, ringsize - offset);
+			Size	offset = mq->mq_bytes_written % (uint64) ringsize;
+			Size	sendnow = Min(available, ringsize - offset);
 
 			/* Write as much data as we can via a single memcpy(). */
 			memcpy(&mq->mq_ring[mq->mq_ring_offset + offset],
@@ -693,8 +746,8 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
 			 * end of a run of bytes, because the buffer size is a multiple of
 			 * MAXIMUM_ALIGNOF, and each read is as well.
 			 */
-			Assert(sent == nbytes || sendnow == MAXALIGN64(sendnow));
-			shm_mq_inc_bytes_written(mq, MAXALIGN64(sendnow));
+			Assert(sent == nbytes || sendnow == MAXALIGN(sendnow));
+			shm_mq_inc_bytes_written(mq, MAXALIGN(sendnow));
 
 			/*
 			 * For efficiency, we don't set the reader's latch here.  We'll
@@ -717,23 +770,23 @@ shm_mq_send_bytes(shm_mq_handle *mqh, uint64 nbytes, void *data, bool nowait,
  * bytes_needed.
  */
 static shm_mq_result
-shm_mq_receive_bytes(shm_mq *mq, uint64 bytes_needed, bool nowait,
-					 uint64 *nbytesp, void **datap)
+shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
+					 Size *nbytesp, void **datap)
 {
+	Size		ringsize = mq->mq_ring_size;
 	uint64		used;
-	uint64		ringsize = mq->mq_ring_size;
 	uint64		written;
 
 	for (;;)
 	{
-		uint64		offset;
+		Size		offset;
 		bool		detached;
 
 		/* Get bytes written, so we can compute what's available to read. */
 		written = shm_mq_get_bytes_written(mq, &detached);
 		used = written - mq->mq_bytes_read;
 		Assert(used <= ringsize);
-		offset = mq->mq_bytes_read % ringsize;
+		offset = mq->mq_bytes_read % (uint64) ringsize;
 
 		/* If we have enough data or buffer has wrapped, we're done. */
 		if (used >= bytes_needed || offset + used >= ringsize)
@@ -872,7 +925,7 @@ shm_mq_get_bytes_read(volatile shm_mq *mq, bool *detached)
  * Increment the number of bytes read.
  */
 static void
-shm_mq_inc_bytes_read(volatile shm_mq *mq, uint64 n)
+shm_mq_inc_bytes_read(volatile shm_mq *mq, Size n)
 {
 	PGPROC	   *sender;
 
@@ -907,7 +960,7 @@ shm_mq_get_bytes_written(volatile shm_mq *mq, bool *detached)
  * Increment the number of bytes written.
  */
 static void
-shm_mq_inc_bytes_written(volatile shm_mq *mq, uint64 n)
+shm_mq_inc_bytes_written(volatile shm_mq *mq, Size n)
 {
 	SpinLockAcquire(&mq->mq_mutex);
 	mq->mq_bytes_written += n;
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index 1bc1f56..c7dd905 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -57,9 +57,9 @@ extern void shm_mq_detach(shm_mq *);
 
 /* Send or receive messages. */
 extern shm_mq_result shm_mq_send(shm_mq_handle *mqh,
-			uint64 nbytes, void *data, bool nowait);
+			Size nbytes, void *data, bool nowait);
 extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh,
-			   uint64 *nbytesp, void **datap, bool nowait);
+			   Size *nbytesp, void **datap, bool nowait);
 
 /* Wait for our counterparty to attach to the queue. */
 extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to