From 3e62b74a0e8d22df942f625a343d1d6254ad1b08 Mon Sep 17 00:00:00 2001
From: Asif Rehman <asif.rehman@highgo.ca>
Date: Sun, 13 Oct 2019 22:59:28 +0500
Subject: [PATCH 2/4] backend changes for parallel backup

---
 src/backend/replication/basebackup.c   | 589 ++++++++++++++++++++++++-
 src/backend/replication/repl_gram.y    |  72 +++
 src/backend/replication/repl_scanner.l |   7 +
 src/include/nodes/replnodes.h          |  10 +
 src/include/replication/basebackup.h   |   2 +-
 5 files changed, 670 insertions(+), 10 deletions(-)

diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index a05a97ded2..cc262e49b8 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -41,6 +41,7 @@
 #include "utils/ps_status.h"
 #include "utils/relcache.h"
 #include "utils/timestamp.h"
+#include "utils/pg_lsn.h"
 
 
 typedef struct
@@ -52,11 +53,34 @@ typedef struct
 	bool		includewal;
 	uint32		maxrate;
 	bool		sendtblspcmapfile;
+	bool		exclusive;
+	XLogRecPtr	lsn;
 } basebackup_options;
 
+typedef struct
+{
+	char		name[MAXPGPATH];
+	char		type;
+	int32		size;
+	time_t		mtime;
+} BackupFile;
+
+#define STORE_BACKUPFILE(_backupfiles, _name, _type, _size, _mtime) \
+	do { \
+		if (_backupfiles != NULL) { \
+			BackupFile *file = palloc0(sizeof(BackupFile)); \
+			strlcpy(file->name, _name, sizeof(file->name)); \
+			file->type = _type; \
+			file->size = _size; \
+			file->mtime = _mtime; \
+			*_backupfiles = lappend(*_backupfiles, file); \
+		} \
+	} while(0)
 
 static int64 sendDir(const char *path, int basepathlen, bool sizeonly,
 					 List *tablespaces, bool sendtblspclinks);
+static int64 sendDir_(const char *path, int basepathlen, bool sizeonly,
+					  List *tablespaces, bool sendtblspclinks, List **files);
 static bool sendFile(const char *readfilename, const char *tarfilename,
 					 struct stat *statbuf, bool missing_ok, Oid dboid);
 static void sendFileWithContent(const char *filename, const char *content);
@@ -76,6 +100,12 @@ static void throttle(size_t increment);
 static void setup_throttle(int maxrate);
 static bool is_checksummed_file(const char *fullpath, const char *filename);
 
+static void StartBackup(basebackup_options *opt);
+static void StopBackup(basebackup_options *opt);
+static void SendFileList(basebackup_options *opt);
+static void SendFilesContents(basebackup_options *opt, List *filenames, bool missing_ok);
+static char *readfile(const char *readfilename, bool missing_ok);
+
 /* Was the backup currently in-progress initiated in recovery mode? */
 static bool backup_started_in_recovery = false;
 
@@ -338,7 +368,7 @@ perform_base_backup(basebackup_options *opt)
 				sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
 			}
 			else
-				sendTablespace(ti->path, false);
+				sendTablespace(ti->path, false, NULL);
 
 			/*
 			 * If we're including WAL, and this is the main data directory we
@@ -413,6 +443,8 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 	bool		o_maxrate = false;
 	bool		o_tablespace_map = false;
 	bool		o_noverify_checksums = false;
+	bool		o_exclusive = false;
+	bool		o_lsn = false;
 
 	MemSet(opt, 0, sizeof(*opt));
 	foreach(lopt, options)
@@ -501,6 +533,30 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 			noverify_checksums = true;
 			o_noverify_checksums = true;
 		}
+		else if (strcmp(defel->defname, "exclusive") == 0)
+		{
+			if (o_exclusive)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+
+			opt->exclusive = intVal(defel->arg);
+			o_exclusive = true;
+		}
+		else if (strcmp(defel->defname, "lsn") == 0)
+		{
+			bool		have_error = false;
+			char	   *lsn;
+
+			if (o_lsn)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+
+			lsn = strVal(defel->arg);
+			opt->lsn = pg_lsn_in_internal(lsn, &have_error);
+			o_lsn = true;
+		}
 		else
 			elog(ERROR, "option \"%s\" not recognized",
 				 defel->defname);
@@ -535,7 +591,29 @@ SendBaseBackup(BaseBackupCmd *cmd)
 		set_ps_display(activitymsg, false);
 	}
 
-	perform_base_backup(&opt);
+	switch (cmd->cmdtag)
+	{
+		case BASE_BACKUP:
+			perform_base_backup(&opt);
+			break;
+		case START_BACKUP:
+			StartBackup(&opt);
+			break;
+		case SEND_FILE_LIST:
+			SendFileList(&opt);
+			break;
+		case SEND_FILES_CONTENT:
+			SendFilesContents(&opt, cmd->backupfiles, true);
+			break;
+		case STOP_BACKUP:
+			StopBackup(&opt);
+			break;
+
+		default:
+			elog(ERROR, "unrecognized replication command tag: %u",
+				 cmd->cmdtag);
+			break;
+	}
 }
 
 static void
@@ -678,6 +756,61 @@ SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli)
 	pq_puttextmessage('C', "SELECT");
 }
 
+/*
+ * Send a single resultset containing backup label and tablespace map
+ */
+static void
+SendStartBackupResult(StringInfo labelfile, StringInfo tblspc_map_file)
+{
+	StringInfoData buf;
+	Size		len;
+
+	pq_beginmessage(&buf, 'T'); /* RowDescription */
+	pq_sendint16(&buf, 2);		/* 2 fields */
+
+	/* Field headers */
+	pq_sendstring(&buf, "label");
+	pq_sendint32(&buf, 0);		/* table oid */
+	pq_sendint16(&buf, 0);		/* attnum */
+	pq_sendint32(&buf, TEXTOID);	/* type oid */
+	pq_sendint16(&buf, -1);
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+
+	pq_sendstring(&buf, "tablespacemap");
+	pq_sendint32(&buf, 0);		/* table oid */
+	pq_sendint16(&buf, 0);		/* attnum */
+	pq_sendint32(&buf, TEXTOID);	/* type oid */
+	pq_sendint16(&buf, -1);
+	pq_sendint32(&buf, 0);
+	pq_sendint16(&buf, 0);
+	pq_endmessage(&buf);
+
+	/* Data row */
+	pq_beginmessage(&buf, 'D');
+	pq_sendint16(&buf, 2);		/* number of columns */
+
+	len = labelfile->len;
+	pq_sendint32(&buf, len);
+	pq_sendbytes(&buf, labelfile->data, len);
+
+	if (tblspc_map_file)
+	{
+		len = tblspc_map_file->len;
+		pq_sendint32(&buf, len);
+		pq_sendbytes(&buf, tblspc_map_file->data, len);
+	}
+	else
+	{
+		pq_sendint32(&buf, -1); /* Length = -1 ==> NULL */
+	}
+
+	pq_endmessage(&buf);
+
+	/* Send a CommandComplete message */
+	pq_puttextmessage('C', "SELECT");
+}
+
 /*
  * Inject a file with given name and content in the output tar stream.
  */
@@ -729,7 +862,7 @@ sendFileWithContent(const char *filename, const char *content)
  * Only used to send auxiliary tablespaces, not PGDATA.
  */
 int64
-sendTablespace(char *path, bool sizeonly)
+sendTablespace(char *path, bool sizeonly, List **files)
 {
 	int64		size;
 	char		pathbuf[MAXPGPATH];
@@ -758,11 +891,11 @@ sendTablespace(char *path, bool sizeonly)
 		return 0;
 	}
 
+	STORE_BACKUPFILE(files, pathbuf, 'd', -1, statbuf.st_mtime);
 	size = _tarWriteHeader(TABLESPACE_VERSION_DIRECTORY, NULL, &statbuf,
 						   sizeonly);
-
 	/* Send all the files in the tablespace version directory */
-	size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true);
+	size += sendDir_(pathbuf, strlen(path), sizeonly, NIL, true, files);
 
 	return size;
 }
@@ -780,8 +913,16 @@ sendTablespace(char *path, bool sizeonly)
  * as it will be sent separately in the tablespace_map file.
  */
 static int64
-sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
-		bool sendtblspclinks)
+sendDir(const char *path, int basepathlen, bool sizeonly,
+		List *tablespaces, bool sendtblspclinks)
+{
+	return sendDir_(path, basepathlen, sizeonly, tablespaces, sendtblspclinks, NULL);
+}
+
+/* Same as sendDir(), except that it also returns a list of filenames in PGDATA */
+static int64
+sendDir_(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
+		 bool sendtblspclinks, List **files)
 {
 	DIR		   *dir;
 	struct dirent *de;
@@ -935,6 +1076,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 			if (strcmp(de->d_name, excludeDirContents[excludeIdx]) == 0)
 			{
 				elog(DEBUG1, "contents of directory \"%s\" excluded from backup", de->d_name);
+
+				STORE_BACKUPFILE(files, pathbuf, 'd', -1, statbuf.st_mtime);
 				size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly);
 				excludeFound = true;
 				break;
@@ -951,6 +1094,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 		if (statrelpath != NULL && strcmp(pathbuf, statrelpath) == 0)
 		{
 			elog(DEBUG1, "contents of directory \"%s\" excluded from backup", statrelpath);
+
+			STORE_BACKUPFILE(files, pathbuf, 'd', -1, statbuf.st_mtime);
 			size += _tarWriteDir(pathbuf, basepathlen, &statbuf, sizeonly);
 			continue;
 		}
@@ -972,6 +1117,9 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 			size += _tarWriteHeader("./pg_wal/archive_status", NULL, &statbuf,
 									sizeonly);
 
+			STORE_BACKUPFILE(files, pathbuf, 'd', -1, statbuf.st_mtime);
+			STORE_BACKUPFILE(files, "./pg_wal/archive_status", 'd', -1, statbuf.st_mtime);
+
 			continue;			/* don't recurse into pg_wal */
 		}
 
@@ -1001,6 +1149,7 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 								pathbuf)));
 			linkpath[rllen] = '\0';
 
+			STORE_BACKUPFILE(files, pathbuf, 'l', statbuf.st_size, statbuf.st_mtime);
 			size += _tarWriteHeader(pathbuf + basepathlen + 1, linkpath,
 									&statbuf, sizeonly);
 #else
@@ -1027,6 +1176,8 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 			 */
 			size += _tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf,
 									sizeonly);
+			STORE_BACKUPFILE(files, pathbuf, 'd', -1, statbuf.st_mtime);
+
 
 			/*
 			 * Call ourselves recursively for a directory, unless it happens
@@ -1057,13 +1208,15 @@ sendDir(const char *path, int basepathlen, bool sizeonly, List *tablespaces,
 				skip_this_dir = true;
 
 			if (!skip_this_dir)
-				size += sendDir(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks);
+				size += sendDir_(pathbuf, basepathlen, sizeonly, tablespaces, sendtblspclinks, files);
 		}
 		else if (S_ISREG(statbuf.st_mode))
 		{
 			bool		sent = false;
 
-			if (!sizeonly)
+			STORE_BACKUPFILE(files, pathbuf, 'f', statbuf.st_size, statbuf.st_mtime);
+
+			if (!sizeonly && files == NULL)
 				sent = sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf,
 								true, isDbDir ? pg_atoi(lastDir + 1, sizeof(Oid), 0) : InvalidOid);
 
@@ -1769,3 +1922,421 @@ setup_throttle(int maxrate)
 		throttling_counter = -1;
 	}
 }
+
+/*
+ * StartBackup - prepare to start an online backup.
+ *
+ * This function calls do_pg_start_backup() and sends back starting checkpoint,
+ * available tablespaces, content of backup_label and tablespace_map files.
+ */
+static void
+StartBackup(basebackup_options *opt)
+{
+	TimeLineID	starttli;
+	StringInfo	labelfile;
+	StringInfo	tblspc_map_file = NULL;
+	int			datadirpathlen;
+	List	   *tablespaces = NIL;
+
+	datadirpathlen = strlen(DataDir);
+
+	backup_started_in_recovery = RecoveryInProgress();
+
+	labelfile = makeStringInfo();
+	tblspc_map_file = makeStringInfo();
+
+	total_checksum_failures = 0;
+
+	startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &starttli,
+								  opt->exclusive? NULL : labelfile, &tablespaces,
+								  tblspc_map_file,
+								  opt->progress, opt->sendtblspcmapfile);
+
+	/*
+	 * Once do_pg_start_backup has been called, ensure that any failure causes
+	 * us to abort the backup so we don't "leak" a backup counter. For this
+	 * reason, *all* functionality between do_pg_start_backup() and the end of
+	 * do_pg_stop_backup() should be inside the error cleanup block!
+	 */
+
+	PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
+	{
+		tablespaceinfo *ti;
+
+		SendXlogRecPtrResult(startptr, starttli);
+
+		/*
+		 * Calculate the relative path of temporary statistics directory in
+		 * order to skip the files which are located in that directory later.
+		 */
+		if (is_absolute_path(pgstat_stat_directory) &&
+			strncmp(pgstat_stat_directory, DataDir, datadirpathlen) == 0)
+			statrelpath = psprintf("./%s", pgstat_stat_directory + datadirpathlen + 1);
+		else if (strncmp(pgstat_stat_directory, "./", 2) != 0)
+			statrelpath = psprintf("./%s", pgstat_stat_directory);
+		else
+			statrelpath = pgstat_stat_directory;
+
+		/* Add a node for the base directory at the end */
+		ti = palloc0(sizeof(tablespaceinfo));
+		ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, true) : -1;
+		tablespaces = lappend(tablespaces, ti);
+
+		/* Send tablespace header */
+		SendBackupHeader(tablespaces);
+
+		/* Setup and activate network throttling, if client requested it */
+		setup_throttle(opt->maxrate);
+
+		/*
+		 * In exclusive mode, pg_start_backup creates backup_label and
+		 * tablespace_map files and does not their contents in *labelfile
+		 * and *tblspcmapfile. So we read them from these files to return
+		 * to frontend.
+		 *
+		 * In non-exlusive mode, contents of these files are available in
+		 * *labelfile and *tblspcmapfile and are retured directly.
+		 */
+		if (opt->exclusive)
+		{
+			resetStringInfo(labelfile);
+			resetStringInfo(tblspc_map_file);
+
+			appendStringInfoString(labelfile, readfile(BACKUP_LABEL_FILE, false));
+			if (opt->sendtblspcmapfile)
+				appendStringInfoString(tblspc_map_file, readfile(TABLESPACE_MAP, false));
+		}
+
+		if ((tblspc_map_file && tblspc_map_file->len <= 0) ||
+			!opt->sendtblspcmapfile)
+			tblspc_map_file = NULL;
+
+		/* send backup_label and tablespace_map to frontend */
+		SendStartBackupResult(labelfile, tblspc_map_file);
+	}
+	PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
+}
+
+/*
+ * StopBackup() - ends an online backup
+ *
+ * The function is called at the end of an online backup. It sends out pg_control
+ * file, optionaly WAL segments and ending WAL location.
+ */
+static void
+StopBackup(basebackup_options *opt)
+{
+	TimeLineID	endtli;
+	XLogRecPtr	endptr;
+	struct stat statbuf;
+	StringInfoData buf;
+	char	   *labelfile = NULL;
+
+	/* Setup and activate network throttling, if client requested it */
+	setup_throttle(opt->maxrate);
+
+	pq_beginmessage(&buf, 'H');
+	pq_sendbyte(&buf, 0);		/* overall format */
+	pq_sendint16(&buf, 0);		/* natts */
+	pq_endmessage(&buf);
+
+	/* ... and pg_control after everything else. */
+	if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not stat file \"%s\": %m",
+						XLOG_CONTROL_FILE)));
+	sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, &statbuf, false, InvalidOid);
+
+	/* stop backup */
+	if (!opt->exclusive)
+		labelfile = (char *) opt->label;
+	endptr = do_pg_stop_backup(labelfile, !opt->nowait, &endtli);
+
+	if (opt->includewal)
+		include_wal_files(endptr, endtli);
+
+	pq_putemptymessage('c');	/* CopyDone */
+	SendXlogRecPtrResult(endptr, endtli);
+}
+
+/*
+ * SendFileList() - sends a list of filenames to frontend
+ *
+ * The function collects a list of filenames, nessery for a complete backup and
+ * sends this list to the client.
+ */
+static void
+SendFileList(basebackup_options *opt)
+{
+	StringInfoData buf;
+	ListCell   *lc;
+	List	   *tablespaces = NIL;
+	StringInfo	tblspc_map_file = NULL;
+
+	tblspc_map_file = makeStringInfo();
+	collectTablespaces(&tablespaces, tblspc_map_file, false, false);
+
+	/* Add a node for the base directory at the end */
+	tablespaceinfo *ti = palloc0(sizeof(tablespaceinfo));
+	tablespaces = lappend(tablespaces, ti);
+
+	foreach(lc, tablespaces)
+	{
+		List	   *backupFiles = NULL;
+		tablespaceinfo *ti = (tablespaceinfo *) lfirst(lc);
+
+		if (ti->path == NULL)
+			sendDir_(".", 1, false, NIL, !opt->sendtblspcmapfile, &backupFiles);
+		else
+			sendTablespace(ti->path, false, &backupFiles);
+
+		/* Construct and send the list of filenames */
+		pq_beginmessage(&buf, 'T'); /* RowDescription */
+		pq_sendint16(&buf, 4);	/* n field */
+
+		/* First field - file name */
+		pq_sendstring(&buf, "filename");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, TEXTOID);
+		pq_sendint16(&buf, -1);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+
+		/* Second field - is_dir */
+		pq_sendstring(&buf, "type");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, CHAROID);
+		pq_sendint16(&buf, 1);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+
+		/* Third field - size */
+		pq_sendstring(&buf, "size");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, INT8OID);
+		pq_sendint16(&buf, 8);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+
+		/* Third field - mtime */
+		pq_sendstring(&buf, "mtime");
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_sendint32(&buf, INT8OID);
+		pq_sendint16(&buf, 8);
+		pq_sendint32(&buf, 0);
+		pq_sendint16(&buf, 0);
+		pq_endmessage(&buf);
+
+		foreach(lc, backupFiles)
+		{
+			BackupFile *backupFile = (BackupFile *) lfirst(lc);
+			Size		len;
+
+			/* Send one datarow message */
+			pq_beginmessage(&buf, 'D');
+			pq_sendint16(&buf, 4);	/* number of columns */
+
+			/* send file name */
+			len = strlen(backupFile->name);
+			pq_sendint32(&buf, len);
+			pq_sendbytes(&buf, backupFile->name, len);
+
+			/* send type */
+			pq_sendint32(&buf, 1);
+			pq_sendbyte(&buf, backupFile->type);
+
+			/* send size */
+			send_int8_string(&buf, backupFile->size);
+
+			/* send mtime */
+			send_int8_string(&buf, backupFile->mtime);
+
+			pq_endmessage(&buf);
+		}
+
+		pfree(backupFiles);
+	}
+
+	/* Send a CommandComplete message */
+	pq_puttextmessage('C', "SELECT");
+}
+
+/*
+ * SendFilesContents() - sends the actual files to the caller
+ *
+ * The function sends out the given file(s) over to the caller using the COPY
+ * protocol.
+ */
+static void
+SendFilesContents(basebackup_options *opt, List *filenames, bool missing_ok)
+{
+	StringInfoData buf;
+	ListCell   *lc;
+	bool		basetablespace = true;
+	int			basepathlen = 1;
+
+	if (list_length(filenames) <= 0)
+		return;
+
+	total_checksum_failures = 0;
+
+	/* Setup and activate network throttling, if client requested it */
+	setup_throttle(opt->maxrate);
+
+	/*
+	 * LABEL is reused here to identify the tablespace path on server. Its empty
+	 * in case of 'base' tablespace.
+	 */
+	if (is_absolute_path(opt->label))
+	{
+		basepathlen = strlen(opt->label);
+		basetablespace = false;
+	}
+
+	/* set backup start location. */
+	startptr = opt->lsn;
+
+	/* Send CopyOutResponse message */
+	pq_beginmessage(&buf, 'H');
+	pq_sendbyte(&buf, 0);		/* overall format */
+	pq_sendint16(&buf, 0);		/* natts */
+	pq_endmessage(&buf);
+
+	foreach(lc, filenames)
+	{
+		struct stat statbuf;
+		char	   *pathbuf;
+
+		pathbuf = (char *) strVal(lfirst(lc));
+		if (lstat(pathbuf, &statbuf) != 0)
+		{
+			if (errno != ENOENT)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not stat file or directory \"%s\": %m",
+								pathbuf)));
+
+			/* If the file went away while scanning, it's not an error. */
+			continue;
+		}
+
+		/* Allow symbolic links in pg_tblspc only */
+		if (strstr(pathbuf, "./pg_tblspc") != NULL &&
+#ifndef WIN32
+			S_ISLNK(statbuf.st_mode)
+#else
+			pgwin32_is_junction(pathbuf)
+#endif
+			)
+		{
+			char		linkpath[MAXPGPATH];
+			int			rllen;
+
+			rllen = readlink(pathbuf, linkpath, sizeof(linkpath));
+			if (rllen < 0)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not read symbolic link \"%s\": %m",
+								pathbuf)));
+			if (rllen >= sizeof(linkpath))
+				ereport(ERROR,
+						(errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
+						 errmsg("symbolic link \"%s\" target is too long",
+								pathbuf)));
+			linkpath[rllen] = '\0';
+
+			_tarWriteHeader(pathbuf, linkpath, &statbuf, false);
+		}
+		else if (S_ISDIR(statbuf.st_mode))
+		{
+			_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false);
+		}
+		else if (
+#ifndef WIN32
+				 S_ISLNK(statbuf.st_mode)
+#else
+				 pgwin32_is_junction(pathbuf)
+#endif
+			)
+		{
+			/*
+			 * If symlink, write it as a directory. file symlinks only allowed
+			 * in pg_tblspc
+			 */
+			statbuf.st_mode = S_IFDIR | pg_dir_create_mode;
+			_tarWriteHeader(pathbuf + basepathlen + 1, NULL, &statbuf, false);
+		}
+		else
+		{
+			/* send file to client */
+			sendFile(pathbuf, pathbuf + basepathlen + 1, &statbuf, true, InvalidOid);
+		}
+	}
+
+	pq_putemptymessage('c');	/* CopyDone */
+
+	/*
+	 * Check for checksum failures. If there are failures across multiple
+	 * processes it may not report totoal checksum count, but it will error
+	 * out,terminating the backup.
+	 */
+	if (total_checksum_failures)
+	{
+		if (total_checksum_failures > 1)
+			ereport(WARNING,
+					(errmsg("%lld total checksum verification failures", total_checksum_failures)));
+
+		ereport(ERROR,
+				(errcode(ERRCODE_DATA_CORRUPTED),
+				 errmsg("checksum verification failure during base backup")));
+	}
+}
+
+static char *
+readfile(const char *readfilename, bool missing_ok)
+{
+	struct stat statbuf;
+	FILE	   *fp;
+	char	   *data;
+	int			r;
+
+	if (stat(readfilename, &statbuf))
+	{
+		if (errno == ENOENT && missing_ok)
+			return NULL;
+
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not stat file \"%s\": %m",
+						readfilename)));
+	}
+
+	fp = AllocateFile(readfilename, "r");
+	if (!fp)
+	{
+		if (errno == ENOENT && missing_ok)
+			return NULL;
+
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not open file \"%s\": %m", readfilename)));
+	}
+
+	data = palloc(statbuf.st_size + 1);
+	r = fread(data, statbuf.st_size, 1, fp);
+	data[statbuf.st_size] = '\0';
+
+	/* Close the file */
+	if (r != 1 || ferror(fp) || FreeFile(fp))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read file \"%s\": %m",
+						readfilename)));
+
+	return data;
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index c4e11cc4e8..bba437c785 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -87,6 +87,12 @@ static SQLCmd *make_sqlcmd(void);
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_START_BACKUP
+%token K_SEND_FILE_LIST
+%token K_SEND_FILES_CONTENT
+%token K_STOP_BACKUP
+%token K_EXCLUSIVE
+%token K_LSN
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
@@ -102,6 +108,8 @@ static SQLCmd *make_sqlcmd(void);
 %type <boolval>	opt_temporary
 %type <list>	create_slot_opt_list
 %type <defelt>	create_slot_opt
+%type <list>	backup_files backup_files_list
+%type <node>	backup_file
 
 %%
 
@@ -162,6 +170,36 @@ base_backup:
 				{
 					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
 					cmd->options = $2;
+					cmd->cmdtag = BASE_BACKUP;
+					$$ = (Node *) cmd;
+				}
+			| K_START_BACKUP base_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = START_BACKUP;
+					$$ = (Node *) cmd;
+				}
+			| K_SEND_FILE_LIST base_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = SEND_FILE_LIST;
+					$$ = (Node *) cmd;
+				}
+			| K_SEND_FILES_CONTENT backup_files base_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $3;
+					cmd->cmdtag = SEND_FILES_CONTENT;
+					cmd->backupfiles = $2;
+					$$ = (Node *) cmd;
+				}
+			| K_STOP_BACKUP base_backup_opt_list
+				{
+					BaseBackupCmd *cmd = makeNode(BaseBackupCmd);
+					cmd->options = $2;
+					cmd->cmdtag = STOP_BACKUP;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -214,6 +252,40 @@ base_backup_opt:
 				  $$ = makeDefElem("noverify_checksums",
 								   (Node *)makeInteger(true), -1);
 				}
+			| K_EXCLUSIVE
+				{
+				  $$ = makeDefElem("exclusive",
+								   (Node *)makeInteger(true), -1);
+				}
+			| K_LSN SCONST
+				{
+				  $$ = makeDefElem("lsn",
+								   (Node *)makeString($2), -1);
+				}
+			;
+
+backup_files:
+			'(' backup_files_list ')'
+				{
+					$$ = $2;
+				}
+			| /* EMPTY */
+				{ $$ = NIL; }
+			;
+
+backup_files_list:
+			backup_file
+				{
+					$$ = list_make1($1);
+				}
+			| backup_files_list ',' backup_file
+				{
+					$$ = lappend($1, $3);
+				}
+			;
+
+backup_file:
+			SCONST							{ $$ = (Node *) makeString($1); }
 			;
 
 create_replication_slot:
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 380faeb5f6..f97fe804ff 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -107,6 +107,13 @@ EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
 WAIT				{ return K_WAIT; }
+START_BACKUP		{ return K_START_BACKUP; }
+SEND_FILE_LIST		{ return K_SEND_FILE_LIST; }
+SEND_FILES_CONTENT	{ return K_SEND_FILES_CONTENT; }
+STOP_BACKUP			{ return K_STOP_BACKUP; }
+EXCLUSIVE			{ return K_EXCLUSIVE; }
+LSN					{ return K_LSN; }
+
 
 ","				{ return ','; }
 ";"				{ return ';'; }
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 1e3ed4e19f..1a224122a2 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -23,6 +23,14 @@ typedef enum ReplicationKind
 	REPLICATION_KIND_LOGICAL
 } ReplicationKind;
 
+typedef enum BackupCmdTag
+{
+	BASE_BACKUP,
+	START_BACKUP,
+	SEND_FILE_LIST,
+	SEND_FILES_CONTENT,
+	STOP_BACKUP
+} BackupCmdTag;
 
 /* ----------------------
  *		IDENTIFY_SYSTEM command
@@ -42,6 +50,8 @@ typedef struct BaseBackupCmd
 {
 	NodeTag		type;
 	List	   *options;
+	BackupCmdTag cmdtag;
+	List	   *backupfiles;
 } BaseBackupCmd;
 
 
diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h
index 503a5b9f0b..9e792af99d 100644
--- a/src/include/replication/basebackup.h
+++ b/src/include/replication/basebackup.h
@@ -31,6 +31,6 @@ typedef struct
 
 extern void SendBaseBackup(BaseBackupCmd *cmd);
 
-extern int64 sendTablespace(char *path, bool sizeonly);
+extern int64 sendTablespace(char *path, bool sizeonly, List **files);
 
 #endif							/* _BASEBACKUP_H */
-- 
2.21.0 (Apple Git-122)

