Hi,

I found one crash in pg_restore, this occurs when there is a failure before
all the child workers are created. Back trace for the same is given below:
#0  0x00007f9c6d31e337 in raise () from /lib64/libc.so.6
#1  0x00007f9c6d31fa28 in abort () from /lib64/libc.so.6
#2  0x00007f9c6d317156 in __assert_fail_base () from /lib64/libc.so.6
#3  0x00007f9c6d317202 in __assert_fail () from /lib64/libc.so.6
#4  0x0000000000407c9e in WaitForTerminatingWorkers (pstate=0x14af7f0) at
parallel.c:515
#5  0x0000000000407bf9 in ShutdownWorkersHard (pstate=0x14af7f0) at
parallel.c:451
#6  0x0000000000407ae9 in archive_close_connection (code=1, arg=0x6315a0
<shutdown_info>) at parallel.c:368
#7  0x000000000041a7c7 in exit_nicely (code=1) at pg_backup_utils.c:99
#8  0x0000000000408180 in ParallelBackupStart (AH=0x14972e0) at
parallel.c:967
#9  0x000000000040a3dd in RestoreArchive (AHX=0x14972e0) at
pg_backup_archiver.c:661
#10 0x0000000000404125 in main (argc=6, argv=0x7ffd5146f308) at
pg_restore.c:443

The problem is like:

   - The variable pstate->numWorkers is being set with the number of
   workers initially in ParallelBackupStart.
   - Then the workers are created one by one.
   - Before creating all the process there is a failure.
   - Then the parent terminates the child process and waits for all the
   child process to get terminated.
   - This function WaitForTerminatingWorkers checks if all process is
   terminated by calling HasEveryWorkerTerminated.
   - HasEveryWorkerTerminated will always return false because it will
   check for the numWorkers rather than the actual forked process count and
   hits the next assert "Assert(j < pstate->numWorkers);".


Attached patch has the fix for the same. Fixed it by setting
pstate->numWorkers with the actual worker count when the child process is
being created.

Thoughts?

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
From 5cded66879e74f6351b44856eef2d66fab172e95 Mon Sep 17 00:00:00 2001
From: Vignesh C<vignesh21@gmail.com>
Date: Wed, 1 Jan 2020 08:48:44 +0530
Subject: [PATCH] pg_restore crash when there is a failure before all parallel
 workers are created.

There is a possibility that there can be a failure before all the parallel
workers are created, pstate->numWorkers is being set with the number of
workers initially even before the fork process is successful. In error case
while trying to terminate and wait for all the child process, it will try to
wait for the initial value instead of waiting for the actual child process.
Fixed it by setting pstate->numWorkers with the actual forked child processes.
---
 src/bin/pg_dump/parallel.c | 21 ++++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)

diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 24239fa..88d8ee2 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -907,22 +907,22 @@ ParallelBackupStart(ArchiveHandle *AH)
 {
 	ParallelState *pstate;
 	int			i;
+	int numWorkers = AH->public.numWorkers;
 
-	Assert(AH->public.numWorkers > 0);
+	Assert(numWorkers > 0);
 
 	pstate = (ParallelState *) pg_malloc(sizeof(ParallelState));
 
-	pstate->numWorkers = AH->public.numWorkers;
+	pstate->numWorkers = (numWorkers == 1) ? numWorkers : 0;
 	pstate->te = NULL;
 	pstate->parallelSlot = NULL;
 
-	if (AH->public.numWorkers == 1)
+	if (numWorkers == 1)
 		return pstate;
 
-	pstate->te = (TocEntry **)
-		pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
+	pstate->te = (TocEntry **) pg_malloc0(numWorkers * sizeof(TocEntry *));
 	pstate->parallelSlot = (ParallelSlot *)
-		pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot));
+		pg_malloc0(numWorkers * sizeof(ParallelSlot));
 
 #ifdef WIN32
 	/* Make fmtId() and fmtQualifiedId() use thread-local storage */
@@ -950,7 +950,7 @@ ParallelBackupStart(ArchiveHandle *AH)
 	fflush(NULL);
 
 	/* Create desired number of workers */
-	for (i = 0; i < pstate->numWorkers; i++)
+	for (i = 0; i < numWorkers; i++)
 	{
 #ifdef WIN32
 		WorkerInfo *wi;
@@ -980,6 +980,11 @@ ParallelBackupStart(ArchiveHandle *AH)
 		slot->pipeRevRead = pipeMW[PIPE_READ];
 		slot->pipeRevWrite = pipeWM[PIPE_WRITE];
 
+		/*
+		 * Number of workers need to be increased before fork as the workers
+		 * will be using numWorkers to iterate and identify their slot.
+		 */
+		pstate->numWorkers++;
 #ifdef WIN32
 		/* Create transient structure to pass args to worker function */
 		wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
@@ -1026,6 +1031,8 @@ ParallelBackupStart(ArchiveHandle *AH)
 		}
 		else if (pid < 0)
 		{
+			pstate->numWorkers--;
+
 			/* fork failed */
 			fatal("could not create worker process: %m");
 		}
-- 
1.8.3.1

Reply via email to