From 8c29c68ff24413d8d01478080d9741b0b231d848 Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Thu, 3 Oct 2019 23:41:55 +0500
Subject: [PATCH] parallel backup

---
 src/backend/access/transam/xlog.c             |    2 +-
 src/backend/replication/basebackup.c          | 1078 +++++++++++++----
 src/backend/replication/repl_gram.y           |   58 +
 src/backend/replication/repl_scanner.l        |    5 +
 src/bin/pg_basebackup/pg_basebackup.c         |  360 +++++-
 .../t/040_pg_basebackup_parallel.pl           |  571 +++++++++
 src/include/nodes/replnodes.h                 |    9 +
 src/include/replication/basebackup.h          |    2 +-
 8 files changed, 1797 insertions(+), 288 deletions(-)
 create mode 100644 src/bin/pg_basebackup/t/040_pg_basebackup_parallel.pl

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 790e2c8714..3dc2ebd7dc 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -10477,7 +10477,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
 			ti->oid = pstrdup(de->d_name);
 			ti->path = pstrdup(buflinkpath.data);
 			ti->rpath = relpath ? pstrdup(relpath) : NULL;
-			ti->size = infotbssize ? sendTablespace(fullpath, true) : -1;
+			ti->size = infotbssize ? sendTablespace(fullpath, true, NULL) : -1;
 
 			if (tablespaces)
 				*tablespaces = lappend(*tablespaces, ti);
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index d0f210de8c..fe906dbfdf 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -52,11 +52,31 @@ typedef struct
 	bool		includewal;
 	uint32		maxrate;
 	bool		sendtblspcmapfile;
+	int32		worker;
 } basebackup_options;
 
+typedef struct
+{
+	char		path[MAXPGPATH];
+	bool		isdir;
+	int32		size;
+}			pathinfo;
+
+#define STORE_PATHINFO(_filenames, _path, _isdir, _size) \
+	do { \
+		if (files != NULL) { \
+			pathinfo *pi = palloc0(sizeof(pathinfo)); \
+			strlcpy(pi->path, _path, sizeof(pi->path)); \
+			pi->isdir = _isdir; \
+			pi->size = _size; \
+			*_filenames = lappend(*_filenames, pi); \
+		} \
+	} while(0)
 
 static int64 sendDir(const char *path, int basepathlen, bool sizeonly,
 					 List *tablespaces, bool sendtblspclinks);
+static int64 sendDir_(const char *path, int basepathlen, bool sizeonly,
+					  List *tablespaces, bool sendtblspclinks, List **files);
 static bool sendFile(const char *readfilename, const char *tarfilename,
 					 struct stat *statbuf, bool missing_ok, Oid dboid);
 static void sendFileWithContent(const char *filename, const char *content);
@@ -71,15 +91,26 @@ static void perform_base_backup(basebackup_options *opt);
 static void parse_basebackup_options(List *options, basebackup_options *opt);
 static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
 static int	compareWalFileNames(const ListCell *a, const ListCell *b);
+static int	compareFileSize(const ListCell *a, const ListCell *b);
 static void throttle(size_t increment);
 static bool is_checksummed_file(const char *fullpath, const char *filename);
 
+static void StartBackup(basebackup_options *opt);
+static void StopBackup(basebackup_options *opt);
+static void SendBackupFileList(basebackup_options *opt, List *tablespaces);
+static void SendFilesContents(basebackup_options *opt, List *filenames, bool missing_ok);
+static void include_wal_files(XLogRecPtr endptr, TimeLineID endtli);
+static void setup_throttle(int maxrate);
+static char *readfile(const char *readfilename, bool missing_ok);
+
 /* Was the backup currently in-progress initiated in recovery mode? */
 static bool backup_started_in_recovery = false;
 
 /* Relative path of temporary statistics directory */
 static char *statrelpath = NULL;
 
+#define BACKUP_LABEL_FILE_TMP BACKUP_LABEL_FILE ".tmp"
+#define TABLESPACE_MAP_TMP TABLESPACE_MAP ".tmp"
 /*
  * Size of each block sent into the tar stream for larger files.
  */
@@ -192,6 +223,14 @@ static const char *const excludeFiles[] =
 	BACKUP_LABEL_FILE,
 	TABLESPACE_MAP,
 
+	/*
+	 * Skip backup_label.tmp or tablespace_map.tmp files. These are temporary
+	 * and are injected into the backup by SendFilesList and
+	 * SendFilesContents, will be removed after as well.
+	 */
+	BACKUP_LABEL_FILE_TMP,
+	TABLESPACE_MAP_TMP,
+
 	"postmaster.pid",
 	"postmaster.opts",
 
@@ -294,28 +333,7 @@ perform_base_backup(basebackup_options *opt)
 		SendBackupHeader(tablespaces);
 
 		/* Setup and activate network throttling, if client requested it */
-		if (opt->maxrate > 0)
-		{
-			throttling_sample =
-				(int64) opt->maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
-
-			/*
-			 * The minimum amount of time for throttling_sample bytes to be
-			 * transferred.
-			 */
-			elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
-
-			/* Enable throttling. */
-			throttling_counter = 0;
-
-			/* The 'real data' starts now (header was ignored). */
-			throttled_last = GetCurrentTimestamp();
-		}
-		else
-		{
-			/* Disable throttling. */
-			throttling_counter = -1;
-		}
+		setup_throttle(opt->maxrate);
 
 		/* Send off our tablespaces one by one */
 		foreach(lc, tablespaces)
@@ -357,7 +375,7 @@ perform_base_backup(basebackup_options *opt)
 				sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
 			}
 			else
-				sendTablespace(ti->path, false);
+				sendTablespace(ti->path, false, NULL);
 
 			/*
 			 * If we're including WAL, and this is the main data directory we
@@ -384,227 +402,7 @@ perform_base_backup(basebackup_options *opt)
 		 * We've left the last tar file "open", so we can now append the
 		 * required WAL files to it.
 		 */
-		char		pathbuf[MAXPGPATH];
-		XLogSegNo	segno;
-		XLogSegNo	startsegno;
-		XLogSegNo	endsegno;
-		struct stat statbuf;
-		List	   *historyFileList = NIL;
-		List	   *walFileList = NIL;
-		char		firstoff[MAXFNAMELEN];
-		char		lastoff[MAXFNAMELEN];
-		DIR		   *dir;
-		struct dirent *de;
-		ListCell   *lc;
-		TimeLineID	tli;
-
-		/*
-		 * I'd rather not worry about timelines here, so scan pg_wal and
-		 * include all WAL files in the range between 'startptr' and 'endptr',
-		 * regardless of the timeline the file is stamped with. If there are
-		 * some spurious WAL files belonging to timelines that don't belong in
-		 * this server's history, they will be included too. Normally there
-		 * shouldn't be such files, but if there are, there's little harm in
-		 * including them.
-		 */
-		XLByteToSeg(startptr, startsegno, wal_segment_size);
-		XLogFileName(firstoff, ThisTimeLineID, startsegno, wal_segment_size);
-		XLByteToPrevSeg(endptr, endsegno, wal_segment_size);
-		XLogFileName(lastoff, ThisTimeLineID, endsegno, wal_segment_size);
-
-		dir = AllocateDir("pg_wal");
-		while ((de = ReadDir(dir, "pg_wal")) != NULL)
-		{
-			/* Does it look like a WAL segment, and is it in the range? */
-			if (IsXLogFileName(de->d_name) &&
-				strcmp(de->d_name + 8, firstoff + 8) >= 0 &&
-				strcmp(de->d_name + 8, lastoff + 8) <= 0)
-			{
-				walFileList = lappend(walFileList, pstrdup(de->d_name));
-			}
-			/* Does it look like a timeline history file? */
-			else if (IsTLHistoryFileName(de->d_name))
-			{
-				historyFileList = lappend(historyFileList, pstrdup(de->d_name));
-			}
-		}
-		FreeDir(dir);
-
-		/*
-		 * Before we go any further, check that none of the WAL segments we
-		 * need were removed.
-		 */
-		CheckXLogRemoved(startsegno, ThisTimeLineID);
-
-		/*
-		 * Sort the WAL filenames.  We want to send the files in order from
-		 * oldest to newest, to reduce the chance that a file is recycled
-		 * before we get a chance to send it over.
-		 */
-		list_sort(walFileList, compareWalFileNames);
-
-		/*
-		 * There must be at least one xlog file in the pg_wal directory, since
-		 * we are doing backup-including-xlog.
-		 */
-		if (walFileList == NIL)
-			ereport(ERROR,
-					(errmsg("could not find any WAL files")));
-
-		/*
-		 * Sanity check: the first and last segment should cover startptr and
-		 * endptr, with no gaps in between.
-		 */
-		XLogFromFileName((char *) linitial(walFileList),
-						 &tli, &segno, wal_segment_size);
-		if (segno != startsegno)
-		{
-			char		startfname[MAXFNAMELEN];
-
-			XLogFileName(startfname, ThisTimeLineID, startsegno,
-						 wal_segment_size);
-			ereport(ERROR,
-					(errmsg("could not find WAL file \"%s\"", startfname)));
-		}
-		foreach(lc, walFileList)
-		{
-			char	   *walFileName = (char *) lfirst(lc);
-			XLogSegNo	currsegno = segno;
-			XLogSegNo	nextsegno = segno + 1;
-
-			XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
-			if (!(nextsegno == segno || currsegno == segno))
-			{
-				char		nextfname[MAXFNAMELEN];
-
-				XLogFileName(nextfname, ThisTimeLineID, nextsegno,
-							 wal_segment_size);
-				ereport(ERROR,
-						(errmsg("could not find WAL file \"%s\"", nextfname)));
-			}
-		}
-		if (segno != endsegno)
-		{
-			char		endfname[MAXFNAMELEN];
-
-			XLogFileName(endfname, ThisTimeLineID, endsegno, wal_segment_size);
-			ereport(ERROR,
-					(errmsg("could not find WAL file \"%s\"", endfname)));
-		}
-
-		/* Ok, we have everything we need. Send the WAL files. */
-		foreach(lc, walFileList)
-		{
-			char	   *walFileName = (char *) lfirst(lc);
-			FILE	   *fp;
-			char		buf[TAR_SEND_SIZE];
-			size_t		cnt;
-			pgoff_t		len = 0;
-
-			snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName);
-			XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
-
-			fp = AllocateFile(pathbuf, "rb");
-			if (fp == NULL)
-			{
-				int			save_errno = errno;
-
-				/*
-				 * Most likely reason for this is that the file was already
-				 * removed by a checkpoint, so check for that to get a better
-				 * error message.
-				 */
-				CheckXLogRemoved(segno, tli);
-
-				errno = save_errno;
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not open file \"%s\": %m", pathbuf)));
-			}
-
-			if (fstat(fileno(fp), &statbuf) != 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not stat file \"%s\": %m",
-								pathbuf)));
-			if (statbuf.st_size != wal_segment_size)
-			{
-				CheckXLogRemoved(segno, tli);
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("unexpected WAL file size \"%s\"", walFileName)));
-			}
-
-			/* send the WAL file itself */
-			_tarWriteHeader(pathbuf, NULL, &statbuf, false);
-
-			while ((cnt = fread(buf, 1,
-								Min(sizeof(buf), wal_segment_size - len),
-								fp)) > 0)
-			{
-				CheckXLogRemoved(segno, tli);
-				/* Send the chunk as a CopyData message */
-				if (pq_putmessage('d', buf, cnt))
-					ereport(ERROR,
-							(errmsg("base backup could not send data, aborting backup")));
-
-				len += cnt;
-				throttle(cnt);
-
-				if (len == wal_segment_size)
-					break;
-			}
-
-			CHECK_FREAD_ERROR(fp, pathbuf);
-
-			if (len != wal_segment_size)
-			{
-				CheckXLogRemoved(segno, tli);
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("unexpected WAL file size \"%s\"", walFileName)));
-			}
-
-			/* wal_segment_size is a multiple of 512, so no need for padding */
-
-			FreeFile(fp);
-
-			/*
-			 * Mark file as archived, otherwise files can get archived again
-			 * after promotion of a new node. This is in line with
-			 * walreceiver.c always doing an XLogArchiveForceDone() after a
-			 * complete segment.
-			 */
-			StatusFilePath(pathbuf, walFileName, ".done");
-			sendFileWithContent(pathbuf, "");
-		}
-
-		/*
-		 * Send timeline history files too. Only the latest timeline history
-		 * file is required for recovery, and even that only if there happens
-		 * to be a timeline switch in the first WAL segment that contains the
-		 * checkpoint record, or if we're taking a base backup from a standby
-		 * server and the target timeline changes while the backup is taken.
-		 * But they are small and highly useful for debugging purposes, so
-		 * better include them all, always.
-		 */
-		foreach(lc, historyFileList)
-		{
-			char	   *fname = lfirst(lc);
-
-			snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname);
-
-			if (lstat(pathbuf, &statbuf) != 0)
-				ereport(ERROR,
-						(errcode_for_file_access(),
-						 errmsg("could not stat file \"%s\": %m", pathbuf)));
-
-			sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid);
-
-			/* unconditionally mark file as archived */
-			StatusFilePath(pathbuf, fname, ".done");
-			sendFileWithContent(pathbuf, "");
-		}
+		include_wal_files(endptr, endtli);
 
 		/* Send CopyDone message for the last tar file */
 		pq_putemptymessage('c');
@@ -637,6 +435,24 @@ compareWalFileNames(const ListCell *a, const ListCell *b)
 	return strcmp(fna + 8, fnb + 8);
 }
 
+/*
+ * list_sort comparison function, to compare size attribute of pathinfo
+ * in descending order.
+ */
+static int
+compareFileSize(const ListCell *a, const ListCell *b)
+{
+	pathinfo   *fna = (pathinfo *) lfirst(a);
+	pathinfo   *fnb = (pathinfo *) lfirst(b);
+
+	if (fna->size > fnb->size)
+		return -1;
+	if (fna->size < fnb->size)
+		return 1;
+	return 0;
+
+}
+
 /*
  * Parse the base backup options passed down by the parser
  */
@@ -652,8 +468,10 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 	bool		o_maxrate = false;
 	bool		o_tablespace_map = false;
 	bool		o_noverify_checksums = false;
+	bool		o_worker = false;
 
 	MemSet(opt, 0, sizeof(*opt));
+	opt->worker = -1;
 	foreach(lopt, options)
 	{
 		DefElem    *defel = (DefElem *) lfirst(lopt);
@@ -740,6 +558,16 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 			noverify_checksums = true;
 			o_noverify_checksums = true;
 		}
+		else if (strcmp(defel->defname, "worker") == 0)
+		{
+			if (o_worker)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+
+			opt->worker = intVal(defel->arg);
+			o_worker = true;
+		}
 		else
 			elog(ERROR, "option \"%s\" not recognized",
 				 defel->defname);
@@ -774,7 +602,26 @@ SendBaseBackup(BaseBackupCmd *cmd)
 		set_ps_display(activitymsg, false);
 	}
 
-	perform_base_backup(&opt);
+	switch (cmd->cmdtag)
+	{
+		case BASE_BACKUP:
+			perform_base_backup(&opt);
+			break;
+		case START_BACKUP:
+			StartBackup(&opt);
+			break;
+		case SEND_FILES_CONTENT:
+			SendFilesContents(&opt, cmd->backupfiles, true);
+			break;
+		case STOP_BACKUP:
+			StopBackup(&opt);
+			break;
+
+		default:
+			elog(ERROR, "unrecognized replication command tag: %u",
+				 cmd->cmdtag);
+			break;
+	}
 }
 
 static void
@@ -968,7 +815,7 @@ sendFileWithContent(const char *filename, const char *content)
  * Only used to send auxiliary tablespaces, not PGDATA.
  */
 int64
-sendTablespace(char *path, bool sizeonly)
+sendTablespace(char *path, bool sizeonly, List **files)
 {
 	int64		size;
 	char		pathbuf[MAXPGPATH];
@@ -997,11 +844,11 @@ sendTablespace(char *path, bool sizeonly)
 		return 0;
 	}
 
+	STORE_PATHINFO(files, pathbuf, true, -1);
 	size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
 						   sizeonly);
-
 	/* Send all the files in the tablespace version directory */
-	size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true);
+	size += sendDir_(pathbuf, strlen(path), sizeonly, NIL, true, files);
 
 	return size;
 }
@@ -1019,8 +866,16 @@ sendTablespace(char *path, bool sizeonly)
  * as it will be sent separately in the tablespace_map file.
  */
 static int64
-sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
-		bool sendtblspclinks)
+sendDir(const char *path, int basepathlen, bool sizeonly,
+		List *tablespaces, bool sendtblspclinks)
+{
+	return sendDir_(path, basepathlen, sizeonly, tablespaces, sendtblspclinks, NULL);
+}
+
+/* Same as sendDir(), except that it also returns a list of filenames in PGDATA */
+static int64
+sendDir_(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
+		 bool sendtblspclinks, List **files)
 {
 	DIR		   *dir;
 	struct dirent *de;
@@ -1174,6 +1029,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 			if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0)
 			{
 				elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
+
+				STORE_PATHINFO(files, pathbuf, true, -1);
 				size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly);
 				excludeFound = true;
 				break;
@@ -1190,6 +1047,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 		if (statrelpath != NULL && strcmp(pathbuf, statrelpath) == 0)
 		{
 			elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath);
+
+			STORE_PATHINFO(files, pathbuf, true, -1);
 			size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly);
 			continue;
 		}
@@ -1211,6 +1070,9 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 			size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf,
 									sizeonly);
 
+			STORE_PATHINFO(files, pathbuf, true, -1);
+			STORE_PATHINFO(files, "./pg_wal/archive_status", true, -1);
+
 			continue;			/* don't recurse into pg_wal */
 		}
 
@@ -1240,6 +1102,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 								pathbuf)));
 			linkpath[rllen] = '\0';
 
+			STORE_PATHINFO(files, pathbuf, false, statbuf.st_size);
 			size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath,
 									&statbuf, sizeonly);
 #else
@@ -1266,6 +1129,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 			 */
 			size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
 									sizeonly);
+			STORE_PATHINFO(files, pathbuf, true, -1);
+
 
 			/*
 			 * Call ourselves recursively for a directory, unless it happens
@@ -1296,13 +1161,15 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 				skip_this_dir = true;
 
 			if (!skip_this_dir)
-				size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks);
+				size += sendDir_(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks, files);
 		}
 		else if (S_ISREG(statbuf.st_mode))
 		{
 			bool		sent = false;
 
-			if (!sizeonly)
+			STORE_PATHINFO(files, pathbuf, false, statbuf.st_size);
+
+			if (!sizeonly && files == NULL)
 				sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
 								true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid);
 
@@ -1743,3 +1610,710 @@ throttle(size_t increment)
 	 */
 	throttled_last = GetCurrentTimestamp();
 }
+
+/*
+ * In parallel mode, pg_stop_backup() is not called, nor are the files sent
+ * right away. Upon receiving the BASE_BACKUP call, it sends out a list of
+ * files in $PGDATA.
+ */
+static void
+StartBackup(basebackup_options *opt)
+{
+	TimeLineID	starttli;
+	StringInfo	labelfile;
+	StringInfo	tblspc_map_file = NULL;
+	int			datadirpathlen;
+	List	   *tablespaces = NIL;
+
+	datadirpathlen = strlen(DataDir);
+
+	backup_started_in_recovery = RecoveryInProgress();
+
+	labelfile = makeStringInfo();
+	tblspc_map_file = makeStringInfo();
+
+	total_checksum_failures = 0;
+
+	startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
+								  labelfile, &tablespaces,
+								  tblspc_map_file,
+								  opt->progress, opt->sendtblspcmapfile);
+
+	/*
+	 * Once do_pg_start_backup has been called, ensure that any failure causes
+	 * us to abort the backup so we don't "leak" a backup counter. For this
+	 * reason, *all* functionality between do_pg_start_backup() and the end of
+	 * do_pg_stop_backup() should be inside the error cleanup block!
+	 */
+
+	PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
+	{
+		tablespaceinfo *ti;
+		FILE	   *fp;
+
+		SendXlogRecPtrResult(startptr, starttli);
+
+		/*
+		 * Calculate the relative path of temporary statistics directory in
+		 * order to skip the files which are located in that directory later.
+		 */
+		if (is_absolute_path(pgstat_stat_directory) &&
+			strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0)
+			statrelpath = psprintf("./%s", pgstat_stat_directory + datadirpathlen + 1);
+		else if (strncmp(pgstat_stat_directory, "./", 2) != 0)
+			statrelpath = psprintf("./%s", pgstat_stat_directory);
+		else
+			statrelpath = pgstat_stat_directory;
+
+		/* Add a node for the base directory at the end */
+		ti = palloc0(sizeof(tablespaceinfo));
+		ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
+		tablespaces = lappend(tablespaces, ti);
+
+		/* Send tablespace header */
+		SendBackupHeader(tablespaces);
+
+		/* Setup and activate network throttling, if client requested it */
+		setup_throttle(opt->maxrate);
+
+		/*
+		 * backup_label and tablespace_map are stored into temp files for
+		 * their usage are a later stage i.e. during STOP_BACKUP or while
+		 * transfering files to the client.
+		 */
+		fp = AllocateFile(BACKUP_LABEL_FILE_TMP, "w");
+		if (!fp)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not create file \"%s\": %m",
+							BACKUP_LABEL_FILE_TMP)));
+		if (fwrite(labelfile->data, labelfile->len, 1, fp) != 1 ||
+			fflush(fp) != 0 ||
+			pg_fsync(fileno(fp)) != 0 ||
+			ferror(fp) ||
+			FreeFile(fp))
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not write file \"%s\": %m",
+							BACKUP_LABEL_FILE_TMP)));
+
+		if (opt->sendtblspcmapfile && tblspc_map_file->len > 0)
+		{
+			fp = AllocateFile(TABLESPACE_MAP_TMP, "w");
+			if (!fp)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not create file \"%s\": %m",
+								TABLESPACE_MAP_TMP)));
+			if (fwrite(tblspc_map_file->data, tblspc_map_file->len, 1, fp) != 1 ||
+				fflush(fp) != 0 ||
+				pg_fsync(fileno(fp)) != 0 ||
+				ferror(fp) ||
+				FreeFile(fp))
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not write file \"%s\": %m",
+								TABLESPACE_MAP_TMP)));
+		}
+
+		/* send out the list of file in $PGDATA */
+		SendBackupFileList(opt, tablespaces);
+	}
+	PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
+}
+
+/*
+ * StopBackup() - ends a parallel backup
+ *
+ * The function is called in parallel mode. It ends a parallel backup session
+ * established by 'BASE_BACKUP PARALLEL' command.
+ */
+static void
+StopBackup(basebackup_options *opt)
+{
+	TimeLineID	endtli;
+	XLogRecPtr	endptr;
+	struct stat statbuf;
+	StringInfoData buf;
+	char	   *labelfile;
+
+	/* Setup and activate network throttling, if client requested it */
+	setup_throttle(opt->maxrate);
+
+	/* read backup_label file into buffer, we need it for do_pg_stop_backup */
+	labelfile = readfile(BACKUP_LABEL_FILE_TMP, false);
+
+	pq_beginmessage(&buf, 'H');
+	pq_sendbyte(&buf, 0);		/* overall format */
+	pq_sendint16(&buf, 0);		/* natts */
+	pq_endmessage(&buf);
+
+	/* ... and pg_control after everything else. */
+	if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not stat file \"%s\": %m",
+						XLOG_CONTROL_FILE)));
+	sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
+
+	/* stop backup */
+	endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
+
+	if (opt->includewal)
+		include_wal_files(endptr, endtli);
+
+	pq_putemptymessage('c');	/* CopyDone */
+	SendXlogRecPtrResult(endptr, endtli);
+
+	unlink(BACKUP_LABEL_FILE_TMP);
+	unlink(TABLESPACE_MAP_TMP);
+}
+
+/*
+ * SendBackupFileList() - sends a list of filenames of PGDATA
+ *
+ * The function collects a list of filenames, nessery for a full backup and sends
+ * this list to the client.
+ */
+static void
+SendBackupFileList(basebackup_options *opt, List *tablespaces)
+{
+	StringInfoData buf;
+	ListCell   *lc;
+
+	foreach(lc, tablespaces)
+	{
+		List	   *filenames = NULL;
+		tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
+
+		if (ti->path == NULL)
+			sendDir_(".", 1, false, NIL, !opt->sendtblspcmapfile, &filenames);
+		else
+			sendTablespace(ti->path, false, &filenames);
+
+		/* sort the files in desending order, based on file size */
+		list_sort(filenames, compareFileSize);
+
+		/* Construct and send the list of filenames */
+		pq_beginmessage(&buf, 'T'); /* RowDescription */
+		pq_sendint16(&buf, 3);	/* 1 field */
+
+		/* First field - file path */
+		pq_sendstring(&buf, "path");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, TEXTOID);
+		pq_sendint16(&buf, -1);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+
+		/* Second field - is_dir */
+		pq_sendstring(&buf, "isdir");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, BOOLOID);
+		pq_sendint16(&buf, 1);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+
+		/* Third field - size */
+		pq_sendstring(&buf, "size");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, INT8OID);
+		pq_sendint16(&buf, 8);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_endmessage(&buf);
+
+		foreach(lc, filenames)
+		{
+			pathinfo   *pi = (pathinfo *) lfirst(lc);
+			Size		len;
+
+			/* Send one datarow message */
+			pq_beginmessage(&buf, 'D');
+			pq_sendint16(&buf, 3);	/* number of columns */
+
+			/* send file name */
+			len = strlen(pi->path);
+			pq_sendint32(&buf, len);
+			pq_sendbytes(&buf, pi->path, len);
+
+			/* send isdir */
+			pq_sendint32(&buf, 1);
+			pq_sendbytes(&buf, pi->isdir ? "t" : "f", 1);
+
+			/* send size */
+			send_int8_string(&buf, pi->size);
+
+			pq_endmessage(&buf);
+		}
+
+		pfree(filenames);
+	}
+
+	/* Send a CommandComplete message */
+	pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * SendFilesContents() - sends the actual files to the caller
+ *
+ * The function sends the files over to the caller using the COPY protocol.
+ */
+static void
+SendFilesContents(basebackup_options *opt, List *filenames, bool missing_ok)
+{
+	StringInfoData buf;
+	char	   *labelfile;
+	ListCell   *lc;
+	char		startxlogfilename[MAXFNAMELEN];
+	bool		basetablespace = true;
+	int			basepathlen = 1;
+	char		ch;
+	uint32		hi,
+				lo;
+
+	if (list_length(filenames) <= 0)
+		return;
+
+	total_checksum_failures = 0;
+
+	/* Setup and activate network throttling, if client requested it */
+	setup_throttle(opt->maxrate);
+
+	/*
+	 * LABEL is reused here to identify the tablespace path on server. Its empty
+	 * in case of 'base' tablespace.
+	 */
+	if (is_absolute_path(opt->label))
+	{
+		basepathlen = strlen(opt->label);
+		basetablespace = false;
+	}
+
+	/* retrive the backup start location from backup_label file. */
+	labelfile = readfile(BACKUP_LABEL_FILE_TMP, false);
+	if (sscanf(labelfile, "START WAL LOCATION: %X/%X (file %24s)%c",
+			   &hi, &lo, startxlogfilename,
+			   &ch) != 4 || ch != '\n')
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE_TMP)));
+	startptr = ((uint64) hi) << 32 | lo;
+
+	/* Send CopyOutResponse message */
+	pq_beginmessage(&buf, 'H');
+	pq_sendbyte(&buf, 0);		/* overall format */
+	pq_sendint16(&buf, 0);		/* natts */
+	pq_endmessage(&buf);
+
+	if (opt->worker == 0 && basetablespace) /* 'base' tablespace */
+	{
+		/* Send BACKUP_LABEL_FILE file */
+		sendFileWithContent(BACKUP_LABEL_FILE, labelfile);
+
+		/* Send TABLESPACE_MAP file */
+		if (opt->sendtblspcmapfile)
+		{
+			char	   *mapfile = readfile(TABLESPACE_MAP_TMP, true);
+
+			if (mapfile)
+			{
+				sendFileWithContent(TABLESPACE_MAP, mapfile);
+				pfree(mapfile);
+			}
+		}
+	}
+
+	foreach(lc, filenames)
+	{
+		struct stat statbuf;
+		char	   *pathbuf;
+
+		pathbuf = (char *) strVal(lfirst(lc));
+		if (lstat(pathbuf, &statbuf) != 0)
+		{
+			if (errno != ENOENT)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not stat file or directory \"%s\": %m",
+								pathbuf)));
+
+			/* If the file went away while scanning, it's not an error. */
+			continue;
+		}
+
+		/* Allow symbolic links in pg_tblspc only */
+		if (strstr(pathbuf, "./pg_tblspc") != NULL &&
+#ifndef WIN32
+			S_ISLNK(statbuf.st_mode)
+#else
+			pgwin32_is_junction(pathbuf)
+#endif
+			)
+		{
+			char		linkpath[MAXPGPATH];
+			int			rllen;
+
+			rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
+			if (rllen < 0)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not read symbolic link \"%s\": %m",
+								pathbuf)));
+			if (rllen >= sizeof(linkpath))
+				ereport(ERROR,
+						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+						 errmsg("symbolic link \"%s\" target is too long",
+								pathbuf)));
+			linkpath[rllen] = '\0';
+
+			_tarWriteHeader(pathbuf, linkpath, &statbuf, false);
+		}
+		else if (S_ISDIR(statbuf.st_mode))
+		{
+			_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false);
+		}
+		else if (
+#ifndef WIN32
+				 S_ISLNK(statbuf.st_mode)
+#else
+				 pgwin32_is_junction(pathbuf)
+#endif
+			)
+		{
+			/*
+			 * If symlink, write it as a directory. file symlinks only allowed
+			 * in pg_tblspc
+			 */
+			statbuf.st_mode = S_IFDIR | pg_dir_create_mode;
+			_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false);
+		}
+		else
+		{
+			/* send file to client */
+			sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, true, InvalidOid);
+		}
+	}
+
+	pq_putemptymessage('c');	/* CopyDone */
+
+	/*
+	 * Check for checksum failures. If there are failures across multiple
+	 * processes it may not report totoal checksum count, but it will error
+	 * out,terminating the backup.
+	 */
+	if (total_checksum_failures)
+	{
+		if (total_checksum_failures > 1)
+			ereport(WARNING,
+					(errmsg("%lld total checksum verification failures", total_checksum_failures)));
+
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg("checksum verification failure during base backup")));
+	}
+}
+
+static void
+include_wal_files(XLogRecPtr endptr, TimeLineID endtli)
+{
+	/*
+	 * We've left the last tar file "open", so we can now append the required
+	 * WAL files to it.
+	 */
+	char		pathbuf[MAXPGPATH];
+	XLogSegNo	segno;
+	XLogSegNo	startsegno;
+	XLogSegNo	endsegno;
+	struct stat statbuf;
+	List	   *historyFileList = NIL;
+	List	   *walFileList = NIL;
+	char		firstoff[MAXFNAMELEN];
+	char		lastoff[MAXFNAMELEN];
+	DIR		   *dir;
+	struct dirent *de;
+	ListCell   *lc;
+	TimeLineID	tli;
+
+	/*
+	 * I'd rather not worry about timelines here, so scan pg_wal and include
+	 * all WAL files in the range between 'startptr' and 'endptr', regardless
+	 * of the timeline the file is stamped with. If there are some spurious
+	 * WAL files belonging to timelines that don't belong in this server's
+	 * history, they will be included too. Normally there shouldn't be such
+	 * files, but if there are, there's little harm in including them.
+	 */
+	XLByteToSeg(startptr, startsegno, wal_segment_size);
+	XLogFileName(firstoff, ThisTimeLineID, startsegno, wal_segment_size);
+	XLByteToPrevSeg(endptr, endsegno, wal_segment_size);
+	XLogFileName(lastoff, ThisTimeLineID, endsegno, wal_segment_size);
+
+	dir = AllocateDir("pg_wal");
+	while ((de = ReadDir(dir, "pg_wal")) != NULL)
+	{
+		/* Does it look like a WAL segment, and is it in the range? */
+		if (IsXLogFileName(de->d_name) &&
+			strcmp(de->d_name + 8, firstoff + 8) >= 0 &&
+			strcmp(de->d_name + 8, lastoff + 8) <= 0)
+		{
+			walFileList = lappend(walFileList, pstrdup(de->d_name));
+		}
+		/* Does it look like a timeline history file? */
+		else if (IsTLHistoryFileName(de->d_name))
+		{
+			historyFileList = lappend(historyFileList, pstrdup(de->d_name));
+		}
+	}
+	FreeDir(dir);
+
+	/*
+	 * Before we go any further, check that none of the WAL segments we need
+	 * were removed.
+	 */
+	CheckXLogRemoved(startsegno, ThisTimeLineID);
+
+	/*
+	 * Sort the WAL filenames.  We want to send the files in order from oldest
+	 * to newest, to reduce the chance that a file is recycled before we get a
+	 * chance to send it over.
+	 */
+	list_sort(walFileList, compareWalFileNames);
+
+	/*
+	 * There must be at least one xlog file in the pg_wal directory, since we
+	 * are doing backup-including-xlog.
+	 */
+	if (walFileList == NIL)
+		ereport(ERROR,
+				(errmsg("could not find any WAL files")));
+
+	/*
+	 * Sanity check: the first and last segment should cover startptr and
+	 * endptr, with no gaps in between.
+	 */
+	XLogFromFileName((char *) linitial(walFileList),
+					 &tli, &segno, wal_segment_size);
+	if (segno != startsegno)
+	{
+		char		startfname[MAXFNAMELEN];
+
+		XLogFileName(startfname, ThisTimeLineID, startsegno,
+					 wal_segment_size);
+		ereport(ERROR,
+				(errmsg("could not find WAL file \"%s\"", startfname)));
+	}
+	foreach(lc, walFileList)
+	{
+		char	   *walFileName = (char *) lfirst(lc);
+		XLogSegNo	currsegno = segno;
+		XLogSegNo	nextsegno = segno + 1;
+
+		XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
+		if (!(nextsegno == segno || currsegno == segno))
+		{
+			char		nextfname[MAXFNAMELEN];
+
+			XLogFileName(nextfname, ThisTimeLineID, nextsegno,
+						 wal_segment_size);
+			ereport(ERROR,
+					(errmsg("could not find WAL file \"%s\"", nextfname)));
+		}
+	}
+	if (segno != endsegno)
+	{
+		char		endfname[MAXFNAMELEN];
+
+		XLogFileName(endfname, ThisTimeLineID, endsegno, wal_segment_size);
+		ereport(ERROR,
+				(errmsg("could not find WAL file \"%s\"", endfname)));
+	}
+
+	/* Ok, we have everything we need. Send the WAL files. */
+	foreach(lc, walFileList)
+	{
+		char	   *walFileName = (char *) lfirst(lc);
+		FILE	   *fp;
+		char		buf[TAR_SEND_SIZE];
+		size_t		cnt;
+		pgoff_t		len = 0;
+
+		snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFileName);
+		XLogFromFileName(walFileName, &tli, &segno, wal_segment_size);
+
+		fp = AllocateFile(pathbuf, "rb");
+		if (fp == NULL)
+		{
+			int			save_errno = errno;
+
+			/*
+			 * Most likely reason for this is that the file was already
+			 * removed by a checkpoint, so check for that to get a better
+			 * error message.
+			 */
+			CheckXLogRemoved(segno, tli);
+
+			errno = save_errno;
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not open file \"%s\": %m", pathbuf)));
+		}
+
+		if (fstat(fileno(fp), &statbuf) != 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m",
+							pathbuf)));
+		if (statbuf.st_size != wal_segment_size)
+		{
+			CheckXLogRemoved(segno, tli);
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("unexpected WAL file size \"%s\"", walFileName)));
+		}
+
+		/* send the WAL file itself */
+		_tarWriteHeader(pathbuf, NULL, &statbuf, false);
+
+		while ((cnt = fread(buf, 1,
+							Min(sizeof(buf), wal_segment_size - len),
+							fp)) > 0)
+		{
+			CheckXLogRemoved(segno, tli);
+			/* Send the chunk as a CopyData message */
+			if (pq_putmessage('d', buf, cnt))
+				ereport(ERROR,
+						(errmsg("base backup could not send data, aborting backup")));
+
+			len += cnt;
+			throttle(cnt);
+
+			if (len == wal_segment_size)
+				break;
+		}
+
+		if (len != wal_segment_size)
+		{
+			CheckXLogRemoved(segno, tli);
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("unexpected WAL file size \"%s\"", walFileName)));
+		}
+
+		/* wal_segment_size is a multiple of 512, so no need for padding */
+
+		FreeFile(fp);
+
+		/*
+		 * Mark file as archived, otherwise files can get archived again after
+		 * promotion of a new node. This is in line with walreceiver.c always
+		 * doing an XLogArchiveForceDone() after a complete segment.
+		 */
+		StatusFilePath(pathbuf, walFileName, ".done");
+		sendFileWithContent(pathbuf, "");
+	}
+
+	/*
+	 * Send timeline history files too. Only the latest timeline history file
+	 * is required for recovery, and even that only if there happens to be a
+	 * timeline switch in the first WAL segment that contains the checkpoint
+	 * record, or if we're taking a base backup from a standby server and the
+	 * target timeline changes while the backup is taken. But they are small
+	 * and highly useful for debugging purposes, so better include them all,
+	 * always.
+	 */
+	foreach(lc, historyFileList)
+	{
+		char	   *fname = lfirst(lc);
+
+		snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", fname);
+
+		if (lstat(pathbuf, &statbuf) != 0)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not stat file \"%s\": %m", pathbuf)));
+
+		sendFile(pathbuf, pathbuf, &statbuf, false, InvalidOid);
+
+		/* unconditionally mark file as archived */
+		StatusFilePath(pathbuf, fname, ".done");
+		sendFileWithContent(pathbuf, "");
+	}
+}
+
+/*
+ * Setup and activate network throttling, if client requested it
+ */
+static void
+setup_throttle(int maxrate)
+{
+	/* Setup and activate network throttling, if client requested it */
+	if (maxrate > 0)
+	{
+		throttling_sample =
+			(int64) maxrate * (int64) 1024 / THROTTLING_FREQUENCY;
+
+		/*
+		 * The minimum amount of time for throttling_sample bytes to be
+		 * transferred.
+		 */
+		elapsed_min_unit = USECS_PER_SEC / THROTTLING_FREQUENCY;
+
+		/* Enable throttling. */
+		throttling_counter = 0;
+
+		/* The 'real data' starts now (header was ignored). */
+		throttled_last = GetCurrentTimestamp();
+	}
+	else
+	{
+		/* Disable throttling. */
+		throttling_counter = -1;
+	}
+}
+
+static char *
+readfile(const char *readfilename, bool missing_ok)
+{
+	struct stat statbuf;
+	FILE	   *fp;
+	char	   *data;
+	int			r;
+
+	if (stat(readfilename, &statbuf))
+	{
+		if (errno == ENOENT && missing_ok)
+			return NULL;
+
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not stat file \"%s\": %m",
+						readfilename)));
+	}
+
+	fp = AllocateFile(readfilename, "r");
+	if (!fp)
+	{
+		if (errno == ENOENT && missing_ok)
+			return NULL;
+
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m", readfilename)));
+	}
+
+	data = palloc(statbuf.st_size + 1);
+	r = fread(data, statbuf.st_size, 1, fp);
+	data[statbuf.st_size] = '\0';
+
+	/* Close the file */
+	if (r != 1 || ferror(fp) || FreeFile(fp))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read file \"%s\": %m",
+						readfilename)));
+
+	return data;
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index c4e11cc4e8..88e384bf3c 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,6 +87,10 @@ static SQLCmd *make_sqlcmd(void);
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_START_BACKUP
+%token K_SEND_FILES_CONTENT
+%token K_STOP_BACKUP
+%token K_WORKER
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
@@ -102,6 +106,8 @@ static SQLCmd *make_sqlcmd(void);
 %type <boolval>	opt_temporary
 %type <list>	create_slot_opt_list
 %type <defelt>	create_slot_opt
+%type <list>	backup_files backup_files_list
+%type <node>	backup_file
 
 %%
 
@@ -162,6 +168,29 @@ base_backup:
 				{
 					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
 					cmd->options = $2;
+					cmd->cmdtag = BASE_BACKUP;
+					$$ = (Node *) cmd;
+				}
+			| K_START_BACKUP base_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = START_BACKUP;
+					$$ = (Node *) cmd;
+				}
+			| K_SEND_FILES_CONTENT backup_files base_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $3;
+					cmd->cmdtag = SEND_FILES_CONTENT;
+					cmd->backupfiles = $2;
+					$$ = (Node *) cmd;
+				}
+			| K_STOP_BACKUP base_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = STOP_BACKUP;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -214,6 +243,35 @@ base_backup_opt:
 				  $$ = makeDefElem("noverify_checksums",
 								   (Node *)makeInteger(true), -1);
 				}
+			| K_WORKER UCONST
+				{
+				  $$ = makeDefElem("worker",
+								   (Node *)makeInteger($2), -1);
+				}
+			;
+
+backup_files:
+			'(' backup_files_list ')'
+				{
+					$$ = $2;
+				}
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+backup_files_list:
+			backup_file
+				{
+					$$ = list_make1($1);
+				}
+			| backup_files_list ',' backup_file
+				{
+					$$ = lappend($1, $3);
+				}
+			;
+
+backup_file:
+			SCONST							{ $$ = (Node *) makeString($1); }
 			;
 
 create_replication_slot:
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 380faeb5f6..4836828c39 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -107,6 +107,11 @@ EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
 WAIT				{ return K_WAIT; }
+START_BACKUP		{ return K_START_BACKUP; }
+SEND_FILES_CONTENT	{ return K_SEND_FILES_CONTENT; }
+STOP_BACKUP			{ return K_STOP_BACKUP; }
+WORKER				{ return K_WORKER; }
+
 
 ","				{ return ','; }
 ";"				{ return ';'; }
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 55ef13926d..5139dcbe03 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -41,6 +41,7 @@
 #include "receivelog.h"
 #include "replication/basebackup.h"
 #include "streamutil.h"
+#include "fe_utils/simple_list.h"
 
 #define ERRCODE_DATA_CORRUPTED	"XX001"
 
@@ -57,6 +58,15 @@ typedef struct TablespaceList
 	TablespaceListCell *tail;
 } TablespaceList;
 
+typedef struct WorkerFiles
+{
+	int			num_files;
+	char	   *tspath;
+	SimpleStringList *worker_files;
+
+}			WorkerFiles;
+
+
 /*
  * pg_xlog has been renamed to pg_wal in version 10.  This version number
  * should be compared with PQserverVersion().
@@ -110,6 +120,10 @@ static bool found_existing_xlogdir = false;
 static bool made_tablespace_dirs = false;
 static bool found_tablespace_dirs = false;
 
+static int	numWorkers = 1;
+static PGresult *tablespacehdr;
+static SimpleOidList workerspid = {NULL, NULL};
+
 /* Progress counters */
 static uint64 totalsize_kb;
 static uint64 totaldone;
@@ -141,7 +155,7 @@ static void usage(void);
 static void verify_dir_is_empty_or_create(char *dirname, bool *created, bool *found);
 static void progress_report(int tablespacenum, const char *filename, bool force);
 
-static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
+static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum, int worker);
 static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
 static void BaseBackup(void);
 
@@ -151,6 +165,10 @@ static bool reached_end_position(XLogRecPtr segendpos, uint32 timeline,
 static const char *get_tablespace_mapping(const char *dir);
 static void tablespace_list_append(const char *arg);
 
+static void ParallelBackupEnd(void);
+static int	ReceiveFiles(WorkerFiles * workerFiles, int worker);
+static void create_workers_and_fetch(WorkerFiles * workerFiles);
+static int	simple_list_length(SimpleStringList *list);
 
 static void
 cleanup_directories_atexit(void)
@@ -349,6 +367,7 @@ usage(void)
 	printf(_("      --no-slot          prevent creation of temporary replication slot\n"));
 	printf(_("      --no-verify-checksums\n"
 			 "                         do not verify checksums\n"));
+	printf(_("  -j, --jobs=NUM         use this many parallel jobs to backup\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -921,7 +940,7 @@ writeTarData(
  * No attempt to inspect or validate the contents of the file is done.
  */
 static void
-ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
+ReceiveTarFile(PGconn *conn, PGresult *res, int rownum, int worker)
 {
 	char		filename[MAXPGPATH];
 	char	   *copybuf = NULL;
@@ -978,7 +997,10 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 #ifdef HAVE_LIBZ
 			if (compresslevel != 0)
 			{
-				snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir);
+				if (numWorkers > 1)
+					snprintf(filename, sizeof(filename), "%s/base.%d.tar.gz", basedir, worker);
+				else
+					snprintf(filename, sizeof(filename), "%s/base.tar.gz", basedir);
 				ztarfile = gzopen(filename, "wb");
 				if (gzsetparams(ztarfile, compresslevel,
 								Z_DEFAULT_STRATEGY) != Z_OK)
@@ -991,7 +1013,10 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 			else
 #endif
 			{
-				snprintf(filename, sizeof(filename), "%s/base.tar", basedir);
+				if (numWorkers > 1)
+					snprintf(filename, sizeof(filename), "%s/base.%d.tar", basedir, worker);
+				else
+					snprintf(filename, sizeof(filename), "%s/base.tar", basedir);
 				tarfile = fopen(filename, "wb");
 			}
 		}
@@ -1004,8 +1029,12 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 #ifdef HAVE_LIBZ
 		if (compresslevel != 0)
 		{
-			snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir,
-					 PQgetvalue(res, rownum, 0));
+			if (numWorkers > 1)
+				snprintf(filename, sizeof(filename), "%s/%s.%d.tar.gz", basedir,
+						 PQgetvalue(res, rownum, 0), worker);
+			else
+				snprintf(filename, sizeof(filename), "%s/%s.tar.gz", basedir,
+						 PQgetvalue(res, rownum, 0));
 			ztarfile = gzopen(filename, "wb");
 			if (gzsetparams(ztarfile, compresslevel,
 							Z_DEFAULT_STRATEGY) != Z_OK)
@@ -1018,8 +1047,12 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 		else
 #endif
 		{
-			snprintf(filename, sizeof(filename), "%s/%s.tar", basedir,
-					 PQgetvalue(res, rownum, 0));
+			if (numWorkers > 1)
+				snprintf(filename, sizeof(filename), "%s/%s.%d.tar", basedir,
+						 PQgetvalue(res, rownum, 0), worker);
+			else
+				snprintf(filename, sizeof(filename), "%s/%s.tar", basedir,
+						 PQgetvalue(res, rownum, 0));
 			tarfile = fopen(filename, "wb");
 		}
 	}
@@ -1475,6 +1508,7 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 			 */
 			snprintf(filename, sizeof(filename), "%s/%s", current_path,
 					 copybuf);
+
 			if (filename[strlen(filename) - 1] == '/')
 			{
 				/*
@@ -1486,21 +1520,14 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 					 * Directory
 					 */
 					filename[strlen(filename) - 1] = '\0';	/* Remove trailing slash */
+
+					/*
+					 * In parallel mode, we create directories before fetching
+					 * files so its Ok if a directory already exist.
+					 */
 					if (mkdir(filename, pg_dir_create_mode) != 0)
 					{
-						/*
-						 * When streaming WAL, pg_wal (or pg_xlog for pre-9.6
-						 * clusters) will have been created by the wal
-						 * receiver process. Also, when the WAL directory
-						 * location was specified, pg_wal (or pg_xlog) has
-						 * already been created as a symbolic link before
-						 * starting the actual backup. So just ignore creation
-						 * failures on related directories.
-						 */
-						if (!((pg_str_endswith(filename, "/pg_wal") ||
-							   pg_str_endswith(filename, "/pg_xlog") ||
-							   pg_str_endswith(filename, "/archive_status")) &&
-							  errno == EEXIST))
+						if (errno != EEXIST)
 						{
 							pg_log_error("could not create directory \"%s\": %m",
 										 filename);
@@ -1528,8 +1555,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 					 * can map them too.)
 					 */
 					filename[strlen(filename) - 1] = '\0';	/* Remove trailing slash */
-
 					mapped_tblspc_path = get_tablespace_mapping(&copybuf[157]);
+
 					if (symlink(mapped_tblspc_path, filename) != 0)
 					{
 						pg_log_error("could not create symbolic link from \"%s\" to \"%s\": %m",
@@ -1716,7 +1743,8 @@ BaseBackup(void)
 	}
 
 	basebkp =
-		psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s %s",
+		psprintf("%s LABEL '%s' %s %s %s %s %s %s %s",
+				 (numWorkers > 1) ? "START_BACKUP" : "BASE_BACKUP",
 				 escaped_label,
 				 showprogress ? "PROGRESS" : "",
 				 includewal == FETCH_WAL ? "WAL" : "",
@@ -1830,20 +1858,102 @@ BaseBackup(void)
 		StartLogStreamer(xlogstart, starttli, sysidentifier);
 	}
 
-	/*
-	 * Start receiving chunks
-	 */
-	for (i = 0; i < PQntuples(res); i++)
+	if (numWorkers > 1)
 	{
-		if (format == 't')
-			ReceiveTarFile(conn, res, i);
-		else
-			ReceiveAndUnpackTarFile(conn, res, i);
-	}							/* Loop over all tablespaces */
+		WorkerFiles *workerFiles = palloc0(sizeof(WorkerFiles) * tablespacecount);
+
+		tablespacehdr = res;
+
+		for (i = 0; i < tablespacecount; i++)
+		{
+			bool		basetablespace;
+
+			workerFiles[i].worker_files = palloc0(sizeof(SimpleStringList) * numWorkers);
+
+			/*
+			 * Get the header
+			 */
+			res = PQgetResult(conn);
+			if (PQresultStatus(res) != PGRES_TUPLES_OK)
+			{
+				pg_log_error("could not get backup header: %s",
+							 PQerrorMessage(conn));
+				exit(1);
+			}
+			if (PQntuples(res) < 1)
+			{
+				pg_log_error("no data returned from server");
+				exit(1);
+			}
+
+			basetablespace = PQgetisnull(tablespacehdr, i, 0);
+			workerFiles[i].tspath = PQgetvalue(tablespacehdr, i, 1);
+			workerFiles[i].num_files = 0;
+
+			for (int j = 0; j < PQntuples(res); j++)
+			{
+				const char *path = PQgetvalue(res, j, 0);
+				bool		isdir = PQgetvalue(res, j, 1)[0] == 't';
+
+				if (format == 'p' && isdir)
+				{
+					char		dirpath[MAXPGPATH];
+
+					if (basetablespace)
+						snprintf(dirpath, sizeof(dirpath), "%s/%s", basedir, path);
+					else
+					{
+						const char *tspath = PQgetvalue(tablespacehdr, i, 1);
+
+						snprintf(dirpath, sizeof(dirpath), "%s/%s",
+								 get_tablespace_mapping(tspath), (path + strlen(tspath) + 1));
+					}
+
+					if (pg_mkdir_p(dirpath, pg_dir_create_mode) != 0)
+					{
+						if (errno != EEXIST)
+						{
+							pg_log_error("could not create directory \"%s\": %m",
+										 dirpath);
+							exit(1);
+						}
+					}
+				}
+
+				workerFiles[i].num_files++;
+				simple_string_list_append(&workerFiles[i].worker_files[j % numWorkers], path);
+			}
+			PQclear(res);
+		}
+
+		res = PQgetResult(conn);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not get data: %s", PQerrorMessage(conn));
+			exit(1);
+		}
+
+		res = PQgetResult(conn);
+		create_workers_and_fetch(workerFiles);
+		ParallelBackupEnd();
+	}
+	else
+	{
+		/*
+		 * Start receiving chunks
+		 */
+		for (i = 0; i < PQntuples(res); i++)
+		{
+			if (format == 't')
+				ReceiveTarFile(conn, res, i, 0);
+			else
+				ReceiveAndUnpackTarFile(conn, res, i);
+		}						/* Loop over all tablespaces */
+	}
 
 	if (showprogress)
 	{
-		progress_report(PQntuples(res), NULL, true);
+		progress_report(PQntuples(tablespacehdr), NULL, true);
 		if (isatty(fileno(stderr)))
 			fprintf(stderr, "\n");	/* Need to move to next line */
 	}
@@ -2043,6 +2153,7 @@ main(int argc, char **argv)
 		{"waldir", required_argument, NULL, 1},
 		{"no-slot", no_argument, NULL, 2},
 		{"no-verify-checksums", no_argument, NULL, 3},
+		{"jobs", required_argument, NULL, 'j'},
 		{NULL, 0, NULL, 0}
 	};
 	int			c;
@@ -2070,7 +2181,7 @@ main(int argc, char **argv)
 
 	atexit(cleanup_directories_atexit);
 
-	while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvP",
+	while ((c = getopt_long(argc, argv, "CD:F:r:RS:T:X:l:nNzZ:d:c:h:p:U:s:wWkvPj:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -2211,6 +2322,9 @@ main(int argc, char **argv)
 			case 3:
 				verify_checksums = false;
 				break;
+			case 'j':			/* number of jobs */
+				numWorkers = atoi(optarg);
+				break;
 			default:
 
 				/*
@@ -2325,6 +2439,14 @@ main(int argc, char **argv)
 		}
 	}
 
+	if (numWorkers <= 0)
+	{
+		pg_log_error("invalid number of parallel jobs");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 #ifndef HAVE_LIBZ
 	if (compresslevel != 0)
 	{
@@ -2397,3 +2519,173 @@ main(int argc, char **argv)
 	success = true;
 	return 0;
 }
+
+static void
+ParallelBackupEnd(void)
+{
+	PGresult   *res = NULL;
+	char	   *basebkp;
+
+	basebkp = psprintf("STOP_BACKUP %s %s",
+					   includewal == FETCH_WAL ? "WAL" : "",
+					   includewal == NO_WAL ? "" : "NOWAIT");
+	if (PQsendQuery(conn, basebkp) == 0)
+	{
+		pg_log_error("could not execute STOP BACKUP \"%s\"",
+					 PQerrorMessage(conn));
+		exit(1);
+	}
+
+	/* receive pg_control and wal files */
+	if (format == 't')
+		ReceiveTarFile(conn, res, tablespacecount, numWorkers);
+	else
+		ReceiveAndUnpackTarFile(conn, res, tablespacecount);
+
+	PQclear(res);
+}
+
+static int
+ReceiveFiles(WorkerFiles * workerFiles, int worker)
+{
+	SimpleStringListCell *cell;
+	PGresult   *res = NULL;
+	PGconn	   *worker_conn;
+	int			i;
+
+	worker_conn = GetConnection();
+	for (i = 0; i < tablespacecount; i++)
+	{
+		SimpleStringList *files = &workerFiles[i].worker_files[worker];
+		PQExpBuffer buf = createPQExpBuffer();
+
+		if (simple_list_length(files) <= 0)
+			continue;
+
+
+		/*
+		 * build query in form of: SEND_FILES_CONTENT ('base/1/1245/32683',
+		 * 'base/1/1245/32683', ...) [options]
+		 */
+		appendPQExpBuffer(buf, "SEND_FILES_CONTENT (");
+		for (cell = files->head; cell; cell = cell->next)
+		{
+			if (cell != files->tail)
+				appendPQExpBuffer(buf, "'%s' ,", cell->val);
+			else
+				appendPQExpBuffer(buf, "'%s'", cell->val);
+		}
+		appendPQExpBufferStr(buf, " )");
+
+		/*
+		 * Add backup options to the command. we are reusing the LABEL here to
+		 * keep the original tablespace path on the server.
+		 */
+		appendPQExpBuffer(buf, " LABEL '%s' WORKER %u %s %s",
+						  workerFiles[i].tspath,
+						  worker,
+						  format == 't' ? "TABLESPACE_MAP" : "",
+						  verify_checksums ? "" : "NOVERIFY_CHECKSUMS");
+		if (maxrate > 0)
+			appendPQExpBuffer(buf, " MAX_RATE %u", maxrate);
+
+		if (!worker_conn)
+			return 1;
+
+		if (PQsendQuery(worker_conn, buf->data) == 0)
+		{
+			pg_log_error("could not send files list \"%s\"",
+						 PQerrorMessage(worker_conn));
+			return 1;
+		}
+		destroyPQExpBuffer(buf);
+
+		if (format == 't')
+			ReceiveTarFile(worker_conn, tablespacehdr, i, worker);
+		else
+			ReceiveAndUnpackTarFile(worker_conn, tablespacehdr, i);
+
+		res = PQgetResult(worker_conn);
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not get data stream: %s",
+						 PQerrorMessage(worker_conn));
+			exit(1);
+		}
+
+		res = PQgetResult(worker_conn);
+	}
+
+	PQclear(res);
+	PQfinish(worker_conn);
+
+	return 0;
+}
+
+static void
+create_workers_and_fetch(WorkerFiles * workerFiles)
+{
+	int			status;
+	int			pid,
+				i;
+
+	for (i = 0; i < numWorkers; i++)
+	{
+		pid = fork();
+		if (pid == 0)
+		{
+			/* in child process */
+			_exit(ReceiveFiles(workerFiles, i));
+		}
+		else if (pid < 0)
+		{
+			pg_log_error("could not create backup worker: %m");
+			exit(1);
+		}
+
+		simple_oid_list_append(&workerspid, pid);
+		if (verbose)
+			pg_log_info("backup worker (%d) created", pid);
+
+		/*
+		 * Else we are in the parent process and all is well.
+		 */
+	}
+
+	for (i = 0; i < numWorkers; i++)
+	{
+		pid = waitpid(-1, &status, 0);
+
+		if (WIFEXITED(status) && WEXITSTATUS(status) == EXIT_FAILURE)
+		{
+			SimpleOidListCell *cell;
+
+			pg_log_error("backup worker (%d) failed with code %d", pid, WEXITSTATUS(status));
+
+			/* error. kill other workers and exit. */
+			for (cell = workerspid.head; cell; cell = cell->next)
+			{
+				if (pid != cell->val)
+				{
+					kill(cell->val, SIGTERM);
+					pg_log_error("backup worker killed %d", cell->val);
+				}
+			}
+
+			exit(1);
+		}
+	}
+}
+
+
+static int
+simple_list_length(SimpleStringList *list)
+{
+	int			len = 0;
+	SimpleStringListCell *cell;
+
+	for (cell = list->head; cell; cell = cell->next, len++)
+		;
+
+	return len;
+}
diff --git a/src/bin/pg_basebackup/t/040_pg_basebackup_parallel.pl b/src/bin/pg_basebackup/t/040_pg_basebackup_parallel.pl
new file mode 100644
index 0000000000..6c31214f3d
--- /dev/null
+++ b/src/bin/pg_basebackup/t/040_pg_basebackup_parallel.pl
@@ -0,0 +1,571 @@
+use strict;
+use warnings;
+use Cwd;
+use Config;
+use File::Basename qw(basename dirname);
+use File::Path qw(rmtree);
+use PostgresNode;
+use TestLib;
+use Test::More tests => 106;
+
+program_help_ok('pg_basebackup');
+program_version_ok('pg_basebackup');
+program_options_handling_ok('pg_basebackup');
+
+my $tempdir = TestLib::tempdir;
+
+my $node = get_new_node('main');
+
+# Set umask so test directories and files are created with default permissions
+umask(0077);
+
+# Initialize node without replication settings
+$node->init(extra => ['--data-checksums']);
+$node->start;
+my $pgdata = $node->data_dir;
+
+$node->command_fails(['pg_basebackup'],
+	'pg_basebackup needs target directory specified');
+
+# Some Windows ANSI code pages may reject this filename, in which case we
+# quietly proceed without this bit of test coverage.
+if (open my $badchars, '>>', "$tempdir/pgdata/FOO\xe0\xe0\xe0BAR")
+{
+	print $badchars "test backup of file with non-UTF8 name\n";
+	close $badchars;
+}
+
+$node->set_replication_conf();
+system_or_bail 'pg_ctl', '-D', $pgdata, 'reload';
+
+$node->command_fails(
+	[ 'pg_basebackup', '-D', "$tempdir/backup" ],
+	'pg_basebackup fails because of WAL configuration');
+
+ok(!-d "$tempdir/backup", 'backup directory was cleaned up');
+
+# Create a backup directory that is not empty so the next command will fail
+# but leave the data directory behind
+mkdir("$tempdir/backup")
+  or BAIL_OUT("unable to create $tempdir/backup");
+append_to_file("$tempdir/backup/dir-not-empty.txt", "Some data");
+
+$node->command_fails([ 'pg_basebackup', '-D', "$tempdir/backup", '-n' ],
+	'failing run with no-clean option');
+
+ok(-d "$tempdir/backup", 'backup directory was created and left behind');
+rmtree("$tempdir/backup");
+
+open my $conf, '>>', "$pgdata/postgresql.conf";
+print $conf "max_replication_slots = 10\n";
+print $conf "max_wal_senders = 10\n";
+print $conf "wal_level = replica\n";
+close $conf;
+$node->restart;
+
+# Write some files to test that they are not copied.
+foreach my $filename (
+	qw(backup_label tablespace_map postgresql.auto.conf.tmp current_logfiles.tmp)
+  )
+{
+	open my $file, '>>', "$pgdata/$filename";
+	print $file "DONOTCOPY";
+	close $file;
+}
+
+# Connect to a database to create global/pg_internal.init.  If this is removed
+# the test to ensure global/pg_internal.init is not copied will return a false
+# positive.
+$node->safe_psql('postgres', 'SELECT 1;');
+
+# Create an unlogged table to test that forks other than init are not copied.
+$node->safe_psql('postgres', 'CREATE UNLOGGED TABLE base_unlogged (id int)');
+
+my $baseUnloggedPath = $node->safe_psql('postgres',
+	q{select pg_relation_filepath('base_unlogged')});
+
+# Make sure main and init forks exist
+ok(-f "$pgdata/${baseUnloggedPath}_init", 'unlogged init fork in base');
+ok(-f "$pgdata/$baseUnloggedPath",        'unlogged main fork in base');
+
+# Create files that look like temporary relations to ensure they are ignored.
+my $postgresOid = $node->safe_psql('postgres',
+	q{select oid from pg_database where datname = 'postgres'});
+
+my @tempRelationFiles =
+  qw(t999_999 t9999_999.1 t999_9999_vm t99999_99999_vm.1);
+
+foreach my $filename (@tempRelationFiles)
+{
+	append_to_file("$pgdata/base/$postgresOid/$filename", 'TEMP_RELATION');
+}
+
+# Run base backup in parallel mode.
+$node->command_ok([ 'pg_basebackup', '-D', "$tempdir/backup", '-X', 'none', "-j 4" ],
+	'pg_basebackup runs');
+ok(-f "$tempdir/backup/PG_VERSION", 'backup was created');
+
+# Permissions on backup should be default
+SKIP:
+{
+	skip "unix-style permissions not supported on Windows", 1
+	  if ($windows_os);
+
+	ok(check_mode_recursive("$tempdir/backup", 0700, 0600),
+		"check backup dir permissions");
+}
+
+# Only archive_status directory should be copied in pg_wal/.
+is_deeply(
+	[ sort(slurp_dir("$tempdir/backup/pg_wal/")) ],
+	[ sort qw(. .. archive_status) ],
+	'no WAL files copied');
+
+# Contents of these directories should not be copied.
+foreach my $dirname (
+	qw(pg_dynshmem pg_notify pg_replslot pg_serial pg_snapshots pg_stat_tmp pg_subtrans)
+  )
+{
+	is_deeply(
+		[ sort(slurp_dir("$tempdir/backup/$dirname/")) ],
+		[ sort qw(. ..) ],
+		"contents of $dirname/ not copied");
+}
+
+# These files should not be copied.
+foreach my $filename (
+	qw(postgresql.auto.conf.tmp postmaster.opts postmaster.pid tablespace_map current_logfiles.tmp
+	global/pg_internal.init))
+{
+	ok(!-f "$tempdir/backup/$filename", "$filename not copied");
+}
+
+# Unlogged relation forks other than init should not be copied
+ok(-f "$tempdir/backup/${baseUnloggedPath}_init",
+	'unlogged init fork in backup');
+ok( !-f "$tempdir/backup/$baseUnloggedPath",
+	'unlogged main fork not in backup');
+
+# Temp relations should not be copied.
+foreach my $filename (@tempRelationFiles)
+{
+	ok( !-f "$tempdir/backup/base/$postgresOid/$filename",
+		"base/$postgresOid/$filename not copied");
+}
+
+# Make sure existing backup_label was ignored.
+isnt(slurp_file("$tempdir/backup/backup_label"),
+	'DONOTCOPY', 'existing backup_label not copied');
+rmtree("$tempdir/backup");
+
+$node->command_ok(
+	[
+		'pg_basebackup', '-D', "$tempdir/backup2", '--waldir',
+		"$tempdir/xlog2", "-j 4"
+	],
+	'separate xlog directory');
+ok(-f "$tempdir/backup2/PG_VERSION", 'backup was created');
+ok(-d "$tempdir/xlog2/",             'xlog directory was created');
+rmtree("$tempdir/backup2");
+rmtree("$tempdir/xlog2");
+
+$node->command_ok([ 'pg_basebackup', '-D', "$tempdir/tarbackup", '-Ft', "-j 4"],
+	'tar format');
+foreach my $filename (
+	qw(base.0.tar base.1.tar base.2.tar base.3.tar))
+{
+	ok(!-f "$tempdir/backup/$filename", "backup $filename tar created");
+}
+
+rmtree("$tempdir/tarbackup");
+
+$node->command_fails(
+	[ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-T=/foo" ],
+	'-T with empty old directory fails');
+$node->command_fails(
+	[ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-T/foo=" ],
+	'-T with empty new directory fails');
+$node->command_fails(
+	[
+		'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4",
+		"-T/foo=/bar=/baz"
+	],
+	'-T with multiple = fails');
+$node->command_fails(
+	[ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-Tfoo=/bar" ],
+	'-T with old directory not absolute fails');
+$node->command_fails(
+	[ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-T/foo=bar" ],
+	'-T with new directory not absolute fails');
+$node->command_fails(
+	[ 'pg_basebackup', '-D', "$tempdir/backup_foo", '-Fp', "-j 4", "-Tfoo" ],
+	'-T with invalid format fails');
+
+# Tar format doesn't support filenames longer than 100 bytes.
+my $superlongname = "superlongname_" . ("x" x 100);
+my $superlongpath = "$pgdata/$superlongname";
+
+open my $file, '>', "$superlongpath"
+  or die "unable to create file $superlongpath";
+close $file;
+$node->command_fails(
+	[ 'pg_basebackup', '-D', "$tempdir/tarbackup_l1", '-Ft', "-j 4" ],
+	'pg_basebackup tar with long name fails');
+unlink "$pgdata/$superlongname";
+
+
+# The following tests test symlinks. Windows doesn't have symlinks, so
+# skip on Windows.
+SKIP:
+{
+	skip "symlinks not supported on Windows", 18 if ($windows_os);
+
+	# Move pg_replslot out of $pgdata and create a symlink to it.
+	$node->stop;
+
+	# Set umask so test directories and files are created with group permissions
+	umask(0027);
+
+	# Enable group permissions on PGDATA
+	chmod_recursive("$pgdata", 0750, 0640);
+
+	rename("$pgdata/pg_replslot", "$tempdir/pg_replslot")
+	  or BAIL_OUT "could not move $pgdata/pg_replslot";
+	symlink("$tempdir/pg_replslot", "$pgdata/pg_replslot")
+	  or BAIL_OUT "could not symlink to $pgdata/pg_replslot";
+
+	$node->start;
+
+	# Create a temporary directory in the system location and symlink it
+	# to our physical temp location.  That way we can use shorter names
+	# for the tablespace directories, which hopefully won't run afoul of
+	# the 99 character length limit.
+	my $shorter_tempdir = TestLib::tempdir_short . "/tempdir";
+	symlink "$tempdir", $shorter_tempdir;
+
+	mkdir "$tempdir/tblspc1";
+	$node->safe_psql('postgres',
+		"CREATE TABLESPACE tblspc1 LOCATION '$shorter_tempdir/tblspc1';");
+	$node->safe_psql('postgres',
+		"CREATE TABLE test1 (a int) TABLESPACE tblspc1;");
+	$node->command_ok([ 'pg_basebackup', '-D', "$tempdir/tarbackup2", '-Ft', "-j 4" ],
+		'tar format with tablespaces');
+	ok(-f "$tempdir/tarbackup2/base.0.tar", 'backup tar was created');
+	my @tblspc_tars = glob "$tempdir/tarbackup2/[0-9]*.tar";
+	is(scalar(@tblspc_tars), 3, 'one tablespace tar was created');
+	rmtree("$tempdir/tarbackup2");
+
+	# Create an unlogged table to test that forks other than init are not copied.
+	$node->safe_psql('postgres',
+		'CREATE UNLOGGED TABLE tblspc1_unlogged (id int) TABLESPACE tblspc1;'
+	);
+
+	my $tblspc1UnloggedPath = $node->safe_psql('postgres',
+		q{select pg_relation_filepath('tblspc1_unlogged')});
+
+	# Make sure main and init forks exist
+	ok( -f "$pgdata/${tblspc1UnloggedPath}_init",
+		'unlogged init fork in tablespace');
+	ok(-f "$pgdata/$tblspc1UnloggedPath", 'unlogged main fork in tablespace');
+
+	# Create files that look like temporary relations to ensure they are ignored
+	# in a tablespace.
+	my @tempRelationFiles = qw(t888_888 t888888_888888_vm.1);
+	my $tblSpc1Id         = basename(
+		dirname(
+			dirname(
+				$node->safe_psql(
+					'postgres', q{select pg_relation_filepath('test1')}))));
+
+	foreach my $filename (@tempRelationFiles)
+	{
+		append_to_file(
+			"$shorter_tempdir/tblspc1/$tblSpc1Id/$postgresOid/$filename",
+			'TEMP_RELATION');
+	}
+
+	$node->command_fails(
+		[ 'pg_basebackup', '-D', "$tempdir/backup1", '-Fp', "-j 4" ],
+		'plain format with tablespaces fails without tablespace mapping');
+
+	$node->command_ok(
+		[
+			'pg_basebackup', '-D', "$tempdir/backup1", '-Fp', "-j 4",
+			"-T$shorter_tempdir/tblspc1=$tempdir/tbackup/tblspc1"
+		],
+		'plain format with tablespaces succeeds with tablespace mapping');
+	ok(-d "$tempdir/tbackup/tblspc1", 'tablespace was relocated');
+	opendir(my $dh, "$pgdata/pg_tblspc") or die;
+	ok( (   grep {
+				-l "$tempdir/backup1/pg_tblspc/$_"
+				  and readlink "$tempdir/backup1/pg_tblspc/$_" eq
+				  "$tempdir/tbackup/tblspc1"
+			} readdir($dh)),
+		"tablespace symlink was updated");
+	closedir $dh;
+
+	# Group access should be enabled on all backup files
+	ok(check_mode_recursive("$tempdir/backup1", 0750, 0640),
+		"check backup dir permissions");
+
+	# Unlogged relation forks other than init should not be copied
+	my ($tblspc1UnloggedBackupPath) =
+	  $tblspc1UnloggedPath =~ /[^\/]*\/[^\/]*\/[^\/]*$/g;
+
+	ok(-f "$tempdir/tbackup/tblspc1/${tblspc1UnloggedBackupPath}_init",
+		'unlogged init fork in tablespace backup');
+	ok(!-f "$tempdir/tbackup/tblspc1/$tblspc1UnloggedBackupPath",
+		'unlogged main fork not in tablespace backup');
+
+	# Temp relations should not be copied.
+	foreach my $filename (@tempRelationFiles)
+	{
+		ok( !-f "$tempdir/tbackup/tblspc1/$tblSpc1Id/$postgresOid/$filename",
+			"[tblspc1]/$postgresOid/$filename not copied");
+
+		# Also remove temp relation files or tablespace drop will fail.
+		my $filepath =
+		  "$shorter_tempdir/tblspc1/$tblSpc1Id/$postgresOid/$filename";
+
+		unlink($filepath)
+		  or BAIL_OUT("unable to unlink $filepath");
+	}
+
+	ok( -d "$tempdir/backup1/pg_replslot",
+		'pg_replslot symlink copied as directory');
+	rmtree("$tempdir/backup1");
+
+	mkdir "$tempdir/tbl=spc2";
+	$node->safe_psql('postgres', "DROP TABLE test1;");
+	$node->safe_psql('postgres', "DROP TABLE tblspc1_unlogged;");
+	$node->safe_psql('postgres', "DROP TABLESPACE tblspc1;");
+	$node->safe_psql('postgres',
+		"CREATE TABLESPACE tblspc2 LOCATION '$shorter_tempdir/tbl=spc2';");
+	$node->command_ok(
+		[
+			'pg_basebackup', '-D', "$tempdir/backup3", '-Fp', "-j 4",
+			"-T$shorter_tempdir/tbl\\=spc2=$tempdir/tbackup/tbl\\=spc2"
+		],
+		'mapping tablespace with = sign in path');
+	ok(-d "$tempdir/tbackup/tbl=spc2",
+		'tablespace with = sign was relocated');
+	$node->safe_psql('postgres', "DROP TABLESPACE tblspc2;");
+	rmtree("$tempdir/backup3");
+
+	mkdir "$tempdir/$superlongname";
+	$node->safe_psql('postgres',
+		"CREATE TABLESPACE tblspc3 LOCATION '$tempdir/$superlongname';");
+	$node->command_ok(
+		[ 'pg_basebackup', '-D', "$tempdir/tarbackup_l3", '-Ft' , '-j 4'],
+		'pg_basebackup tar with long symlink target');
+	$node->safe_psql('postgres', "DROP TABLESPACE tblspc3;");
+	rmtree("$tempdir/tarbackup_l3");
+}
+
+$node->command_ok([ 'pg_basebackup', '-D', "$tempdir/backupR", '-R' , '-j 4'],
+	'pg_basebackup -R runs');
+ok(-f "$tempdir/backupR/postgresql.auto.conf", 'postgresql.auto.conf exists');
+ok(-f "$tempdir/backupR/standby.signal",       'standby.signal was created');
+my $recovery_conf = slurp_file "$tempdir/backupR/postgresql.auto.conf";
+rmtree("$tempdir/backupR");
+
+my $port = $node->port;
+like(
+	$recovery_conf,
+	qr/^primary_conninfo = '.*port=$port.*'\n/m,
+	'postgresql.auto.conf sets primary_conninfo');
+
+$node->command_ok(
+	[ 'pg_basebackup', '-D', "$tempdir/backupxd" , "-j 4"],
+	'pg_basebackup runs in default xlog mode');
+ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxd/pg_wal")),
+	'WAL files copied');
+rmtree("$tempdir/backupxd");
+
+$node->command_ok(
+	[ 'pg_basebackup', '-D', "$tempdir/backupxf", '-X', 'fetch' , "-j 4"],
+	'pg_basebackup -X fetch runs');
+ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxf/pg_wal")),
+	'WAL files copied');
+rmtree("$tempdir/backupxf");
+$node->command_ok(
+	[ 'pg_basebackup', '-D', "$tempdir/backupxs", '-X', 'stream' , "-j 4"],
+	'pg_basebackup -X stream runs');
+ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxs/pg_wal")),
+	'WAL files copied');
+rmtree("$tempdir/backupxs");
+$node->command_ok(
+	[ 'pg_basebackup', '-D', "$tempdir/backupxst", '-X', 'stream', '-Ft' , "-j 4"],
+	'pg_basebackup -X stream runs in tar mode');
+ok(-f "$tempdir/backupxst/pg_wal.tar", "tar file was created");
+rmtree("$tempdir/backupxst");
+$node->command_ok(
+	[
+		'pg_basebackup',         '-D',
+		"$tempdir/backupnoslot", '-X',
+		'stream',                '--no-slot',
+		'-j 4'
+	],
+	'pg_basebackup -X stream runs with --no-slot');
+rmtree("$tempdir/backupnoslot");
+
+$node->command_fails(
+	[
+		'pg_basebackup',             '-D',
+		"$tempdir/backupxs_sl_fail", '-X',
+		'stream',                    '-S',
+		'slot0',
+		'-j 4'
+	],
+	'pg_basebackup fails with nonexistent replication slot');
+#
+$node->command_fails(
+	[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C' , '-j 4'],
+	'pg_basebackup -C fails without slot name');
+
+$node->command_fails(
+	[
+		'pg_basebackup',          '-D',
+		"$tempdir/backupxs_slot", '-C',
+		'-S',                     'slot0',
+		'--no-slot',
+		'-j 4'
+	],
+	'pg_basebackup fails with -C -S --no-slot');
+
+$node->command_ok(
+	[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot", '-C', '-S', 'slot0', '-j 4'],
+	'pg_basebackup -C runs');
+rmtree("$tempdir/backupxs_slot");
+
+is( $node->safe_psql(
+		'postgres',
+		q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'slot0'}
+	),
+	'slot0',
+	'replication slot was created');
+isnt(
+	$node->safe_psql(
+		'postgres',
+		q{SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = 'slot0'}
+	),
+	'',
+	'restart LSN of new slot is not null');
+
+$node->command_fails(
+	[ 'pg_basebackup', '-D', "$tempdir/backupxs_slot1", '-C', '-S', 'slot0', '-j 4'],
+	'pg_basebackup fails with -C -S and a previously existing slot');
+
+$node->safe_psql('postgres',
+	q{SELECT * FROM pg_create_physical_replication_slot('slot1')});
+my $lsn = $node->safe_psql('postgres',
+	q{SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = 'slot1'}
+);
+is($lsn, '', 'restart LSN of new slot is null');
+$node->command_fails(
+	[ 'pg_basebackup', '-D', "$tempdir/fail", '-S', 'slot1', '-X', 'none', '-j 4'],
+	'pg_basebackup with replication slot fails without WAL streaming');
+$node->command_ok(
+	[
+		'pg_basebackup', '-D', "$tempdir/backupxs_sl", '-X',
+		'stream',        '-S', 'slot1', '-j 4'
+	],
+	'pg_basebackup -X stream with replication slot runs');
+$lsn = $node->safe_psql('postgres',
+	q{SELECT restart_lsn FROM pg_replication_slots WHERE slot_name = 'slot1'}
+);
+like($lsn, qr!^0/[0-9A-Z]{7,8}$!, 'restart LSN of slot has advanced');
+rmtree("$tempdir/backupxs_sl");
+
+$node->command_ok(
+	[
+		'pg_basebackup', '-D', "$tempdir/backupxs_sl_R", '-X',
+		'stream',        '-S', 'slot1',                  '-R',
+		'-j 4'
+	],
+	'pg_basebackup with replication slot and -R runs');
+like(
+	slurp_file("$tempdir/backupxs_sl_R/postgresql.auto.conf"),
+	qr/^primary_slot_name = 'slot1'\n/m,
+	'recovery conf file sets primary_slot_name');
+
+my $checksum = $node->safe_psql('postgres', 'SHOW data_checksums;');
+is($checksum, 'on', 'checksums are enabled');
+rmtree("$tempdir/backupxs_sl_R");
+
+# create tables to corrupt and get their relfilenodes
+my $file_corrupt1 = $node->safe_psql('postgres',
+	q{SELECT a INTO corrupt1 FROM generate_series(1,10000) AS a; ALTER TABLE corrupt1 SET (autovacuum_enabled=false); SELECT pg_relation_filepath('corrupt1')}
+);
+my $file_corrupt2 = $node->safe_psql('postgres',
+	q{SELECT b INTO corrupt2 FROM generate_series(1,2) AS b; ALTER TABLE corrupt2 SET (autovacuum_enabled=false); SELECT pg_relation_filepath('corrupt2')}
+);
+
+# set page header and block sizes
+my $pageheader_size = 24;
+my $block_size = $node->safe_psql('postgres', 'SHOW block_size;');
+
+# induce corruption
+system_or_bail 'pg_ctl', '-D', $pgdata, 'stop';
+open $file, '+<', "$pgdata/$file_corrupt1";
+seek($file, $pageheader_size, 0);
+syswrite($file, "\0\0\0\0\0\0\0\0\0");
+close $file;
+system_or_bail 'pg_ctl', '-D', $pgdata, 'start';
+
+$node->command_checks_all(
+	[ 'pg_basebackup', '-D', "$tempdir/backup_corrupt", '-j 4'],
+	1,
+	[qr{^$}],
+	[qr/^WARNING.*checksum verification failed/s],
+	'pg_basebackup reports checksum mismatch');
+rmtree("$tempdir/backup_corrupt");
+
+# induce further corruption in 5 more blocks
+system_or_bail 'pg_ctl', '-D', $pgdata, 'stop';
+open $file, '+<', "$pgdata/$file_corrupt1";
+for my $i (1 .. 5)
+{
+	my $offset = $pageheader_size + $i * $block_size;
+	seek($file, $offset, 0);
+	syswrite($file, "\0\0\0\0\0\0\0\0\0");
+}
+close $file;
+system_or_bail 'pg_ctl', '-D', $pgdata, 'start';
+
+$node->command_checks_all(
+	[ 'pg_basebackup', '-D', "$tempdir/backup_corrupt2", '-j 4'],
+	1,
+	[qr{^$}],
+	[qr/^WARNING.*further.*failures.*will.not.be.reported/s],
+	'pg_basebackup does not report more than 5 checksum mismatches');
+rmtree("$tempdir/backup_corrupt2");
+
+# induce corruption in a second file
+system_or_bail 'pg_ctl', '-D', $pgdata, 'stop';
+open $file, '+<', "$pgdata/$file_corrupt2";
+seek($file, $pageheader_size, 0);
+syswrite($file, "\0\0\0\0\0\0\0\0\0");
+close $file;
+system_or_bail 'pg_ctl', '-D', $pgdata, 'start';
+
+#$node->command_checks_all(
+#	[ 'pg_basebackup', '-D', "$tempdir/backup_corrupt3", '-j 4'],
+#	1,
+#	[qr{^$}],
+#	[qr/^WARNING.*checksum verification failed/s],
+#	'pg_basebackup correctly report the total number of checksum mismatches');
+#rmtree("$tempdir/backup_corrupt3");
+
+# do not verify checksums, should return ok
+$node->command_ok(
+	[
+		'pg_basebackup',            '-D',
+		"$tempdir/backup_corrupt4", '--no-verify-checksums',
+		'-j 4'
+	],
+	'pg_basebackup with -k does not report checksum mismatch');
+rmtree("$tempdir/backup_corrupt4");
+
+$node->safe_psql('postgres', "DROP TABLE corrupt1;");
+$node->safe_psql('postgres', "DROP TABLE corrupt2;");
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 1e3ed4e19f..f92d593e2e 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -23,6 +23,13 @@ typedef enum ReplicationKind
 	REPLICATION_KIND_LOGICAL
 } ReplicationKind;
 
+typedef enum BackupCmdTag
+{
+	BASE_BACKUP,
+	START_BACKUP,
+	SEND_FILES_CONTENT,
+	STOP_BACKUP
+} BackupCmdTag;
 
 /* ----------------------
  *		IDENTIFY_SYSTEM command
@@ -42,6 +49,8 @@ typedef struct BaseBackupCmd
 {
 	NodeTag		type;
 	List	   *options;
+	BackupCmdTag cmdtag;
+	List	   *backupfiles;
 } BaseBackupCmd;
 
 
diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h
index 503a5b9f0b..9e792af99d 100644
--- a/src/include/replication/basebackup.h
+++ b/src/include/replication/basebackup.h
@@ -31,6 +31,6 @@ typedef struct
 
 extern void SendBaseBackup(BaseBackupCmd *cmd);
 
-extern int64 sendTablespace(char *path, bool sizeonly);
+extern int64 sendTablespace(char *path, bool sizeonly, List **files);
 
 #endif							/* _BASEBACKUP_H */
-- 
2.21.0 (Apple Git-122)

