On 29 April 2016 at 15:40, Craig Ringer <cr...@2ndquadrant.com> wrote:


> I don't think pg_recvlogical can do anything about the need for that dummy
> write, since the client has no way to determine the exact LSN of the commit
> record of the xact of interest. It can't rely
> on pg_current_xlog_insert_location() or pg_current_xlog_location() since
> autovacuum or a checkpoint might've written xlog since. Logical streaming
> replication doesn't have a non-blocking mode where it returns immediately
> if it'd have to wait for more xlog so we can't just send off the most
> recent server LSN as the endpoint.
>

(Patch attached. Blah blah explanation blah):

With this patch pg_recvlogical takes a new --endpos LSN argument, and will
exit if either:

* it receives an XLogData message with dataStart >= endpos; or
* it receives a keepalive with walEnd >= endpos

The latter allows it to work without needing a dummy transaction to make it
see a data message after endpos. If there's nothing to read on the socket
until a keepalive we know that the server has nothing to send us, and if
walend has passed endpos we know nothing can have committed before endpos.


The way I've written things the endpos is the point where we stop receiving
and exit, so if a record with start lsn >= endpos is received we'll exit
without writing it.

I thought about writing out the record before exiting if the record start
LSN is exactly endpos. That'd be handy in cases where the client knows a
commit's LSN and wants everything up to that commit. But it's easy enough
in this case for the client to set endpos to the commit start lsn + 1, so
it's not like the current behaviour stops you doing anything, and it means
the code can just test endpos and exit. pg_current_xlog_insert_location()
will return at least the lsn of the last commit + 1, so you'll get the
expected behaviour for free there. It does mean we might wait for the next
walsender keepalive or status update before we exit, though, so if someone
feels strongly that endpos should be an inclusive bound I can do that. It's
just a bit uglier in the code.

I can't add a "number of xacts" filter like the SQL interface has because
pg_recvlogical has no idea which records represent a commit, so it's not
possible without changing the protocol. I'm not convinced a "number of
messages" filter is particularly useful. I could add a timeout, but it's
easy enough to do that in a wrapper (like IPC::Run). So I'm sticking with
just the LSN filter for now.

Also because pg_recvlogical isn't aware of transaction boundaries, setting
endpos might result in a partial transaction being output if endpos is
after the end of the last xact wanted and some other xact containing
changes made before endpos commits after endpos but before the next status
update/keepalive is sent. That xact won't be consumed from the server and
will just be sent when the slot is next read from. This won't result in
unpredictable output for testing since there we control what other xacts
run and will generally exit based on walsender status updates/keepalives.

Here's the patch. Docs included. Comments?

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From cc54f99d21de3c573873cce3467237caccfb1a33 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Fri, 29 Apr 2016 18:18:19 +0800
Subject: [PATCH] Allow a stop LSN to be specified to pg_recvlogical

pg_recvlogical just runs until cancelled or until the upstream
server disconnects. For some purposes, especially testing, it's
useful to have the ability to stop receive at a specified LSN
without having to parse the output and deal with buffering issues,
etc.

Add a --endpos parameter that takes the LSN at which no further
messages should be written and receive should stop.
---
 doc/src/sgml/ref/pg_recvlogical.sgml   |  36 ++++++++++++
 src/bin/pg_basebackup/pg_recvlogical.c | 102 +++++++++++++++++++++++++++++----
 2 files changed, 127 insertions(+), 11 deletions(-)

diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml
index 9d0b58b..8f1e8f6 100644
--- a/doc/src/sgml/ref/pg_recvlogical.sgml
+++ b/doc/src/sgml/ref/pg_recvlogical.sgml
@@ -155,6 +155,42 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+      <term><option>--endpos=<replaceable>lsn</replaceable></option></term>
+      <listitem>
+       <para>
+        In <option>--start</option> mode, automatically stop replication and
+        exit with normal exit status 0 when receive passes the specified LSN.
+        Because of logical decoding's reordering of operations this actually
+        means that no change that's part of a transaction that committed before
+        endpos is still waiting to be received. There can be still be
+        pending changes made before endpos that're part of transactions that
+        committed after endpos or are not yet committed.
+       </para>
+       <para>
+        The endpos option is not aware of transaction boundaries and may
+        truncate output partway through a transaction. The partially
+        output transaction will not be consumed and will be replayed again
+        when the slot is next read from. Individual messages will never
+        be truncated.
+       </para>
+       <para>
+        If there's a record with start LSN exactly equal to
+        <option>--endpos</option> it will not be output.  If you want to
+        receive up to and including a given LSN, specify
+        <option>--endpos</option> as the desired LSN + 1.
+       </para>
+       <para>
+        If there is no more data ready to be sent when the server's xlog passes
+        endpos, <application>pg_recvlogical</application> may not exit until
+        the next keepalive is received from the server, usually after half of
+        <link
+        linkend="guc-wal-sender-timeout"><literal>wal_sender_timeout</literal></link>
+        has elapsed.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>--if-not-exists</option></term>
       <listitem>
        <para>
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 6d12705..d3c5aea 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -37,6 +37,7 @@ static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
 static int	fsync_interval = 10 * 1000; /* 10 sec = default */
 static XLogRecPtr startpos = InvalidXLogRecPtr;
+static XLogRecPtr endpos = InvalidXLogRecPtr;
 static bool do_create_slot = false;
 static bool slot_exists_ok = false;
 static bool do_start_slot = false;
@@ -78,6 +79,7 @@ usage(void)
 			 "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
 	printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
 	printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
+	printf(_("      --endpos=LSN       exit when receive reaches or passes the specified lsn\n"));
 	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
 	printf(_("  -o, --option=NAME[=VALUE]\n"
 			 "                         pass option NAME with optional value VALUE to the\n"
@@ -278,6 +280,7 @@ StreamLogicalLog(void)
 		int			bytes_written;
 		int64		now;
 		int			hdr_len;
+		XLogRecPtr	cur_record_lsn = InvalidXLogRecPtr;
 
 		if (copybuf != NULL)
 		{
@@ -451,6 +454,7 @@ StreamLogicalLog(void)
 			int			pos;
 			bool		replyRequested;
 			XLogRecPtr	walEnd;
+			bool		endposReached = false;
 
 			/*
 			 * Parse the keepalive message, enclosed in the CopyData message.
@@ -473,8 +477,24 @@ StreamLogicalLog(void)
 			}
 			replyRequested = copybuf[pos];
 
-			/* If the server requested an immediate reply, send one. */
-			if (replyRequested)
+			if (verbose)
+			{
+				fprintf(stderr, "%s: keepalive with walend=%X/%X received (immediate_reply=%c)\n",
+						progname, (uint32)(walEnd>>32), (uint32)walEnd, replyRequested ? 'y' : 'n');
+			}
+
+			if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
+			{
+				/*
+				 * If there's nothing to read on the socket until a keepalive
+				 * we know that the server has nothing to send us, and if
+				 * walend has passed endpos we know nothing else can have
+				 * committed before endpos. So we can bail out now.
+				 */
+				endposReached = true;
+			}
+
+			if (replyRequested || endposReached)
 			{
 				/* fsync data, so we send a recent flush pointer */
 				if (!OutputFsync(now))
@@ -485,6 +505,18 @@ StreamLogicalLog(void)
 					goto error;
 				last_status = now;
 			}
+
+			if (endposReached)
+			{
+				(void) PQputCopyEnd(conn, NULL);
+				(void) PQflush(conn);
+				if (verbose)
+					fprintf(stderr, "%s: endpos %X/%X reached/passed by keepalive\n",
+							progname, (uint32)(endpos>>32), (uint32)endpos);
+				time_to_abort = true;
+				break;
+			}
+
 			continue;
 		}
 		else if (copybuf[0] != 'w')
@@ -512,10 +544,38 @@ StreamLogicalLog(void)
 		}
 
 		/* Extract WAL location for this block */
+		cur_record_lsn = fe_recvint64(&copybuf[1]);
+
+		if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
 		{
-			XLogRecPtr	temp = fe_recvint64(&copybuf[1]);
+			/* We've read past past the limit of what we want to receive */
+
+			if (!OutputFsync(now))
+				goto error;
+
+			if (!sendFeedback(conn, now, true, false))
+				goto error;
 
-			output_written_lsn = Max(temp, output_written_lsn);
+			/*
+			 * Try to tell the server we're exiting, but don't wait around or
+			 * retry on failure. Current servers don't understand
+			 * client-initiated CopyDone anyway.
+			 */
+			(void) PQputCopyEnd(conn, NULL);
+			(void) PQflush(conn);
+
+			if (verbose)
+				fprintf(stderr, "%s: endpos %X/%X reached/passed by record startpos %X/%X\n",
+						progname, (uint32)(endpos>>32), (uint32)(endpos),
+						(uint32)(cur_record_lsn>>32), (uint32)cur_record_lsn);
+
+			/* We'll never get a reply CopyDone from the server, so just bail out. */
+			time_to_abort = true;
+			break;
+		}
+		else
+		{
+			output_written_lsn = Max(cur_record_lsn, output_written_lsn);
 		}
 
 		bytes_left = r - hdr_len;
@@ -557,11 +617,20 @@ StreamLogicalLog(void)
 	}
 
 	res = PQgetResult(conn);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	if (PQresultStatus(res) == PGRES_COPY_OUT)
+	{
+		/*
+		 * We're doing a client-initiated clean exit and have sent CopyDone to
+		 * the server. We've already sent replay confirmation and fsync'd so we
+		 * can just clean up the connection now.
+		 */
+		goto error;
+	}
+	else if (PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		fprintf(stderr,
-				_("%s: unexpected termination of replication stream: %s"),
-				progname, PQresultErrorMessage(res));
+				_("%s: unexpected termination of replication stream: %s (status=%u)"),
+				progname, PQresultErrorMessage(res), PQresultStatus(res));
 		goto error;
 	}
 	PQclear(res);
@@ -644,6 +713,7 @@ main(int argc, char **argv)
 		{"start", no_argument, NULL, 2},
 		{"drop-slot", no_argument, NULL, 3},
 		{"if-not-exists", no_argument, NULL, 4},
+		{"endpos", required_argument, NULL, 5},
 		{NULL, 0, NULL, 0}
 	};
 	int			c;
@@ -778,6 +848,16 @@ main(int argc, char **argv)
 			case 4:
 				slot_exists_ok = true;
 				break;
+			case 5:
+				if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
+				{
+					fprintf(stderr,
+							_("%s: could not parse end position \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				endpos = ((uint64) hi) << 32 | lo;
+				break;
 
 			default:
 
@@ -846,9 +926,9 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
-	if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
+	if ((startpos != InvalidXLogRecPtr || endpos != InvalidXLogRecPtr) && (do_create_slot || do_drop_slot))
 	{
-		fprintf(stderr, _("%s: cannot use --create-slot or --drop-slot together with --startpos\n"), progname);
+		fprintf(stderr, _("%s: cannot use --create-slot or --drop-slot together with --startpos or --endpos\n"), progname);
 		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
 				progname);
 		exit(1);
@@ -920,8 +1000,8 @@ main(int argc, char **argv)
 		if (time_to_abort)
 		{
 			/*
-			 * We've been Ctrl-C'ed. That's not an error, so exit without an
-			 * errorcode.
+			 * We've been Ctrl-C'ed or reached an exit limit condition. That's
+			 * not an error, so exit without an errorcode.
 			 */
 			disconnect_and_exit(0);
 		}
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to