diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 54d9ea7be0..5b45b07e7c 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -437,10 +437,11 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
 		WaitForParallelWorkersToFinish(pcxt);
 		WaitForParallelWorkersToExit(pcxt);
 		pcxt->nworkers_launched = 0;
-		if (pcxt->any_message_received)
+		if (pcxt->known_attached_workers)
 		{
-			pfree(pcxt->any_message_received);
-			pcxt->any_message_received = NULL;
+			pfree(pcxt->known_attached_workers);
+			pcxt->known_attached_workers = NULL;
+			pcxt->nknown_attached_workers = 0;
 		}
 	}
 
@@ -542,17 +543,148 @@ LaunchParallelWorkers(ParallelContext *pcxt)
 
 	/*
 	 * Now that nworkers_launched has taken its final value, we can initialize
-	 * any_message_received.
+	 * known_attached_workers.
 	 */
 	if (pcxt->nworkers_launched > 0)
-		pcxt->any_message_received =
+	{
+		pcxt->known_attached_workers =
 			palloc0(sizeof(bool) * pcxt->nworkers_launched);
+		pcxt->nknown_attached_workers = 0;
+	}
 
 	/* Restore previous memory context. */
 	MemoryContextSwitchTo(oldcontext);
 }
 
 /*
+ * Wait for all workers to attach to their error queues, and throw an error if
+ * any worker fails to do this.
+ *
+ * Callers can assume that if this function returns successfully, then the
+ * number of workers given by pcxt->nworkers_launched have initialized and
+ * attached to their error queues.  Whether or not these workers are guaranteed
+ * to still be running depends on what code the caller asked them to run;
+ * this function does not guarantee that they have not exited.  However, it
+ * does guarantee that any workers which exited must have done so cleanly and
+ * after successfully performing the work with which they were tasked.
+ *
+ * If this function is not called, then some of the workers that were launched
+ * may not have been started due to a fork() failure, or may have exited during
+ * early startup prior to attaching to the error queue, so nworkers_launched
+ * cannot be viewed as completely reliable.  It will never be less than the
+ * number of workers which actually started, but it might be more.  Any workers
+ * that failed to start will still be discovered by
+ * WaitForParallelWorkersToFinish and an error will be thrown at that time,
+ * provided that function is eventually reached.
+ *
+ * In general, the leader process should do as much work as possible before
+ * calling this function.  fork() failures and other early-startup failures
+ * are very uncommon, and having the leader sit idle when it could be doing
+ * useful work is undesirable.  However, if the leader needs to wait for
+ * all of its workers or for a specific worker, it may want to call this
+ * function before doing so.  If not, it must make some other provision for
+ * the failure-to-start case, lest it wait forever.  On the other hand, a
+ * leader which never waits for a worker that might not be started yet, or
+ * at least never does so prior to WaitForParallelWorkersToFinish(), need not
+ * call this function at all.
+ */
+void
+WaitForParallelWorkersToAttach(ParallelContext *pcxt)
+{
+	int			i;
+
+	/* Skip this if we have no launched workers. */
+	if (pcxt->nworkers_launched == 0)
+		return;
+
+	for (;;)
+	{
+		/*
+		 * This will process any parallel messages that are pending and it may
+		 * also throw an error propagated from a worker.
+		 */
+		CHECK_FOR_INTERRUPTS();
+
+		for (i = 0; i < pcxt->nworkers_launched; ++i)
+		{
+			BgwHandleStatus status;
+			shm_mq	   *mq;
+			int			rc;
+			pid_t		pid;
+
+			if (pcxt->known_attached_workers[i])
+				continue;
+
+			/*
+			 * If error_mqh is NULL, then the worker has already exited
+			 * cleanly.
+			 */
+			if (pcxt->worker[i].error_mqh == NULL)
+			{
+				pcxt->known_attached_workers[i] = true;
+				++pcxt->nknown_attached_workers;
+				continue;
+			}
+
+			status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
+			if (status == BGWH_STARTED)
+			{
+				/* Has the worker attached to the error queue? */
+				mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
+				if (shm_mq_get_sender(mq) != NULL)
+				{
+					/* Yes, so it is known to be attached. */
+					pcxt->known_attached_workers[i] = true;
+					++pcxt->nknown_attached_workers;
+				}
+			}
+			else if (status == BGWH_STOPPED)
+			{
+				/*
+				 * If the worker stopped without attaching to the error queue,
+				 * throw an error.
+				 */
+				mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
+				if (shm_mq_get_sender(mq) == NULL)
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("parallel worker failed to initialize"),
+							 errhint("More details may be available in the server log.")));
+
+				pcxt->known_attached_workers[i] = true;
+				++pcxt->nknown_attached_workers;
+			}
+			else
+			{
+				/*
+				 * Worker not yet started, so we must wait.  The postmaster
+				 * will notify us if the worker's state changes.  Our latch
+				 * might also get set for some other reason, but if so we'll
+				 * just end up waiting for the same worker again.
+				 */
+				rc = WaitLatch(MyLatch,
+							   WL_LATCH_SET | WL_POSTMASTER_DEATH,
+							   -1, WAIT_EVENT_BGWORKER_STARTUP);
+
+				/* emergency bailout if postmaster has died */
+				if (rc & WL_POSTMASTER_DEATH)
+					proc_exit(1);
+
+				if (rc & WL_LATCH_SET)
+					ResetLatch(MyLatch);
+			}
+		}
+
+		/* If all workers are known to have started, we're done. */
+		if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
+		{
+			Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
+			break;
+		}
+	}
+}
+
+/*
  * Wait for all workers to finish computing.
  *
  * Even if the parallel operation seems to have completed successfully, it's
@@ -589,7 +721,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
 			 */
 			if (pcxt->worker[i].error_mqh == NULL)
 				++nfinished;
-			else if (pcxt->any_message_received[i])
+			else if (pcxt->known_attached_workers[i])
 			{
 				anyone_alive = true;
 				break;
@@ -909,8 +1041,12 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
 {
 	char		msgtype;
 
-	if (pcxt->any_message_received != NULL)
-		pcxt->any_message_received[i] = true;
+	if (pcxt->known_attached_workers != NULL &&
+		!pcxt->known_attached_workers[i])
+	{
+		pcxt->known_attached_workers[i] = true;
+		pcxt->nknown_attached_workers++;
+	}
 
 	msgtype = pq_getmsgbyte(msg);
 
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index 89266b5371..58eadd45b8 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -312,7 +312,14 @@ gather_readnext(GatherState *gatherstate)
 		/* Check for async events, particularly messages from workers. */
 		CHECK_FOR_INTERRUPTS();
 
-		/* Attempt to read a tuple, but don't block if none is available. */
+		/*
+		 * Attempt to read a tuple, but don't block if none is available.
+		 *
+		 * Note that TupleQueueReaderNext will just return NULL for a worker
+		 * which fails to initialize.  We'll treat that worker as having
+		 * produced no tuples; WaitForParallelWorkersToFinish will error out
+		 * when we get there.
+		 */
 		Assert(gatherstate->nextreader < gatherstate->nreaders);
 		reader = gatherstate->reader[gatherstate->nextreader];
 		tup = TupleQueueReaderNext(reader, true, &readerdone);
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index a3e34c6980..6858c91e8c 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -710,7 +710,14 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
 	/* Check for async events, particularly messages from workers. */
 	CHECK_FOR_INTERRUPTS();
 
-	/* Attempt to read a tuple. */
+	/*
+	 * Attempt to read a tuple.
+	 *
+	 * Note that TupleQueueReaderNext will just return NULL for a worker which
+	 * fails to initialize.  We'll treat that worker as having produced no
+	 * tuples; WaitForParallelWorkersToFinish will error out when we get
+	 * there.
+	 */
 	reader = gm_state->reader[nreader - 1];
 	tup = TupleQueueReaderNext(reader, nowait, done);
 
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 32c2e32bea..d0c218b185 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -43,7 +43,8 @@ typedef struct ParallelContext
 	void	   *private_memory;
 	shm_toc    *toc;
 	ParallelWorkerInfo *worker;
-	bool	   *any_message_received;
+	int			nknown_attached_workers;
+	bool	   *known_attached_workers;
 } ParallelContext;
 
 typedef struct ParallelWorkerContext
@@ -62,6 +63,7 @@ extern ParallelContext *CreateParallelContext(const char *library_name, const ch
 extern void InitializeParallelDSM(ParallelContext *pcxt);
 extern void ReinitializeParallelDSM(ParallelContext *pcxt);
 extern void LaunchParallelWorkers(ParallelContext *pcxt);
+extern void WaitForParallelWorkersToAttach(ParallelContext *pcxt);
 extern void WaitForParallelWorkersToFinish(ParallelContext *pcxt);
 extern void DestroyParallelContext(ParallelContext *pcxt);
 extern bool ParallelContextActive(void);
