diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index f720896e50..e26c869c58 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -111,6 +111,9 @@ static FixedParallelState *MyFixedParallelState;
 /* List of active parallel contexts. */
 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
 
+/* Backend-local copy of data from FixedParallelState. */
+static pid_t ParallelMasterPid;
+
 /*
  * List of internal parallel worker entry points.  We need this for
  * reasons explained in LookupParallelWorkerFunction(), below.
@@ -131,6 +134,7 @@ static const struct
 static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
 static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
 static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
+static void ParallelWorkerShutdown(int code, Datum arg);
 
 
 /*
@@ -422,6 +426,11 @@ ReinitializeParallelDSM(ParallelContext *pcxt)
 		WaitForParallelWorkersToFinish(pcxt);
 		WaitForParallelWorkersToExit(pcxt);
 		pcxt->nworkers_launched = 0;
+		if (pcxt->any_message_received)
+		{
+			pfree(pcxt->any_message_received);
+			pcxt->any_message_received = NULL;
+		}
 	}
 
 	/* Reset a few bits of fixed parallel state to a clean state. */
@@ -520,6 +529,14 @@ LaunchParallelWorkers(ParallelContext *pcxt)
 		}
 	}
 
+	/*
+	 * Now that nworkers_launched has taken its final value, we can initialize
+	 * any_message_received.
+	 */
+	if (pcxt->nworkers_launched > 0)
+		pcxt->any_message_received =
+			palloc0(sizeof(bool) * pcxt->nworkers_launched);
+
 	/* Restore previous memory context. */
 	MemoryContextSwitchTo(oldcontext);
 }
@@ -541,6 +558,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
 	for (;;)
 	{
 		bool		anyone_alive = false;
+		int			nfinished = 0;
 		int			i;
 
 		/*
@@ -552,7 +570,15 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
 
 		for (i = 0; i < pcxt->nworkers_launched; ++i)
 		{
-			if (pcxt->worker[i].error_mqh != NULL)
+			/*
+			 * If error_mqh is NULL, then the worker has already exited
+			 * cleanly.  If we have received a message through error_mqh from
+			 * the worker, we know it started up cleanly, and therefore we're
+			 * certain to be notified when it exits.
+			 */
+			if (pcxt->worker[i].error_mqh == NULL)
+				++nfinished;
+			else if (pcxt->any_message_received[i])
 			{
 				anyone_alive = true;
 				break;
@@ -560,7 +586,62 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt)
 		}
 
 		if (!anyone_alive)
-			break;
+		{
+			/* If all workers are known to have finished, we're done. */
+			if (nfinished >= pcxt->nworkers_launched)
+			{
+				Assert(nfinished == pcxt->nworkers_launched);
+				break;
+			}
+
+			/*
+			 * We didn't detect any living workers, but not all workers are
+			 * known to have exited cleanly.  Either not all workers have
+			 * launched yet, or maybe some of them failed to start or
+			 * terminated abnormally.
+			 */
+			for (i = 0; i < pcxt->nworkers_launched; ++i)
+			{
+				pid_t		pid;
+				shm_mq	   *mq;
+
+				/*
+				 * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
+				 * should just keep waiting.  If it is BGWH_STOPPED, then
+				 * further investigation is needed.
+				 */
+				if (pcxt->worker[i].error_mqh == NULL ||
+					pcxt->worker[i].bgwhandle == NULL ||
+					GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
+										   &pid) != BGWH_STOPPED)
+					continue;
+
+				/*
+				 * Check whether the worker ended up stopped without ever
+				 * attaching to the error queue.  If so, the postmaster was
+				 * unable to fork the worker or it exited without initializing
+				 * properly.  We must throw an error, since the caller may
+				 * have been expecting the worker to do some work before
+				 * exiting.
+				 */
+				mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
+				if (shm_mq_get_sender(mq) == NULL)
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("parallel worker failed to initialize"),
+							 errhint("More details may be available in the server log.")));
+
+				/*
+				 * The worker is stopped, but is attached to the error queue.
+				 * Unless there's a bug somewhere, this will only happen when
+				 * the worker writes messages and terminates after the
+				 * CHECK_FOR_INTERRUPTS() near the top of this function and
+				 * before the call to GetBackgroundWorkerPid().  In that case,
+				 * or latch should have been set as well and the right things
+				 * will happen on the next pass through the loop.
+				 */
+			}
+		}
 
 		WaitLatch(MyLatch, WL_LATCH_SET, -1,
 				  WAIT_EVENT_PARALLEL_FINISH);
@@ -817,6 +898,9 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
 {
 	char		msgtype;
 
+	if (pcxt->any_message_received != NULL)
+		pcxt->any_message_received[i] = true;
+
 	msgtype = pq_getmsgbyte(msg);
 
 	switch (msgtype)
@@ -1012,11 +1096,16 @@ ParallelWorkerMain(Datum main_arg)
 	fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
 	MyFixedParallelState = fps;
 
+	/* Arrange to signal the leader if we exit. */
+	ParallelMasterPid = fps->parallel_master_pid;
+	ParallelMasterBackendId = fps->parallel_master_backend_id;
+	on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);
+
 	/*
-	 * Now that we have a worker number, we can find and attach to the error
-	 * queue provided for us.  That's good, because until we do that, any
-	 * errors that happen here will not be reported back to the process that
-	 * requested that this worker be launched.
+	 * Now we can find and attach to the error queue provided for us.  That's
+	 * good, because until we do that, any errors that happen here will not be
+	 * reported back to the process that requested that this worker be
+	 * launched.
 	 */
 	error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
 	mq = (shm_mq *) (error_queue_space +
@@ -1134,9 +1223,6 @@ ParallelWorkerMain(Datum main_arg)
 	SetTempNamespaceState(fps->temp_namespace_id,
 						  fps->temp_toast_namespace_id);
 
-	/* Set ParallelMasterBackendId so we know how to address temp relations. */
-	ParallelMasterBackendId = fps->parallel_master_backend_id;
-
 	/*
 	 * We've initialized all of our state now; nothing should change
 	 * hereafter.
@@ -1182,6 +1268,20 @@ ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
 }
 
 /*
+ * Make sure the leader tries to read from our error queue one more time.
+ * This guards against the case where we exit uncleanly without sending an
+ * ErrorResponse to the leader, for example because some code calls proc_exit
+ * directly.
+ */
+static void
+ParallelWorkerShutdown(int code, Datum arg)
+{
+	SendProcSignal(ParallelMasterPid,
+				   PROCSIG_PARALLEL_MESSAGE,
+				   ParallelMasterBackendId);
+}
+
+/*
  * Look up (and possibly load) a parallel worker entry point function.
  *
  * For functions contained in the core code, we use library name "postgres"
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 8c6a747ced..32c2e32bea 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -43,6 +43,7 @@ typedef struct ParallelContext
 	void	   *private_memory;
 	shm_toc    *toc;
 	ParallelWorkerInfo *worker;
+	bool	   *any_message_received;
 } ParallelContext;
 
 typedef struct ParallelWorkerContext
