On Thu, Sep 4, 2014 at 11:33 PM, Michael Paquier <michael.paqu...@gmail.com> wrote: > Thoughts?
I have been poking at that during the long flight back from Chicago and created the attached patch that makes pg_dump able to create a replication slot (hence have pg_dump put its hands on a synchronized snapshot describing data at the state of slot creation), then take a dump using the exported snapshot while maintaining the replication connection for slot creation alive for the duration of the dump. Taking a dump consistent with a replication slot is useful for online upgrade cases first, because you can simply run pg_dump, have a slot created, and get as well a state of the database consistent with the slot creation before replaying changes in a way or another. Using that, a decoder that generates raw queries, and a receiver able to apply changes on a remote Postgres server, it is possible to get a kind of live migration solution from a Postgres instance to another for a single database, as long as the origin server uses 9.4. Making the receiver report write and flush positions makes also possible the origin server to use synchronous replication protocol to be sure that changes got applied on remote before performing a switch from the origin to the remote (that may actually explain why multi-syncrep would be useful here for multiple databases). Also, I imagine that users could even use this tool in pg_dump for example to do some post processing on the data dumped in accordance to the decoder plugin before applying changes to a remote source. Now, this is done with the addition of two options in pg_dump to control the logical slot creation: - --slot to define the name of the slot being created - --plugin-name, to define the name of the decoder plugin And then you can of course do things like that: # Raw data dump on a slot $ pg_dump --slot bar --plugin-name test_decoding # Existing parallel dump not changed: $ pg_dump -j 4 -f data -F d # Parallel dump on a slot $ pg_dump -j 4 --slot bar --plugin-name test_decoding -f data -F d This patch does not solve the existing problems related to relation locking between LOCK taken on tables and the moment a snapshot is exported (actually that's a different problem), but similarly to parallel pg_dump it reduces the exposition window to schema changes to a minimum. This has needed the addition of some logic to make pg_dump aware of replication connection. Parallel dumps are supported as well, the trick being to be sure that the existing parallel dump facility is still using the snapshots from the main db connection, and not the replication connection, while parallel dumps are possible using the snapshot from the slot created. The first patch attached is the feature itself. The second patch, that can be applied on top the first one, outputs some useful logs to track the snapshot creation depending on the code paths taken. I used that for debugging purposes only, just posting it here for reference. I'll add that to the next commit fest (patch contains docs as well). Regards, -- Michael
From aeb75d82acc875252dab800009bba540ab89fec6 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@otacoo.com> Date: Sun, 21 Sep 2014 18:16:22 +0900 Subject: [PATCH] Support dump from replication slot creation state pg_dump logic now incorporates a couple of things to be able to manage a replication connection for the creation of a logical slot, only way to easily get a snapshot to get a consistent database state based on the creation of this slot. This is a piece of the puzzle for online upgrades, and is still useful by itself. --- doc/src/sgml/ref/pg_dump.sgml | 29 ++++++++++++ src/bin/pg_dump/pg_backup.h | 6 ++- src/bin/pg_dump/pg_backup_archiver.c | 9 ++-- src/bin/pg_dump/pg_backup_archiver.h | 3 ++ src/bin/pg_dump/pg_backup_db.c | 86 ++++++++++++++++++++++++++--------- src/bin/pg_dump/pg_dump.c | 88 +++++++++++++++++++++++++++++++++--- 6 files changed, 188 insertions(+), 33 deletions(-) diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml index eabdc62..c2d1097 100644 --- a/doc/src/sgml/ref/pg_dump.sgml +++ b/doc/src/sgml/ref/pg_dump.sgml @@ -801,6 +801,16 @@ PostgreSQL documentation </varlistentry> <varlistentry> + <term><option>--plugin-name=<replaceable class="parameter">plugin_name</replaceable></option></term> + <listitem> + <para> + Define a decoder plugin name (see <xref linkend="logicaldecoding-output-plugin">) + used for the creation of slot when <option>--slot</> is defined. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><option>--quote-all-identifiers</></term> <listitem> <para> @@ -866,6 +876,25 @@ PostgreSQL documentation </varlistentry> <varlistentry> + <term><option>--slot=<replaceable class="parameter">slot_name</replaceable></option></term> + <listitem> + <para> + Create a logical replication slot (see + <xref linkend="streaming-replication-slots"> for more details) from + which is taken a dump of data consistent with the point of the slot + creation using the synchronized snapshot it created. This is useful + to get a consistent image of a database before beginning to apply slot + changes to it as this ensures that the data integrity is maintained as + the same as when the replication slot was created. + </para> + <para> + This option needs to define a decoder plugin (see + <xref linkend="logicaldecoding-output-plugin">) that can be defined using <option>--plugin-name</>. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><option>--use-set-session-authorization</></term> <listitem> <para> diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index 921bc1b..b1628e1 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -85,6 +85,8 @@ struct Archive int numWorkers; /* number of parallel processes */ char *sync_snapshot_id; /* sync snapshot id for parallel * operation */ + char *slot_name; /* Slot used for dump */ + char *plugin_name; /* Plugin name for slot creation */ /* info needed for string escaping */ int encoding; /* libpq code for client_encoding */ @@ -164,9 +166,11 @@ extern void ConnectDatabase(Archive *AH, const char *pghost, const char *pgport, const char *username, - enum trivalue prompt_password); + enum trivalue prompt_password, + bool is_replication); extern void DisconnectDatabase(Archive *AHX); extern PGconn *GetConnection(Archive *AHX); +extern PGconn *GetReplicationConnection(Archive *AHX); /* Called to add a TOC entry */ extern void ArchiveEntry(Archive *AHX, diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 5476a1e..f3e6abf 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -307,7 +307,7 @@ RestoreArchive(Archive *AHX) ConnectDatabase(AHX, ropt->dbname, ropt->pghost, ropt->pgport, ropt->username, - ropt->promptPassword); + ropt->promptPassword, false); /* * If we're talking to the DB directly, don't send comments since they @@ -3730,7 +3730,7 @@ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list) */ ConnectDatabase((Archive *) AH, ropt->dbname, ropt->pghost, ropt->pgport, ropt->username, - ropt->promptPassword); + ropt->promptPassword, false); _doSetFixedOutputState(AH); @@ -4282,7 +4282,7 @@ CloneArchive(ArchiveHandle *AH) /* this also sets clone->connection */ ConnectDatabase((Archive *) clone, ropt->dbname, ropt->pghost, ropt->pgport, ropt->username, - ropt->promptPassword); + ropt->promptPassword, false); } else { @@ -4307,7 +4307,8 @@ CloneArchive(ArchiveHandle *AH) encname = pg_encoding_to_char(AH->public.encoding); /* this also sets clone->connection */ - ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO); + ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, + TRI_NO, false); /* * Set the same encoding, whatever we set here is what we got from diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index c163f29..832d956 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -293,6 +293,9 @@ typedef struct _archiveHandle ArchiverOutput outputKind; /* Flag for what we're currently writing */ bool pgCopyIn; /* Currently in libpq 'COPY IN' mode. */ + /* Replication connection */ + PGconn *repConnection; /* Connection using replication protocol */ + int loFd; /* BLOB fd */ int writingBlob; /* Flag */ int blobCount; /* # of blobs restored */ diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c index 4d1d14f..d7ce0c5 100644 --- a/src/bin/pg_dump/pg_backup_db.c +++ b/src/bin/pg_dump/pg_backup_db.c @@ -27,18 +27,19 @@ /* translator: this is a module name */ static const char *modulename = gettext_noop("archiver (db)"); -static void _check_database_version(ArchiveHandle *AH); +static void _check_database_version(ArchiveHandle *AH, bool is_replication); static PGconn *_connectDB(ArchiveHandle *AH, const char *newdbname, const char *newUser); static void notice_processor(void *arg, const char *message); static void -_check_database_version(ArchiveHandle *AH) +_check_database_version(ArchiveHandle *AH, bool is_replication) { const char *remoteversion_str; int remoteversion; + PGconn *conn = is_replication ? AH->repConnection : AH->connection; - remoteversion_str = PQparameterStatus(AH->connection, "server_version"); - remoteversion = PQserverVersion(AH->connection); + remoteversion_str = PQparameterStatus(conn, "server_version"); + remoteversion = PQserverVersion(conn); if (remoteversion == 0 || !remoteversion_str) exit_horribly(modulename, "could not get server_version from libpq\n"); @@ -194,7 +195,7 @@ _connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser) AH->savedPassword = password; /* check for version mismatch */ - _check_database_version(AH); + _check_database_version(AH, false); PQsetNoticeProcessor(newConn, notice_processor, NULL); @@ -217,13 +218,21 @@ ConnectDatabase(Archive *AHX, const char *pghost, const char *pgport, const char *username, - enum trivalue prompt_password) + enum trivalue prompt_password, + bool is_replication) { ArchiveHandle *AH = (ArchiveHandle *) AHX; char *password = AH->savedPassword; bool new_pass; + PGconn *conn = is_replication ? AH->repConnection : AH->connection; - if (AH->connection) + /* + * Replication connection cannot be established before a normal connection + * as a check on the remote server version is necessary for compatibility. + */ + Assert((is_replication && AH->connection) || !is_replication); + + if (conn) exit_horribly(modulename, "already connected to a database\n"); if (prompt_password == TRI_YES && password == NULL) @@ -240,9 +249,9 @@ ConnectDatabase(Archive *AHX, */ do { -#define PARAMS_ARRAY_SIZE 7 - const char **keywords = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*keywords)); - const char **values = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*values)); +#define PARAMS_CONNECT_SIZE 8 + const char **keywords = pg_malloc(PARAMS_CONNECT_SIZE * sizeof(*keywords)); + const char **values = pg_malloc(PARAMS_CONNECT_SIZE * sizeof(*values)); keywords[0] = "host"; values[0] = pghost; @@ -259,21 +268,31 @@ ConnectDatabase(Archive *AHX, keywords[6] = NULL; values[6] = NULL; + /* Process replication connection for logical slot */ + if (is_replication) + { + keywords[6] = "replication"; + values[6] = "database"; + keywords[7] = NULL; + values[7] = NULL; + } + + /* Process regular connection */ new_pass = false; - AH->connection = PQconnectdbParams(keywords, values, true); + conn = PQconnectdbParams(keywords, values, true); free(keywords); free(values); - if (!AH->connection) + if (!conn) exit_horribly(modulename, "failed to connect to database\n"); - if (PQstatus(AH->connection) == CONNECTION_BAD && - PQconnectionNeedsPassword(AH->connection) && + if (PQstatus(conn) == CONNECTION_BAD && + PQconnectionNeedsPassword(conn) && password == NULL && prompt_password != TRI_NO) { - PQfinish(AH->connection); + PQfinish(conn); password = simple_prompt("Password: ", 100, false); if (password == NULL) exit_horribly(modulename, "out of memory\n"); @@ -281,18 +300,25 @@ ConnectDatabase(Archive *AHX, } } while (new_pass); - AH->savedPassword = password; - /* check to see that the backend connection was successfully made */ - if (PQstatus(AH->connection) == CONNECTION_BAD) + if (PQstatus(conn) == CONNECTION_BAD) exit_horribly(modulename, "connection to database \"%s\" failed: %s", - PQdb(AH->connection) ? PQdb(AH->connection) : "", - PQerrorMessage(AH->connection)); + PQdb(conn) ? PQdb(conn) : "", + PQerrorMessage(conn)); + + /* Save obtained connection to correct slot */ + if (is_replication) + AH->repConnection = conn; + else + AH->connection = conn; + + AH->savedPassword = password; /* check for version mismatch */ - _check_database_version(AH); + _check_database_version(AH, is_replication); - PQsetNoticeProcessor(AH->connection, notice_processor, NULL); + if (!is_replication) + PQsetNoticeProcessor(AH->connection, notice_processor, NULL); } /* @@ -306,6 +332,14 @@ DisconnectDatabase(Archive *AHX) PGcancel *cancel; char errbuf[1]; + /* Disconnect replication connection if there is one */ + if (AH->repConnection) + { + PQfinish(AH->repConnection); + AH->repConnection = NULL; + } + + /* Leave if no connection */ if (!AH->connection) return; @@ -330,6 +364,14 @@ GetConnection(Archive *AHX) return AH->connection; } +PGconn * +GetReplicationConnection(Archive *AHX) +{ + ArchiveHandle *AH = (ArchiveHandle *) AHX; + + return AH->repConnection; +} + static void notice_processor(void *arg, const char *message) { diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 2915329..fdbdcaa 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -142,7 +142,8 @@ static int enable_row_security = 0; static void help(const char *progname); static void setup_connection(Archive *AH, const char *dumpencoding, - char *use_role); + char *use_role); +static void setup_replication_connection(Archive *AH); static ArchiveFormat parseArchiveFormat(const char *format, ArchiveMode *mode); static void expand_schema_name_patterns(Archive *fout, SimpleStringList *patterns, @@ -280,6 +281,8 @@ main(int argc, char **argv) const char *pgport = NULL; const char *username = NULL; const char *dumpencoding = NULL; + char *slot_name = NULL; + char *plugin_name = NULL; bool oids = false; TableInfo *tblinfo; int numTables; @@ -361,6 +364,8 @@ main(int argc, char **argv) {"no-security-labels", no_argument, &no_security_labels, 1}, {"no-synchronized-snapshots", no_argument, &no_synchronized_snapshots, 1}, {"no-unlogged-table-data", no_argument, &no_unlogged_table_data, 1}, + {"slot", required_argument, NULL, 6}, + {"plugin-name", required_argument, NULL, 7}, {NULL, 0, NULL, 0} }; @@ -538,6 +543,14 @@ main(int argc, char **argv) set_dump_section(optarg, &dumpSections); break; + case 6: /* Name of slot to be created for the dump */ + slot_name = pg_strdup(optarg); + break; + + case 7: /* Plugin associated with slot created */ + plugin_name = pg_strdup(optarg); + break; + default: fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit_nicely(1); @@ -645,10 +658,41 @@ main(int argc, char **argv) * Open the database using the Archiver, so it knows about it. Errors mean * death. */ - ConnectDatabase(fout, dbname, pghost, pgport, username, prompt_password); + ConnectDatabase(fout, dbname, pghost, pgport, username, + prompt_password, false); + + /* Sanity check for replication connection */ + if (slot_name && !plugin_name) + exit_horribly(NULL, "Slot name is defined but plugin name is missing.\n"); + fout->slot_name = slot_name; + fout->plugin_name = plugin_name; + + /* Establish replication connection if necessary for logical slot creation */ + if (fout->remoteVersion < 90400 && slot_name) + { + exit_horribly(NULL, + "Logical slot creation is not supported by this server version.\n"); + } + else if (slot_name) + { + ConnectDatabase(fout, dbname, pghost, pgport, username, + prompt_password, true); + setup_replication_connection(fout); + } + + /* + * Any synchronized snapshot needed for a dump may have been taken using + * the replication connection so be sure to setup connection used for the + * dump with a consistent set of parameters. + */ setup_connection(fout, dumpencoding, use_role); /* + * Setup connection for dump. It may be possible that it uses a snapshot from + * a replication slot. + */ + + /* * Disable security label support if server version < v9.1.x (prevents * access to nonexistent pg_seclabel catalog) */ @@ -916,6 +960,10 @@ help(const char *progname) " use SET SESSION AUTHORIZATION commands instead of\n" " ALTER OWNER commands to set ownership\n")); + printf(_("\nReplication slot options:\n")); + printf(_(" --plugin-name Output plugin used for slot creation defined by --slot\n")); + printf(_(" --slot Slot created and used for dump\n")); + printf(_("\nConnection options:\n")); printf(_(" -d, --dbname=DBNAME database to dump\n")); printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); @@ -1039,9 +1087,16 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role) ExecuteSqlStatement(AH, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); - - - if (AH->numWorkers > 1 && AH->remoteVersion >= 90200 && !no_synchronized_snapshots) + /* + * In this code path, a transaction snashot can be exported from a + * non-replication connection, something that can be done only if + * slot connection is not set for this archive handler. + */ + if ((AH->numWorkers > 1 && + AH->remoteVersion >= 90200 && + !no_synchronized_snapshots) || + (AH->remoteVersion >= 90400 && + AH->slot_name)) { if (AH->sync_snapshot_id) { @@ -1052,7 +1107,7 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role) ExecuteSqlStatement(AH, query->data); destroyPQExpBuffer(query); } - else + else if (!AH->slot_name) AH->sync_snapshot_id = get_synchronized_snapshot(AH); } @@ -1066,6 +1121,27 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role) } static void +setup_replication_connection(Archive *AH) +{ + char query[256]; + PGresult *res; + PGconn *conn = GetReplicationConnection(AH); + + /* Create a slot and obtain an exported snapshot from it for the dump */ + snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", + AH->slot_name, AH->plugin_name); + + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + exit_horribly(NULL, "%s: could not send replication command \"%s\": %s", + progname, query, PQerrorMessage(conn)); + + AH->sync_snapshot_id = pg_strdup(PQgetvalue(res, 0, 2)); + PQclear(res); +} + + +static void setupDumpWorker(Archive *AHX, RestoreOptions *ropt) { setup_connection(AHX, NULL, NULL); -- 2.1.0
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index fdbdcaa..ab1e425 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -1101,6 +1101,8 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role) if (AH->sync_snapshot_id) { PQExpBuffer query = createPQExpBuffer(); + fprintf(stderr, "Snapshot used for connection: %s\n", + AH->sync_snapshot_id); appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT "); appendStringLiteralConn(query, AH->sync_snapshot_id, conn); @@ -1108,7 +1110,11 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role) destroyPQExpBuffer(query); } else if (!AH->slot_name) + { AH->sync_snapshot_id = get_synchronized_snapshot(AH); + fprintf(stderr, "Snapshot created for non-replication connection: %s\n", + AH->sync_snapshot_id); + } } if (AH->remoteVersion >= 90500) @@ -1137,6 +1143,9 @@ setup_replication_connection(Archive *AH) progname, query, PQerrorMessage(conn)); AH->sync_snapshot_id = pg_strdup(PQgetvalue(res, 0, 2)); + fprintf(stderr, "Snapshot created for replication connection: %s\n", + AH->sync_snapshot_id); + PQclear(res); }
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers