From 867251ede310384ec3c9c72487411e634bd006fe Mon Sep 17 00:00:00 2001
From: Dave Cramer <davecramer@gmail.com>
Date: Mon, 3 Dec 2018 06:28:31 -0500
Subject: [PATCH 3/4] Add ability for pg_recvlogical to stop replication from
 client side

---
 src/bin/pg_basebackup/pg_recvlogical.c | 490 +++++++++++++++++++--------------
 1 file changed, 290 insertions(+), 200 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index a242e0b..2d69aa7 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -56,8 +56,12 @@ static const char *plugin = "test_decoding";
 /* Global State */
 static int	outfd = -1;
 static volatile sig_atomic_t time_to_abort = false;
+static volatile sig_atomic_t force_time_to_abort = false;
 static volatile sig_atomic_t output_reopen = false;
+static bool copyDoneSent;
+static bool copyDoneReceived;
 static bool output_isfile;
+static int64 last_status_time;
 static TimestampTz output_last_fsync = -1;
 static bool output_needs_fsync = false;
 static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
@@ -205,6 +209,222 @@ OutputFsync(TimestampTz now)
 	return true;
 }
 
+static bool
+ProcessKeepalive(PGconn *conn, char *msgBuf, int msgLength)
+{
+	int			pos;
+	bool		replyRequested;
+	XLogRecPtr	walEnd;
+
+	/*
+	 * Parse the keepalive message, enclosed in the CopyData message.
+	 * We just check if the server requested a reply, and ignore the
+	 * rest.
+	 */
+	pos = 1;			/* skip msgtype 'k' */
+
+	/* read walEnd */
+	walEnd = fe_recvint64(&msgBuf[pos]);
+	output_written_lsn = Max(walEnd, output_written_lsn);
+
+	pos += 8;			/* skip sendTime */
+
+	if (msgLength < pos + 1)
+	{
+		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+				progname, msgLength);
+		return -1;
+	}
+	replyRequested = msgBuf[pos];
+
+	/* If the server requested an immediate reply, send one. */
+	if (replyRequested)
+	{
+		int64 now = feGetCurrentTimestamp();
+
+		/* fsync data, so we send a recent flush pointer */
+		if (!OutputFsync(now))
+		{
+			return false;
+		}
+
+		if (!sendFeedback(conn, now, true, false))
+		{
+			return false;
+		}
+		last_status_time = now;
+	}
+
+	return true;
+}
+
+static bool
+ProcessXLogData(PGconn *conn, char *msgBuf, int msgLength)
+{
+	int bytes_left;
+	int bytes_written;
+
+	/*
+	 * Read the header of the XLogData message, enclosed in the CopyData
+	 * message. We only need the WAL location field (dataStart), the rest
+	 * of the header is ignored.
+	 */
+	int hdr_len = 1;			/* msgtype 'w' */
+	hdr_len += 8;			/* dataStart */
+	hdr_len += 8;			/* walEnd */
+	hdr_len += 8;			/* sendTime */
+	if (msgLength < hdr_len + 1)
+	{
+		fprintf(stderr, _("%s: streaming header too small: %d\n"),
+				progname, msgLength);
+		return false;
+	}
+
+	if (time_to_abort && copyDoneSent)
+	{
+		/*
+		 * We've sent feedback and sent CopyDone, so we are now discarding
+		 * xlog data input to find the server's reply CopyDone. That way when
+		 * another client connects to the slot later they start replay exactly
+		 * where we left off - or at least at the last commit we flushed to
+		 * disk. This is not an error condition.
+		 */
+		return true;
+	}
+
+	/* Extract WAL location for this block */
+	{
+		XLogRecPtr	temp = fe_recvint64(&msgBuf[1]);
+
+		output_written_lsn = Max(temp, output_written_lsn);
+	}
+
+	bytes_left = msgLength - hdr_len;
+	bytes_written = 0;
+
+	/* signal that a fsync is needed */
+	output_needs_fsync = true;
+
+	while (bytes_left)
+	{
+		int			ret;
+
+		ret = write(outfd,
+					msgBuf + hdr_len + bytes_written,
+					bytes_left);
+
+		if (ret < 0)
+		{
+			fprintf(stderr,
+			  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+					progname, bytes_left, outfile,
+					strerror(errno));
+			return false;
+		}
+
+		/* Write was successful, advance our position */
+		bytes_written += ret;
+		bytes_left -= ret;
+	}
+
+	if (write(outfd, "\n", 1) != 1)
+	{
+		fprintf(stderr,
+			  _("%s: could not write %u bytes to log file \"%s\": %s\n"),
+				progname, 1, outfile,
+				strerror(errno));
+		return false;
+	}
+
+	return true;
+}
+
+static bool
+ProcessReceiveMsg(PGconn *conn, unsigned char type, char *msgBuf, int msgLength)
+{
+	bool success = false;
+	switch (type)
+	{
+		case 'k':
+			success = ProcessKeepalive(conn, msgBuf, msgLength);
+			break;
+		case 'w':
+			success = ProcessXLogData(conn, msgBuf, msgLength);
+			break;
+		default:
+			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+					progname, type);
+	}
+
+	return success;
+}
+
+/*
+ * Sync wait activity on socket. Waiting can be interrupt by fsync or keepalive timeout.
+ * Returns the number of ready descriptors, or -1 for errors.
+ */
+static int
+WaitSocketActivity(PGconn *conn, int64 now)
+{
+	/*
+	 * In async mode, and no data available. We block on reading but
+	 * not more than the specified timeout, so that we can send a
+	 * response back to the client.
+	 */
+	fd_set		input_mask;
+	int64		message_target = 0;
+	int64		fsync_target = 0;
+	struct timeval timeout;
+	struct timeval *timeoutptr = NULL;
+
+	if (PQsocket(conn) < 0)
+	{
+		fprintf(stderr,
+				_("%s: invalid socket: %s"),
+				progname, PQerrorMessage(conn));
+		return -1;
+	}
+
+	FD_ZERO(&input_mask);
+	FD_SET(PQsocket(conn), &input_mask);
+
+	/* Compute when we need to wakeup to send a keepalive message. */
+	if (standby_message_timeout)
+		message_target = last_status_time + (standby_message_timeout - 1) *
+			((int64) 1000);
+
+	/* Compute when we need to wakeup to fsync the output file. */
+	if (fsync_interval > 0 && output_needs_fsync)
+		fsync_target = output_last_fsync + (fsync_interval - 1) *
+			((int64) 1000);
+
+	/* Now compute when to wakeup. */
+	if (message_target > 0 || fsync_target > 0)
+	{
+		int64		targettime;
+		long		secs;
+		int			usecs;
+
+		targettime = message_target;
+
+		if (fsync_target > 0 && fsync_target < targettime)
+			targettime = fsync_target;
+
+		feTimestampDifference(now,
+							  targettime,
+							  &secs,
+							  &usecs);
+		if (secs <= 0)
+			timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+		else
+			timeout.tv_sec = secs;
+		timeout.tv_usec = usecs;
+		timeoutptr = &timeout;
+	}
+
+	return select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+}
+
 /*
  * Start the log streaming
  */
@@ -213,13 +433,14 @@ StreamLogicalLog(void)
 {
 	PGresult   *res;
 	char	   *copybuf = NULL;
-	TimestampTz last_status = -1;
 	int			i;
 	PQExpBuffer query;
 
 	output_written_lsn = InvalidXLogRecPtr;
 	output_fsync_lsn = InvalidXLogRecPtr;
-
+	last_status_time = -1;
+	copyDoneReceived = false;
+	copyDoneSent = false;
 	query = createPQExpBuffer();
 
 	/*
@@ -281,13 +502,10 @@ StreamLogicalLog(void)
 				_("%s: streaming initiated\n"),
 				progname);
 
-	while (!time_to_abort)
+	while (!force_time_to_abort)
 	{
 		int			r;
-		int			bytes_left;
-		int			bytes_written;
 		TimestampTz now;
-		int			hdr_len;
 		XLogRecPtr	cur_record_lsn = InvalidXLogRecPtr;
 
 		if (copybuf != NULL)
@@ -309,15 +527,50 @@ StreamLogicalLog(void)
 				goto error;
 		}
 
-		if (standby_message_timeout > 0 &&
-			feTimestampDifferenceExceeds(last_status, now,
+		if (standby_message_timeout > 0 && !time_to_abort &&
+			feTimestampDifferenceExceeds(last_status_time, now,
 										 standby_message_timeout))
 		{
 			/* Time to send feedback! */
 			if (!sendFeedback(conn, now, true, false))
 				goto error;
 
-			last_status = now;
+			last_status_time = now;
+		}
+
+		if (time_to_abort && !copyDoneSent)
+		{
+			if (verbose)
+			{
+				fprintf(stderr,
+						_("%s: stopping write up to %X/%X, flush to %X/%X (slot %s)\n"),
+						progname,
+						(uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
+						(uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
+						replication_slot);
+			}
+
+			/*
+			 * Force fsync and send feedback just before we send CopyDone to
+			 * make sure the server knows exactly what we replayed up to. We'll
+			 * discard data received after we request the end of COPY BOTH mode
+			 * so we know we've written everything we're going to.
+			 */
+			if (!OutputFsync(now))
+				goto error;
+
+			if (!sendFeedback(conn, now, true, false))
+				goto error;
+
+			last_status_time = now;
+
+			if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+			{
+				fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+						progname, PQerrorMessage(conn));
+				goto error;
+			}
+			copyDoneSent = true;
 		}
 
 		/* got SIGHUP, close output file */
@@ -360,64 +613,9 @@ StreamLogicalLog(void)
 		r = PQgetCopyData(conn, &copybuf, 1);
 		if (r == 0)
 		{
-			/*
-			 * In async mode, and no data available. We block on reading but
-			 * not more than the specified timeout, so that we can send a
-			 * response back to the client.
-			 */
-			fd_set		input_mask;
-			TimestampTz message_target = 0;
-			TimestampTz fsync_target = 0;
-			struct timeval timeout;
-			struct timeval *timeoutptr = NULL;
-
-			if (PQsocket(conn) < 0)
-			{
-				fprintf(stderr,
-						_("%s: invalid socket: %s"),
-						progname, PQerrorMessage(conn));
-				goto error;
-			}
-
-			FD_ZERO(&input_mask);
-			FD_SET(PQsocket(conn), &input_mask);
-
-			/* Compute when we need to wakeup to send a keepalive message. */
-			if (standby_message_timeout)
-				message_target = last_status + (standby_message_timeout - 1) *
-					((int64) 1000);
-
-			/* Compute when we need to wakeup to fsync the output file. */
-			if (fsync_interval > 0 && output_needs_fsync)
-				fsync_target = output_last_fsync + (fsync_interval - 1) *
-					((int64) 1000);
-
-			/* Now compute when to wakeup. */
-			if (message_target > 0 || fsync_target > 0)
-			{
-				TimestampTz targettime;
-				long		secs;
-				int			usecs;
-
-				targettime = message_target;
-
-				if (fsync_target > 0 && fsync_target < targettime)
-					targettime = fsync_target;
-
-				feTimestampDifference(now,
-									  targettime,
-									  &secs,
-									  &usecs);
-				if (secs <= 0)
-					timeout.tv_sec = 1; /* Always sleep at least 1 sec */
-				else
-					timeout.tv_sec = secs;
-				timeout.tv_usec = usecs;
-				timeoutptr = &timeout;
-			}
-
-			r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
-			if (r == 0 || (r < 0 && errno == EINTR))
+ 			int readyMsg = WaitSocketActivity(conn, now);
+  
+ 			if (readyMsg == 0 || (readyMsg < 0 && errno == EINTR))
 			{
 				/*
 				 * Got a timeout or signal. Continue the loop and either
@@ -426,7 +624,7 @@ StreamLogicalLog(void)
 				 */
 				continue;
 			}
-			else if (r < 0)
+			else if (readyMsg < 0)
 			{
 				fprintf(stderr, _("%s: select() failed: %s\n"),
 						progname, strerror(errno));
@@ -441,12 +639,26 @@ StreamLogicalLog(void)
 						progname, PQerrorMessage(conn));
 				goto error;
 			}
+
 			continue;
 		}
 
-		/* End of copy stream */
+		/*
+		 * End of copy stream (server sent CopyDone)
+		 *
+		 * This is where we exit on normal time_to_abort because our own
+		 * CopyDone caused the server to shut down streaming on its end.
+		 */
 		if (r == -1)
+		{
+			copyDoneReceived = true;
+			if (verbose && time_to_abort && copyDoneSent)
+			{
+				fprintf(stderr,
+						_("%s: streaming ended by user request"), progname);
+			}
 			break;
+		}
 
 		/* Failure while reading the copy stream */
 		if (r == -2)
@@ -456,138 +668,8 @@ StreamLogicalLog(void)
 			goto error;
 		}
 
-		/* Check the message type. */
-		if (copybuf[0] == 'k')
-		{
-			int			pos;
-			bool		replyRequested;
-			XLogRecPtr	walEnd;
-			bool		endposReached = false;
-
-			/*
-			 * Parse the keepalive message, enclosed in the CopyData message.
-			 * We just check if the server requested a reply, and ignore the
-			 * rest.
-			 */
-			pos = 1;			/* skip msgtype 'k' */
-			walEnd = fe_recvint64(&copybuf[pos]);
-			output_written_lsn = Max(walEnd, output_written_lsn);
-
-			pos += 8;			/* read walEnd */
-
-			pos += 8;			/* skip sendTime */
-
-			if (r < pos + 1)
-			{
-				fprintf(stderr, _("%s: streaming header too small: %d\n"),
-						progname, r);
-				goto error;
-			}
-			replyRequested = copybuf[pos];
-
-			if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
-			{
-				/*
-				 * If there's nothing to read on the socket until a keepalive
-				 * we know that the server has nothing to send us; and if
-				 * walEnd has passed endpos, we know nothing else can have
-				 * committed before endpos.  So we can bail out now.
-				 */
-				endposReached = true;
-			}
-
-			/* Send a reply, if necessary */
-			if (replyRequested || endposReached)
-			{
-				if (!flushAndSendFeedback(conn, &now))
-					goto error;
-				last_status = now;
-			}
-
-			if (endposReached)
-			{
-				prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
-				time_to_abort = true;
-				break;
-			}
-
-			continue;
-		}
-		else if (copybuf[0] != 'w')
-		{
-			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
-					progname, copybuf[0]);
-			goto error;
-		}
-
-		/*
-		 * Read the header of the XLogData message, enclosed in the CopyData
-		 * message. We only need the WAL location field (dataStart), the rest
-		 * of the header is ignored.
-		 */
-		hdr_len = 1;			/* msgtype 'w' */
-		hdr_len += 8;			/* dataStart */
-		hdr_len += 8;			/* walEnd */
-		hdr_len += 8;			/* sendTime */
-		if (r < hdr_len + 1)
+		if(!ProcessReceiveMsg(conn, copybuf[0], copybuf, r))
 		{
-			fprintf(stderr, _("%s: streaming header too small: %d\n"),
-					progname, r);
-			goto error;
-		}
-
-		/* Extract WAL location for this block */
-		cur_record_lsn = fe_recvint64(&copybuf[1]);
-
-		if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
-		{
-			/*
-			 * We've read past our endpoint, so prepare to go away being
-			 * cautious about what happens to our output data.
-			 */
-			if (!flushAndSendFeedback(conn, &now))
-				goto error;
-			prepareToTerminate(conn, endpos, false, cur_record_lsn);
-			time_to_abort = true;
-			break;
-		}
-
-		output_written_lsn = Max(cur_record_lsn, output_written_lsn);
-
-		bytes_left = r - hdr_len;
-		bytes_written = 0;
-
-		/* signal that a fsync is needed */
-		output_needs_fsync = true;
-
-		while (bytes_left)
-		{
-			int			ret;
-
-			ret = write(outfd,
-						copybuf + hdr_len + bytes_written,
-						bytes_left);
-
-			if (ret < 0)
-			{
-				fprintf(stderr,
-						_("%s: could not write %u bytes to log file \"%s\": %s\n"),
-						progname, bytes_left, outfile,
-						strerror(errno));
-				goto error;
-			}
-
-			/* Write was successful, advance our position */
-			bytes_written += ret;
-			bytes_left -= ret;
-		}
-
-		if (write(outfd, "\n", 1) != 1)
-		{
-			fprintf(stderr,
-					_("%s: could not write %u bytes to log file \"%s\": %s\n"),
-					progname, 1, outfile,
-					strerror(errno));
 			goto error;
 		}
 
@@ -656,6 +738,14 @@ error:
 static void
 sigint_handler(int signum)
 {
+	/*
+	 * Backward compatible, allow force interrupt logical replication
+	 * after second SIGINT without wait CopyDone from server
+	 */
+	if (time_to_abort)
+	{
+		force_time_to_abort = true;
+	}
 	time_to_abort = true;
 }
 
-- 
2.6.4

