From 9d0820124bca27549a50165b39a3c3f8a9ce6815 Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Date: Wed, 19 Jul 2023 06:34:40 +0000
Subject: [PATCH v6] Fix pg_recvlogical error message upon SIGINT/SIGTERM

When pg_recvlogical needs to abort on a signal like SIGINT/SIGTERM, it
is expected to exit cleanly.  However, the code forgot to clean up the
state of the connection befor leaving.  This would cause the tool to
emit messages like "unexpected termination of replication stream" error,
which is meant for really unexpected termination or a crash.

The code is refactored to apply the same termination abort operations for
signals, end LSN and keepalive cases.

Reported-by: Andres Freund
Author: Bharath Rupireddy
Reviewed-by: Kyotaro Horiguchi, Andres Freund, Cary Huang
Discussion: https://www.postgresql.org/message-id/20221019213953.htdtzikf4f45ywil%40awork3.anarazel.de
---
 src/bin/pg_basebackup/pg_recvlogical.c | 51 ++++++++++++++++++++------
 1 file changed, 39 insertions(+), 12 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c
index f3c7937a1d..973c2e6a5b 100644
--- a/src/bin/pg_basebackup/pg_recvlogical.c
+++ b/src/bin/pg_basebackup/pg_recvlogical.c
@@ -32,6 +32,14 @@
 /* Time to sleep between reconnection attempts */
 #define RECONNECT_SLEEP_TIME 5
 
+typedef enum
+{
+	STREAM_STOP_NONE,
+	STREAM_STOP_END_OF_WAL,
+	STREAM_STOP_KEEPALIVE,
+	STREAM_STOP_SIGNAL
+}			StreamStopReason;
+
 /* Global Options */
 static char *outfile = NULL;
 static int	verbose = 0;
@@ -55,6 +63,7 @@ static const char *plugin = "test_decoding";
 /* Global State */
 static int	outfd = -1;
 static volatile sig_atomic_t time_to_abort = false;
+static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
 static volatile sig_atomic_t output_reopen = false;
 static bool output_isfile;
 static TimestampTz output_last_fsync = -1;
@@ -66,7 +75,7 @@ static void usage(void);
 static void StreamLogicalLog(void);
 static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
 static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
-							   bool keepalive, XLogRecPtr lsn);
+							   XLogRecPtr lsn);
 
 static void
 usage(void)
@@ -207,6 +216,7 @@ StreamLogicalLog(void)
 	TimestampTz last_status = -1;
 	int			i;
 	PQExpBuffer query;
+	XLogRecPtr	cur_record_lsn;
 
 	output_written_lsn = InvalidXLogRecPtr;
 	output_fsync_lsn = InvalidXLogRecPtr;
@@ -275,7 +285,8 @@ StreamLogicalLog(void)
 		int			bytes_written;
 		TimestampTz now;
 		int			hdr_len;
-		XLogRecPtr	cur_record_lsn = InvalidXLogRecPtr;
+
+		cur_record_lsn = InvalidXLogRecPtr;
 
 		if (copybuf != NULL)
 		{
@@ -487,7 +498,7 @@ StreamLogicalLog(void)
 
 			if (endposReached)
 			{
-				prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
+				stop_reason = STREAM_STOP_KEEPALIVE;
 				time_to_abort = true;
 				break;
 			}
@@ -527,7 +538,7 @@ StreamLogicalLog(void)
 			 */
 			if (!flushAndSendFeedback(conn, &now))
 				goto error;
-			prepareToTerminate(conn, endpos, false, cur_record_lsn);
+			stop_reason = STREAM_STOP_END_OF_WAL;
 			time_to_abort = true;
 			break;
 		}
@@ -572,12 +583,16 @@ StreamLogicalLog(void)
 			/* endpos was exactly the record we just processed, we're done */
 			if (!flushAndSendFeedback(conn, &now))
 				goto error;
-			prepareToTerminate(conn, endpos, false, cur_record_lsn);
+			stop_reason = STREAM_STOP_END_OF_WAL;
 			time_to_abort = true;
 			break;
 		}
 	}
 
+	/* Clean up connection state if stream has been aborted */
+	if (time_to_abort)
+		prepareToTerminate(conn, endpos, cur_record_lsn);
+
 	res = PQgetResult(conn);
 	if (PQresultStatus(res) == PGRES_COPY_OUT)
 	{
@@ -656,6 +671,7 @@ error:
 static void
 sigexit_handler(SIGNAL_ARGS)
 {
+	stop_reason = STREAM_STOP_SIGNAL;
 	time_to_abort = true;
 }
 
@@ -1021,18 +1037,29 @@ flushAndSendFeedback(PGconn *conn, TimestampTz *now)
  * retry on failure.
  */
 static void
-prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
+prepareToTerminate(PGconn *conn, XLogRecPtr endpos, XLogRecPtr lsn)
 {
 	(void) PQputCopyEnd(conn, NULL);
 	(void) PQflush(conn);
 
+	Assert(stop_reason != STREAM_STOP_NONE);
+
 	if (verbose)
 	{
-		if (keepalive)
-			pg_log_info("end position %X/%X reached by keepalive",
-						LSN_FORMAT_ARGS(endpos));
-		else
-			pg_log_info("end position %X/%X reached by WAL record at %X/%X",
-						LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
+		switch (stop_reason)
+		{
+			case STREAM_STOP_SIGNAL:
+				pg_log_info("received interrupt signal, exiting");
+				break;
+			case STREAM_STOP_KEEPALIVE:
+				pg_log_info("end position %X/%X reached by keepalive",
+							LSN_FORMAT_ARGS(endpos));
+				break;
+			case STREAM_STOP_END_OF_WAL:
+				Assert(!XLogRecPtrIsInvalid(lsn));
+				pg_log_info("end position %X/%X reached by WAL record at %X/%X",
+							LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
+				break;
+		}
 	}
 }
-- 
2.34.1

