Craig Ringer wrote:
 
> With this patch pg_recvlogical takes a new --endpos LSN argument, and will
> exit if either:
> 
> * it receives an XLogData message with dataStart >= endpos; or
> * it receives a keepalive with walEnd >= endpos
> 
> The latter allows it to work without needing a dummy transaction to make it
> see a data message after endpos. If there's nothing to read on the socket
> until a keepalive we know that the server has nothing to send us, and if
> walend has passed endpos we know nothing can have committed before endpos.

Here's a slightly revised version of this patch, for later
consideration.

Changes:
- added -E as short form of --endpos (consistent with -I as --startpos)
- refactored some repetitive code in two auxilliary functions
- allow --endpos to work with --create-slot.
- revert some unrelated changes, such as message additions in verbose
  mode and changes to existing messages
- documentation reworked.

I didn't spot any bugs in Craig's patch, but it needs more testing.

-- 
Álvaro Herrera                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/doc/src/sgml/ref/pg_recvlogical.sgml b/doc/src/sgml/ref/pg_recvlogical.sgml
index 9d0b58b..6f23229 100644
--- a/doc/src/sgml/ref/pg_recvlogical.sgml
+++ b/doc/src/sgml/ref/pg_recvlogical.sgml
@@ -155,6 +155,41 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+      <term><option>-E <replaceable>lsn</replaceable></option></term>
+      <term><option>--endpos=<replaceable>lsn</replaceable></option></term>
+      <listitem>
+       <para>
+        In <option>--start</option> mode, automatically stop replication
+        and exit with normal exit status 0 when receiving reaches the
+        specified LSN.  If specified when not in <option>--start</option>
+        mode, an error is raised.
+       </para>
+
+       <para>
+        Note the following points:
+        <itemizedlist>
+         <listitem> 
+          <para>
+           If there's a record with LSN exactly equal to <replaceable>lsn</>,
+           the record will not be output.  If you want to receive up to and
+           including a given LSN, specify LSN + 1 as the desired stop point.
+          </para>
+         </listitem>
+         <listitem>
+          <para>
+           The <option>--endpos</option> option is not aware of transaction
+           boundaries and may truncate output partway through a transaction.
+           Any partially output transaction will not be consumed and will be
+           replayed again when the slot is next read from. Individual messages
+           are never truncated.
+          </para>
+         </listitem>
+        </itemizedlist>
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>--if-not-exists</option></term>
       <listitem>
        <para>
diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index 6d12705..5108222 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -37,6 +37,7 @@ static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
 static int	fsync_interval = 10 * 1000; /* 10 sec = default */
 static XLogRecPtr startpos = InvalidXLogRecPtr;
+static XLogRecPtr endpos = InvalidXLogRecPtr;
 static bool do_create_slot = false;
 static bool slot_exists_ok = false;
 static bool do_start_slot = false;
@@ -60,6 +61,9 @@ static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
 static void usage(void);
 static void StreamLogicalLog(void);
 static void disconnect_and_exit(int code);
+static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
+static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
+				   bool keepalive, XLogRecPtr lsn);
 
 static void
 usage(void)
@@ -78,6 +82,7 @@ usage(void)
 			 "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
 	printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
 	printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
+	printf(_("  -E, --endpos=LSN       exit upon receiving the specified LSN\n"));
 	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
 	printf(_("  -o, --option=NAME[=VALUE]\n"
 			 "                         pass option NAME with optional value VALUE to the\n"
@@ -278,6 +283,7 @@ StreamLogicalLog(void)
 		int			bytes_written;
 		int64		now;
 		int			hdr_len;
+		XLogRecPtr	cur_record_lsn = InvalidXLogRecPtr;
 
 		if (copybuf != NULL)
 		{
@@ -451,6 +457,7 @@ StreamLogicalLog(void)
 			int			pos;
 			bool		replyRequested;
 			XLogRecPtr	walEnd;
+			bool		endposReached = false;
 
 			/*
 			 * Parse the keepalive message, enclosed in the CopyData message.
@@ -473,18 +480,32 @@ StreamLogicalLog(void)
 			}
 			replyRequested = copybuf[pos];
 
-			/* If the server requested an immediate reply, send one. */
-			if (replyRequested)
+			if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
 			{
-				/* fsync data, so we send a recent flush pointer */
-				if (!OutputFsync(now))
-					goto error;
+				/*
+				 * If there's nothing to read on the socket until a keepalive
+				 * we know that the server has nothing to send us; and if
+				 * walEnd has passed endpos, we know nothing else can have
+				 * committed before endpos.  So we can bail out now.
+				 */
+				endposReached = true;
+			}
 
-				now = feGetCurrentTimestamp();
-				if (!sendFeedback(conn, now, true, false))
+			/* Send a reply, if necessary */
+			if (replyRequested || endposReached)
+			{
+				if (!flushAndSendFeedback(conn, &now))
 					goto error;
 				last_status = now;
 			}
+
+			if (endposReached)
+			{
+				prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
+				time_to_abort = true;
+				break;
+			}
+
 			continue;
 		}
 		else if (copybuf[0] != 'w')
@@ -494,7 +515,6 @@ StreamLogicalLog(void)
 			goto error;
 		}
 
-
 		/*
 		 * Read the header of the XLogData message, enclosed in the CopyData
 		 * message. We only need the WAL location field (dataStart), the rest
@@ -512,10 +532,23 @@ StreamLogicalLog(void)
 		}
 
 		/* Extract WAL location for this block */
-		{
-			XLogRecPtr	temp = fe_recvint64(&copybuf[1]);
+		cur_record_lsn = fe_recvint64(&copybuf[1]);
 
-			output_written_lsn = Max(temp, output_written_lsn);
+		if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
+		{
+			/*
+			 * We've read past our endpoint, so prepare to go away being
+			 * cautious about what happens to our output data.
+			 */
+			if (!flushAndSendFeedback(conn, &now))
+				goto error;
+			prepareToTerminate(conn, endpos, false, cur_record_lsn);
+			time_to_abort = true;
+			break;
+		}
+		else
+		{
+			output_written_lsn = Max(cur_record_lsn, output_written_lsn);
 		}
 
 		bytes_left = r - hdr_len;
@@ -557,7 +590,16 @@ StreamLogicalLog(void)
 	}
 
 	res = PQgetResult(conn);
-	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	if (PQresultStatus(res) == PGRES_COPY_OUT)
+	{
+		/*
+		 * We're doing a client-initiated clean exit and have sent CopyDone to
+		 * the server. We've already sent replay confirmation and fsync'd so
+		 * we can just clean up the connection now.
+		 */
+		goto error;
+	}
+	else if (PQresultStatus(res) != PGRES_COMMAND_OK)
 	{
 		fprintf(stderr,
 				_("%s: unexpected termination of replication stream: %s"),
@@ -635,6 +677,7 @@ main(int argc, char **argv)
 		{"password", no_argument, NULL, 'W'},
 /* replication options */
 		{"startpos", required_argument, NULL, 'I'},
+		{"endpos", required_argument, NULL, 'E'},
 		{"option", required_argument, NULL, 'o'},
 		{"plugin", required_argument, NULL, 'P'},
 		{"status-interval", required_argument, NULL, 's'},
@@ -670,7 +713,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:o:P:s:S:",
+	while ((c = getopt_long(argc, argv, "f:F:nvd:h:p:U:wWI:E:o:P:s:S:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -730,6 +773,16 @@ main(int argc, char **argv)
 				}
 				startpos = ((uint64) hi) << 32 | lo;
 				break;
+			case 'E':
+				if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
+				{
+					fprintf(stderr,
+							_("%s: could not parse end position \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				endpos = ((uint64) hi) << 32 | lo;
+				break;
 			case 'o':
 				{
 					char	   *data = pg_strdup(optarg);
@@ -854,6 +907,16 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (endpos != InvalidXLogRecPtr && !do_start_slot)
+	{
+		fprintf(stderr,
+				_("%s: cannot use --create-slot or --drop-slot together with --endpos\n"),
+				progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 #ifndef WIN32
 	pqsignal(SIGINT, sigint_handler);
 	pqsignal(SIGHUP, sighup_handler);
@@ -861,8 +924,8 @@ main(int argc, char **argv)
 
 	/*
 	 * Obtain a connection to server. This is not really necessary but it
-	 * helps to get more precise error messages about authentication,
-	 * required GUC parameters and such.
+	 * helps to get more precise error messages about authentication, required
+	 * GUC parameters and such.
 	 */
 	conn = GetConnection();
 	if (!conn)
@@ -920,8 +983,8 @@ main(int argc, char **argv)
 		if (time_to_abort)
 		{
 			/*
-			 * We've been Ctrl-C'ed. That's not an error, so exit without an
-			 * errorcode.
+			 * We've been Ctrl-C'ed or reached an exit limit condition. That's
+			 * not an error, so exit without an errorcode.
 			 */
 			disconnect_and_exit(0);
 		}
@@ -940,3 +1003,47 @@ main(int argc, char **argv)
 		}
 	}
 }
+
+/*
+ * Fsync our output data, and send a feedback message to the server.  Returns
+ * true if successful, false otherwise.
+ *
+ * If successful, *now is updated to the current timestamp just before sending
+ * feedback.
+ */
+static bool
+flushAndSendFeedback(PGconn *conn, TimestampTz *now)
+{
+	/* flush data, so that we send a recent flush pointer */
+	if (!OutputFsync(*now))
+		return false;
+	*now = feGetCurrentTimestamp();
+	if (!sendFeedback(conn, *now, true, false))
+		return false;
+
+	return true;
+}
+
+/*
+ * Try to inform the server about of upcoming demise, but don't wait around or
+ * retry on failure.
+ */
+static void
+prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
+{
+	(void) PQputCopyEnd(conn, NULL);
+	(void) PQflush(conn);
+
+	if (verbose)
+	{
+		if (keepalive)
+			fprintf(stderr, "%s: endpos %X/%X reached by keepalive\n",
+					progname,
+					(uint32) (endpos >> 32), (uint32) endpos);
+		else
+			fprintf(stderr, "%s: endpos %X/%X reached by record at %X/%X\n",
+					progname, (uint32) (endpos >> 32), (uint32) (endpos),
+					(uint32) (lsn >> 32), (uint32) lsn);
+
+	}
+}
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to