From 5c12e8fe83ba0fe2a7f24e1e84263fa112469390 Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Mon, 14 Oct 2019 17:28:58 +0500
Subject: [PATCH 3/4] pg_basebackup changes for parallel backup.

---
 src/bin/pg_basebackup/pg_basebackup.c | 583 ++++++++++++++++++++++++--
 1 file changed, 548 insertions(+), 35 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 55ef13926d..311c1f94ca 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -41,6 +41,7 @@
 #include "receivelog.h"
 #include "replication/basebackup.h"
 #include "streamutil.h"
+#include "fe_utils/simple_list.h"
 
 #define ERRCODE_DATA_CORRUPTED	"XX001"
 
@@ -57,6 +58,37 @@ typedef struct TablespaceList
 	TablespaceListCell *tail;
 } TablespaceList;
 
+typedef struct
+{
+	char		name[MAXPGPATH];
+	char		type;
+	int32		size;
+	time_t		mtime;
+} BackupFile;
+
+typedef struct
+{
+	Oid			tblspcOid;
+	char	   *tablespace;	/* tablespace name or NULL if 'base' tablespace */
+	int			numFiles;		/* number of files */
+	BackupFile *backupFiles; /* list of files in tablespace */
+} TablespaceInfo;
+
+typedef struct
+{
+	int 	tablespacecount;
+	int		numWorkers;
+
+	char	xlogstart[64];
+	char   *backup_label;
+	char   *tablespace_map;
+
+	TablespaceInfo *tsInfo;
+	SimpleStringList **worker_files;
+} BackupInfo;
+
+static BackupInfo *backupInfo = NULL;
+
 /*
  * pg_xlog has been renamed to pg_wal in version 10.  This version number
  * should be compared with PQserverVersion().
@@ -110,6 +142,10 @@ static bool found_existing_xlogdir = false;
 static bool made_tablespace_dirs = false;
 static bool found_tablespace_dirs = false;
 
+static int	numWorkers = 1;
+static PGresult *tablespacehdr;
+static SimpleOidList workerspid = {NULL, NULL};
+
 /* Progress counters */
 static uint64 totalsize_kb;
 static uint64 totaldone;
@@ -141,7 +177,7 @@ static void usage(void);
 static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found);
 static void progress_report(int tablespacenum, const char *filename, bool force);
 
-static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
+static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum, int worker);
 static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
 static void BaseBackup(void);
 
@@ -151,6 +187,16 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
 static const char *get_tablespace_mapping(const char *dir);
 static void tablespace_list_append(const char *arg);
 
+static void ParallelBackupEnd(void);
+static void GetBackupFilesList(PGconn *conn, BackupInfo *binfo);
+static int	ReceiveFiles(BackupInfo *backupInfo, int worker);
+static int	compareFileSize(const void *a, const void *b);
+static void create_workers_and_fetch(BackupInfo *backupInfo);
+static void read_label_tblspcmap(PGconn *conn, char **backup_label, char **tablespace_map);
+static void create_backup_dirs(bool basetablespace, char *tablespace, char *name);
+static void writefile(char *path, char *buf);
+static int	simple_list_length(SimpleStringList *list);
+
 
 static void
 cleanup_directories_atexit(void)
@@ -349,6 +395,7 @@ usage(void)
 	printf(_("      --no-slot          prevent creation of temporary replication slot\n"));
 	printf(_("      --no-verify-checksums\n"
 			 "                         do not verify checksums\n"));
+	printf(_("  -j, --jobs=NUM         use this many parallel jobs to backup\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -921,7 +968,7 @@ writeTarData(
  * No attempt to inspect or validate the contents of the file is done.
  */
 static void
-ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
+ReceiveTarFile(PGconn *conn, PGresult *res, int rownum, int worker)
 {
 	char		filename[MAXPGPATH];
 	char	   *copybuf = NULL;
@@ -978,7 +1025,10 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 #ifdef HAVE_LIBZ
 			if (compresslevel != 0)
 			{
-				snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir);
+				if (numWorkers > 1)
+					snprintf(filename, sizeof(filename), "%s/base.%d.tar.gz", basedir, worker);
+				else
+					snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir);
 				ztarfile = gzopen(filename, "wb");
 				if (gzsetparams(ztarfile, compresslevel,
 								Z_DEFAULT_STRATEGY) != Z_OK)
@@ -991,7 +1041,10 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 			else
 #endif
 			{
-				snprintf(filename, sizeof(filename), "%s/base.tar", basedir);
+				if (numWorkers > 1)
+					snprintf(filename, sizeof(filename), "%s/base.%d.tar", basedir, worker);
+				else
+					snprintf(filename, sizeof(filename), "%s/base.tar", basedir);
 				tarfile = fopen(filename, "wb");
 			}
 		}
@@ -1004,8 +1057,12 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 #ifdef HAVE_LIBZ
 		if (compresslevel != 0)
 		{
-			snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir,
-					 PQgetvalue(res, rownum, 0));
+			if (numWorkers > 1)
+				snprintf(filename, sizeof(filename), "%s/%s.%d.tar.gz", basedir,
+						 PQgetvalue(res, rownum, 0), worker);
+			else
+				snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir,
+						 PQgetvalue(res, rownum, 0));
 			ztarfile = gzopen(filename, "wb");
 			if (gzsetparams(ztarfile, compresslevel,
 							Z_DEFAULT_STRATEGY) != Z_OK)
@@ -1018,8 +1075,12 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 		else
 #endif
 		{
-			snprintf(filename, sizeof(filename), "%s/%s.tar", basedir,
-					 PQgetvalue(res, rownum, 0));
+			if (numWorkers > 1)
+				snprintf(filename, sizeof(filename), "%s/%s.%d.tar", basedir,
+						 PQgetvalue(res, rownum, 0), worker);
+			else
+				snprintf(filename, sizeof(filename), "%s/%s.tar", basedir,
+						 PQgetvalue(res, rownum, 0));
 			tarfile = fopen(filename, "wb");
 		}
 	}
@@ -1082,6 +1143,45 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 
 			MemSet(zerobuf, 0, sizeof(zerobuf));
 
+			if (numWorkers > 1 && basetablespace && worker == 0)
+			{
+				char		header[512];
+				int			padding;
+				int			len;
+
+				/* add backup_label and tablespace_map files to the tar */
+				len = strlen(backupInfo->backup_label);
+				tarCreateHeader(header,
+								"backup_label",
+								NULL,
+								len,
+								pg_file_create_mode, 04000, 02000,
+								time(NULL));
+
+				padding = ((len + 511) & ~511) - len;
+				WRITE_TAR_DATA(header, sizeof(header));
+				WRITE_TAR_DATA(backupInfo->backup_label, len);
+				if (padding)
+					WRITE_TAR_DATA(zerobuf, padding);
+
+				if (backupInfo->tablespace_map)
+				{
+					len = strlen(backupInfo->tablespace_map);
+					tarCreateHeader(header,
+									"tablespace_map",
+									NULL,
+									len,
+									pg_file_create_mode, 04000, 02000,
+									time(NULL));
+
+					padding = ((len + 511) & ~511) - len;
+					WRITE_TAR_DATA(header, sizeof(header));
+					WRITE_TAR_DATA(backupInfo->tablespace_map, len);
+					if (padding)
+						WRITE_TAR_DATA(zerobuf, padding);
+				}
+			}
+
 			if (basetablespace && writerecoveryconf)
 			{
 				char		header[512];
@@ -1475,6 +1575,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 			 */
 			snprintf(filename, sizeof(filename), "%s/%s", current_path,
 					 copybuf);
+
 			if (filename[strlen(filename) - 1] == '/')
 			{
 				/*
@@ -1486,21 +1587,14 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 					 * Directory
 					 */
 					filename[strlen(filename) - 1] = '\0';	/* Remove trailing slash */
+
+					/*
+					 * In parallel mode, we create directories before fetching
+					 * files so its Ok if a directory already exist.
+					 */
 					if (mkdir(filename, pg_dir_create_mode) != 0)
 					{
-						/*
-						 * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
-						 * clusters) will have been created by the wal
-						 * receiver process. Also, when the WAL directory
-						 * location was specified, pg_wal (or pg_xlog) has
-						 * already been created as a symbolic link before
-						 * starting the actual backup. So just ignore creation
-						 * failures on related directories.
-						 */
-						if (!((pg_str_endswith(filename, "/pg_wal") ||
-							   pg_str_endswith(filename, "/pg_xlog") ||
-							   pg_str_endswith(filename, "/archive_status")) &&
-							  errno == EEXIST))
+						if (errno != EEXIST)
 						{
 							pg_log_error("could not create directory \"%s\": %m",
 										 filename);
@@ -1528,8 +1622,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 					 * can map them too.)
 					 */
 					filename[strlen(filename) - 1] = '\0';	/* Remove trailing slash */
-
 					mapped_tblspc_path = get_tablespace_mapping(&copybuf[157]);
+
 					if (symlink(mapped_tblspc_path, filename) != 0)
 					{
 						pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
@@ -1716,7 +1810,8 @@ BaseBackup(void)
 	}
 
 	basebkp =
-		psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
+		psprintf("%s LABEL '%s' %s %s %s %s %s %s %s",
+				 (numWorkers > 1) ? "START_BACKUP" : "BASE_BACKUP",
 				 escaped_label,
 				 showprogress ? "PROGRESS" : "",
 				 includewal == FETCH_WAL ? "WAL" : "",
@@ -1774,7 +1869,7 @@ BaseBackup(void)
 	/*
 	 * Get the header
 	 */
-	res = PQgetResult(conn);
+	tablespacehdr = res = PQgetResult(conn);
 	if (PQresultStatus(res) != PGRES_TUPLES_OK)
 	{
 		pg_log_error("could not get backup header: %s",
@@ -1830,20 +1925,62 @@ BaseBackup(void)
 		StartLogStreamer(xlogstart, starttli, sysidentifier);
 	}
 
-	/*
-	 * Start receiving chunks
-	 */
-	for (i = 0; i < PQntuples(res); i++)
+	if (numWorkers > 1)
 	{
-		if (format == 't')
-			ReceiveTarFile(conn, res, i);
-		else
-			ReceiveAndUnpackTarFile(conn, res, i);
-	}							/* Loop over all tablespaces */
+		backupInfo = palloc0(sizeof(BackupInfo));
+
+		backupInfo->tablespacecount = tablespacecount;
+		backupInfo->numWorkers = numWorkers;
+		strlcpy(backupInfo->xlogstart, xlogstart, sizeof(backupInfo->xlogstart));
+		read_label_tblspcmap(conn, &backupInfo->backup_label, &backupInfo->tablespace_map);
+
+		/* retrive backup files from server. **/
+		GetBackupFilesList(conn, backupInfo);
+
+		/*
+		 * add backup_label in backup, (for tar format, ReceiveTarFile() will
+		 * takecare of it).
+		 */
+		if (format == 'p')
+			writefile("backup_label", backupInfo->backup_label);
+
+		/*
+		 * The backup files list is already in descending order, distribute it
+		 * to workers.
+		 */
+		backupInfo->worker_files = palloc0(sizeof(SimpleStringList) * tablespacecount);
+		for (i = 0; i < backupInfo->tablespacecount; i++)
+		{
+			TablespaceInfo *curTsInfo = &backupInfo->tsInfo[i];
+
+			backupInfo->worker_files[i] = palloc0(sizeof(SimpleStringList) * numWorkers);
+			for (int j = 0; j < curTsInfo->numFiles; j++)
+			{
+				simple_string_list_append(&backupInfo->worker_files[i][j % numWorkers],
+										  curTsInfo->backupFiles[j].name);
+			}
+		}
+
+		create_workers_and_fetch(backupInfo);
+		ParallelBackupEnd();
+	}
+	else
+	{
+		/*
+		 * Start receiving chunks
+		 */
+		for (i = 0; i < PQntuples(res); i++)
+		{
+			if (format == 't')
+				ReceiveTarFile(conn, res, i, 0);
+			else
+				ReceiveAndUnpackTarFile(conn, res, i);
+		}						/* Loop over all tablespaces */
+	}
 
 	if (showprogress)
 	{
-		progress_report(PQntuples(res), NULL, true);
+		progress_report(PQntuples(tablespacehdr), NULL, true);
 		if (isatty(fileno(stderr)))
 			fprintf(stderr, "\n");	/* Need to move to next line */
 	}
@@ -2043,6 +2180,7 @@ main(int argc, char **argv)
 		{"waldir", required_argument, NULL, 1},
 		{"no-slot", no_argument, NULL, 2},
 		{"no-verify-checksums", no_argument, NULL, 3},
+		{"jobs", required_argument, NULL, 'j'},
 		{NULL, 0, NULL, 0}
 	};
 	int			c;
@@ -2070,7 +2208,7 @@ main(int argc, char **argv)
 
 	atexit(cleanup_directories_atexit);
 
-	while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP",
+	while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -2211,6 +2349,9 @@ main(int argc, char **argv)
 			case 3:
 				verify_checksums = false;
 				break;
+			case 'j':			/* number of jobs */
+				numWorkers = atoi(optarg);
+				break;
 			default:
 
 				/*
@@ -2325,6 +2466,14 @@ main(int argc, char **argv)
 		}
 	}
 
+	if (numWorkers <= 0)
+	{
+		pg_log_error("invalid number of parallel jobs");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 #ifndef HAVE_LIBZ
 	if (compresslevel != 0)
 	{
@@ -2397,3 +2546,367 @@ main(int argc, char **argv)
 	success = true;
 	return 0;
 }
+
+static void
+ParallelBackupEnd(void)
+{
+	PGresult   *res = NULL;
+	char	   *basebkp;
+
+	basebkp = psprintf("STOP_BACKUP LABEL '%s' %s %s",
+					   backupInfo->backup_label,
+					   includewal == FETCH_WAL ? "WAL" : "",
+					   includewal == NO_WAL ? "" : "NOWAIT");
+	if (PQsendQuery(conn, basebkp) == 0)
+	{
+		pg_log_error("could not execute STOP BACKUP \"%s\"",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+
+	/* receive pg_control and wal files */
+	if (format == 't')
+		ReceiveTarFile(conn, res, tablespacecount, numWorkers);
+	else
+		ReceiveAndUnpackTarFile(conn, res, tablespacecount);
+
+	PQclear(res);
+}
+
+static void
+GetBackupFilesList(PGconn *conn, BackupInfo *backupInfo)
+{
+	int			i;
+	PGresult   *res = NULL;
+	char	   *basebkp;
+
+	backupInfo->tsInfo = palloc0(sizeof(TablespaceInfo) * backupInfo->tablespacecount);
+	TablespaceInfo *tsInfo = backupInfo->tsInfo;
+
+	/*
+	 * Get list of files.
+	 */
+	basebkp = psprintf("SEND_FILE_LIST %s",
+					   format == 't' ? "TABLESPACE_MAP" : "");
+	if (PQsendQuery(conn, basebkp) == 0)
+	{
+		pg_log_error("could not send replication command \"%s\": %s",
+					 "SEND_FILE_LIST", PQerrorMessage(conn));
+		exit(1);
+	}
+
+	/*
+	 * The list of files is grouped by tablespaces, and we want to fetch them
+	 * in the same order.
+	 */
+	for (i = 0; i < backupInfo->tablespacecount; i++)
+	{
+		bool		basetablespace;
+
+		res = PQgetResult(conn);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_error("could not get backup header: %s",
+						 PQerrorMessage(conn));
+			exit(1);
+		}
+		if (PQntuples(res) < 1)
+		{
+			pg_log_error("no data returned from server");
+			exit(1);
+		}
+
+		basetablespace = PQgetisnull(tablespacehdr, i, 0);
+		tsInfo[i].tblspcOid = atol(PQgetvalue(tablespacehdr, i, 0));
+		tsInfo[i].tablespace = PQgetvalue(tablespacehdr, i, 1);
+		tsInfo[i].numFiles = PQntuples(res);
+		tsInfo[i].backupFiles =
+			palloc0(sizeof(BackupFile) * tsInfo[i].numFiles);
+
+		for (int j = 0; j < tsInfo[i].numFiles; j++)
+		{
+			char	   *name = PQgetvalue(res, j, 0);
+			char		type = PQgetvalue(res, j, 1)[0];
+			int32		size = atol(PQgetvalue(res, j, 2));
+			time_t		mtime = atol(PQgetvalue(res, j, 3));
+
+			/*
+			 * In 'plain' format, create backup directories first.
+			 */
+			if (format == 'p' && type == 'd')
+				create_backup_dirs(basetablespace, tsInfo[i].tablespace, name);
+
+			strlcpy(tsInfo[i].backupFiles[j].name, name, MAXPGPATH);
+			tsInfo[i].backupFiles[j].type = type;
+			tsInfo[i].backupFiles[j].size = size;
+			tsInfo[i].backupFiles[j].mtime = mtime;
+		}
+
+		/* sort files in descending order, based on size */
+		qsort(tsInfo[i].backupFiles, tsInfo[i].numFiles,
+			  sizeof(BackupFile), &compareFileSize);
+		PQclear(res);
+	}
+
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not get data: %s", PQerrorMessage(conn));
+		exit(1);
+	}
+	res = PQgetResult(conn);
+}
+
+static int
+ReceiveFiles(BackupInfo *backupInfo, int worker)
+{
+	SimpleStringListCell *cell;
+	PGresult   *res = NULL;
+	PGconn	   *worker_conn;
+	int			i;
+
+	worker_conn = GetConnection();
+	for (i = 0; i < backupInfo->tablespacecount; i++)
+	{
+		TablespaceInfo *curTsInfo = &backupInfo->tsInfo[i];
+		SimpleStringList *files = &backupInfo->worker_files[i][worker];
+		PQExpBuffer buf = createPQExpBuffer();
+
+		if (simple_list_length(files) <= 0)
+			continue;
+
+
+		/*
+		 * build query in form of: SEND_FILES_CONTENT ('base/1/1245/32683',
+		 * 'base/1/1245/32683', ...) [options]
+		 */
+		appendPQExpBuffer(buf, "SEND_FILES_CONTENT (");
+		for (cell = files->head; cell; cell = cell->next)
+		{
+			if (cell != files->tail)
+				appendPQExpBuffer(buf, "'%s' ,", cell->val);
+			else
+				appendPQExpBuffer(buf, "'%s'", cell->val);
+		}
+		appendPQExpBufferStr(buf, ")");
+
+		/*
+		 * Add backup options to the command. we are reusing the LABEL here to
+		 * keep the original tablespace path on the server.
+		 */
+		appendPQExpBuffer(buf, " LABEL '%s' LSN '%s' %s %s",
+						  curTsInfo->tablespace,
+						  backupInfo->xlogstart,
+						  format == 't' ? "TABLESPACE_MAP" : "",
+						  verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+		if (maxrate > 0)
+			appendPQExpBuffer(buf, " MAX_RATE %u", maxrate);
+
+		if (!worker_conn)
+			return 1;
+
+		if (PQsendQuery(worker_conn, buf->data) == 0)
+		{
+			pg_log_error("could not send files list \"%s\"",
+						 PQerrorMessage(worker_conn));
+			return 1;
+		}
+
+		destroyPQExpBuffer(buf);
+		if (format == 't')
+			ReceiveTarFile(worker_conn, tablespacehdr, i, worker);
+		else
+			ReceiveAndUnpackTarFile(worker_conn, tablespacehdr, i);
+
+		res = PQgetResult(worker_conn);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not get data stream: %s",
+						 PQerrorMessage(worker_conn));
+			exit(1);
+		}
+
+		res = PQgetResult(worker_conn);
+	}
+
+	PQclear(res);
+	PQfinish(worker_conn);
+
+	return 0;
+}
+
+/* qsort comparator for BackupFile (sort descending order)  */
+static int
+compareFileSize(const void *a, const void *b)
+{
+	const		BackupFile *v1 = (BackupFile *) a;
+	const		BackupFile *v2 = (BackupFile *) b;
+
+	if (v1->size > v2->size)
+		return -1;
+	if (v1->size < v2->size)
+		return 1;
+
+	return 0;
+}
+
+static void
+create_workers_and_fetch(BackupInfo *backupInfo)
+{
+	int			status;
+	int			pid,
+				i;
+
+	for (i = 0; i < numWorkers; i++)
+	{
+		pid = fork();
+		if (pid == 0)
+		{
+			/* in child process */
+			_exit(ReceiveFiles(backupInfo, i));
+		}
+		else if (pid < 0)
+		{
+			pg_log_error("could not create backup worker: %m");
+			exit(1);
+		}
+
+		simple_oid_list_append(&workerspid, pid);
+		if (verbose)
+			pg_log_info("backup worker (%d) created", pid);
+
+		/*
+		 * Else we are in the parent process and all is well.
+		 */
+	}
+
+	for (i = 0; i < numWorkers; i++)
+	{
+		pid = waitpid(-1, &status, 0);
+
+		if (WIFEXITED(status) && WEXITSTATUS(status) == EXIT_FAILURE)
+		{
+			SimpleOidListCell *cell;
+
+			pg_log_error("backup worker (%d) failed with code %d", pid, WEXITSTATUS(status));
+
+			/* error. kill other workers and exit. */
+			for (cell = workerspid.head; cell; cell = cell->next)
+			{
+				if (pid != cell->val)
+				{
+					kill(cell->val, SIGTERM);
+					pg_log_error("backup worker killed %d", cell->val);
+				}
+			}
+
+			exit(1);
+		}
+	}
+}
+
+static void
+read_label_tblspcmap(PGconn *conn, char **backuplabel, char **tblspc_map)
+{
+	PGresult   *res = NULL;
+
+	Assert(backuplabel != NULL);
+	Assert(tblspc_map != NULL);
+
+	/*
+	 * Get Backup label and tablespace map data.
+	 */
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not get data: %s",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+	if (PQntuples(res) < 1)
+	{
+		pg_log_error("no data returned from server");
+		exit(1);
+	}
+
+	*backuplabel = PQgetvalue(res, 0, 0);	/* backup_label */
+	if (!PQgetisnull(res, 0, 1))
+		*tblspc_map = PQgetvalue(res, 0, 1);	/* tablespae_map */
+
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not get data: %s",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+
+	res = PQgetResult(conn);
+	PQclear(res);
+}
+
+static void
+create_backup_dirs(bool basetablespace, char *tablespace, char *name)
+{
+	char	dirpath[MAXPGPATH];
+
+	Assert(name != NULL);
+
+	if (basetablespace)
+		snprintf(dirpath, sizeof(dirpath), "%s/%s", basedir, name);
+	else
+	{
+		Assert(tablespace != NULL);
+		snprintf(dirpath, sizeof(dirpath), "%s/%s",
+				 get_tablespace_mapping(tablespace), (name + strlen(tablespace) + 1));
+	}
+
+	if (pg_mkdir_p(dirpath, pg_dir_create_mode) != 0)
+	{
+		if (errno != EEXIST)
+		{
+			pg_log_error("could not create directory \"%s\": %m",
+						 dirpath);
+			exit(1);
+		}
+	}
+}
+
+static void
+writefile(char *path, char *buf)
+{
+	FILE   *f;
+	char	pathbuf[MAXPGPATH];
+
+	snprintf(pathbuf, MAXPGPATH, "%s/%s", basedir, path);
+	f = fopen(pathbuf, "w");
+	if (f == NULL)
+	{
+		pg_log_error("could not open file \"%s\": %m", pathbuf);
+		exit(1);
+	}
+
+	if (fwrite(buf, strlen(buf), 1, f) != 1)
+	{
+		pg_log_error("could not write to file \"%s\": %m", pathbuf);
+		exit(1);
+	}
+
+	if (fclose(f))
+	{
+		pg_log_error("could not write to file \"%s\": %m", pathbuf);
+		exit(1);
+	}
+}
+
+static int
+simple_list_length(SimpleStringList *list)
+{
+	int			len = 0;
+	SimpleStringListCell *cell;
+
+	for (cell = list->head; cell; cell = cell->next, len++)
+		;
+
+	return len;
+}
-- 
2.21.0 (Apple Git-122)

