From 60c887db70cfb900eef8c5131eb1bb3796d44444 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Thu, 19 Sep 2019 13:56:23 -0400
Subject: [PATCH 1/2] Refactor some pg_basebackup code.

Reduce code duplication and eliminate weird macro tricks.
---
 src/bin/pg_basebackup/pg_basebackup.c | 1004 +++++++++++++------------
 1 file changed, 507 insertions(+), 497 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 7986872f10..f7f6aad0f4 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -56,6 +56,40 @@ typedef struct TablespaceList
 	TablespaceListCell *tail;
 } TablespaceList;
 
+typedef struct WriteTarState
+{
+	int			tablespacenum;
+	char		filename[MAXPGPATH];
+	FILE	   *tarfile;
+	char		tarhdr[512];
+	bool		basetablespace;
+	bool		in_tarhdr;
+	bool		skip_file;
+	bool		is_recovery_guc_supported;
+	bool		is_postgresql_auto_conf;
+	bool		found_postgresql_auto_conf;
+	int			file_padding_len;
+	size_t		tarhdrsz;
+	pgoff_t		filesz;
+#ifdef HAVE_LIBZ
+	gzFile		ztarfile;
+#endif
+}			WriteTarState;
+
+typedef struct UnpackTarState
+{
+	int			tablespacenum;
+	char		current_path[MAXPGPATH];
+	char		filename[MAXPGPATH];
+	const char *mapped_tblspc_path;
+	pgoff_t		current_len_left;
+	int			current_padding;
+	FILE	   *file;
+}			UnpackTarState;
+
+typedef void (*WriteDataCallback) (size_t nbytes, char *buf,
+								   void *callback_data);
+
 /*
  * pg_xlog has been renamed to pg_wal in version 10.  This version number
  * should be compared with PQserverVersion().
@@ -146,7 +180,10 @@ static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *fo
 static void progress_report(int tablespacenum, const char *filename, bool force);
 
 static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
+static void ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data);
 static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
+static void ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf,
+										 void *callback_data);
 static void GenerateRecoveryConf(PGconn *conn);
 static void WriteRecoveryConf(void);
 static void BaseBackup(void);
@@ -879,43 +916,79 @@ parse_max_rate(char *src)
 	return (int32) result;
 }
 
+/*
+ * Read a stream of COPY data and invoke the provided callback for each
+ * chunk.
+ */
+static void
+ReceiveCopyData(PGconn *conn, WriteDataCallback callback,
+				void *callback_data)
+{
+	PGresult   *res;
+
+	/* Get the COPY data stream. */
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COPY_OUT)
+	{
+		pg_log_error("could not get COPY data stream: %s",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+	PQclear(res);
+
+	/* Loop over chunks until done. */
+	while (1)
+	{
+		int			r;
+		char	   *copybuf;
+
+		r = PQgetCopyData(conn, &copybuf, 0);
+		if (r == -1)
+		{
+			/* End of chunk. */
+			break;
+		}
+		else if (r == -2)
+		{
+			pg_log_error("could not read COPY data: %s",
+						 PQerrorMessage(conn));
+			exit(1);
+		}
+
+		(*callback) (r, copybuf, callback_data);
+
+		PQfreemem(copybuf);
+	}
+}
+
 /*
  * Write a piece of tar data
  */
 static void
-writeTarData(
-#ifdef HAVE_LIBZ
-			 gzFile ztarfile,
-#endif
-			 FILE *tarfile, char *buf, int r, char *current_file)
+writeTarData(WriteTarState * state, char *buf, int r)
 {
 #ifdef HAVE_LIBZ
-	if (ztarfile != NULL)
+	if (state->ztarfile != NULL)
 	{
-		if (gzwrite(ztarfile, buf, r) != r)
+		if (gzwrite(state->ztarfile, buf, r) != r)
 		{
 			pg_log_error("could not write to compressed file \"%s\": %s",
-						 current_file, get_gz_error(ztarfile));
+						 state->filename, get_gz_error(state->ztarfile));
 			exit(1);
 		}
 	}
 	else
 #endif
 	{
-		if (fwrite(buf, r, 1, tarfile) != 1)
+		if (fwrite(buf, r, 1, state->tarfile) != 1)
 		{
-			pg_log_error("could not write to file \"%s\": %m", current_file);
+			pg_log_error("could not write to file \"%s\": %m",
+						 state->filename);
 			exit(1);
 		}
 	}
 }
 
-#ifdef HAVE_LIBZ
-#define WRITE_TAR_DATA(buf, sz) writeTarData(ztarfile, tarfile, buf, sz, filename)
-#else
-#define WRITE_TAR_DATA(buf, sz) writeTarData(tarfile, buf, sz, filename)
-#endif
-
 /*
  * Receive a tar format file from the connection to the server, and write
  * the data from this file directly into a tar file. If compression is
@@ -929,29 +1002,19 @@ writeTarData(
 static void
 ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 {
-	char		filename[MAXPGPATH];
-	char	   *copybuf = NULL;
-	FILE	   *tarfile = NULL;
-	char		tarhdr[512];
-	bool		basetablespace = PQgetisnull(res, rownum, 0);
-	bool		in_tarhdr = true;
-	bool		skip_file = false;
-	bool		is_recovery_guc_supported = true;
-	bool		is_postgresql_auto_conf = false;
-	bool		found_postgresql_auto_conf = false;
-	int			file_padding_len = 0;
-	size_t		tarhdrsz = 0;
-	pgoff_t		filesz = 0;
+	char		zerobuf[1024];
+	WriteTarState state;
 
-#ifdef HAVE_LIBZ
-	gzFile		ztarfile = NULL;
-#endif
+	memset(&state, 0, sizeof(state));
+	state.tablespacenum = rownum;
+	state.basetablespace = PQgetisnull(res, rownum, 0);
+	state.in_tarhdr = true;
 
 	/* recovery.conf is integrated into postgresql.conf in 12 and newer */
-	if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_RECOVERY_GUC)
-		is_recovery_guc_supported = false;
+	if (PQserverVersion(conn) >= MINIMUM_VERSION_FOR_RECOVERY_GUC)
+		state.is_recovery_guc_supported = true;
 
-	if (basetablespace)
+	if (state.basetablespace)
 	{
 		/*
 		 * Base tablespaces
@@ -965,40 +1028,42 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 #ifdef HAVE_LIBZ
 			if (compresslevel != 0)
 			{
-				ztarfile = gzdopen(dup(fileno(stdout)), "wb");
-				if (gzsetparams(ztarfile, compresslevel,
+				state.ztarfile = gzdopen(dup(fileno(stdout)), "wb");
+				if (gzsetparams(state.ztarfile, compresslevel,
 								Z_DEFAULT_STRATEGY) != Z_OK)
 				{
 					pg_log_error("could not set compression level %d: %s",
-								 compresslevel, get_gz_error(ztarfile));
+								 compresslevel, get_gz_error(state.ztarfile));
 					exit(1);
 				}
 			}
 			else
 #endif
-				tarfile = stdout;
-			strcpy(filename, "-");
+				state.tarfile = stdout;
+			strcpy(state.filename, "-");
 		}
 		else
 		{
 #ifdef HAVE_LIBZ
 			if (compresslevel != 0)
 			{
-				snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir);
-				ztarfile = gzopen(filename, "wb");
-				if (gzsetparams(ztarfile, compresslevel,
+				snprintf(state.filename, sizeof(state.filename),
+						 "%s/base.tar.gz", basedir);
+				state.ztarfile = gzopen(state.filename, "wb");
+				if (gzsetparams(state.ztarfile, compresslevel,
 								Z_DEFAULT_STRATEGY) != Z_OK)
 				{
 					pg_log_error("could not set compression level %d: %s",
-								 compresslevel, get_gz_error(ztarfile));
+								 compresslevel, get_gz_error(state.ztarfile));
 					exit(1);
 				}
 			}
 			else
 #endif
 			{
-				snprintf(filename, sizeof(filename), "%s/base.tar", basedir);
-				tarfile = fopen(filename, "wb");
+				snprintf(state.filename, sizeof(state.filename),
+						 "%s/base.tar", basedir);
+				state.tarfile = fopen(state.filename, "wb");
 			}
 		}
 	}
@@ -1010,34 +1075,35 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 #ifdef HAVE_LIBZ
 		if (compresslevel != 0)
 		{
-			snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir,
-					 PQgetvalue(res, rownum, 0));
-			ztarfile = gzopen(filename, "wb");
-			if (gzsetparams(ztarfile, compresslevel,
+			snprintf(state.filename, sizeof(state.filename),
+					 "%s/%s.tar.gz",
+					 basedir, PQgetvalue(res, rownum, 0));
+			state.ztarfile = gzopen(state.filename, "wb");
+			if (gzsetparams(state.ztarfile, compresslevel,
 							Z_DEFAULT_STRATEGY) != Z_OK)
 			{
 				pg_log_error("could not set compression level %d: %s",
-							 compresslevel, get_gz_error(ztarfile));
+							 compresslevel, get_gz_error(state.ztarfile));
 				exit(1);
 			}
 		}
 		else
 #endif
 		{
-			snprintf(filename, sizeof(filename), "%s/%s.tar", basedir,
-					 PQgetvalue(res, rownum, 0));
-			tarfile = fopen(filename, "wb");
+			snprintf(state.filename, sizeof(state.filename), "%s/%s.tar",
+					 basedir, PQgetvalue(res, rownum, 0));
+			state.tarfile = fopen(state.filename, "wb");
 		}
 	}
 
 #ifdef HAVE_LIBZ
 	if (compresslevel != 0)
 	{
-		if (!ztarfile)
+		if (!state.ztarfile)
 		{
 			/* Compression is in use */
 			pg_log_error("could not create compressed file \"%s\": %s",
-						 filename, get_gz_error(ztarfile));
+						 state.filename, get_gz_error(state.ztarfile));
 			exit(1);
 		}
 	}
@@ -1045,314 +1111,292 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 #endif
 	{
 		/* Either no zlib support, or zlib support but compresslevel = 0 */
-		if (!tarfile)
+		if (!state.tarfile)
 		{
-			pg_log_error("could not create file \"%s\": %m", filename);
+			pg_log_error("could not create file \"%s\": %m", state.filename);
 			exit(1);
 		}
 	}
 
+	ReceiveCopyData(conn, ReceiveTarCopyChunk, &state);
+
 	/*
-	 * Get the COPY data stream
+	 * End of copy data. If requested, and this is the base tablespace, write
+	 * configuration file into the tarfile. When done, close the file (but not
+	 * stdout).
+	 *
+	 * Also, write two completely empty blocks at the end of the tar file, as
+	 * required by some tar programs.
 	 */
-	res = PQgetResult(conn);
-	if (PQresultStatus(res) != PGRES_COPY_OUT)
-	{
-		pg_log_error("could not get COPY data stream: %s",
-					 PQerrorMessage(conn));
-		exit(1);
-	}
 
-	while (1)
+	MemSet(zerobuf, 0, sizeof(zerobuf));
+
+	if (state.basetablespace && writerecoveryconf)
 	{
-		int			r;
+		char		header[512];
 
-		if (copybuf != NULL)
+		/*
+		 * If postgresql.auto.conf has not been found in the streamed data,
+		 * add recovery configuration to postgresql.auto.conf if recovery
+		 * parameters are GUCs.  If the instance connected to is older than
+		 * 12, create recovery.conf with this data otherwise.
+		 */
+		if (!state.found_postgresql_auto_conf || !state.is_recovery_guc_supported)
 		{
-			PQfreemem(copybuf);
-			copybuf = NULL;
+			int			padding;
+
+			tarCreateHeader(header,
+							state.is_recovery_guc_supported ? "postgresql.auto.conf" : "recovery.conf",
+							NULL,
+							recoveryconfcontents->len,
+							pg_file_create_mode, 04000, 02000,
+							time(NULL));
+
+			padding = ((recoveryconfcontents->len + 511) & ~511) - recoveryconfcontents->len;
+
+			writeTarData(&state, header, sizeof(header));
+			writeTarData(&state, recoveryconfcontents->data,
+						 recoveryconfcontents->len);
+			if (padding)
+				writeTarData(&state, zerobuf, padding);
 		}
 
-		r = PQgetCopyData(conn, &copybuf, 0);
-		if (r == -1)
+		/*
+		 * standby.signal is supported only if recovery parameters are GUCs.
+		 */
+		if (state.is_recovery_guc_supported)
 		{
-			/*
-			 * End of chunk. If requested, and this is the base tablespace,
-			 * write configuration file into the tarfile. When done, close the
-			 * file (but not stdout).
-			 *
-			 * Also, write two completely empty blocks at the end of the tar
-			 * file, as required by some tar programs.
-			 */
-			char		zerobuf[1024];
+			tarCreateHeader(header, "standby.signal", NULL,
+							0,	/* zero-length file */
+							pg_file_create_mode, 04000, 02000,
+							time(NULL));
 
-			MemSet(zerobuf, 0, sizeof(zerobuf));
+			writeTarData(&state, header, sizeof(header));
+			writeTarData(&state, zerobuf, 511);
+		}
+	}
 
-			if (basetablespace && writerecoveryconf)
+	/* 2 * 512 bytes empty data at end of file */
+	writeTarData(&state, zerobuf, sizeof(zerobuf));
+
+#ifdef HAVE_LIBZ
+	if (state.ztarfile != NULL)
+	{
+		if (gzclose(state.ztarfile) != 0)
+		{
+			pg_log_error("could not close compressed file \"%s\": %s",
+						 state.filename, get_gz_error(state.ztarfile));
+			exit(1);
+		}
+	}
+	else
+#endif
+	{
+		if (strcmp(basedir, "-") != 0)
+		{
+			if (fclose(state.tarfile) != 0)
 			{
-				char		header[512];
+				pg_log_error("could not close file \"%s\": %m",
+							 state.filename);
+				exit(1);
+			}
+		}
+	}
 
-				/*
-				 * If postgresql.auto.conf has not been found in the streamed
-				 * data, add recovery configuration to postgresql.auto.conf if
-				 * recovery parameters are GUCs.  If the instance connected to
-				 * is older than 12, create recovery.conf with this data
-				 * otherwise.
-				 */
-				if (!found_postgresql_auto_conf || !is_recovery_guc_supported)
-				{
-					int			padding;
+	progress_report(rownum, state.filename, true);
 
-					tarCreateHeader(header,
-									is_recovery_guc_supported ? "postgresql.auto.conf" : "recovery.conf",
-									NULL,
-									recoveryconfcontents->len,
-									pg_file_create_mode, 04000, 02000,
-									time(NULL));
+	/*
+	 * Do not sync the resulting tar file yet, all files are synced once at
+	 * the end.
+	 */
+}
 
-					padding = ((recoveryconfcontents->len + 511) & ~511) - recoveryconfcontents->len;
+/*
+ * Receive one chunk of tar-format data from the server.
+ */
+static void
+ReceiveTarCopyChunk(size_t r, char *copybuf, void *callback_data)
+{
+	WriteTarState *state = callback_data;
 
-					WRITE_TAR_DATA(header, sizeof(header));
-					WRITE_TAR_DATA(recoveryconfcontents->data,
-								   recoveryconfcontents->len);
-					if (padding)
-						WRITE_TAR_DATA(zerobuf, padding);
-				}
+	if (!writerecoveryconf || !state->basetablespace)
+	{
+		/*
+		 * When not writing config file, or when not working on the base
+		 * tablespace, we never have to look for an existing configuration
+		 * file in the stream.
+		 */
+		writeTarData(state, copybuf, r);
+	}
+	else
+	{
+		/*
+		 * Look for a config file in the existing tar stream. If it's there,
+		 * we must skip it so we can later overwrite it with our own version
+		 * of the file.
+		 *
+		 * To do this, we have to process the individual files inside the TAR
+		 * stream. The stream consists of a header and zero or more chunks,
+		 * all 512 bytes long. The stream from the server is broken up into
+		 * smaller pieces, so we have to track the size of the files to find
+		 * the next header structure.
+		 */
+		int			rr = r;
+		int			pos = 0;
 
+		while (rr > 0)
+		{
+			if (state->in_tarhdr)
+			{
 				/*
-				 * standby.signal is supported only if recovery parameters are
-				 * GUCs.
+				 * We're currently reading a header structure inside the TAR
+				 * stream, i.e. the file metadata.
 				 */
-				if (is_recovery_guc_supported)
+				if (state->tarhdrsz < 512)
 				{
-					tarCreateHeader(header, "standby.signal", NULL,
-									0,	/* zero-length file */
-									pg_file_create_mode, 04000, 02000,
-									time(NULL));
+					/*
+					 * Copy the header structure into tarhdr in case the
+					 * header is not aligned to 512 bytes or it's not returned
+					 * in whole by the last PQgetCopyData call.
+					 */
+					int			hdrleft;
+					int			bytes2copy;
 
-					WRITE_TAR_DATA(header, sizeof(header));
-					WRITE_TAR_DATA(zerobuf, 511);
-				}
-			}
+					hdrleft = 512 - state->tarhdrsz;
+					bytes2copy = (rr > hdrleft ? hdrleft : rr);
 
-			/* 2 * 512 bytes empty data at end of file */
-			WRITE_TAR_DATA(zerobuf, sizeof(zerobuf));
+					memcpy(&state->tarhdr[state->tarhdrsz], copybuf + pos,
+						   bytes2copy);
 
-#ifdef HAVE_LIBZ
-			if (ztarfile != NULL)
-			{
-				if (gzclose(ztarfile) != 0)
-				{
-					pg_log_error("could not close compressed file \"%s\": %s",
-								 filename, get_gz_error(ztarfile));
-					exit(1);
-				}
-			}
-			else
-#endif
-			{
-				if (strcmp(basedir, "-") != 0)
-				{
-					if (fclose(tarfile) != 0)
-					{
-						pg_log_error("could not close file \"%s\": %m",
-									 filename);
-						exit(1);
-					}
+					rr -= bytes2copy;
+					pos += bytes2copy;
+					state->tarhdrsz += bytes2copy;
 				}
-			}
-
-			break;
-		}
-		else if (r == -2)
-		{
-			pg_log_error("could not read COPY data: %s",
-						 PQerrorMessage(conn));
-			exit(1);
-		}
-
-		if (!writerecoveryconf || !basetablespace)
-		{
-			/*
-			 * When not writing config file, or when not working on the base
-			 * tablespace, we never have to look for an existing configuration
-			 * file in the stream.
-			 */
-			WRITE_TAR_DATA(copybuf, r);
-		}
-		else
-		{
-			/*
-			 * Look for a config file in the existing tar stream. If it's
-			 * there, we must skip it so we can later overwrite it with our
-			 * own version of the file.
-			 *
-			 * To do this, we have to process the individual files inside the
-			 * TAR stream. The stream consists of a header and zero or more
-			 * chunks, all 512 bytes long. The stream from the server is
-			 * broken up into smaller pieces, so we have to track the size of
-			 * the files to find the next header structure.
-			 */
-			int			rr = r;
-			int			pos = 0;
-
-			while (rr > 0)
-			{
-				if (in_tarhdr)
+				else
 				{
 					/*
-					 * We're currently reading a header structure inside the
-					 * TAR stream, i.e. the file metadata.
+					 * We have the complete header structure in tarhdr, look
+					 * at the file metadata: we may want append recovery info
+					 * into postgresql.auto.conf and skip standby.signal file
+					 * if recovery parameters are integrated as GUCs, and
+					 * recovery.conf otherwise. In both cases we must
+					 * calculate tar padding.
 					 */
-					if (tarhdrsz < 512)
+					if (state->is_recovery_guc_supported)
 					{
-						/*
-						 * Copy the header structure into tarhdr in case the
-						 * header is not aligned to 512 bytes or it's not
-						 * returned in whole by the last PQgetCopyData call.
-						 */
-						int			hdrleft;
-						int			bytes2copy;
-
-						hdrleft = 512 - tarhdrsz;
-						bytes2copy = (rr > hdrleft ? hdrleft : rr);
-
-						memcpy(&tarhdr[tarhdrsz], copybuf + pos, bytes2copy);
-
-						rr -= bytes2copy;
-						pos += bytes2copy;
-						tarhdrsz += bytes2copy;
+						state->skip_file =
+							(strcmp(&state->tarhdr[0], "standby.signal") == 0);
+						state->is_postgresql_auto_conf =
+							(strcmp(&state->tarhdr[0], "postgresql.auto.conf") == 0);
 					}
 					else
-					{
-						/*
-						 * We have the complete header structure in tarhdr,
-						 * look at the file metadata: we may want append
-						 * recovery info into postgresql.auto.conf and skip
-						 * standby.signal file if recovery parameters are
-						 * integrated as GUCs, and recovery.conf otherwise. In
-						 * both cases we must calculate tar padding.
-						 */
-						if (is_recovery_guc_supported)
-						{
-							skip_file = (strcmp(&tarhdr[0], "standby.signal") == 0);
-							is_postgresql_auto_conf = (strcmp(&tarhdr[0], "postgresql.auto.conf") == 0);
-						}
-						else
-							skip_file = (strcmp(&tarhdr[0], "recovery.conf") == 0);
+						state->skip_file =
+							(strcmp(&state->tarhdr[0], "recovery.conf") == 0);
 
-						filesz = read_tar_number(&tarhdr[124], 12);
-						file_padding_len = ((filesz + 511) & ~511) - filesz;
+					state->filesz = read_tar_number(&state->tarhdr[124], 12);
+					state->file_padding_len =
+						((state->filesz + 511) & ~511) - state->filesz;
 
-						if (is_recovery_guc_supported &&
-							is_postgresql_auto_conf &&
-							writerecoveryconf)
-						{
-							/* replace tar header */
-							char		header[512];
+					if (state->is_recovery_guc_supported &&
+						state->is_postgresql_auto_conf &&
+						writerecoveryconf)
+					{
+						/* replace tar header */
+						char		header[512];
 
-							tarCreateHeader(header, "postgresql.auto.conf", NULL,
-											filesz + recoveryconfcontents->len,
-											pg_file_create_mode, 04000, 02000,
-											time(NULL));
+						tarCreateHeader(header, "postgresql.auto.conf", NULL,
+										state->filesz + recoveryconfcontents->len,
+										pg_file_create_mode, 04000, 02000,
+										time(NULL));
 
-							WRITE_TAR_DATA(header, sizeof(header));
-						}
-						else
+						writeTarData(state, header, sizeof(header));
+					}
+					else
+					{
+						/* copy stream with padding */
+						state->filesz += state->file_padding_len;
+
+						if (!state->skip_file)
 						{
-							/* copy stream with padding */
-							filesz += file_padding_len;
-
-							if (!skip_file)
-							{
-								/*
-								 * If we're not skipping the file, write the
-								 * tar header unmodified.
-								 */
-								WRITE_TAR_DATA(tarhdr, 512);
-							}
+							/*
+							 * If we're not skipping the file, write the tar
+							 * header unmodified.
+							 */
+							writeTarData(state, state->tarhdr, 512);
 						}
-
-						/* Next part is the file, not the header */
-						in_tarhdr = false;
 					}
+
+					/* Next part is the file, not the header */
+					state->in_tarhdr = false;
 				}
-				else
+			}
+			else
+			{
+				/*
+				 * We're processing a file's contents.
+				 */
+				if (state->filesz > 0)
 				{
 					/*
-					 * We're processing a file's contents.
+					 * We still have data to read (and possibly write).
 					 */
-					if (filesz > 0)
-					{
-						/*
-						 * We still have data to read (and possibly write).
-						 */
-						int			bytes2write;
+					int			bytes2write;
 
-						bytes2write = (filesz > rr ? rr : filesz);
+					bytes2write = (state->filesz > rr ? rr : state->filesz);
 
-						if (!skip_file)
-							WRITE_TAR_DATA(copybuf + pos, bytes2write);
+					if (!state->skip_file)
+						writeTarData(state, copybuf + pos, bytes2write);
 
-						rr -= bytes2write;
-						pos += bytes2write;
-						filesz -= bytes2write;
-					}
-					else if (is_recovery_guc_supported &&
-							 is_postgresql_auto_conf &&
-							 writerecoveryconf)
-					{
-						/* append recovery config to postgresql.auto.conf */
-						int			padding;
-						int			tailsize;
+					rr -= bytes2write;
+					pos += bytes2write;
+					state->filesz -= bytes2write;
+				}
+				else if (state->is_recovery_guc_supported &&
+						 state->is_postgresql_auto_conf &&
+						 writerecoveryconf)
+				{
+					/* append recovery config to postgresql.auto.conf */
+					int			padding;
+					int			tailsize;
 
-						tailsize = (512 - file_padding_len) + recoveryconfcontents->len;
-						padding = ((tailsize + 511) & ~511) - tailsize;
+					tailsize = (512 - state->file_padding_len) + recoveryconfcontents->len;
+					padding = ((tailsize + 511) & ~511) - tailsize;
 
-						WRITE_TAR_DATA(recoveryconfcontents->data, recoveryconfcontents->len);
+					writeTarData(state, recoveryconfcontents->data,
+								 recoveryconfcontents->len);
 
-						if (padding)
-						{
-							char		zerobuf[512];
+					if (padding)
+					{
+						char		zerobuf[512];
 
-							MemSet(zerobuf, 0, sizeof(zerobuf));
-							WRITE_TAR_DATA(zerobuf, padding);
-						}
+						MemSet(zerobuf, 0, sizeof(zerobuf));
+						writeTarData(state, zerobuf, padding);
+					}
 
-						/* skip original file padding */
-						is_postgresql_auto_conf = false;
-						skip_file = true;
-						filesz += file_padding_len;
+					/* skip original file padding */
+					state->is_postgresql_auto_conf = false;
+					state->skip_file = true;
+					state->filesz += state->file_padding_len;
 
-						found_postgresql_auto_conf = true;
-					}
-					else
-					{
-						/*
-						 * No more data in the current file, the next piece of
-						 * data (if any) will be a new file header structure.
-						 */
-						in_tarhdr = true;
-						skip_file = false;
-						is_postgresql_auto_conf = false;
-						tarhdrsz = 0;
-						filesz = 0;
-					}
+					state->found_postgresql_auto_conf = true;
+				}
+				else
+				{
+					/*
+					 * No more data in the current file, the next piece of
+					 * data (if any) will be a new file header structure.
+					 */
+					state->in_tarhdr = true;
+					state->skip_file = false;
+					state->is_postgresql_auto_conf = false;
+					state->tarhdrsz = 0;
+					state->filesz = 0;
 				}
 			}
 		}
-		totaldone += r;
-		progress_report(rownum, filename, false);
-	}							/* while (1) */
-	progress_report(rownum, filename, true);
-
-	if (copybuf != NULL)
-		PQfreemem(copybuf);
-
-	/*
-	 * Do not sync the resulting tar file yet, all files are synced once at
-	 * the end.
-	 */
+	}
+	totaldone += r;
+	progress_report(state->tablespacenum, state->filename, false);
 }
 
 
@@ -1390,251 +1434,217 @@ get_tablespace_mapping(const char *dir)
 static void
 ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 {
-	char		current_path[MAXPGPATH];
-	char		filename[MAXPGPATH];
-	const char *mapped_tblspc_path;
-	pgoff_t		current_len_left = 0;
-	int			current_padding = 0;
+	UnpackTarState state;
 	bool		basetablespace;
-	char	   *copybuf = NULL;
-	FILE	   *file = NULL;
+
+	memset(&state, 0, sizeof(state));
+	state.tablespacenum = rownum;
 
 	basetablespace = PQgetisnull(res, rownum, 0);
 	if (basetablespace)
-		strlcpy(current_path, basedir, sizeof(current_path));
+		strlcpy(state.current_path, basedir, sizeof(state.current_path));
 	else
-		strlcpy(current_path,
+		strlcpy(state.current_path,
 				get_tablespace_mapping(PQgetvalue(res, rownum, 1)),
-				sizeof(current_path));
+				sizeof(state.current_path));
 
-	/*
-	 * Get the COPY data
-	 */
-	res = PQgetResult(conn);
-	if (PQresultStatus(res) != PGRES_COPY_OUT)
+	ReceiveCopyData(conn, ReceiveTarAndUnpackCopyChunk, &state);
+
+
+	if (state.file)
+		fclose(state.file);
+
+	progress_report(rownum, state.filename, true);
+
+	if (state.file != NULL)
 	{
-		pg_log_error("could not get COPY data stream: %s",
-					 PQerrorMessage(conn));
+		pg_log_error("COPY stream ended before last file was finished");
 		exit(1);
 	}
 
-	while (1)
-	{
-		int			r;
+	if (basetablespace && writerecoveryconf)
+		WriteRecoveryConf();
 
-		if (copybuf != NULL)
-		{
-			PQfreemem(copybuf);
-			copybuf = NULL;
-		}
+	/*
+	 * No data is synced here, everything is done for all tablespaces at the
+	 * end.
+	 */
+}
 
-		r = PQgetCopyData(conn, &copybuf, 0);
+static void
+ReceiveTarAndUnpackCopyChunk(size_t r, char *copybuf, void *callback_data)
+{
+	UnpackTarState *state = callback_data;
 
-		if (r == -1)
-		{
-			/*
-			 * End of chunk
-			 */
-			if (file)
-				fclose(file);
+	if (state->file == NULL)
+	{
+#ifndef WIN32
+		int			filemode;
+#endif
 
-			break;
-		}
-		else if (r == -2)
+		/*
+		 * No current file, so this must be the header for a new file
+		 */
+		if (r != 512)
 		{
-			pg_log_error("could not read COPY data: %s",
-						 PQerrorMessage(conn));
+			pg_log_error("invalid tar block header size: %zu", r);
 			exit(1);
 		}
+		totaldone += 512;
 
-		if (file == NULL)
-		{
-#ifndef WIN32
-			int			filemode;
-#endif
-
-			/*
-			 * No current file, so this must be the header for a new file
-			 */
-			if (r != 512)
-			{
-				pg_log_error("invalid tar block header size: %d", r);
-				exit(1);
-			}
-			totaldone += 512;
-
-			current_len_left = read_tar_number(&copybuf[124], 12);
+		state->current_len_left = read_tar_number(&copybuf[124], 12);
 
 #ifndef WIN32
-			/* Set permissions on the file */
-			filemode = read_tar_number(&copybuf[100], 8);
+		/* Set permissions on the file */
+		filemode = read_tar_number(&copybuf[100], 8);
 #endif
 
-			/*
-			 * All files are padded up to 512 bytes
-			 */
-			current_padding =
-				((current_len_left + 511) & ~511) - current_len_left;
+		/*
+		 * All files are padded up to 512 bytes
+		 */
+		state->current_padding =
+			((state->current_len_left + 511) & ~511) - state->current_len_left;
 
+		/*
+		 * First part of header is zero terminated filename
+		 */
+		snprintf(state->filename, sizeof(state->filename),
+				 "%s/%s", state->current_path, copybuf);
+		if (state->filename[strlen(state->filename) - 1] == '/')
+		{
 			/*
-			 * First part of header is zero terminated filename
+			 * Ends in a slash means directory or symlink to directory
 			 */
-			snprintf(filename, sizeof(filename), "%s/%s", current_path,
-					 copybuf);
-			if (filename[strlen(filename) - 1] == '/')
+			if (copybuf[156] == '5')
 			{
 				/*
-				 * Ends in a slash means directory or symlink to directory
+				 * Directory. Remove trailing slash first.
 				 */
-				if (copybuf[156] == '5')
+				state->filename[strlen(state->filename) - 1] = '\0';
+				if (mkdir(state->filename, pg_dir_create_mode) != 0)
 				{
 					/*
-					 * Directory
+					 * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
+					 * clusters) will have been created by the wal receiver
+					 * process. Also, when the WAL directory location was
+					 * specified, pg_wal (or pg_xlog) has already been created
+					 * as a symbolic link before starting the actual backup.
+					 * So just ignore creation failures on related
+					 * directories.
 					 */
-					filename[strlen(filename) - 1] = '\0';	/* Remove trailing slash */
-					if (mkdir(filename, pg_dir_create_mode) != 0)
+					if (!((pg_str_endswith(state->filename, "/pg_wal") ||
+						   pg_str_endswith(state->filename, "/pg_xlog") ||
+						   pg_str_endswith(state->filename, "/archive_status")) &&
+						  errno == EEXIST))
 					{
-						/*
-						 * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
-						 * clusters) will have been created by the wal
-						 * receiver process. Also, when the WAL directory
-						 * location was specified, pg_wal (or pg_xlog) has
-						 * already been created as a symbolic link before
-						 * starting the actual backup. So just ignore creation
-						 * failures on related directories.
-						 */
-						if (!((pg_str_endswith(filename, "/pg_wal") ||
-							   pg_str_endswith(filename, "/pg_xlog") ||
-							   pg_str_endswith(filename, "/archive_status")) &&
-							  errno == EEXIST))
-						{
-							pg_log_error("could not create directory \"%s\": %m",
-										 filename);
-							exit(1);
-						}
+						pg_log_error("could not create directory \"%s\": %m",
+									 state->filename);
+						exit(1);
 					}
+				}
 #ifndef WIN32
-					if (chmod(filename, (mode_t) filemode))
-						pg_log_error("could not set permissions on directory \"%s\": %m",
-									 filename);
+				if (chmod(state->filename, (mode_t) filemode))
+					pg_log_error("could not set permissions on directory \"%s\": %m",
+								 state->filename);
 #endif
-				}
-				else if (copybuf[156] == '2')
-				{
-					/*
-					 * Symbolic link
-					 *
-					 * It's most likely a link in pg_tblspc directory, to the
-					 * location of a tablespace. Apply any tablespace mapping
-					 * given on the command line (--tablespace-mapping). (We
-					 * blindly apply the mapping without checking that the
-					 * link really is inside pg_tblspc. We don't expect there
-					 * to be other symlinks in a data directory, but if there
-					 * are, you can call it an undocumented feature that you
-					 * can map them too.)
-					 */
-					filename[strlen(filename) - 1] = '\0';	/* Remove trailing slash */
+			}
+			else if (copybuf[156] == '2')
+			{
+				/*
+				 * Symbolic link
+				 *
+				 * It's most likely a link in pg_tblspc directory, to the
+				 * location of a tablespace. Apply any tablespace mapping
+				 * given on the command line (--tablespace-mapping). (We
+				 * blindly apply the mapping without checking that the link
+				 * really is inside pg_tblspc. We don't expect there to be
+				 * other symlinks in a data directory, but if there are, you
+				 * can call it an undocumented feature that you can map them
+				 * too.)
+				 */
+				state->filename[strlen(state->filename) - 1] = '\0';	/* Remove trailing slash */
 
-					mapped_tblspc_path = get_tablespace_mapping(&copybuf[157]);
-					if (symlink(mapped_tblspc_path, filename) != 0)
-					{
-						pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
-									 filename, mapped_tblspc_path);
-						exit(1);
-					}
-				}
-				else
+				state->mapped_tblspc_path =
+					get_tablespace_mapping(&copybuf[157]);
+				if (symlink(state->mapped_tblspc_path, state->filename) != 0)
 				{
-					pg_log_error("unrecognized link indicator \"%c\"",
-								 copybuf[156]);
+					pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
+								 state->filename, state->mapped_tblspc_path);
 					exit(1);
 				}
-				continue;		/* directory or link handled */
 			}
-
-			/*
-			 * regular file
-			 */
-			file = fopen(filename, "wb");
-			if (!file)
+			else
 			{
-				pg_log_error("could not create file \"%s\": %m", filename);
+				pg_log_error("unrecognized link indicator \"%c\"",
+							 copybuf[156]);
 				exit(1);
 			}
+			return;				/* directory or link handled */
+		}
+
+		/*
+		 * regular file
+		 */
+		state->file = fopen(state->filename, "wb");
+		if (!state->file)
+		{
+			pg_log_error("could not create file \"%s\": %m", state->filename);
+			exit(1);
+		}
 
 #ifndef WIN32
-			if (chmod(filename, (mode_t) filemode))
-				pg_log_error("could not set permissions on file \"%s\": %m",
-							 filename);
+		if (chmod(state->filename, (mode_t) filemode))
+			pg_log_error("could not set permissions on file \"%s\": %m",
+						 state->filename);
 #endif
 
-			if (current_len_left == 0)
-			{
-				/*
-				 * Done with this file, next one will be a new tar header
-				 */
-				fclose(file);
-				file = NULL;
-				continue;
-			}
-		}						/* new file */
-		else
+		if (state->current_len_left == 0)
 		{
 			/*
-			 * Continuing blocks in existing file
+			 * Done with this file, next one will be a new tar header
 			 */
-			if (current_len_left == 0 && r == current_padding)
-			{
-				/*
-				 * Received the padding block for this file, ignore it and
-				 * close the file, then move on to the next tar header.
-				 */
-				fclose(file);
-				file = NULL;
-				totaldone += r;
-				continue;
-			}
-
-			if (fwrite(copybuf, r, 1, file) != 1)
-			{
-				pg_log_error("could not write to file \"%s\": %m", filename);
-				exit(1);
-			}
-			totaldone += r;
-			progress_report(rownum, filename, false);
-
-			current_len_left -= r;
-			if (current_len_left == 0 && current_padding == 0)
-			{
-				/*
-				 * Received the last block, and there is no padding to be
-				 * expected. Close the file and move on to the next tar
-				 * header.
-				 */
-				fclose(file);
-				file = NULL;
-				continue;
-			}
-		}						/* continuing data in existing file */
-	}							/* loop over all data blocks */
-	progress_report(rownum, filename, true);
-
-	if (file != NULL)
+			fclose(state->file);
+			state->file = NULL;
+			return;
+		}
+	}							/* new file */
+	else
 	{
-		pg_log_error("COPY stream ended before last file was finished");
-		exit(1);
-	}
-
-	if (copybuf != NULL)
-		PQfreemem(copybuf);
+		/*
+		 * Continuing blocks in existing file
+		 */
+		if (state->current_len_left == 0 && r == state->current_padding)
+		{
+			/*
+			 * Received the padding block for this file, ignore it and close
+			 * the file, then move on to the next tar header.
+			 */
+			fclose(state->file);
+			state->file = NULL;
+			totaldone += r;
+			return;
+		}
 
-	if (basetablespace && writerecoveryconf)
-		WriteRecoveryConf();
+		if (fwrite(copybuf, r, 1, state->file) != 1)
+		{
+			pg_log_error("could not write to file \"%s\": %m", state->filename);
+			exit(1);
+		}
+		totaldone += r;
+		progress_report(state->tablespacenum, state->filename, false);
 
-	/*
-	 * No data is synced here, everything is done for all tablespaces at the
-	 * end.
-	 */
+		state->current_len_left -= r;
+		if (state->current_len_left == 0 && state->current_padding == 0)
+		{
+			/*
+			 * Received the last block, and there is no padding to be
+			 * expected. Close the file and move on to the next tar header.
+			 */
+			fclose(state->file);
+			state->file = NULL;
+			return;
+		}
+	}							/* continuing data in existing file */
 }
 
 /*
-- 
2.17.2 (Apple Git-113)

