On Thu, Sep 4, 2014 at 11:33 PM, Michael Paquier
<michael.paqu...@gmail.com> wrote:
> Thoughts?

I have been poking at that during the long flight back from Chicago
and created the attached patch that makes pg_dump able to create a
replication slot (hence have pg_dump put its hands on a synchronized
snapshot describing data at the state of slot creation), then take a
dump using the exported snapshot while maintaining the replication
connection for slot creation alive for the duration of the dump.

Taking a dump consistent with a replication slot is useful for online
upgrade cases first, because you can simply run pg_dump, have a slot
created, and get as well a state of the database consistent with the
slot creation before replaying changes in a way or another. Using
that, a decoder that generates raw queries, and a receiver able to
apply changes on a remote Postgres server, it is possible to get a
kind of live migration solution from a Postgres instance to another
for a single database, as long as the origin server uses 9.4. Making
the receiver report write and flush positions makes also possible the
origin server to use synchronous replication protocol to be sure that
changes got applied on remote before performing a switch from the
origin to the remote (that may actually explain why multi-syncrep
would be useful here for multiple databases). Also, I imagine that
users could even use this tool in pg_dump for example to do some post
processing on the data dumped in accordance to the decoder plugin
before applying changes to a remote source.

Now, this is done with the addition of two options in pg_dump to
control the logical slot creation:
- --slot to define the name of the slot being created
- --plugin-name, to define the name of the decoder plugin
And then you can of course do things like that:
# Raw data dump on a slot
$ pg_dump --slot bar --plugin-name test_decoding
# Existing parallel dump not changed:
$ pg_dump -j 4 -f data -F d
# Parallel dump on a slot
$ pg_dump -j 4 --slot bar --plugin-name test_decoding -f data -F d

This patch does not solve the existing problems related to relation
locking between LOCK taken on tables and the moment a snapshot is
exported (actually that's a different problem), but similarly to
parallel pg_dump it reduces the exposition window to schema changes to
a minimum. This has needed the addition of some logic to make pg_dump
aware of replication connection. Parallel dumps are supported as well,
the trick being to be sure that the existing parallel dump facility is
still using the snapshots from the main db connection, and not the
replication connection, while parallel dumps are possible using the
snapshot from the slot created.

The first patch attached is the feature itself. The second patch, that
can be applied on top the first one, outputs some useful logs to track
the snapshot creation depending on the code paths taken. I used that
for debugging purposes only, just posting it here for reference. I'll
add that to the next commit fest (patch contains docs as well).

Regards,
-- 
Michael
From aeb75d82acc875252dab800009bba540ab89fec6 Mon Sep 17 00:00:00 2001
From: Michael Paquier <mich...@otacoo.com>
Date: Sun, 21 Sep 2014 18:16:22 +0900
Subject: [PATCH] Support dump from replication slot creation state

pg_dump logic now incorporates a couple of things to be able to manage
a replication connection for the creation of a logical slot, only way
to easily get a snapshot to get a consistent database state based on
the creation of this slot. This is a piece of the puzzle for online
upgrades, and is still useful by itself.
---
 doc/src/sgml/ref/pg_dump.sgml        | 29 ++++++++++++
 src/bin/pg_dump/pg_backup.h          |  6 ++-
 src/bin/pg_dump/pg_backup_archiver.c |  9 ++--
 src/bin/pg_dump/pg_backup_archiver.h |  3 ++
 src/bin/pg_dump/pg_backup_db.c       | 86 ++++++++++++++++++++++++++---------
 src/bin/pg_dump/pg_dump.c            | 88 +++++++++++++++++++++++++++++++++---
 6 files changed, 188 insertions(+), 33 deletions(-)

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index eabdc62..c2d1097 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -801,6 +801,16 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+       <term><option>--plugin-name=<replaceable class="parameter">plugin_name</replaceable></option></term>
+       <listitem>
+         <para>
+          Define a decoder plugin name (see <xref linkend="logicaldecoding-output-plugin">)
+          used for the creation of slot when <option>--slot</> is defined.
+         </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>--quote-all-identifiers</></term>
       <listitem>
        <para>
@@ -866,6 +876,25 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+       <term><option>--slot=<replaceable class="parameter">slot_name</replaceable></option></term>
+       <listitem>
+         <para>
+          Create a logical replication slot (see
+          <xref linkend="streaming-replication-slots"> for more details) from
+          which is taken a dump of data consistent with the point of the slot
+          creation using the synchronized snapshot it created. This is useful
+          to get a consistent image of a database before beginning to apply slot
+          changes to it as this ensures that the data integrity is maintained as
+          the same as when the replication slot was created.
+         </para>
+         <para>
+          This option needs to define a decoder plugin (see
+          <xref linkend="logicaldecoding-output-plugin">) that can be defined using <option>--plugin-name</>.
+         </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>--use-set-session-authorization</></term>
       <listitem>
        <para>
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index 921bc1b..b1628e1 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -85,6 +85,8 @@ struct Archive
 	int			numWorkers;		/* number of parallel processes */
 	char	   *sync_snapshot_id;		/* sync snapshot id for parallel
 										 * operation */
+	char	   *slot_name;		/* Slot used for dump */
+	char	   *plugin_name;	/* Plugin name for slot creation */
 
 	/* info needed for string escaping */
 	int			encoding;		/* libpq code for client_encoding */
@@ -164,9 +166,11 @@ extern void ConnectDatabase(Archive *AH,
 				const char *pghost,
 				const char *pgport,
 				const char *username,
-				enum trivalue prompt_password);
+				enum trivalue prompt_password,
+				bool is_replication);
 extern void DisconnectDatabase(Archive *AHX);
 extern PGconn *GetConnection(Archive *AHX);
+extern PGconn *GetReplicationConnection(Archive *AHX);
 
 /* Called to add a TOC entry */
 extern void ArchiveEntry(Archive *AHX,
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 5476a1e..f3e6abf 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -307,7 +307,7 @@ RestoreArchive(Archive *AHX)
 
 		ConnectDatabase(AHX, ropt->dbname,
 						ropt->pghost, ropt->pgport, ropt->username,
-						ropt->promptPassword);
+						ropt->promptPassword, false);
 
 		/*
 		 * If we're talking to the DB directly, don't send comments since they
@@ -3730,7 +3730,7 @@ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list)
 	 */
 	ConnectDatabase((Archive *) AH, ropt->dbname,
 					ropt->pghost, ropt->pgport, ropt->username,
-					ropt->promptPassword);
+					ropt->promptPassword, false);
 
 	_doSetFixedOutputState(AH);
 
@@ -4282,7 +4282,7 @@ CloneArchive(ArchiveHandle *AH)
 		/* this also sets clone->connection */
 		ConnectDatabase((Archive *) clone, ropt->dbname,
 						ropt->pghost, ropt->pgport, ropt->username,
-						ropt->promptPassword);
+						ropt->promptPassword, false);
 	}
 	else
 	{
@@ -4307,7 +4307,8 @@ CloneArchive(ArchiveHandle *AH)
 		encname = pg_encoding_to_char(AH->public.encoding);
 
 		/* this also sets clone->connection */
-		ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO);
+		ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username,
+						TRI_NO, false);
 
 		/*
 		 * Set the same encoding, whatever we set here is what we got from
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index c163f29..832d956 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -293,6 +293,9 @@ typedef struct _archiveHandle
 	ArchiverOutput outputKind;	/* Flag for what we're currently writing */
 	bool		pgCopyIn;		/* Currently in libpq 'COPY IN' mode. */
 
+	/* Replication connection */
+	PGconn	   *repConnection;	/* Connection using replication protocol */
+
 	int			loFd;			/* BLOB fd */
 	int			writingBlob;	/* Flag */
 	int			blobCount;		/* # of blobs restored */
diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c
index 4d1d14f..d7ce0c5 100644
--- a/src/bin/pg_dump/pg_backup_db.c
+++ b/src/bin/pg_dump/pg_backup_db.c
@@ -27,18 +27,19 @@
 /* translator: this is a module name */
 static const char *modulename = gettext_noop("archiver (db)");
 
-static void _check_database_version(ArchiveHandle *AH);
+static void _check_database_version(ArchiveHandle *AH, bool is_replication);
 static PGconn *_connectDB(ArchiveHandle *AH, const char *newdbname, const char *newUser);
 static void notice_processor(void *arg, const char *message);
 
 static void
-_check_database_version(ArchiveHandle *AH)
+_check_database_version(ArchiveHandle *AH, bool is_replication)
 {
 	const char *remoteversion_str;
 	int			remoteversion;
+	PGconn	   *conn = is_replication ? AH->repConnection : AH->connection;
 
-	remoteversion_str = PQparameterStatus(AH->connection, "server_version");
-	remoteversion = PQserverVersion(AH->connection);
+	remoteversion_str = PQparameterStatus(conn, "server_version");
+	remoteversion = PQserverVersion(conn);
 	if (remoteversion == 0 || !remoteversion_str)
 		exit_horribly(modulename, "could not get server_version from libpq\n");
 
@@ -194,7 +195,7 @@ _connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser)
 	AH->savedPassword = password;
 
 	/* check for version mismatch */
-	_check_database_version(AH);
+	_check_database_version(AH, false);
 
 	PQsetNoticeProcessor(newConn, notice_processor, NULL);
 
@@ -217,13 +218,21 @@ ConnectDatabase(Archive *AHX,
 				const char *pghost,
 				const char *pgport,
 				const char *username,
-				enum trivalue prompt_password)
+				enum trivalue prompt_password,
+				bool is_replication)
 {
 	ArchiveHandle *AH = (ArchiveHandle *) AHX;
 	char	   *password = AH->savedPassword;
 	bool		new_pass;
+	PGconn	   *conn = is_replication ? AH->repConnection : AH->connection;
 
-	if (AH->connection)
+	/*
+	 * Replication connection cannot be established before a normal connection
+	 * as a check on the remote server version is necessary for compatibility.
+	 */
+	Assert((is_replication && AH->connection) || !is_replication);
+
+	if (conn)
 		exit_horribly(modulename, "already connected to a database\n");
 
 	if (prompt_password == TRI_YES && password == NULL)
@@ -240,9 +249,9 @@ ConnectDatabase(Archive *AHX,
 	 */
 	do
 	{
-#define PARAMS_ARRAY_SIZE	7
-		const char **keywords = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*keywords));
-		const char **values = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*values));
+#define PARAMS_CONNECT_SIZE	8
+		const char **keywords = pg_malloc(PARAMS_CONNECT_SIZE * sizeof(*keywords));
+		const char **values = pg_malloc(PARAMS_CONNECT_SIZE * sizeof(*values));
 
 		keywords[0] = "host";
 		values[0] = pghost;
@@ -259,21 +268,31 @@ ConnectDatabase(Archive *AHX,
 		keywords[6] = NULL;
 		values[6] = NULL;
 
+		/* Process replication connection for logical slot */
+		if (is_replication)
+		{
+			keywords[6] = "replication";
+			values[6] = "database";
+			keywords[7] = NULL;
+			values[7] = NULL;
+		}
+
+		/* Process regular connection */
 		new_pass = false;
-		AH->connection = PQconnectdbParams(keywords, values, true);
+		conn = PQconnectdbParams(keywords, values, true);
 
 		free(keywords);
 		free(values);
 
-		if (!AH->connection)
+		if (!conn)
 			exit_horribly(modulename, "failed to connect to database\n");
 
-		if (PQstatus(AH->connection) == CONNECTION_BAD &&
-			PQconnectionNeedsPassword(AH->connection) &&
+		if (PQstatus(conn) == CONNECTION_BAD &&
+			PQconnectionNeedsPassword(conn) &&
 			password == NULL &&
 			prompt_password != TRI_NO)
 		{
-			PQfinish(AH->connection);
+			PQfinish(conn);
 			password = simple_prompt("Password: ", 100, false);
 			if (password == NULL)
 				exit_horribly(modulename, "out of memory\n");
@@ -281,18 +300,25 @@ ConnectDatabase(Archive *AHX,
 		}
 	} while (new_pass);
 
-	AH->savedPassword = password;
-
 	/* check to see that the backend connection was successfully made */
-	if (PQstatus(AH->connection) == CONNECTION_BAD)
+	if (PQstatus(conn) == CONNECTION_BAD)
 		exit_horribly(modulename, "connection to database \"%s\" failed: %s",
-					  PQdb(AH->connection) ? PQdb(AH->connection) : "",
-					  PQerrorMessage(AH->connection));
+					  PQdb(conn) ? PQdb(conn) : "",
+					  PQerrorMessage(conn));
+
+	/* Save obtained connection to correct slot */
+	if (is_replication)
+		AH->repConnection = conn;
+	else
+		AH->connection = conn;
+
+	AH->savedPassword = password;
 
 	/* check for version mismatch */
-	_check_database_version(AH);
+	_check_database_version(AH, is_replication);
 
-	PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
+	if (!is_replication)
+		PQsetNoticeProcessor(AH->connection, notice_processor, NULL);
 }
 
 /*
@@ -306,6 +332,14 @@ DisconnectDatabase(Archive *AHX)
 	PGcancel   *cancel;
 	char		errbuf[1];
 
+	/* Disconnect replication connection if there is one */
+	if (AH->repConnection)
+	{
+		PQfinish(AH->repConnection);
+		AH->repConnection = NULL;
+	}
+
+	/* Leave if no connection */
 	if (!AH->connection)
 		return;
 
@@ -330,6 +364,14 @@ GetConnection(Archive *AHX)
 	return AH->connection;
 }
 
+PGconn *
+GetReplicationConnection(Archive *AHX)
+{
+	ArchiveHandle *AH = (ArchiveHandle *) AHX;
+
+	return AH->repConnection;
+}
+
 static void
 notice_processor(void *arg, const char *message)
 {
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 2915329..fdbdcaa 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -142,7 +142,8 @@ static int	enable_row_security = 0;
 
 static void help(const char *progname);
 static void setup_connection(Archive *AH, const char *dumpencoding,
-				 char *use_role);
+							 char *use_role);
+static void setup_replication_connection(Archive *AH);
 static ArchiveFormat parseArchiveFormat(const char *format, ArchiveMode *mode);
 static void expand_schema_name_patterns(Archive *fout,
 							SimpleStringList *patterns,
@@ -280,6 +281,8 @@ main(int argc, char **argv)
 	const char *pgport = NULL;
 	const char *username = NULL;
 	const char *dumpencoding = NULL;
+	char	   *slot_name = NULL;
+	char	   *plugin_name = NULL;
 	bool		oids = false;
 	TableInfo  *tblinfo;
 	int			numTables;
@@ -361,6 +364,8 @@ main(int argc, char **argv)
 		{"no-security-labels", no_argument, &no_security_labels, 1},
 		{"no-synchronized-snapshots", no_argument, &no_synchronized_snapshots, 1},
 		{"no-unlogged-table-data", no_argument, &no_unlogged_table_data, 1},
+		{"slot", required_argument, NULL, 6},
+		{"plugin-name", required_argument, NULL, 7},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -538,6 +543,14 @@ main(int argc, char **argv)
 				set_dump_section(optarg, &dumpSections);
 				break;
 
+			case 6:				/* Name of slot to be created for the dump */
+				slot_name = pg_strdup(optarg);
+				break;
+
+			case 7:				/* Plugin associated with slot created */
+				plugin_name = pg_strdup(optarg);
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
 				exit_nicely(1);
@@ -645,10 +658,41 @@ main(int argc, char **argv)
 	 * Open the database using the Archiver, so it knows about it. Errors mean
 	 * death.
 	 */
-	ConnectDatabase(fout, dbname, pghost, pgport, username, prompt_password);
+	ConnectDatabase(fout, dbname, pghost, pgport, username,
+					prompt_password, false);
+
+	/* Sanity check for replication connection */
+	if (slot_name && !plugin_name)
+		exit_horribly(NULL, "Slot name is defined but plugin name is missing.\n");
+	fout->slot_name = slot_name;
+	fout->plugin_name = plugin_name;
+
+	/* Establish replication connection if necessary for logical slot creation */
+	if (fout->remoteVersion < 90400 && slot_name)
+	{
+		exit_horribly(NULL,
+		  "Logical slot creation is not supported by this server version.\n");
+	}
+	else if (slot_name)
+	{
+		ConnectDatabase(fout, dbname, pghost, pgport, username,
+						prompt_password, true);
+		setup_replication_connection(fout);
+	}
+
+	/*
+	 * Any synchronized snapshot needed for a dump may have been taken using
+	 * the replication connection so be sure to setup connection used for the
+	 * dump with a consistent set of parameters.
+	 */
 	setup_connection(fout, dumpencoding, use_role);
 
 	/*
+	 * Setup connection for dump. It may be possible that it uses a snapshot from
+	 * a replication slot.
+	 */
+
+	/*
 	 * Disable security label support if server version < v9.1.x (prevents
 	 * access to nonexistent pg_seclabel catalog)
 	 */
@@ -916,6 +960,10 @@ help(const char *progname)
 			 "                               use SET SESSION AUTHORIZATION commands instead of\n"
 			 "                               ALTER OWNER commands to set ownership\n"));
 
+	printf(_("\nReplication slot options:\n"));
+	printf(_("  --plugin-name            Output plugin used for slot creation defined by --slot\n"));
+	printf(_("  --slot                   Slot created and used for dump\n"));
+
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=DBNAME      database to dump\n"));
 	printf(_("  -h, --host=HOSTNAME      database server host or socket directory\n"));
@@ -1039,9 +1087,16 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
 		ExecuteSqlStatement(AH,
 							"SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
 
-
-
-	if (AH->numWorkers > 1 && AH->remoteVersion >= 90200 && !no_synchronized_snapshots)
+	/*
+	 * In this code path, a transaction snashot can be exported from a
+	 * non-replication connection, something that can be done only if
+	 * slot connection is not set for this archive handler.
+	 */
+	if ((AH->numWorkers > 1 &&
+		 AH->remoteVersion >= 90200 &&
+		 !no_synchronized_snapshots) ||
+		(AH->remoteVersion >= 90400 &&
+		 AH->slot_name))
 	{
 		if (AH->sync_snapshot_id)
 		{
@@ -1052,7 +1107,7 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
 			ExecuteSqlStatement(AH, query->data);
 			destroyPQExpBuffer(query);
 		}
-		else
+		else if (!AH->slot_name)
 			AH->sync_snapshot_id = get_synchronized_snapshot(AH);
 	}
 
@@ -1066,6 +1121,27 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
 }
 
 static void
+setup_replication_connection(Archive *AH)
+{
+	char		query[256];
+	PGresult   *res;
+	PGconn	   *conn = GetReplicationConnection(AH);
+
+	/* Create a slot and obtain an exported snapshot from it for the dump */
+	snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"",
+			 AH->slot_name, AH->plugin_name);
+
+	res = PQexec(conn, query);
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		exit_horribly(NULL, "%s: could not send replication command \"%s\": %s",
+				progname, query, PQerrorMessage(conn));
+
+	AH->sync_snapshot_id = pg_strdup(PQgetvalue(res, 0, 2));
+	PQclear(res);
+}
+
+
+static void
 setupDumpWorker(Archive *AHX, RestoreOptions *ropt)
 {
 	setup_connection(AHX, NULL, NULL);
-- 
2.1.0

diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index fdbdcaa..ab1e425 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -1101,6 +1101,8 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
 		if (AH->sync_snapshot_id)
 		{
 			PQExpBuffer query = createPQExpBuffer();
+			fprintf(stderr, "Snapshot used for connection: %s\n",
+					AH->sync_snapshot_id);
 
 			appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT ");
 			appendStringLiteralConn(query, AH->sync_snapshot_id, conn);
@@ -1108,7 +1110,11 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role)
 			destroyPQExpBuffer(query);
 		}
 		else if (!AH->slot_name)
+		{
 			AH->sync_snapshot_id = get_synchronized_snapshot(AH);
+			fprintf(stderr, "Snapshot created for non-replication connection: %s\n",
+					AH->sync_snapshot_id);
+		}
 	}
 
 	if (AH->remoteVersion >= 90500)
@@ -1137,6 +1143,9 @@ setup_replication_connection(Archive *AH)
 				progname, query, PQerrorMessage(conn));
 
 	AH->sync_snapshot_id = pg_strdup(PQgetvalue(res, 0, 2));
+	fprintf(stderr, "Snapshot created for replication connection: %s\n",
+			AH->sync_snapshot_id);
+
 	PQclear(res);
 }
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to