On Mon, Sep 22, 2014 at 2:25 PM, Amit Kapila <amit.kapil...@gmail.com> wrote: > On Sat, Sep 20, 2014 at 10:06 PM, Michael Paquier > <michael.paqu...@gmail.com> wrote: >> On Sat, Sep 20, 2014 at 7:09 AM, Amit Kapila <amit.kapil...@gmail.com> >> wrote: >> > 3. >> > I find existing comments okay, is there a need to change >> > in this case? Part of the new comment mentions >> > "obtaining start LSN position", actually the start position is >> > identified as part of next function call FindStreamingStart(), >> > only incase it return InvalidXLogRecPtr, the value returned >> > by IDENTIFY_SYSTEM will be used. >> Hopefully I am following you correctly here: comment has been updated >> to mention that the start LSN and TLI fetched from IDENTIFY_SYSTEM are >> always fetched but used only if no valid position is found in output >> folder of pg_receivexlog. > > Updated comment is consistent with code, however my main point > was that in this case, I don't see much need to change existing > comment. I think this point is more related to each individual's > perspective, so if you think there is a need to update the existing > comment, then retain as it is in your patch and let Committer > take a call about it. Well, let's use your suggestion then and switch back to the former comment then.
>> > 4. >> > + /* Obtain a connection before doing anything */ >> > + conn = GetConnection(); >> > + if (!conn) >> > + /* Error message already written in GetConnection() */ >> > Is there a reason for moving this function out of StreamLog(), >> > there is no harm in moving it, but there doesn't seem to be any >> > problem even if it is called inside StreamLog(). >> For pg_recvlogical, this move is done to reduce code redundancy, and >> actually it makes sense to just grab one connection in the main() code >> path before performing any replication commands. For pg_receivexlog, >> the move is done because it makes the code more consistent with >> pg_recvlogical, and also it is a preparatory work for the second patch >> that introduces the create/drop slot. Even if 2nd patch is not >> accepted I figured out that it would not hurt to simply grab one >> connection in the main code path before doing any action. > > In pg_receivexlog, StreamLog() calls PQfinish() to close a connection > to the backend and StreamLog() is getting called in while(true) loop, > so if you just grab the connection once in main loop, the current > logic won't work. For same reasons, the current coding related to > GetConnection() in pg_receivelogical seems to be right, so it is better > not to change that as well. For the second patch (that introduces the > create/drop slot), I think it is better to do in a way similar to what > current pg_receivelogical does. OK let's do so then. I changed back the GetConnection stuff back to what is done on master. In the second patch, I added an extra call to GetConnection before the create/drop commands. >> > 6. >> > + /* Identify system, obtaining start LSN position at the same time */ >> > + if (!RunIdentifySystem(conn, >> > NULL, NULL, &startpos, &plugin_name)) >> > + disconnect_and_exit(1); >> > a. >> > Generally IdentifySystem is called as first API, but I think you >> > have changed its location after CreateReplicationSlot as you want >> > to double check the value of plugin, not sure if that is necessary or >> > important enough that we start calling it later. >> Funny part here is that even the current code on master and >> REL9_4_STABLE of pg_recvlogical.c is fetching a start position when >> creating a replication slot that is not used as two different actions >> cannot be used at the same time. That's a matter of removing this line >> in code though: >> startpos = ((uint64) hi) << 32 | lo; > > User is not allowed to give startpos with drop or create of replication > slot. It is prevented by check: > if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot)) > So it seems you cannot remove the startpos assignment in code. Ah yes true, it is actually possible to start the replication process after creating the slot, so better not to remove it... I have updated CreateReplicationSlot to add startpos in the fields that can be requested from results. >> As that's only cosmetic for 9.4, and this patch makes things more >> correct I guess that's fine to do nothing and just try to get this >> patch in. > > In what sense do you think that this part of patch is better than > current code? I was trying to make the code a maximum consistent between the two utilities, but your approach makes more sense. > I think calling Identify_System as a first command makes sense > (as it is in current code) because if that fails we should not > proceed with execution of other command's. Yes looking at that again you are right. > Another point about refactoring patch is that fourth column in > Identify_System is dbname and in patch, you are referring it as > plugin which seems to be inconsistent. Oops. Thanks for checking, I changed the check to have something for the database name. At the same time, I noticed an unnecessary limitation in the second patch: we should be able to create a slot and stream from it directly. Last version exited immediately after the creation, error or not. Updated patches attached. Regards, -- Michael
From aeb75d82acc875252dab800009bba540ab89fec6 Mon Sep 17 00:00:00 2001 From: Michael Paquier <mich...@otacoo.com> Date: Sun, 21 Sep 2014 18:16:22 +0900 Subject: [PATCH] Support dump from replication slot creation state pg_dump logic now incorporates a couple of things to be able to manage a replication connection for the creation of a logical slot, only way to easily get a snapshot to get a consistent database state based on the creation of this slot. This is a piece of the puzzle for online upgrades, and is still useful by itself. --- doc/src/sgml/ref/pg_dump.sgml | 29 ++++++++++++ src/bin/pg_dump/pg_backup.h | 6 ++- src/bin/pg_dump/pg_backup_archiver.c | 9 ++-- src/bin/pg_dump/pg_backup_archiver.h | 3 ++ src/bin/pg_dump/pg_backup_db.c | 86 ++++++++++++++++++++++++++--------- src/bin/pg_dump/pg_dump.c | 88 +++++++++++++++++++++++++++++++++--- 6 files changed, 188 insertions(+), 33 deletions(-) diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml index eabdc62..c2d1097 100644 --- a/doc/src/sgml/ref/pg_dump.sgml +++ b/doc/src/sgml/ref/pg_dump.sgml @@ -801,6 +801,16 @@ PostgreSQL documentation </varlistentry> <varlistentry> + <term><option>--plugin-name=<replaceable class="parameter">plugin_name</replaceable></option></term> + <listitem> + <para> + Define a decoder plugin name (see <xref linkend="logicaldecoding-output-plugin">) + used for the creation of slot when <option>--slot</> is defined. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><option>--quote-all-identifiers</></term> <listitem> <para> @@ -866,6 +876,25 @@ PostgreSQL documentation </varlistentry> <varlistentry> + <term><option>--slot=<replaceable class="parameter">slot_name</replaceable></option></term> + <listitem> + <para> + Create a logical replication slot (see + <xref linkend="streaming-replication-slots"> for more details) from + which is taken a dump of data consistent with the point of the slot + creation using the synchronized snapshot it created. This is useful + to get a consistent image of a database before beginning to apply slot + changes to it as this ensures that the data integrity is maintained as + the same as when the replication slot was created. + </para> + <para> + This option needs to define a decoder plugin (see + <xref linkend="logicaldecoding-output-plugin">) that can be defined using <option>--plugin-name</>. + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><option>--use-set-session-authorization</></term> <listitem> <para> diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h index 921bc1b..b1628e1 100644 --- a/src/bin/pg_dump/pg_backup.h +++ b/src/bin/pg_dump/pg_backup.h @@ -85,6 +85,8 @@ struct Archive int numWorkers; /* number of parallel processes */ char *sync_snapshot_id; /* sync snapshot id for parallel * operation */ + char *slot_name; /* Slot used for dump */ + char *plugin_name; /* Plugin name for slot creation */ /* info needed for string escaping */ int encoding; /* libpq code for client_encoding */ @@ -164,9 +166,11 @@ extern void ConnectDatabase(Archive *AH, const char *pghost, const char *pgport, const char *username, - enum trivalue prompt_password); + enum trivalue prompt_password, + bool is_replication); extern void DisconnectDatabase(Archive *AHX); extern PGconn *GetConnection(Archive *AHX); +extern PGconn *GetReplicationConnection(Archive *AHX); /* Called to add a TOC entry */ extern void ArchiveEntry(Archive *AHX, diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index 5476a1e..f3e6abf 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -307,7 +307,7 @@ RestoreArchive(Archive *AHX) ConnectDatabase(AHX, ropt->dbname, ropt->pghost, ropt->pgport, ropt->username, - ropt->promptPassword); + ropt->promptPassword, false); /* * If we're talking to the DB directly, don't send comments since they @@ -3730,7 +3730,7 @@ restore_toc_entries_postfork(ArchiveHandle *AH, TocEntry *pending_list) */ ConnectDatabase((Archive *) AH, ropt->dbname, ropt->pghost, ropt->pgport, ropt->username, - ropt->promptPassword); + ropt->promptPassword, false); _doSetFixedOutputState(AH); @@ -4282,7 +4282,7 @@ CloneArchive(ArchiveHandle *AH) /* this also sets clone->connection */ ConnectDatabase((Archive *) clone, ropt->dbname, ropt->pghost, ropt->pgport, ropt->username, - ropt->promptPassword); + ropt->promptPassword, false); } else { @@ -4307,7 +4307,8 @@ CloneArchive(ArchiveHandle *AH) encname = pg_encoding_to_char(AH->public.encoding); /* this also sets clone->connection */ - ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, TRI_NO); + ConnectDatabase((Archive *) clone, dbname, pghost, pgport, username, + TRI_NO, false); /* * Set the same encoding, whatever we set here is what we got from diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h index c163f29..832d956 100644 --- a/src/bin/pg_dump/pg_backup_archiver.h +++ b/src/bin/pg_dump/pg_backup_archiver.h @@ -293,6 +293,9 @@ typedef struct _archiveHandle ArchiverOutput outputKind; /* Flag for what we're currently writing */ bool pgCopyIn; /* Currently in libpq 'COPY IN' mode. */ + /* Replication connection */ + PGconn *repConnection; /* Connection using replication protocol */ + int loFd; /* BLOB fd */ int writingBlob; /* Flag */ int blobCount; /* # of blobs restored */ diff --git a/src/bin/pg_dump/pg_backup_db.c b/src/bin/pg_dump/pg_backup_db.c index 4d1d14f..d7ce0c5 100644 --- a/src/bin/pg_dump/pg_backup_db.c +++ b/src/bin/pg_dump/pg_backup_db.c @@ -27,18 +27,19 @@ /* translator: this is a module name */ static const char *modulename = gettext_noop("archiver (db)"); -static void _check_database_version(ArchiveHandle *AH); +static void _check_database_version(ArchiveHandle *AH, bool is_replication); static PGconn *_connectDB(ArchiveHandle *AH, const char *newdbname, const char *newUser); static void notice_processor(void *arg, const char *message); static void -_check_database_version(ArchiveHandle *AH) +_check_database_version(ArchiveHandle *AH, bool is_replication) { const char *remoteversion_str; int remoteversion; + PGconn *conn = is_replication ? AH->repConnection : AH->connection; - remoteversion_str = PQparameterStatus(AH->connection, "server_version"); - remoteversion = PQserverVersion(AH->connection); + remoteversion_str = PQparameterStatus(conn, "server_version"); + remoteversion = PQserverVersion(conn); if (remoteversion == 0 || !remoteversion_str) exit_horribly(modulename, "could not get server_version from libpq\n"); @@ -194,7 +195,7 @@ _connectDB(ArchiveHandle *AH, const char *reqdb, const char *requser) AH->savedPassword = password; /* check for version mismatch */ - _check_database_version(AH); + _check_database_version(AH, false); PQsetNoticeProcessor(newConn, notice_processor, NULL); @@ -217,13 +218,21 @@ ConnectDatabase(Archive *AHX, const char *pghost, const char *pgport, const char *username, - enum trivalue prompt_password) + enum trivalue prompt_password, + bool is_replication) { ArchiveHandle *AH = (ArchiveHandle *) AHX; char *password = AH->savedPassword; bool new_pass; + PGconn *conn = is_replication ? AH->repConnection : AH->connection; - if (AH->connection) + /* + * Replication connection cannot be established before a normal connection + * as a check on the remote server version is necessary for compatibility. + */ + Assert((is_replication && AH->connection) || !is_replication); + + if (conn) exit_horribly(modulename, "already connected to a database\n"); if (prompt_password == TRI_YES && password == NULL) @@ -240,9 +249,9 @@ ConnectDatabase(Archive *AHX, */ do { -#define PARAMS_ARRAY_SIZE 7 - const char **keywords = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*keywords)); - const char **values = pg_malloc(PARAMS_ARRAY_SIZE * sizeof(*values)); +#define PARAMS_CONNECT_SIZE 8 + const char **keywords = pg_malloc(PARAMS_CONNECT_SIZE * sizeof(*keywords)); + const char **values = pg_malloc(PARAMS_CONNECT_SIZE * sizeof(*values)); keywords[0] = "host"; values[0] = pghost; @@ -259,21 +268,31 @@ ConnectDatabase(Archive *AHX, keywords[6] = NULL; values[6] = NULL; + /* Process replication connection for logical slot */ + if (is_replication) + { + keywords[6] = "replication"; + values[6] = "database"; + keywords[7] = NULL; + values[7] = NULL; + } + + /* Process regular connection */ new_pass = false; - AH->connection = PQconnectdbParams(keywords, values, true); + conn = PQconnectdbParams(keywords, values, true); free(keywords); free(values); - if (!AH->connection) + if (!conn) exit_horribly(modulename, "failed to connect to database\n"); - if (PQstatus(AH->connection) == CONNECTION_BAD && - PQconnectionNeedsPassword(AH->connection) && + if (PQstatus(conn) == CONNECTION_BAD && + PQconnectionNeedsPassword(conn) && password == NULL && prompt_password != TRI_NO) { - PQfinish(AH->connection); + PQfinish(conn); password = simple_prompt("Password: ", 100, false); if (password == NULL) exit_horribly(modulename, "out of memory\n"); @@ -281,18 +300,25 @@ ConnectDatabase(Archive *AHX, } } while (new_pass); - AH->savedPassword = password; - /* check to see that the backend connection was successfully made */ - if (PQstatus(AH->connection) == CONNECTION_BAD) + if (PQstatus(conn) == CONNECTION_BAD) exit_horribly(modulename, "connection to database \"%s\" failed: %s", - PQdb(AH->connection) ? PQdb(AH->connection) : "", - PQerrorMessage(AH->connection)); + PQdb(conn) ? PQdb(conn) : "", + PQerrorMessage(conn)); + + /* Save obtained connection to correct slot */ + if (is_replication) + AH->repConnection = conn; + else + AH->connection = conn; + + AH->savedPassword = password; /* check for version mismatch */ - _check_database_version(AH); + _check_database_version(AH, is_replication); - PQsetNoticeProcessor(AH->connection, notice_processor, NULL); + if (!is_replication) + PQsetNoticeProcessor(AH->connection, notice_processor, NULL); } /* @@ -306,6 +332,14 @@ DisconnectDatabase(Archive *AHX) PGcancel *cancel; char errbuf[1]; + /* Disconnect replication connection if there is one */ + if (AH->repConnection) + { + PQfinish(AH->repConnection); + AH->repConnection = NULL; + } + + /* Leave if no connection */ if (!AH->connection) return; @@ -330,6 +364,14 @@ GetConnection(Archive *AHX) return AH->connection; } +PGconn * +GetReplicationConnection(Archive *AHX) +{ + ArchiveHandle *AH = (ArchiveHandle *) AHX; + + return AH->repConnection; +} + static void notice_processor(void *arg, const char *message) { diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 2915329..fdbdcaa 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -142,7 +142,8 @@ static int enable_row_security = 0; static void help(const char *progname); static void setup_connection(Archive *AH, const char *dumpencoding, - char *use_role); + char *use_role); +static void setup_replication_connection(Archive *AH); static ArchiveFormat parseArchiveFormat(const char *format, ArchiveMode *mode); static void expand_schema_name_patterns(Archive *fout, SimpleStringList *patterns, @@ -280,6 +281,8 @@ main(int argc, char **argv) const char *pgport = NULL; const char *username = NULL; const char *dumpencoding = NULL; + char *slot_name = NULL; + char *plugin_name = NULL; bool oids = false; TableInfo *tblinfo; int numTables; @@ -361,6 +364,8 @@ main(int argc, char **argv) {"no-security-labels", no_argument, &no_security_labels, 1}, {"no-synchronized-snapshots", no_argument, &no_synchronized_snapshots, 1}, {"no-unlogged-table-data", no_argument, &no_unlogged_table_data, 1}, + {"slot", required_argument, NULL, 6}, + {"plugin-name", required_argument, NULL, 7}, {NULL, 0, NULL, 0} }; @@ -538,6 +543,14 @@ main(int argc, char **argv) set_dump_section(optarg, &dumpSections); break; + case 6: /* Name of slot to be created for the dump */ + slot_name = pg_strdup(optarg); + break; + + case 7: /* Plugin associated with slot created */ + plugin_name = pg_strdup(optarg); + break; + default: fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); exit_nicely(1); @@ -645,10 +658,41 @@ main(int argc, char **argv) * Open the database using the Archiver, so it knows about it. Errors mean * death. */ - ConnectDatabase(fout, dbname, pghost, pgport, username, prompt_password); + ConnectDatabase(fout, dbname, pghost, pgport, username, + prompt_password, false); + + /* Sanity check for replication connection */ + if (slot_name && !plugin_name) + exit_horribly(NULL, "Slot name is defined but plugin name is missing.\n"); + fout->slot_name = slot_name; + fout->plugin_name = plugin_name; + + /* Establish replication connection if necessary for logical slot creation */ + if (fout->remoteVersion < 90400 && slot_name) + { + exit_horribly(NULL, + "Logical slot creation is not supported by this server version.\n"); + } + else if (slot_name) + { + ConnectDatabase(fout, dbname, pghost, pgport, username, + prompt_password, true); + setup_replication_connection(fout); + } + + /* + * Any synchronized snapshot needed for a dump may have been taken using + * the replication connection so be sure to setup connection used for the + * dump with a consistent set of parameters. + */ setup_connection(fout, dumpencoding, use_role); /* + * Setup connection for dump. It may be possible that it uses a snapshot from + * a replication slot. + */ + + /* * Disable security label support if server version < v9.1.x (prevents * access to nonexistent pg_seclabel catalog) */ @@ -916,6 +960,10 @@ help(const char *progname) " use SET SESSION AUTHORIZATION commands instead of\n" " ALTER OWNER commands to set ownership\n")); + printf(_("\nReplication slot options:\n")); + printf(_(" --plugin-name Output plugin used for slot creation defined by --slot\n")); + printf(_(" --slot Slot created and used for dump\n")); + printf(_("\nConnection options:\n")); printf(_(" -d, --dbname=DBNAME database to dump\n")); printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); @@ -1039,9 +1087,16 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role) ExecuteSqlStatement(AH, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"); - - - if (AH->numWorkers > 1 && AH->remoteVersion >= 90200 && !no_synchronized_snapshots) + /* + * In this code path, a transaction snashot can be exported from a + * non-replication connection, something that can be done only if + * slot connection is not set for this archive handler. + */ + if ((AH->numWorkers > 1 && + AH->remoteVersion >= 90200 && + !no_synchronized_snapshots) || + (AH->remoteVersion >= 90400 && + AH->slot_name)) { if (AH->sync_snapshot_id) { @@ -1052,7 +1107,7 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role) ExecuteSqlStatement(AH, query->data); destroyPQExpBuffer(query); } - else + else if (!AH->slot_name) AH->sync_snapshot_id = get_synchronized_snapshot(AH); } @@ -1066,6 +1121,27 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role) } static void +setup_replication_connection(Archive *AH) +{ + char query[256]; + PGresult *res; + PGconn *conn = GetReplicationConnection(AH); + + /* Create a slot and obtain an exported snapshot from it for the dump */ + snprintf(query, sizeof(query), "CREATE_REPLICATION_SLOT \"%s\" LOGICAL \"%s\"", + AH->slot_name, AH->plugin_name); + + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + exit_horribly(NULL, "%s: could not send replication command \"%s\": %s", + progname, query, PQerrorMessage(conn)); + + AH->sync_snapshot_id = pg_strdup(PQgetvalue(res, 0, 2)); + PQclear(res); +} + + +static void setupDumpWorker(Archive *AHX, RestoreOptions *ropt) { setup_connection(AHX, NULL, NULL); -- 2.1.0
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index fdbdcaa..ab1e425 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -1101,6 +1101,8 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role) if (AH->sync_snapshot_id) { PQExpBuffer query = createPQExpBuffer(); + fprintf(stderr, "Snapshot used for connection: %s\n", + AH->sync_snapshot_id); appendPQExpBufferStr(query, "SET TRANSACTION SNAPSHOT "); appendStringLiteralConn(query, AH->sync_snapshot_id, conn); @@ -1108,7 +1110,11 @@ setup_connection(Archive *AH, const char *dumpencoding, char *use_role) destroyPQExpBuffer(query); } else if (!AH->slot_name) + { AH->sync_snapshot_id = get_synchronized_snapshot(AH); + fprintf(stderr, "Snapshot created for non-replication connection: %s\n", + AH->sync_snapshot_id); + } } if (AH->remoteVersion >= 90500) @@ -1137,6 +1143,9 @@ setup_replication_connection(Archive *AH) progname, query, PQerrorMessage(conn)); AH->sync_snapshot_id = pg_strdup(PQgetvalue(res, 0, 2)); + fprintf(stderr, "Snapshot created for replication connection: %s\n", + AH->sync_snapshot_id); + PQclear(res); }
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers