Craig Ringer wrote: > With this patch pg_recvlogical takes a new --endpos LSN argument, and will > exit if either: > > * it receives an XLogData message with dataStart >= endpos; or > * it receives a keepalive with walEnd >= endpos > > The latter allows it to work without needing a dummy transaction to make it > see a data message after endpos. If there's nothing to read on the socket > until a keepalive we know that the server has nothing to send us, and if > walend has passed endpos we know nothing can have committed before endpos.
Here's a slightly revised version of this patch, for later consideration. Changes: - added -E as short form of --endpos (consistent with -I as --startpos) - refactored some repetitive code in two auxilliary functions - allow --endpos to work with --create-slot. - revert some unrelated changes, such as message additions in verbose mode and changes to existing messages - documentation reworked. I didn't spot any bugs in Craig's patch, but it needs more testing. -- Álvaro Herrera http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml index 9d0b58b..6f23229 100644 --- a/doc/src/sgml/ref/pg_recvlogical.sgml +++ b/doc/src/sgml/ref/pg_recvlogical.sgml @@ -155,6 +155,41 @@ PostgreSQL documentation </varlistentry> <varlistentry> + <term><option>-E <replaceable>lsn</replaceable></option></term> + <term><option>--endpos=<replaceable>lsn</replaceable></option></term> + <listitem> + <para> + In <option>--start</option> mode, automatically stop replication + and exit with normal exit status 0 when receiving reaches the + specified LSN. If specified when not in <option>--start</option> + mode, an error is raised. + </para> + + <para> + Note the following points: + <itemizedlist> + <listitem> + <para> + If there's a record with LSN exactly equal to <replaceable>lsn</>, + the record will not be output. If you want to receive up to and + including a given LSN, specify LSN + 1 as the desired stop point. + </para> + </listitem> + <listitem> + <para> + The <option>--endpos</option> option is not aware of transaction + boundaries and may truncate output partway through a transaction. + Any partially output transaction will not be consumed and will be + replayed again when the slot is next read from. Individual messages + are never truncated. + </para> + </listitem> + </itemizedlist> + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><option>--if-not-exists</option></term> <listitem> <para> diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index 6d12705..5108222 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -37,6 +37,7 @@ static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static int fsync_interval = 10 * 1000; /* 10 sec = default */ static XLogRecPtr startpos = InvalidXLogRecPtr; +static XLogRecPtr endpos = InvalidXLogRecPtr; static bool do_create_slot = false; static bool slot_exists_ok = false; static bool do_start_slot = false; @@ -60,6 +61,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr; static void usage(void); static void StreamLogicalLog(void); static void disconnect_and_exit(int code); +static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now); +static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos, + bool keepalive, XLogRecPtr lsn); static void usage(void) @@ -78,6 +82,7 @@ usage(void) " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000)); printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n")); printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n")); + printf(_(" -E, --endpos=LSN exit upon receiving the specified LSN\n")); printf(_(" -n, --no-loop do not loop on connection lost\n")); printf(_(" -o, --option=NAME[=VALUE]\n" " pass option NAME with optional value VALUE to the\n" @@ -278,6 +283,7 @@ StreamLogicalLog(void) int bytes_written; int64 now; int hdr_len; + XLogRecPtr cur_record_lsn = InvalidXLogRecPtr; if (copybuf != NULL) { @@ -451,6 +457,7 @@ StreamLogicalLog(void) int pos; bool replyRequested; XLogRecPtr walEnd; + bool endposReached = false; /* * Parse the keepalive message, enclosed in the CopyData message. @@ -473,18 +480,32 @@ StreamLogicalLog(void) } replyRequested = copybuf[pos]; - /* If the server requested an immediate reply, send one. */ - if (replyRequested) + if (endpos != InvalidXLogRecPtr && walEnd >= endpos) { - /* fsync data, so we send a recent flush pointer */ - if (!OutputFsync(now)) - goto error; + /* + * If there's nothing to read on the socket until a keepalive + * we know that the server has nothing to send us; and if + * walEnd has passed endpos, we know nothing else can have + * committed before endpos. So we can bail out now. + */ + endposReached = true; + } - now = feGetCurrentTimestamp(); - if (!sendFeedback(conn, now, true, false)) + /* Send a reply, if necessary */ + if (replyRequested || endposReached) + { + if (!flushAndSendFeedback(conn, &now)) goto error; last_status = now; } + + if (endposReached) + { + prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr); + time_to_abort = true; + break; + } + continue; } else if (copybuf[0] != 'w') @@ -494,7 +515,6 @@ StreamLogicalLog(void) goto error; } - /* * Read the header of the XLogData message, enclosed in the CopyData * message. We only need the WAL location field (dataStart), the rest @@ -512,10 +532,23 @@ StreamLogicalLog(void) } /* Extract WAL location for this block */ - { - XLogRecPtr temp = fe_recvint64(©buf[1]); + cur_record_lsn = fe_recvint64(©buf[1]); - output_written_lsn = Max(temp, output_written_lsn); + if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos) + { + /* + * We've read past our endpoint, so prepare to go away being + * cautious about what happens to our output data. + */ + if (!flushAndSendFeedback(conn, &now)) + goto error; + prepareToTerminate(conn, endpos, false, cur_record_lsn); + time_to_abort = true; + break; + } + else + { + output_written_lsn = Max(cur_record_lsn, output_written_lsn); } bytes_left = r - hdr_len; @@ -557,7 +590,16 @@ StreamLogicalLog(void) } res = PQgetResult(conn); - if (PQresultStatus(res) != PGRES_COMMAND_OK) + if (PQresultStatus(res) == PGRES_COPY_OUT) + { + /* + * We're doing a client-initiated clean exit and have sent CopyDone to + * the server. We've already sent replay confirmation and fsync'd so + * we can just clean up the connection now. + */ + goto error; + } + else if (PQresultStatus(res) != PGRES_COMMAND_OK) { fprintf(stderr, _("%s: unexpected termination of replication stream: %s"), @@ -635,6 +677,7 @@ main(int argc, char **argv) {"password", no_argument, NULL, 'W'}, /* replication options */ {"startpos", required_argument, NULL, 'I'}, + {"endpos", required_argument, NULL, 'E'}, {"option", required_argument, NULL, 'o'}, {"plugin", required_argument, NULL, 'P'}, {"status-interval", required_argument, NULL, 's'}, @@ -670,7 +713,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:", + while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:", long_options, &option_index)) != -1) { switch (c) @@ -730,6 +773,16 @@ main(int argc, char **argv) } startpos = ((uint64) hi) << 32 | lo; break; + case 'E': + if (sscanf(optarg, "%X/%X", &hi, &lo) != 2) + { + fprintf(stderr, + _("%s: could not parse end position \"%s\"\n"), + progname, optarg); + exit(1); + } + endpos = ((uint64) hi) << 32 | lo; + break; case 'o': { char *data = pg_strdup(optarg); @@ -854,6 +907,16 @@ main(int argc, char **argv) exit(1); } + if (endpos != InvalidXLogRecPtr && !do_start_slot) + { + fprintf(stderr, + _("%s: cannot use --create-slot or --drop-slot together with --endpos\n"), + progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + #ifndef WIN32 pqsignal(SIGINT, sigint_handler); pqsignal(SIGHUP, sighup_handler); @@ -861,8 +924,8 @@ main(int argc, char **argv) /* * Obtain a connection to server. This is not really necessary but it - * helps to get more precise error messages about authentication, - * required GUC parameters and such. + * helps to get more precise error messages about authentication, required + * GUC parameters and such. */ conn = GetConnection(); if (!conn) @@ -920,8 +983,8 @@ main(int argc, char **argv) if (time_to_abort) { /* - * We've been Ctrl-C'ed. That's not an error, so exit without an - * errorcode. + * We've been Ctrl-C'ed or reached an exit limit condition. That's + * not an error, so exit without an errorcode. */ disconnect_and_exit(0); } @@ -940,3 +1003,47 @@ main(int argc, char **argv) } } } + +/* + * Fsync our output data, and send a feedback message to the server. Returns + * true if successful, false otherwise. + * + * If successful, *now is updated to the current timestamp just before sending + * feedback. + */ +static bool +flushAndSendFeedback(PGconn *conn, TimestampTz *now) +{ + /* flush data, so that we send a recent flush pointer */ + if (!OutputFsync(*now)) + return false; + *now = feGetCurrentTimestamp(); + if (!sendFeedback(conn, *now, true, false)) + return false; + + return true; +} + +/* + * Try to inform the server about of upcoming demise, but don't wait around or + * retry on failure. + */ +static void +prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn) +{ + (void) PQputCopyEnd(conn, NULL); + (void) PQflush(conn); + + if (verbose) + { + if (keepalive) + fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n", + progname, + (uint32) (endpos >> 32), (uint32) endpos); + else + fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n", + progname, (uint32) (endpos >> 32), (uint32) (endpos), + (uint32) (lsn >> 32), (uint32) lsn); + + } +}
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers