It's been discussed before that it would be cool if you could stream a new base backup from the master server, via libpq. That way you would not need low-level filesystem access to initialize a new standby.

Magnus mentioned today that he started hacking on that, and coincidentally I just started experimenting with it yesterday as well :-). So let's get this out on the mailing list.

Here's a WIP patch. It adds a new "TAKE_BACKUP" command to the replication command set. Upon receiving that command, the master starts a COPY, and streams a tarred copy of the data directory to the client. The patch includes a simple command-line tool, pg_streambackup, to connect to a server and request a backup that you can then redirect to a .tar file or pipe to "tar x".

TODO:

* We need a smarter way to do pg_start/stop_backup() with this. At the moment, you can only have one backup running at a time, but we shouldn't have that limitation with this built-in mechanism.

* The streamed backup archive should contain all the necessary WAL files too, so that you don't need to set up archiving to use this. You could just point the tiny client tool to the server, and get a backup archive containing everything that's necessary to restore correctly.

--
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index 113dc3f..5bb4159 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/replication
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = walsender.o walreceiverfuncs.o walreceiver.o
+OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
new file mode 100644
index 0000000..9be9bdf
--- /dev/null
+++ b/src/backend/replication/basebackup.c
@@ -0,0 +1,276 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup.c
+ *	  code for taking a base backup and streaming it a standby
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+#include <dirent.h>
+#include <time.h>
+
+#include "access/xlog_internal.h" /* for pg_start/stop_backup */
+#include "utils/builtins.h"
+#include "lib/stringinfo.h"
+#include "libpq/libpq.h"
+#include "libpq/pqformat.h"
+#include "replication/basebackup.h"
+#include "storage/fd.h"
+
+static void sendDir(char *path);
+static void sendFile(char *path);
+static void _tarWriteHeader(char *filename, uint64 fileLen);
+
+void
+SendBaseBackup(void)
+{
+	StringInfoData buf;
+
+	DirectFunctionCall2(&pg_start_backup, CStringGetTextDatum("basebackup"),
+						BoolGetDatum(true));
+
+	/* Send CopyOutResponse message */
+	pq_beginmessage(&buf, 'H');
+	pq_sendbyte(&buf, 0);		/* overall format */
+	pq_sendint(&buf, 0, 2);			/* natts */
+	pq_endmessage(&buf);
+
+	/* tar up the data directory */
+	sendDir(".");
+
+	/* Send CopyDone message */
+	pq_putemptymessage('c');
+
+	/* XXX: Is there no DirectFunctionCall0? */
+	DirectFunctionCall1(&pg_stop_backup, (Datum) 0);
+}
+
+static void
+sendDir(char *path)
+{
+	DIR		   *dir;
+	struct dirent *de;
+	char		pathbuf[MAXPGPATH];
+	struct stat statbuf;
+
+	dir = AllocateDir(path);
+	while ((de = ReadDir(dir, path)) != NULL)
+	{
+		/* Skip special stuff */
+		if (strcmp(de->d_name, ".") == 0 || strcmp(de->d_name, "..") == 0)
+			continue;
+
+		snprintf(pathbuf, MAXPGPATH, "%s/%s", path, de->d_name);
+
+		/* Skip pg_xlog and postmaster.pid */
+		if (strcmp(pathbuf, "./pg_xlog") == 0)
+			continue;
+		if (strcmp(pathbuf, "./postmaster.pid") == 0)
+			continue;
+
+		if (lstat(pathbuf, &statbuf) != 0)
+		{
+			if (errno != ENOENT)
+			{
+				elog(WARNING, "could not stat file or directory \"%s\": %m",
+					 pathbuf);
+			}
+			continue;
+		}
+
+		if (S_ISDIR(statbuf.st_mode))
+		{
+			/* call ourselves recursively for a directory */
+			sendDir(pathbuf);
+		}
+		else if (S_ISREG(statbuf.st_mode))
+		{
+			sendFile(pathbuf);
+		}
+		else
+			elog(WARNING, "skipping special file \"%s\"", pathbuf);
+	}
+	FreeDir(dir);
+}
+
+
+/***** Functions for handling tar file format, copied from pg_dump ******/
+
+/*
+ * Utility routine to print possibly larger than 32 bit integers in a
+ * portable fashion.  Filled with zeros.
+ */
+static void
+print_val(char *s, uint64 val, unsigned int base, size_t len)
+{
+	int			i;
+
+	for (i = len; i > 0; i--)
+	{
+		int			digit = val % base;
+
+		s[i - 1] = '0' + digit;
+		val = val / base;
+	}
+}
+
+/*
+ * Maximum file size for a tar member: The limit inherent in the
+ * format is 2^33-1 bytes (nearly 8 GB).  But we don't want to exceed
+ * what we can represent in pgoff_t.
+ */
+#define MAX_TAR_MEMBER_FILELEN (((int64) 1 << Min(33, sizeof(pgoff_t)*8 - 1)) - 1)
+
+static int
+_tarChecksum(char *header)
+{
+	int			i,
+				sum;
+
+	sum = 0;
+	for (i = 0; i < 512; i++)
+		if (i < 148 || i >= 156)
+			sum += 0xFF & header[i];
+	return sum + 256;			/* Assume 8 blanks in checksum field */
+}
+
+/* Given the member, write the TAR header & copy the file */
+static void
+sendFile(char *filename)
+{
+	FILE	   *fp;
+	char		buf[32768];
+	size_t		cnt;
+	pgoff_t		len = 0;
+	size_t		pad;
+	pgoff_t		fileLen;
+
+	fp = AllocateFile(filename, "rb");
+	if (fp == NULL)
+		elog(ERROR, "could not open file \"%s\": %m", filename);
+
+	/*
+	 * Find file len & go back to start.
+	 */
+	fseeko(fp, 0, SEEK_END);
+	fileLen = ftello(fp);
+	fseeko(fp, 0, SEEK_SET);
+
+	/*
+	 * Some compilers will throw a warning knowing this test can never be true
+	 * because pgoff_t can't exceed the compared maximum on their platform.
+	 */
+	if (fileLen > MAX_TAR_MEMBER_FILELEN)
+		elog(ERROR, "archive member too large for tar format");
+
+	_tarWriteHeader(filename, fileLen);
+
+	while ((cnt = fread(buf, 1, Min(sizeof(buf), fileLen - len), fp)) > 0)
+	{
+		/* Send the chunk as a CopyData message */
+		pq_putmessage('d', buf, cnt);
+		len += cnt;
+
+		if (len >= fileLen)
+		{
+			/*
+			 * Reached end of file. The file could be longer, if it was
+			 * extended while we were sending it, but for a base backup we
+			 * can ignore such extended data. It will be restored from WAL.
+			 */
+			break;
+		}
+	}
+	/* If the file was truncated while we were sending it, pad it with zeros */
+	if (len < fileLen)
+	{
+		MemSet(buf, 0, sizeof(buf));
+		while(len < fileLen)
+		{
+			cnt = Min(sizeof(buf), fileLen - len);
+			pq_putmessage('d', buf, cnt);
+			len += cnt;
+		}
+	}
+
+	/* Pad to 512 byte boundary */
+	pad = ((len + 511) & ~511) - len;
+	MemSet(buf, 0, pad);
+	pq_putmessage('d', buf, pad);
+
+	FreeFile(fp);
+}
+
+
+static void
+_tarWriteHeader(char *filename, uint64 fileLen)
+{
+	char		h[512];
+	int			lastSum = 0;
+	int			sum;
+
+	memset(h, 0, sizeof(h));
+
+	/* Name 100 */
+	sprintf(&h[0], "%.99s", filename);
+
+	/* Mode 8 */
+	sprintf(&h[100], "100600 ");
+
+	/* User ID 8 */
+	sprintf(&h[108], "004000 ");
+
+	/* Group 8 */
+	sprintf(&h[116], "002000 ");
+
+	/* File size 12 - 11 digits, 1 space, no NUL */
+	print_val(&h[124], fileLen, 8, 11);
+	sprintf(&h[135], " ");
+
+	/* Mod Time 12 */
+	sprintf(&h[136], "%011o ", (int) time(NULL));
+
+	/* Checksum 8 */
+	sprintf(&h[148], "%06o ", lastSum);
+
+	/* Type - regular file */
+	sprintf(&h[156], "0");
+
+	/* Link tag 100 (NULL) */
+
+	/* Magic 6 + Version 2 */
+	sprintf(&h[257], "ustar00");
+
+#if 0
+	/* User 32 */
+	sprintf(&h[265], "%.31s", "");		/* How do I get username reliably? Do
+										 * I need to? */
+
+	/* Group 32 */
+	sprintf(&h[297], "%.31s", "");		/* How do I get group reliably? Do I
+										 * need to? */
+
+	/* Maj Dev 8 */
+	sprintf(&h[329], "%6o ", 0);
+
+	/* Min Dev 8 */
+	sprintf(&h[337], "%6o ", 0);
+#endif
+
+	while ((sum = _tarChecksum(h)) != lastSum)
+	{
+		sprintf(&h[148], "%06o ", sum);
+		lastSum = sum;
+	}
+
+	pq_putmessage('d', h, 512);
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 53c2581..291ec0b 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -42,6 +42,7 @@
 #include "libpq/pqformat.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
+#include "replication/basebackup.h"
 #include "replication/walprotocol.h"
 #include "replication/walsender.h"
 #include "storage/fd.h"
@@ -294,6 +295,14 @@ WalSndHandshake(void)
 						/* break out of the loop */
 						replication_started = true;
 					}
+					else if (strcmp(query_string, "TAKE_BACKUP") == 0)
+					{
+						SendBaseBackup();
+						/* Send CommandComplete and ReadyForQuery messages */
+						EndCommand("SELECT", DestRemote);
+						ReadyForQuery(DestRemote);
+						/* ReadyForQuery did pq_flush for us */
+					}
 					else
 					{
 						ereport(FATAL,
diff --git a/src/bin/scripts/Makefile b/src/bin/scripts/Makefile
index d82c067..99f62a2 100644
--- a/src/bin/scripts/Makefile
+++ b/src/bin/scripts/Makefile
@@ -16,7 +16,7 @@ subdir = src/bin/scripts
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-PROGRAMS = createdb createlang createuser dropdb droplang dropuser clusterdb vacuumdb reindexdb
+PROGRAMS = createdb createlang createuser dropdb droplang dropuser clusterdb vacuumdb reindexdb streambackup
 
 override CPPFLAGS := -I$(top_srcdir)/src/bin/pg_dump -I$(top_srcdir)/src/bin/psql -I$(libpq_srcdir) $(CPPFLAGS)
 
@@ -34,6 +34,7 @@ dropuser: dropuser.o common.o dumputils.o kwlookup.o keywords.o
 clusterdb: clusterdb.o common.o dumputils.o kwlookup.o keywords.o
 vacuumdb: vacuumdb.o common.o
 reindexdb: reindexdb.o common.o dumputils.o kwlookup.o keywords.o
+streambackup: streambackup.o common.o streambackup.o
 
 dumputils.c keywords.c: % : $(top_srcdir)/src/bin/pg_dump/%
 	rm -f $@ && $(LN_S) $< .
@@ -54,6 +55,7 @@ install: all installdirs
 	$(INSTALL_PROGRAM) clusterdb$(X)  '$(DESTDIR)$(bindir)'/clusterdb$(X)
 	$(INSTALL_PROGRAM) vacuumdb$(X)   '$(DESTDIR)$(bindir)'/vacuumdb$(X)
 	$(INSTALL_PROGRAM) reindexdb$(X)  '$(DESTDIR)$(bindir)'/reindexdb$(X)
+	$(INSTALL_PROGRAM) streambackup$(X)  '$(DESTDIR)$(bindir)'/streambackup$(X)
 
 installdirs:
 	$(MKDIR_P) '$(DESTDIR)$(bindir)'
diff --git a/src/bin/scripts/streambackup.c b/src/bin/scripts/streambackup.c
new file mode 100644
index 0000000..28255fd
--- /dev/null
+++ b/src/bin/scripts/streambackup.c
@@ -0,0 +1,182 @@
+/*-------------------------------------------------------------------------
+ *
+ * streambackup
+ *
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+#include "common.h"
+#include "dumputils.h"
+
+
+static void help(const char *progname);
+
+int
+main(int argc, char *argv[])
+{
+	static struct option long_options[] = {
+		{"host", required_argument, NULL, 'h'},
+		{"port", required_argument, NULL, 'p'},
+		{"username", required_argument, NULL, 'U'},
+		{"no-password", no_argument, NULL, 'w'},
+		{"password", no_argument, NULL, 'W'},
+		{NULL, 0, NULL, 0}
+	};
+
+	const char *progname;
+	int			optindex;
+	int			c;
+
+	const char *host = NULL;
+	const char *port = NULL;
+	const char *username = NULL;
+	enum trivalue prompt_password = TRI_DEFAULT;
+	PGconn	   *conn;
+	PGresult   *res;
+
+	progname = get_progname(argv[0]);
+	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
+
+	handle_help_version_opts(argc, argv, "streambackup", help);
+
+	/* process command-line options */
+	while ((c = getopt_long(argc, argv, "h:p:U:wW", long_options, &optindex)) != -1)
+	{
+		switch (c)
+		{
+			case 'h':
+				host = optarg;
+				break;
+			case 'p':
+				port = optarg;
+				break;
+			case 'U':
+				username = optarg;
+				break;
+			case 'w':
+				prompt_password = TRI_NO;
+				break;
+			case 'W':
+				prompt_password = TRI_YES;
+				break;
+			default:
+				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+				exit(1);
+		}
+	}
+
+	switch (argc - optind)
+	{
+		case 0:
+			break;
+		default:
+			fprintf(stderr, _("%s: too many command-line arguments (first is \"%s\")\n"), progname, argv[optind + 1]);
+			fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+			exit(1);
+	}
+
+	{
+#define PARAMS_ARRAY_SIZE	7
+		const char **keywords = malloc(PARAMS_ARRAY_SIZE * sizeof(*keywords));
+		const char **values = malloc(PARAMS_ARRAY_SIZE * sizeof(*values));
+
+		if (!keywords || !values)
+		{
+			fprintf(stderr, _("%s: out of memory\n"), progname);
+			exit(1);
+		}
+		keywords[0] = "host";
+		values[0] = host;
+		keywords[1] = "port";
+		values[1] = port;
+		keywords[2] = "user";
+		values[2] = username;
+		keywords[3] = "password";
+		values[3] = NULL;
+		keywords[4] = "replication";
+		values[4] = "true";
+		keywords[5] = "fallback_application_name";
+		values[5] = progname;
+		keywords[6] = NULL;
+		values[6] = NULL;
+
+		conn = PQconnectdbParams(keywords, values, true);
+	}
+
+	/* check to see that the backend connection was successfully made */
+	if (PQstatus(conn) == CONNECTION_BAD)
+	{
+		fprintf(stderr, _("%s: could not open a replication connection: %s"),
+				progname, PQerrorMessage(conn));
+		exit(1);
+	}
+
+	/* Main workhorse */
+	res = PQexec(conn, "TAKE_BACKUP");
+	if (PQresultStatus(res) != PGRES_COPY_OUT)
+	{
+		PQclear(res);
+		fprintf(stderr, _("%s: could not start streaming backup: %s"),
+				progname, PQerrorMessage(conn));
+	}
+
+	for (;;)
+	{
+		static char *recvBuf = NULL;
+		int rawlen = PQgetCopyData(conn, &recvBuf, 0);
+		if (rawlen == -1)			/* end-of-streaming or error */
+		{
+			PGresult   *res;
+
+			res = PQgetResult(conn);
+			if (PQresultStatus(res) != PGRES_COMMAND_OK)
+			{
+				PQclear(res);
+				fprintf(stderr, _("%s: backup terminated by server: %s"),
+								  progname, PQerrorMessage(conn));
+				exit(1);
+			}
+
+			/* Success! */
+			break;
+		}
+		if (rawlen < -1)
+		{
+			fprintf(stderr, _("%s: backup terminated by server: %s"),
+							  progname, PQerrorMessage(conn));
+			exit(1);
+		}
+		fwrite(recvBuf, rawlen, 1, stdout);
+		PQfreemem(recvBuf);
+	}
+
+	PQfinish(conn);
+
+	exit(0);
+}
+
+static void
+help(const char *progname)
+{
+	printf(_("%s streams a physical backup of a PostgreSQL cluster.\n\n"), progname);
+	printf(_("Usage:\n"));
+	printf(_("  %s [OPTION]... [DBNAME]\n"), progname);
+	printf(_("\nOptions:\n"));
+	printf(_("  -i, --index=INDEX         recreate specific index only\n"));
+	printf(_("  -q, --quiet               don't write any messages\n"));
+	printf(_("  --help                    show this help, then exit\n"));
+	printf(_("  --version                 output version information, then exit\n"));
+	printf(_("\nConnection options:\n"));
+	printf(_("  -h, --host=HOSTNAME       database server host or socket directory\n"));
+	printf(_("  -p, --port=PORT           database server port\n"));
+	printf(_("  -U, --username=USERNAME   user name to connect as\n"));
+	printf(_("  -w, --no-password         never prompt for password\n"));
+	printf(_("  -W, --password            force password prompt\n"));
+	printf(_("\n"));
+	printf(_("\nReport bugs to <pgsql-b...@postgresql.org>.\n"));
+}
diff --git a/src/include/replication/basebackup.h b/src/include/replication/basebackup.h
new file mode 100644
index 0000000..2714663
--- /dev/null
+++ b/src/include/replication/basebackup.h
@@ -0,0 +1,17 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup.h
+ *	  Exports from replication/basebackup.c.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ * $PostgreSQL$
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _BASEBACKUP_H
+#define _BASEBACKUP_H
+
+extern void SendBaseBackup(void);
+
+#endif   /* _BASEBACKUP_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to