From 3dec722f7a6ae063af55e71609f6429b3f19d675 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 8 Feb 2023 09:09:31 +0000
Subject: [PATCH v6 2/2] Extend START_REPLICATION command to accept walsender
 options

This commit extends START_REPLICATION to accept SHUTDOWN_MODE term. Currently,
it works well only for logical replication.

When 'wait_flush', which is the default, is specified, the walsender will wait
for all the sent WALs to be flushed on the subscriber side, before exiting the
process. 'immediate' will exit without confirming the remote flush. This may
break the consistency between publisher and subscriber, but it may be useful
for a system that has a high-latency network to reduce the amount of time for
shutdown. This may be useful to shut down the publisher even when the
worker is stuck.

Author: Hayato Kuroda
Discussion: https://postgr.es/m/TYAPR01MB586668E50FC2447AD7F92491F5E89%40TYAPR01MB5866.jpnprd01.prod.outlook.com
---
 doc/src/sgml/protocol.sgml                    | 15 ++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |  7 ++
 src/backend/replication/logical/worker.c      | 17 ++++-
 src/backend/replication/repl_gram.y           | 12 +++-
 src/backend/replication/repl_scanner.l        |  1 +
 src/backend/replication/walreceiver.c         |  1 +
 src/backend/replication/walsender.c           | 67 ++++++++++++++++++-
 src/include/nodes/replnodes.h                 |  1 +
 src/include/replication/walreceiver.h         |  1 +
 src/test/subscription/t/001_rep_changes.pl    |  7 +-
 src/tools/pgindent/typedefs.list              |  1 +
 11 files changed, 122 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 93fc7167d4..43e71421de 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2500,7 +2500,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
     </varlistentry>
 
     <varlistentry id="protocol-replication-start-replication-slot-logical">
-     <term><literal>START_REPLICATION</literal> <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> <literal>LOGICAL</literal> <replaceable class="parameter">XXX/XXX</replaceable> [ ( <replaceable>option_name</replaceable> [ <replaceable>option_value</replaceable> ] [, ...] ) ]</term>
+     <term><literal>START_REPLICATION</literal> <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> <literal>LOGICAL</literal> <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>SHUTDOWN_MODE</literal> <replaceable class="parameter">shutdown_mode</replaceable> ] [ ( <replaceable>option_name</replaceable> [ <replaceable>option_value</replaceable> ] [, ...] ) ]</term>
      <listitem>
       <para>
        Instructs server to start streaming WAL for logical replication,
@@ -2555,6 +2555,19 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
         </listitem>
        </varlistentry>
 
+       <varlistentry>
+        <term><literal>SHUTDOWN_MODE { 'wait_flush' | 'immediate' }</literal></term>
+        <listitem>
+         <para>
+          Decides the behavior of the walsender process at shutdown. If the
+          shutdown mode is <literal>'wait_flush'</literal>, which is the
+          default, the walsender waits for all the sent WALs to be flushed
+          on the subscriber side. If it is <literal>'immediate'</literal>,
+          the walsender exits without confirming the remote flush.
+         </para>
+        </listitem>
+       </varlistentry>
+
        <varlistentry>
         <term><replaceable class="parameter">option_name</replaceable></term>
         <listitem>
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 560ec974fa..18f6e09cfd 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -403,6 +403,12 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		List	   *pubnames;
 		char	   *pubnames_literal;
 
+		/* Add SHUTDOWN_MODE option if needed */
+		if (options->shutdown_mode &&
+			PQserverVersion(conn->streamConn) >= 160000)
+			appendStringInfo(&cmd, " SHUTDOWN_MODE '%s'",
+							 options->shutdown_mode);
+
 		appendStringInfoString(&cmd, " (");
 
 		appendStringInfo(&cmd, "proto_version '%u'",
@@ -449,6 +455,7 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 		appendStringInfo(&cmd, " TIMELINE %u",
 						 options->proto.physical.startpointTLI);
 
+
 	/* Start streaming. */
 	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
 	pfree(cmd.data);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 19b0574ad0..967252fdbb 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4023,10 +4023,15 @@ maybe_reread_subscription(void)
 
 	/*
 	 * Exit if any parameter that affects the remote connection was changed.
+	 *
 	 * The launcher will start a new worker but note that the parallel apply
 	 * worker won't restart if the streaming option's value is changed from
 	 * 'parallel' to any other value or the server decides not to stream the
 	 * in-progress transaction.
+	 *
+	 * minapplydelay affects SHUTDOWN_MODE option. 'immediate' shutdown mode
+	 * will be specified if it is set to non-zero, otherwise default mode will
+	 * be set.
 	 */
 	if (strcmp(newsub->conninfo, MySubscription->conninfo) != 0 ||
 		strcmp(newsub->name, MySubscription->name) != 0 ||
@@ -4035,7 +4040,8 @@ maybe_reread_subscription(void)
 		newsub->stream != MySubscription->stream ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
-		!equal(newsub->publications, MySubscription->publications))
+		!equal(newsub->publications, MySubscription->publications) ||
+		(newsub->minapplydelay == 0) != (MySubscription->minapplydelay == 0))
 	{
 		if (am_parallel_apply_worker())
 			ereport(LOG,
@@ -4719,6 +4725,7 @@ ApplyWorkerMain(Datum main_arg)
 	options.logical = true;
 	options.startpoint = origin_startpos;
 	options.slotname = myslotname;
+	options.shutdown_mode = NULL;
 
 	server_version = walrcv_server_version(LogRepWorkerWalRcvConn);
 	options.proto.logical.proto_version =
@@ -4757,6 +4764,14 @@ ApplyWorkerMain(Datum main_arg)
 
 	if (!am_tablesync_worker())
 	{
+		/*
+		 * time-delayed logical replication does not support tablesync
+		 * workers, so only the leader apply worker can request walsenders to
+		 * exit before confirming remote flush.
+		 */
+		if (server_version >= 160000 && MySubscription->minapplydelay > 0)
+			options.shutdown_mode = pstrdup("immediate");
+
 		/*
 		 * Even when the two_phase mode is requested by the user, it remains
 		 * as the tri-state PENDING until all tablesyncs have reached READY
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 0c874e33cf..54450a041a 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -76,6 +76,7 @@ Node *replication_parse_result;
 %token K_EXPORT_SNAPSHOT
 %token K_NOEXPORT_SNAPSHOT
 %token K_USE_SNAPSHOT
+%token K_SHUTDOWN_MODE
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
@@ -91,6 +92,7 @@ Node *replication_parse_result;
 %type <boolval>	opt_temporary
 %type <list>	create_slot_options create_slot_legacy_opt_list
 %type <defelt>	create_slot_legacy_opt
+%type <str>	opt_shutdown_mode
 
 %%
 
@@ -270,20 +272,22 @@ start_replication:
 					cmd->slotname = $2;
 					cmd->startpoint = $4;
 					cmd->timeline = $5;
+					cmd->shutdownmode = NULL;
 					$$ = (Node *) cmd;
 				}
 			;
 
 /* START_REPLICATION SLOT slot LOGICAL %X/%X options */
 start_logical_replication:
-			K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR plugin_options
+			K_START_REPLICATION K_SLOT IDENT K_LOGICAL RECPTR opt_shutdown_mode plugin_options
 				{
 					StartReplicationCmd *cmd;
 					cmd = makeNode(StartReplicationCmd);
 					cmd->kind = REPLICATION_KIND_LOGICAL;
 					cmd->slotname = $3;
 					cmd->startpoint = $5;
-					cmd->options = $6;
+					cmd->shutdownmode = $6;
+					cmd->options = $7;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -336,6 +340,10 @@ opt_timeline:
 				| /* EMPTY */			{ $$ = 0; }
 			;
 
+opt_shutdown_mode:
+			K_SHUTDOWN_MODE SCONST			{ $$ = $2; }
+			| /* EMPTY */					{ $$ = NULL; }
+		;
 
 plugin_options:
 			'(' plugin_opt_list ')'			{ $$ = $2; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index cb467ca46f..fcc6f6feda 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -136,6 +136,7 @@ EXPORT_SNAPSHOT		{ return K_EXPORT_SNAPSHOT; }
 NOEXPORT_SNAPSHOT	{ return K_NOEXPORT_SNAPSHOT; }
 USE_SNAPSHOT		{ return K_USE_SNAPSHOT; }
 WAIT				{ return K_WAIT; }
+SHUTDOWN_MODE		{ return K_SHUTDOWN_MODE; }
 
 {space}+		{ /* do nothing */ }
 
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index f6446da2d6..cfce9d93ef 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -409,6 +409,7 @@ WalReceiverMain(void)
 		options.logical = false;
 		options.startpoint = startpoint;
 		options.slotname = slotname[0] != '\0' ? slotname : NULL;
+		options.shutdown_mode = NULL;
 		options.proto.physical.startpointTLI = startpointTLI;
 		if (walrcv_startstreaming(wrconn, &options))
 		{
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 75e8363e24..d169092bf7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -219,6 +219,15 @@ typedef struct
 
 static LagTracker *lag_tracker;
 
+/* Indicator for specifying the shutdown mode */
+typedef enum
+{
+	WALSND_SHUTDOWN_MODE_WAIT_FLUSH = 0,
+	WALSND_SHUTDOWN_MODE_IMMIDEATE
+} WalSndShutdownMode;
+
+static WalSndShutdownMode shutdown_mode = WALSND_SHUTDOWN_MODE_WAIT_FLUSH;
+
 /* Signal handlers */
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
@@ -260,6 +269,8 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
 							  TimeLineID *tli_p);
 
+static void CheckWalSndOptions(const StartReplicationCmd *cmd);
+static void ParseShutdownMode(char *shutdownmode);
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -1272,6 +1283,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 		got_STOPPING = true;
 	}
 
+	/* Check given options and set flags accordingly */
+	CheckWalSndOptions(cmd);
+
 	/*
 	 * Create our decoding context, making it start at the previously ack'ed
 	 * position.
@@ -1450,6 +1464,16 @@ ProcessPendingWrites(void)
 		/* Try to flush pending output to the client */
 		if (pq_flush_if_writable() != 0)
 			WalSndShutdown();
+
+		/*
+		 * In this function, there is a possibility that the walsender is
+		 * stuck. It is caused when the opposite worker is stuck and then the
+		 * send-buffer of the walsender becomes full. Therefore, we must add
+		 * an additional path for shutdown for immediate shutdown mode.
+		 */
+		if (shutdown_mode == WALSND_SHUTDOWN_MODE_IMMIDEATE &&
+			got_STOPPING)
+			WalSndDone(XLogSendLogical);
 	}
 
 	/* reactivate latch so WalSndLoop knows to continue */
@@ -3114,19 +3138,25 @@ WalSndDone(WalSndSendDataCallback send_data)
 	 * To figure out whether all WAL has successfully been replicated, check
 	 * flush location if valid, write otherwise. Tools like pg_receivewal will
 	 * usually (unless in synchronous mode) return an invalid flush location.
+	 *
+	 * If we are in the immediate shutdown mode, flush location and output
+	 * buffer is not checked. This may break the consistency between nodes,
+	 * but it may be useful for the system that has high-latency network to
+	 * reduce the amount of time for shutdown.
 	 */
 	replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
 		MyWalSnd->write : MyWalSnd->flush;
 
-	if (WalSndCaughtUp && sentPtr == replicatedPtr &&
-		!pq_is_send_pending())
+	if (WalSndCaughtUp &&
+		(shutdown_mode == WALSND_SHUTDOWN_MODE_IMMIDEATE ||
+		 (sentPtr == replicatedPtr && !pq_is_send_pending())))
 	{
 		QueryCompletion qc;
 
 		/* Inform the standby that XLOG streaming is done */
 		SetQueryCompletion(&qc, CMDTAG_COPY, 0);
 		EndCommand(&qc, DestRemote, false);
-		pq_flush();
+		pq_flush_if_writable();
 
 		proc_exit(0);
 	}
@@ -3849,3 +3879,34 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
 	Assert(time != 0);
 	return now - time;
 }
+
+/*
+ * Check options for walsender itself and set flags accordingly.
+ *
+ * Currently only one option is accepted.
+ */
+static void
+CheckWalSndOptions(const StartReplicationCmd *cmd)
+{
+	if (cmd->shutdownmode)
+		ParseShutdownMode(cmd->shutdownmode);
+}
+
+/*
+ * Parse given shutdown mode.
+ *
+ * Currently two values are accepted - "wait_flush" and "immediate"
+ */
+static void
+ParseShutdownMode(char *shutdownmode)
+{
+	if (pg_strcasecmp(shutdownmode, "wait_flush") == 0)
+		shutdown_mode = WALSND_SHUTDOWN_MODE_WAIT_FLUSH;
+	else if (pg_strcasecmp(shutdownmode, "immediate") == 0)
+		shutdown_mode = WALSND_SHUTDOWN_MODE_IMMIDEATE;
+	else
+		ereport(ERROR,
+				errcode(ERRCODE_SYNTAX_ERROR),
+				errmsg("invalid value for shutdown mode: \"%s\"", shutdownmode),
+				errhint("Available values: wait_flush, immediate."));
+}
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 4321ba8f86..c96e85e859 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -83,6 +83,7 @@ typedef struct StartReplicationCmd
 	char	   *slotname;
 	TimeLineID	timeline;
 	XLogRecPtr	startpoint;
+	char	   *shutdownmode;
 	List	   *options;
 } StartReplicationCmd;
 
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index decffe352d..ef6297da52 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -170,6 +170,7 @@ typedef struct
 								 * false if physical stream.  */
 	char	   *slotname;		/* Name of the replication slot or NULL. */
 	XLogRecPtr	startpoint;		/* LSN of starting point. */
+	char	   *shutdown_mode;	/* Name of specified shutdown name */
 
 	union
 	{
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index 75fd77b891..0aae5f5dd2 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -523,10 +523,15 @@ $node_publisher->poll_query_until('postgres',
 # inserted on the publisher, and b) when those changes are replicated on the
 # subscriber. Even on slow machines, this strategy will give predictable behavior.
 
-# Set min_apply_delay parameter to 3 seconds
+# Check restart on changing min_apply_delay to 3 seconds
 my $delay = 3;
 $node_subscriber->safe_psql('postgres',
 	"ALTER SUBSCRIPTION tap_sub_renamed SET (min_apply_delay = '${delay}s')");
+$node_publisher->poll_query_until('postgres',
+	"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub_renamed' AND state = 'streaming';"
+  )
+  or die
+  "Timed out while waiting for the walsender to restart after changing min_apply_delay to non-zero value";
 
 # Before doing the insertion, get the current timestamp that will be
 # used as a comparison base.
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 36d1dc0117..d06a7868ca 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2976,6 +2976,7 @@ WalReceiverFunctionsType
 WalSnd
 WalSndCtlData
 WalSndSendDataCallback
+WalSndShutdownMode
 WalSndState
 WalTimeSample
 WalUsage
-- 
2.27.0

