On Thu, Jan 6, 2011 at 23:57, Heikki Linnakangas
<heikki.linnakan...@enterprisedb.com> wrote:
>
> Looks like pg_streamrecv creates the pg_xlog and pg_tblspc directories,
> because they're not included in the streamed tar. Wouldn't it be better to
> include them in the tar as empty directories at the server-side? Otherwise
> if you write the tar file to disk and untar it later, you have to manually
> create them.

Attached is an updated patch that does this.

It also collects all the header records as a single resultset at the
beginning. This made for cleaner code, but more importantly makes it
possible to get the total size of the backup even if there are
multiple tablespaces.

It also changes the tar members to use relative paths instead of
absolute ones - since we send the root of the directory in the header
anyway. That also takes away the "./" portion in all tar members.

git branch on github updated as well, of course.

-- 
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/
*** a/doc/src/sgml/protocol.sgml
--- b/doc/src/sgml/protocol.sgml
***************
*** 1458,1463 **** The commands accepted in walsender mode are:
--- 1458,1555 ----
       </para>
      </listitem>
    </varlistentry>
+ 
+   <varlistentry>
+     <term>BASE_BACKUP <replaceable>options</><literal>;</><replaceable>label</></term>
+     <listitem>
+      <para>
+       Instructs the server to start streaming a base backup.
+       The system will automatically be put in backup mode with the label
+       specified in <replaceable>label</> before the backup is started, and
+       taken out of it when the backup is complete. The following options
+       are accepted:
+       <variablelist>
+        <varlistentry>
+         <term><literal>PROGRESS</></term>
+         <listitem>
+          <para>
+           Request information required to generate a progress report. This will
+           send back an approximate size in the header of each tablespace, which
+           can be used to calculate how far along the stream is done. This is
+           calculated by enumerating all the file sizes once before the transfer
+           is even started, and may as such have a negative impact on the
+           performance - in particular it may take longer before the first data
+           is streamed. Since the database files can change during the backup,
+           the size is only approximate and may both grow and shrink between
+           the time of approximation and the sending of the actual files.
+          </para>
+         </listitem>
+        </varlistentry>
+       </variablelist>
+      </para>
+      <para>
+       When the backup is started, the server will first send a header in
+       ordinary result set format, followed by one or more CopyResponse
+       results, one for PGDATA and one for each additional tablespace other
+       than <literal>pg_default</> and <literal>pg_global</>. The data in
+       the CopyResponse results will be a tar format (using ustar00
+       extensions) dump of the tablespace contents.
+      </para>
+      <para>
+       The header is an ordinary resultset with one row for each tablespace.
+       The fields in this row are:
+       <variablelist>
+        <varlistentry>
+         <term>spcoid</term>
+         <listitem>
+          <para>
+           The oid of the tablespace, or <literal>NULL</> if it's the base
+           directory.
+          </para>
+         </listitem>
+        </varlistentry>
+        <varlistentry>
+         <term>spclocation</term>
+         <listitem>
+          <para>
+           The full path of the tablespace directory, or <literal>NULL</>
+           if it's the base directory.
+          </para>
+         </listitem>
+        </varlistentry>
+        <varlistentry>
+         <term>size</term>
+         <listitem>
+          <para>
+           The approximate size of the datablock, if progress report has
+           been requested; otherwise it's <literal>NULL</>.
+          </para>
+         </listitem>
+        </varlistentry>
+       </variablelist>
+      </para>
+      <para>
+       The tar archive for the data directory and each tablespace will contain
+       all files in the directories, regardless of whether they are
+       <productname>PostgreSQL</> files or other files added to the same
+       directory. The only excluded files are:
+       <itemizedlist spacing="compact" mark="bullet">
+        <listitem>
+         <para>
+          <filename>postmaster.pid</>
+         </para>
+        </listitem>
+        <listitem>
+         <para>
+          <filename>pg_xlog</> (including subdirectories)
+         </para>
+        </listitem>
+       </itemizedlist>
+       Owner, group and file mode are set if the underlying filesystem on
+       the server supports it.
+      </para>
+     </listitem>
+   </varlistentry>
  </variablelist>
  
  </para>
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 8308,8313 **** pg_start_backup(PG_FUNCTION_ARGS)
--- 8308,8328 ----
  	text	   *backupid = PG_GETARG_TEXT_P(0);
  	bool		fast = PG_GETARG_BOOL(1);
  	char	   *backupidstr;
+ 	XLogRecPtr  startpoint;
+ 	char		startxlogstr[MAXFNAMELEN];
+ 
+ 	backupidstr = text_to_cstring(backupid);
+ 
+ 	startpoint = do_pg_start_backup(backupidstr, fast);
+ 
+ 	snprintf(startxlogstr, sizeof(startxlogstr), "%X/%X",
+ 			 startpoint.xlogid, startpoint.xrecoff);
+ 	PG_RETURN_TEXT_P(cstring_to_text(startxlogstr));
+ }
+ 
+ XLogRecPtr
+ do_pg_start_backup(const char *backupidstr, bool fast)
+ {
  	XLogRecPtr	checkpointloc;
  	XLogRecPtr	startpoint;
  	pg_time_t	stamp_time;
***************
*** 8335,8342 **** pg_start_backup(PG_FUNCTION_ARGS)
  			  errmsg("WAL level not sufficient for making an online backup"),
  				 errhint("wal_level must be set to \"archive\" or \"hot_standby\" at server start.")));
  
- 	backupidstr = text_to_cstring(backupid);
- 
  	/*
  	 * Mark backup active in shared memory.  We must do full-page WAL writes
  	 * during an on-line backup even if not doing so at other times, because
--- 8350,8355 ----
***************
*** 8459,8467 **** pg_start_backup(PG_FUNCTION_ARGS)
  	/*
  	 * We're done.  As a convenience, return the starting WAL location.
  	 */
! 	snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
! 			 startpoint.xlogid, startpoint.xrecoff);
! 	PG_RETURN_TEXT_P(cstring_to_text(xlogfilename));
  }
  
  /* Error cleanup callback for pg_start_backup */
--- 8472,8478 ----
  	/*
  	 * We're done.  As a convenience, return the starting WAL location.
  	 */
! 	return startpoint;
  }
  
  /* Error cleanup callback for pg_start_backup */
***************
*** 8490,8495 **** pg_start_backup_callback(int code, Datum arg)
--- 8501,8519 ----
  Datum
  pg_stop_backup(PG_FUNCTION_ARGS)
  {
+ 	XLogRecPtr	stoppoint;
+ 	char		stopxlogstr[MAXFNAMELEN];
+ 
+ 	stoppoint = do_pg_stop_backup();
+ 
+ 	snprintf(stopxlogstr, sizeof(stopxlogstr), "%X/%X",
+ 			 stoppoint.xlogid, stoppoint.xrecoff);
+ 	PG_RETURN_TEXT_P(cstring_to_text(stopxlogstr));
+ }
+ 
+ XLogRecPtr
+ do_pg_stop_backup(void)
+ {
  	XLogRecPtr	startpoint;
  	XLogRecPtr	stoppoint;
  	XLogRecData rdata;
***************
*** 8699,8707 **** pg_stop_backup(PG_FUNCTION_ARGS)
  	/*
  	 * We're done.  As a convenience, return the ending WAL location.
  	 */
! 	snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
! 			 stoppoint.xlogid, stoppoint.xrecoff);
! 	PG_RETURN_TEXT_P(cstring_to_text(stopxlogfilename));
  }
  
  /*
--- 8723,8757 ----
  	/*
  	 * We're done.  As a convenience, return the ending WAL location.
  	 */
! 	return stoppoint;
! }
! 
! 
! /*
!  * do_pg_abort_backup: abort a running backup
!  *
!  * This does just the most basic steps of pg_stop_backup(), by taking the
!  * system out of backup mode, thus making it a lot more safe to call from
!  * an error handler.
!  */
! void
! do_pg_abort_backup(void)
! {
! 	/*
! 	 * OK to clear forcePageWrites
! 	 */
! 	LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
! 	XLogCtl->Insert.forcePageWrites = false;
! 	LWLockRelease(WALInsertLock);
! 
! 	/*
! 	 * Remove backup label file
! 	 */
! 	if (unlink(BACKUP_LABEL_FILE) != 0)
! 		ereport(ERROR,
! 				(errcode_for_file_access(),
! 				 errmsg("could not remove file \"%s\": %m",
! 						BACKUP_LABEL_FILE)));
  }
  
  /*
*** a/src/backend/replication/Makefile
--- b/src/backend/replication/Makefile
***************
*** 12,17 **** subdir = src/backend/replication
  top_builddir = ../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = walsender.o walreceiverfuncs.o walreceiver.o
  
  include $(top_srcdir)/src/backend/common.mk
--- 12,17 ----
  top_builddir = ../../..
  include $(top_builddir)/src/Makefile.global
  
! OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o
  
  include $(top_srcdir)/src/backend/common.mk
*** /dev/null
--- b/src/backend/replication/basebackup.c
***************
*** 0 ****
--- 1,534 ----
+ /*-------------------------------------------------------------------------
+  *
+  * basebackup.c
+  *	  code for taking a base backup and streaming it to a standby
+  *
+  * Portions Copyright (c) 2010-2011, PostgreSQL Global Development Group
+  *
+  * IDENTIFICATION
+  *	  $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #include "postgres.h"
+ 
+ #include <sys/types.h>
+ #include <sys/stat.h>
+ #include <unistd.h>
+ #include <time.h>
+ 
+ #include "access/xlog_internal.h" /* for pg_start/stop_backup */
+ #include "catalog/pg_type.h"
+ #include "lib/stringinfo.h"
+ #include "libpq/libpq.h"
+ #include "libpq/pqformat.h"
+ #include "nodes/pg_list.h"
+ #include "replication/basebackup.h"
+ #include "storage/fd.h"
+ #include "storage/ipc.h"
+ #include "utils/builtins.h"
+ #include "utils/elog.h"
+ 
+ static int64 sendDir(char *path, char *basepath, bool sizeonly);
+ static void sendFile(char *path, char *basepath, struct stat *statbuf);
+ static void _tarWriteHeader(char *filename, char *linktarget,
+ 							struct stat *statbuf);
+ static void SendBackupHeader(List *tablespaces);
+ static void SendBackupDirectory(char *location, char *spcoid);
+ static void base_backup_cleanup(int code, Datum arg);
+ 
+ /*
+  * Called when ERROR or FATAL happens in SendBaseBackup() after
+  * we have started the backup - make sure we end it!
+  */
+ static void
+ base_backup_cleanup(int code, Datum arg)
+ {
+ 	do_pg_abort_backup();
+ }
+ 
+ 
+ typedef struct
+ {
+ 	char	   *oid;
+ 	char	   *path;
+ 	int64		size;
+ } tablespaceinfo;
+ 
+ /*
+  * SendBaseBackup() - send a complete base backup.
+  *
+  * The function will take care of running pg_start_backup() and
+  * pg_stop_backup() for the user.
+  *
+  * It will contain one or more batches. Each batch has a header,
+  * in normal result format, followed by a tar format dump in
+  * CopyOut format.
+  */
+ void
+ SendBaseBackup(const char *options)
+ {
+ 	DIR 		   *dir;
+ 	struct dirent  *de;
+ 	char   		   *backup_label = strchr(options, ';');
+ 	bool			progress = false;
+ 	List		   *tablespaces = NIL;
+ 	tablespaceinfo *ti;
+ 
+ 	if (backup_label == NULL)
+ 		ereport(FATAL,
+ 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+ 				 errmsg("invalid base backup options: %s", options)));
+ 	backup_label++; /* Walk past the semicolon */
+ 
+ 	/* Currently the only option string supported is PROGRESS */
+ 	if (strncmp(options, "PROGRESS", 8) == 0)
+ 		progress = true;
+ 	else if (options[0] != ';')
+ 		ereport(FATAL,
+ 				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+ 				 errmsg("invalid base backup options: %s", options)));
+ 
+ 	/* Make sure we can open the directory with tablespaces in it */
+ 	dir = AllocateDir("pg_tblspc");
+ 	if (!dir)
+ 		ereport(ERROR,
+ 				(errmsg("unable to open directory pg_tblspc: %m")));
+ 
+ 	/* Add a node for the base directory */
+ 	ti = palloc0(sizeof(tablespaceinfo));
+ 	ti->size = progress ? sendDir(".", ".", true) : -1;
+ 	tablespaces = lappend(tablespaces, ti);
+ 
+ 	/* Collect information about all tablespaces */
+ 	while ((de = ReadDir(dir, "pg_tblspc")) != NULL)
+ 	{
+ 		char fullpath[MAXPGPATH];
+ 		char linkpath[MAXPGPATH];
+ 
+ 		if (de->d_name[0] == '.')
+ 			continue;
+ 
+ 		snprintf(fullpath, sizeof(fullpath), "pg_tblspc/%s", de->d_name);
+ 
+ 		MemSet(linkpath, 0, sizeof(linkpath));
+ 		if (readlink(fullpath, linkpath, sizeof(linkpath) - 1) == -1)
+ 		{
+ 			ereport(WARNING,
+ 					(errmsg("unable to read symbolic link %s", fullpath)));
+ 			continue;
+ 		}
+ 
+ 		ti = palloc(sizeof(tablespaceinfo));
+ 		ti->oid = pstrdup(de->d_name);
+ 		ti->path = pstrdup(linkpath);
+ 		ti->size = progress ? sendDir(linkpath, linkpath, true) : -1;
+ 		tablespaces = lappend(tablespaces, ti);
+ 	}
+ 	FreeDir(dir);
+ 
+ 	do_pg_start_backup(backup_label, true);
+ 
+ 	PG_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
+ 	{
+ 		ListCell   *lc;
+ 
+ 		/* Send tablespace header */
+ 		SendBackupHeader(tablespaces);
+ 
+ 		/* Send off our tablespaces one by one */
+ 		foreach(lc, tablespaces)
+ 		{
+ 			ti = (tablespaceinfo *) lfirst(lc);
+ 
+ 			SendBackupDirectory(ti->path, ti->oid);
+ 		}
+ 	}
+ 	PG_END_ENSURE_ERROR_CLEANUP(base_backup_cleanup, (Datum) 0);
+ 
+ 	do_pg_stop_backup();
+ }
+ 
+ static void
+ send_int8_string(StringInfoData *buf, uint64 intval)
+ {
+ 	char is[32];
+ 	sprintf(is, INT64_FORMAT, intval);
+ 	pq_sendint(buf, strlen(is), 4);
+ 	pq_sendbytes(buf, is, strlen(is));
+ }
+ 
+ static void
+ SendBackupHeader(List *tablespaces)
+ {
+ 	StringInfoData buf;
+ 	ListCell *lc;
+ 
+ 	/* Construct and send the directory information */
+ 	pq_beginmessage(&buf, 'T'); /* RowDescription */
+ 	pq_sendint(&buf, 3, 2); /* 3 fields */
+ 
+ 	/* First field - spcoid */
+ 	pq_sendstring(&buf, "spcoid");
+ 	pq_sendint(&buf, 0, 4); /* table oid */
+ 	pq_sendint(&buf, 0, 2); /* attnum */
+ 	pq_sendint(&buf, OIDOID, 4); /* type oid */
+ 	pq_sendint(&buf, 4, 2); /* typlen */
+ 	pq_sendint(&buf, 0, 4); /* typmod */
+ 	pq_sendint(&buf, 0, 2); /* format code */
+ 
+ 	/* Second field - spcpath */
+ 	pq_sendstring(&buf, "spclocation");
+ 	pq_sendint(&buf, 0, 4);
+ 	pq_sendint(&buf, 0, 2);
+ 	pq_sendint(&buf, TEXTOID, 4);
+ 	pq_sendint(&buf, -1, 2);
+ 	pq_sendint(&buf, 0, 4);
+ 	pq_sendint(&buf, 0, 2);
+ 
+ 	/* Third field - size */
+ 	pq_sendstring(&buf, "size");
+ 	pq_sendint(&buf, 0, 4);
+ 	pq_sendint(&buf, 0, 2);
+ 	pq_sendint(&buf, INT8OID, 4);
+ 	pq_sendint(&buf, 8, 2);
+ 	pq_sendint(&buf, 0, 4);
+ 	pq_sendint(&buf, 0, 2);
+ 	pq_endmessage(&buf);
+ 
+ 	foreach(lc, tablespaces)
+ 	{
+ 		tablespaceinfo *ti = lfirst(lc);
+ 
+ 		/* Send one datarow message */
+ 		pq_beginmessage(&buf, 'D');
+ 		pq_sendint(&buf, 3, 2); /* number of columns */
+ 		if (ti->path == NULL)
+ 		{
+ 			pq_sendint(&buf, -1, 4); /* Length = -1 ==> NULL */
+ 			pq_sendint(&buf, -1, 4);
+ 		}
+ 		else
+ 		{
+ 			pq_sendint(&buf, strlen(ti->oid), 4);  /* length */
+ 			pq_sendbytes(&buf, ti->oid, strlen(ti->oid));
+ 			pq_sendint(&buf, strlen(ti->path), 4); /* length */
+ 			pq_sendbytes(&buf, ti->path, strlen(ti->path));
+ 		}
+ 		if (ti->size >= 0)
+ 			send_int8_string(&buf, ti->size/1024);
+ 		else
+ 			pq_sendint(&buf, -1, 4); /* NULL */
+ 
+ 		pq_endmessage(&buf);
+ 	}
+ 
+ 	/* Send a CommandComplete message */
+ 	pq_puttextmessage('C', "SELECT");
+ }
+ 
+ static void
+ SendBackupDirectory(char *location, char *spcoid)
+ {
+ 	StringInfoData buf;
+ 
+ 	/* Send CopyOutResponse message */
+ 	pq_beginmessage(&buf, 'H');
+ 	pq_sendbyte(&buf, 0);			/* overall format */
+ 	pq_sendint(&buf, 0, 2);			/* natts */
+ 	pq_endmessage(&buf);
+ 
+ 	/* tar up the data directory if NULL, otherwise the tablespace */
+ 	sendDir(location == NULL ? "." : location,
+ 			location == NULL ? "." : location,
+ 			false);
+ 
+ 	/* Send CopyDone message */
+ 	pq_putemptymessage('c');
+ }
+ 
+ 
+ static int64
+ sendDir(char *path, char *basepath, bool sizeonly)
+ {
+ 	DIR		   *dir;
+ 	struct dirent *de;
+ 	char		pathbuf[MAXPGPATH];
+ 	struct stat statbuf;
+ 	int64		size = 0;
+ 
+ 	dir = AllocateDir(path);
+ 	while ((de = ReadDir(dir, path)) != NULL)
+ 	{
+ 		/* Skip special stuff */
+ 		if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
+ 			continue;
+ 
+ 		snprintf(pathbuf, MAXPGPATH, "%s/%s", path, de->d_name);
+ 
+ 		/* Skip pg_xlog and postmaster.pid in PGDATA */
+ 		if (strcmp(pathbuf, "./postmaster.pid") == 0)
+ 			continue;
+ 		if (strcmp(pathbuf, "./pg_xlog") == 0)
+ 		{
+ 			/*
+ 			 * Write an empty directory for pg_xlog in the tar though,
+ 			 * so we get permissions right. But don't recurse to get the
+ 			 * contents.
+ 			 */
+ 			if (!sizeonly)
+ 				_tarWriteHeader(pathbuf + strlen(basepath) + 1, NULL, &statbuf);
+ 			size += 512; /* Size of the header just added */
+ 			continue;
+ 		}
+ 
+ 		if (lstat(pathbuf, &statbuf) != 0)
+ 		{
+ 			if (errno != ENOENT)
+ 				elog(WARNING, "could not stat file or directory \"%s\": %m",
+ 					 pathbuf);
+ 
+ 			/* If the file went away while scanning, it's no error. */
+ 			continue;
+ 		}
+ 
+ #ifndef WIN32
+ 		if (S_ISLNK(statbuf.st_mode) && strcmp(path, "./pg_tblspc") == 0)
+ #else
+ 		if (pgwin32_is_junction(pathbuf) && strcmp(path, "./pg_tblspc") == 0)
+ #endif
+ 		{
+ 			/* Allow symbolic links in pg_tblspc */
+ 			char	linkpath[MAXPGPATH];
+ 
+ 			MemSet(linkpath, 0, sizeof(linkpath));
+ 			if (readlink(pathbuf, linkpath, sizeof(linkpath)-1) == -1)
+ 			{
+ 				elog(WARNING, "unable to read symbolic link \"%s\": %m",
+ 					 pathbuf);
+ 			}
+ 			if (!sizeonly)
+ 				_tarWriteHeader(pathbuf + strlen(basepath) + 1, linkpath, &statbuf);
+ 			size += 512; /* Size of the header just added */
+ 		}
+ 		else if (S_ISDIR(statbuf.st_mode))
+ 		{
+ 			/*
+ 			 * Store a directory entry in the tar file so we can get
+ 			 * the permissions right.
+ 			 */
+ 			if (!sizeonly)
+ 				_tarWriteHeader(pathbuf + strlen(basepath) + 1, NULL, &statbuf);
+ 			size += 512; /* Size of the header just added */
+ 
+ 			/* call ourselves recursively for a directory */
+ 			size += sendDir(pathbuf, basepath, sizeonly);
+ 		}
+ 		else if (S_ISREG(statbuf.st_mode))
+ 		{
+ 			size += statbuf.st_size;
+ 			if (!sizeonly)
+ 				sendFile(pathbuf, basepath, &statbuf);
+ 			size += 512; /* Size of the header of the file */
+ 		}
+ 		else
+ 			elog(WARNING, "skipping special file \"%s\"", pathbuf);
+ 	}
+ 	FreeDir(dir);
+ 	return size;
+ }
+ 
+ /*****
+  * Functions for handling tar file format
+  *
+  * Copied from pg_dump, but modified to work with libpq for sending
+  */
+ 
+ 
+ /*
+  * Utility routine to print possibly larger than 32 bit integers in a
+  * portable fashion.  Filled with zeros.
+  */
+ static void
+ print_val(char *s, uint64 val, unsigned int base, size_t len)
+ {
+ 	int			i;
+ 
+ 	for (i = len; i > 0; i--)
+ 	{
+ 		int			digit = val % base;
+ 
+ 		s[i - 1] = '0' + digit;
+ 		val = val / base;
+ 	}
+ }
+ 
+ /*
+  * Maximum file size for a tar member: The limit inherent in the
+  * format is 2^33-1 bytes (nearly 8 GB).  But we don't want to exceed
+  * what we can represent in pgoff_t.
+  */
+ #define MAX_TAR_MEMBER_FILELEN (((int64) 1 << Min(33, sizeof(pgoff_t)*8 - 1)) - 1)
+ 
+ static int
+ _tarChecksum(char *header)
+ {
+ 	int			i,
+ 				sum;
+ 
+ 	sum = 0;
+ 	for (i = 0; i < 512; i++)
+ 		if (i < 148 || i >= 156)
+ 			sum += 0xFF & header[i];
+ 	return sum + 256;			/* Assume 8 blanks in checksum field */
+ }
+ 
+ /* Given the member, write the TAR header & send the file */
+ static void
+ sendFile(char *filename, char *basepath, struct stat *statbuf)
+ {
+ 	FILE	   *fp;
+ 	char		buf[32768];
+ 	size_t		cnt;
+ 	pgoff_t		len = 0;
+ 	size_t		pad;
+ 
+ 	fp = AllocateFile(filename, "rb");
+ 	if (fp == NULL)
+ 		elog(ERROR, "could not open file \"%s\": %m", filename);
+ 
+ 	/*
+ 	 * Some compilers will throw a warning knowing this test can never be true
+ 	 * because pgoff_t can't exceed the compared maximum on their platform.
+ 	 */
+ 	if (statbuf->st_size > MAX_TAR_MEMBER_FILELEN)
+ 		elog(ERROR, "archive member too large for tar format");
+ 
+ 	_tarWriteHeader(filename + strlen(basepath) + 1, NULL, statbuf);
+ 
+ 	while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), fp)) > 0)
+ 	{
+ 		/* Send the chunk as a CopyData message */
+ 		pq_putmessage('d', buf, cnt);
+ 		len += cnt;
+ 
+ 		if (len >= statbuf->st_size)
+ 		{
+ 			/*
+ 			 * Reached end of file. The file could be longer, if it was
+ 			 * extended while we were sending it, but for a base backup we
+ 			 * can ignore such extended data. It will be restored from WAL.
+ 			 */
+ 			break;
+ 		}
+ 	}
+ 
+ 	/* If the file was truncated while we were sending it, pad it with zeros */
+ 	if (len < statbuf->st_size)
+ 	{
+ 		MemSet(buf, 0, sizeof(buf));
+ 		while(len < statbuf->st_size)
+ 		{
+ 			cnt = Min(sizeof(buf), statbuf->st_size - len);
+ 			pq_putmessage('d', buf, cnt);
+ 			len += cnt;
+ 		}
+ 	}
+ 
+ 	/* Pad to 512 byte boundary, per tar format requirements */
+ 	pad = ((len + 511) & ~511) - len;
+ 	if (pad > 0)
+ 	{
+ 		MemSet(buf, 0, pad);
+ 		pq_putmessage('d', buf, pad);
+ 	}
+ 
+ 	FreeFile(fp);
+ }
+ 
+ 
+ static void
+ _tarWriteHeader(char *filename, char *linktarget, struct stat *statbuf)
+ {
+ 	char		h[512];
+ 	int			lastSum = 0;
+ 	int			sum;
+ 
+ 	memset(h, 0, sizeof(h));
+ 
+ 	/* Name 100 */
+ 	sprintf(&h[0], "%.99s", filename);
+ 	if (linktarget != NULL || S_ISDIR(statbuf->st_mode))
+ 	{
+ 		/*
+ 		 * We only support symbolic links to directories, and this is indicated
+ 		 * in the tar format by adding a slash at the end of the name, the same
+ 		 * as for regular directories.
+ 		 */
+ 		h[strlen(filename)] = '/';
+ 		h[strlen(filename)+1] = '\0';
+ 	}
+ 
+ 	/* Mode 8 */
+ 	sprintf(&h[100], "%07o ", statbuf->st_mode);
+ 
+ 	/* User ID 8 */
+ 	sprintf(&h[108], "%07o ", statbuf->st_uid);
+ 
+ 	/* Group 8 */
+ 	sprintf(&h[117], "%07o ", statbuf->st_gid);
+ 
+ 	/* File size 12 - 11 digits, 1 space, no NUL */
+ 	if (linktarget != NULL || S_ISDIR(statbuf->st_mode))
+ 		/* Symbolic link or directory has size zero */
+ 		print_val(&h[124], 0, 8, 11);
+ 	else
+ 		print_val(&h[124], statbuf->st_size, 8, 11);
+ 	sprintf(&h[135], " ");
+ 
+ 	/* Mod Time 12 */
+ 	sprintf(&h[136], "%011o ", (int) statbuf->st_mtime);
+ 
+ 	/* Checksum 8 */
+ 	sprintf(&h[148], "%06o ", lastSum);
+ 
+ 	if (linktarget != NULL)
+ 	{
+ 		/* Type - Symbolic link */
+ 		sprintf(&h[156], "2");
+ 		strcpy(&h[157], linktarget);
+ 	}
+ 	else if (S_ISDIR(statbuf->st_mode))
+ 		/* Type - directory */
+ 		sprintf(&h[156], "5");
+ 	else
+ 		/* Type - regular file */
+ 		sprintf(&h[156], "0");
+ 
+ 	/* Link tag 100 (NULL) */
+ 
+ 	/* Magic 6 + Version 2 */
+ 	sprintf(&h[257], "ustar00");
+ 
+ 	/* User 32 */
+ 	/* XXX: Do we need to care about setting correct username? */
+ 	sprintf(&h[265], "%.31s", "postgres");
+ 
+ 	/* Group 32 */
+ 	/* XXX: Do we need to care about setting correct group name? */
+ 	sprintf(&h[297], "%.31s", "postgres");
+ 
+ 	/* Maj Dev 8 */
+ 	sprintf(&h[329], "%6o ", 0);
+ 
+ 	/* Min Dev 8 */
+ 	sprintf(&h[337], "%6o ", 0);
+ 
+ 	while ((sum = _tarChecksum(h)) != lastSum)
+ 	{
+ 		sprintf(&h[148], "%06o ", sum);
+ 		lastSum = sum;
+ 	}
+ 
+ 	pq_putmessage('d', h, 512);
+ }
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 44,49 ****
--- 44,50 ----
  #include "libpq/pqformat.h"
  #include "libpq/pqsignal.h"
  #include "miscadmin.h"
+ #include "replication/basebackup.h"
  #include "replication/walprotocol.h"
  #include "replication/walsender.h"
  #include "storage/fd.h"
***************
*** 54,59 ****
--- 55,61 ----
  #include "utils/guc.h"
  #include "utils/memutils.h"
  #include "utils/ps_status.h"
+ #include "utils/resowner.h"
  
  
  /* Array of WalSnds in shared memory */
***************
*** 136,141 **** WalSenderMain(void)
--- 138,146 ----
  										   ALLOCSET_DEFAULT_MAXSIZE);
  	MemoryContextSwitchTo(walsnd_context);
  
+ 	/* Set up resource owner */
+ 	CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
+ 
  	/* Unblock signals (they were blocked when the postmaster forked us) */
  	PG_SETMASK(&UnBlockSig);
  
***************
*** 305,310 **** WalSndHandshake(void)
--- 310,324 ----
  						/* break out of the loop */
  						replication_started = true;
  					}
+ 					else if (strncmp(query_string, "BASE_BACKUP ", 12) == 0)
+ 					{
+ 						/* Command is BASE_BACKUP <options>;<label> */
+ 						SendBaseBackup(query_string + strlen("BASE_BACKUP "));
+ 						/* Send CommandComplete and ReadyForQuery messages */
+ 						EndCommand("SELECT", DestRemote);
+ 						ReadyForQuery(DestRemote);
+ 						/* ReadyForQuery did pq_flush for us */
+ 					}
  					else
  					{
  						ereport(FATAL,
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
***************
*** 312,315 **** extern void HandleStartupProcInterrupts(void);
--- 312,319 ----
  extern void StartupProcessMain(void);
  extern void WakeupRecovery(void);
  
+ extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast);
+ extern XLogRecPtr do_pg_stop_backup(void);
+ extern void do_pg_abort_backup(void);
+ 
  #endif   /* XLOG_H */
*** /dev/null
--- b/src/include/replication/basebackup.h
***************
*** 0 ****
--- 1,17 ----
+ /*-------------------------------------------------------------------------
+  *
+  * basebackup.h
+  *	  Exports from replication/basebackup.c.
+  *
+  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+  *
+  * $PostgreSQL$
+  *
+  *-------------------------------------------------------------------------
+  */
+ #ifndef _BASEBACKUP_H
+ #define _BASEBACKUP_H
+ 
+ extern void SendBaseBackup(const char *options);
+ 
+ #endif   /* _BASEBACKUP_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to