From 96134e48301c71a4ece4340229dac4e8100cfc1c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@enterprisedb.com>
Date: Thu, 25 Jan 2018 09:54:41 +1300
Subject: [PATCH 3/3] Pessimistic fork failure detector.

---
 src/backend/access/transam/parallel.c | 30 +++++++++++++++++++++++++++++-
 src/backend/executor/tqueue.c         |  3 ++-
 src/backend/libpq/pqmq.c              |  3 ++-
 src/backend/storage/ipc/shm_mq.c      |  2 +-
 src/include/storage/shm_mq.h          |  1 +
 5 files changed, 35 insertions(+), 4 deletions(-)

diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 54d9ea7be05..195d2110000 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -550,6 +550,14 @@ LaunchParallelWorkers(ParallelContext *pcxt)
 
 	/* Restore previous memory context. */
 	MemoryContextSwitchTo(oldcontext);
+
+	/*
+	 * Cause the next CHECK_FOR_INTERRUPTS() to behave as if we've received
+	 * PROCSIG_PARALLEL_MESSAGE, as part of the protocol for detecting
+	 * failed forks.
+	 */
+	ParallelMessagePending = true;
+	InterruptPending = true;
 }
 
 /*
@@ -874,7 +882,27 @@ HandleParallelMessages(void)
 
 				res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
 									 &data, true);
-				if (res == SHM_MQ_WOULD_BLOCK)
+				if (res == SHM_MQ_WOULD_BLOCK_NOT_YET_ATTACHED)
+				{
+					/*
+					 * Until we've heard from every single worker, we'll
+					 * keep behaving as if we've received PROCSIG_PARALLEL_MESSAGE.
+					 * This means that every CHECK_FOR_INTERRUPTS() will cause
+					 * us to check for backends that never started up.
+					 * This is necessary because the postmaster isn't allowed
+					 * to set PROCSIG_PARALLEL_MESSAGE to tell us about fork
+					 * failure, but it will signal us causing any correctly
+					 * coded latch wait loop to return here so that we can
+					 * poll for workers that failed to fork.  In other words,
+					 * we'll take the extremely pessmistic view that every
+					 * latch-set might be such a failure, and keep checking
+					 * for it until we know it's definitely not the case.
+					 */
+					InterruptPending = true;
+					ParallelMessagePending = true;
+					break;
+				}
+				else if (res == SHM_MQ_WOULD_BLOCK)
 					break;
 				else if (res == SHM_MQ_SUCCESS)
 				{
diff --git a/src/backend/executor/tqueue.c b/src/backend/executor/tqueue.c
index ecdbe7f79f6..feaf999a2f0 100644
--- a/src/backend/executor/tqueue.c
+++ b/src/backend/executor/tqueue.c
@@ -191,7 +191,8 @@ TupleQueueReaderNext(TupleQueueReader *reader, bool nowait, bool *done)
 	}
 
 	/* In non-blocking mode, bail out if no message ready yet. */
-	if (result == SHM_MQ_WOULD_BLOCK)
+	if (result == SHM_MQ_WOULD_BLOCK ||
+		result == SHM_MQ_WOULD_BLOCK_NOT_YET_ATTACHED)
 		return NULL;
 	Assert(result == SHM_MQ_SUCCESS);
 
diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c
index 201075dd477..90cded1e425 100644
--- a/src/backend/libpq/pqmq.c
+++ b/src/backend/libpq/pqmq.c
@@ -165,7 +165,8 @@ mq_putmessage(char msgtype, const char *s, size_t len)
 						   PROCSIG_PARALLEL_MESSAGE,
 						   pq_mq_parallel_master_backend_id);
 
-		if (result != SHM_MQ_WOULD_BLOCK)
+		if (result != SHM_MQ_WOULD_BLOCK &&
+			result != SHM_MQ_WOULD_BLOCK_NOT_YET_ATTACHED)
 			break;
 
 		WaitLatch(MyLatch, WL_LATCH_SET, 0,
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c
index 1131e27e2e7..66448b5f315 100644
--- a/src/backend/storage/ipc/shm_mq.c
+++ b/src/backend/storage/ipc/shm_mq.c
@@ -553,7 +553,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
 				if (counterparty_gone)
 					return SHM_MQ_DETACHED;
 				else
-					return SHM_MQ_WOULD_BLOCK;
+					return SHM_MQ_WOULD_BLOCK_NOT_YET_ATTACHED;
 			}
 		}
 		else if (!shm_mq_wait_internal(mq, &mq->mq_sender, mqh->mqh_handle)
diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h
index f85f2eb7d17..f9800c7edec 100644
--- a/src/include/storage/shm_mq.h
+++ b/src/include/storage/shm_mq.h
@@ -37,6 +37,7 @@ typedef enum
 {
 	SHM_MQ_SUCCESS,				/* Sent or received a message. */
 	SHM_MQ_WOULD_BLOCK,			/* Not completed; retry later. */
+	SHM_MQ_WOULD_BLOCK_NOT_YET_ATTACHED,
 	SHM_MQ_DETACHED				/* Other process has detached queue. */
 } shm_mq_result;
 
-- 
2.15.1

