Hi all

Here's a rebased version of my pg_recvlogical --endpos patch from the
9.5 series, updated to incoroprate Álvaro's changes.

This will be mainly useful for recovery tests as we start adding more
logical decoding testing.

See original post for more detail:

https://www.postgresql.org/message-id/CAMsr+YHBm3mUtXb2_RD=qsnupdt0dr8k-+gtbbgprdyuzfm...@mail.gmail.com

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 464e487e6235e1f86d06aa82a4a57716ff188579 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Thu, 1 Sep 2016 12:37:40 +0800
Subject: [PATCH] Add an optional --endpos LSN argument to pg_reclogical
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

pg_recvlogical usually just runs until cancelled or until the upstream
server disconnects. For some purposes, especially testing, it's useful
to have the ability to stop receive at a specified LSN without having
to parse the output and deal with buffering issues, etc.

Add a --endpos parameter that takes the LSN at which no further
messages should be written and receive should stop.

Craig Ringer, Álvaro Herrera
---
 doc/src/sgml/ref/pg_recvlogical.sgml   |  35 +++++++++
 src/bin/pg_basebackup/pg_recvlogical.c | 137 +++++++++++++++++++++++++++++----
 2 files changed, 157 insertions(+), 15 deletions(-)

diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml
index b35881f..00829a7 100644
--- a/doc/src/sgml/ref/pg_recvlogical.sgml
+++ b/doc/src/sgml/ref/pg_recvlogical.sgml
@@ -155,6 +155,41 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+      <term><option>-E <replaceable>lsn</replaceable></option></term>
+      <term><option>--endpos=<replaceable>lsn</replaceable></option></term>
+      <listitem>
+       <para>
+        In <option>--start</option> mode, automatically stop replication
+        and exit with normal exit status 0 when receiving reaches the
+        specified LSN.  If specified when not in <option>--start</option>
+        mode, an error is raised.
+       </para>
+
+       <para>
+        Note the following points:
+        <itemizedlist>
+         <listitem> 
+          <para>
+           If there's a record with LSN exactly equal to <replaceable>lsn</>,
+           the record will not be output.  If you want to receive up to and
+           including a given LSN, specify LSN + 1 as the desired stop point.
+          </para>
+         </listitem>
+         <listitem>
+          <para>
+           The <option>--endpos</option> option is not aware of transaction
+           boundaries and may truncate output partway through a transaction.
+           Any partially output transaction will not be consumed and will be
+           replayed again when the slot is next read from. Individual messages
+           are never truncated.
+          </para>
+         </listitem>
+        </itemizedlist>
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>--if-not-exists</option></term>
       <listitem>
        <para>
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 4c6cf70..5108222 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -37,6 +37,7 @@ static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
 static int	fsync_interval = 10 * 1000; /* 10 sec = default */
 static XLogRecPtr startpos = InvalidXLogRecPtr;
+static XLogRecPtr endpos = InvalidXLogRecPtr;
 static bool do_create_slot = false;
 static bool slot_exists_ok = false;
 static bool do_start_slot = false;
@@ -60,6 +61,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
 static void usage(void);
 static void StreamLogicalLog(void);
 static void disconnect_and_exit(int code);
+static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
+static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
+				   bool keepalive, XLogRecPtr lsn);
 
 static void
 usage(void)
@@ -78,6 +82,7 @@ usage(void)
 			 "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
 	printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
 	printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
+	printf(_("  -E, --endpos=LSN       exit upon receiving the specified LSN\n"));
 	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
 	printf(_("  -o, --option=NAME[=VALUE]\n"
 			 "                         pass option NAME with optional value VALUE to the\n"
@@ -278,6 +283,7 @@ StreamLogicalLog(void)
 		int			bytes_written;
 		int64		now;
 		int			hdr_len;
+		XLogRecPtr	cur_record_lsn = InvalidXLogRecPtr;
 
 		if (copybuf != NULL)
 		{
@@ -451,6 +457,7 @@ StreamLogicalLog(void)
 			int			pos;
 			bool		replyRequested;
 			XLogRecPtr	walEnd;
+			bool		endposReached = false;
 
 			/*
 			 * Parse the keepalive message, enclosed in the CopyData message.
@@ -473,18 +480,32 @@ StreamLogicalLog(void)
 			}
 			replyRequested = copybuf[pos];
 
-			/* If the server requested an immediate reply, send one. */
-			if (replyRequested)
+			if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
 			{
-				/* fsync data, so we send a recent flush pointer */
-				if (!OutputFsync(now))
-					goto error;
+				/*
+				 * If there's nothing to read on the socket until a keepalive
+				 * we know that the server has nothing to send us; and if
+				 * walEnd has passed endpos, we know nothing else can have
+				 * committed before endpos.  So we can bail out now.
+				 */
+				endposReached = true;
+			}
 
-				now = feGetCurrentTimestamp();
-				if (!sendFeedback(conn, now, true, false))
+			/* Send a reply, if necessary */
+			if (replyRequested || endposReached)
+			{
+				if (!flushAndSendFeedback(conn, &now))
 					goto error;
 				last_status = now;
 			}
+
+			if (endposReached)
+			{
+				prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
+				time_to_abort = true;
+				break;
+			}
+
 			continue;
 		}
 		else if (copybuf[0] != 'w')
@@ -494,7 +515,6 @@ StreamLogicalLog(void)
 			goto error;
 		}
 
-
 		/*
 		 * Read the header of the XLogData message, enclosed in the CopyData
 		 * message. We only need the WAL location field (dataStart), the rest
@@ -512,10 +532,23 @@ StreamLogicalLog(void)
 		}
 
 		/* Extract WAL location for this block */
-		{
-			XLogRecPtr	temp = fe_recvint64(&copybuf[1]);
+		cur_record_lsn = fe_recvint64(&copybuf[1]);
 
-			output_written_lsn = Max(temp, output_written_lsn);
+		if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
+		{
+			/*
+			 * We've read past our endpoint, so prepare to go away being
+			 * cautious about what happens to our output data.
+			 */
+			if (!flushAndSendFeedback(conn, &now))
+				goto error;
+			prepareToTerminate(conn, endpos, false, cur_record_lsn);
+			time_to_abort = true;
+			break;
+		}
+		else
+		{
+			output_written_lsn = Max(cur_record_lsn, output_written_lsn);
 		}
 
 		bytes_left = r - hdr_len;
@@ -557,7 +590,16 @@ StreamLogicalLog(void)
 	}
 
 	res = PQgetResult(conn);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	if (PQresultStatus(res) == PGRES_COPY_OUT)
+	{
+		/*
+		 * We're doing a client-initiated clean exit and have sent CopyDone to
+		 * the server. We've already sent replay confirmation and fsync'd so
+		 * we can just clean up the connection now.
+		 */
+		goto error;
+	}
+	else if (PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		fprintf(stderr,
 				_("%s: unexpected termination of replication stream: %s"),
@@ -635,6 +677,7 @@ main(int argc, char **argv)
 		{"password", no_argument, NULL, 'W'},
 /* replication options */
 		{"startpos", required_argument, NULL, 'I'},
+		{"endpos", required_argument, NULL, 'E'},
 		{"option", required_argument, NULL, 'o'},
 		{"plugin", required_argument, NULL, 'P'},
 		{"status-interval", required_argument, NULL, 's'},
@@ -670,7 +713,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:",
+	while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -730,6 +773,16 @@ main(int argc, char **argv)
 				}
 				startpos = ((uint64) hi) << 32 | lo;
 				break;
+			case 'E':
+				if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
+				{
+					fprintf(stderr,
+							_("%s: could not parse end position \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				endpos = ((uint64) hi) << 32 | lo;
+				break;
 			case 'o':
 				{
 					char	   *data = pg_strdup(optarg);
@@ -854,6 +907,16 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (endpos != InvalidXLogRecPtr && !do_start_slot)
+	{
+		fprintf(stderr,
+				_("%s: cannot use --create-slot or --drop-slot together with --endpos\n"),
+				progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 #ifndef WIN32
 	pqsignal(SIGINT, sigint_handler);
 	pqsignal(SIGHUP, sighup_handler);
@@ -920,8 +983,8 @@ main(int argc, char **argv)
 		if (time_to_abort)
 		{
 			/*
-			 * We've been Ctrl-C'ed. That's not an error, so exit without an
-			 * errorcode.
+			 * We've been Ctrl-C'ed or reached an exit limit condition. That's
+			 * not an error, so exit without an errorcode.
 			 */
 			disconnect_and_exit(0);
 		}
@@ -940,3 +1003,47 @@ main(int argc, char **argv)
 		}
 	}
 }
+
+/*
+ * Fsync our output data, and send a feedback message to the server.  Returns
+ * true if successful, false otherwise.
+ *
+ * If successful, *now is updated to the current timestamp just before sending
+ * feedback.
+ */
+static bool
+flushAndSendFeedback(PGconn *conn, TimestampTz *now)
+{
+	/* flush data, so that we send a recent flush pointer */
+	if (!OutputFsync(*now))
+		return false;
+	*now = feGetCurrentTimestamp();
+	if (!sendFeedback(conn, *now, true, false))
+		return false;
+
+	return true;
+}
+
+/*
+ * Try to inform the server about of upcoming demise, but don't wait around or
+ * retry on failure.
+ */
+static void
+prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
+{
+	(void) PQputCopyEnd(conn, NULL);
+	(void) PQflush(conn);
+
+	if (verbose)
+	{
+		if (keepalive)
+			fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
+					progname,
+					(uint32) (endpos >> 32), (uint32) endpos);
+		else
+			fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
+					progname, (uint32) (endpos >> 32), (uint32) (endpos),
+					(uint32) (lsn >> 32), (uint32) lsn);
+
+	}
+}
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to