From 5406a52c1f827f5cbf44399f52f9c0192ffc52d3 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 7 Feb 2023 05:38:20 +0000
Subject: [PATCH 2/2] Extend START_REPLICATION command to accept walsender
 options

This commit extends START_REPLICATION to accept options of walsender. Currently,
only one option exit_before_confirming is accepted.

For physical replication, the grammer of START_REPLICATION is extended to accept
options. Note that in the normal phyical replication the added option is never
used.

For logical replication, the option list for logical decoding plugin is reused for
storing walsender options. When the min_apply_delay parameter is set for a
subscription, the apply worker related with it will send START_REPLICATION query
with exit_before_confirming = true to publisher node.

This option allows primay servers to shut down even if there are pending WALs to
be sent or sent WALs are not flushed on the secondary. This may be useful to
shut down the primary even when the walreceiver/worker is stuck.

Author: Hayato Kuroda
Discussion: https://postgr.es/m/TYAPR01MB586668E50FC2447AD7F92491F5E89%40TYAPR01MB5866.jpnprd01.prod.outlook.com
---
 doc/src/sgml/protocol.sgml                    | 21 ++++-
 .../libpqwalreceiver/libpqwalreceiver.c       |  4 +
 src/backend/replication/logical/worker.c      | 13 ++-
 src/backend/replication/repl_gram.y           |  8 +-
 src/backend/replication/walsender.c           | 87 ++++++++++++++++++-
 src/include/replication/walreceiver.h         |  1 +
 src/test/subscription/t/001_rep_changes.pl    | 10 ++-
 src/tools/pgindent/typedefs.list              |  1 +
 8 files changed, 138 insertions(+), 7 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 93fc7167d4..9c84d57cfb 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2192,7 +2192,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
     </varlistentry>
 
     <varlistentry id="protocol-replication-start-replication">
-     <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ]
+     <term><literal>START_REPLICATION</literal> [ <literal>SLOT</literal> <replaceable class="parameter">slot_name</replaceable> ] [ <literal>PHYSICAL</literal> ] <replaceable class="parameter">XXX/XXX</replaceable> [ <literal>TIMELINE</literal> <replaceable class="parameter">tli</replaceable> ] [ ( <replaceable>option_name</replaceable> [ <replaceable>option_value</replaceable> ] [, ...] ) ]
       <indexterm><primary>START_REPLICATION</primary></indexterm>
      </term>
      <listitem>
@@ -2496,6 +2496,25 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
         </listitem>
        </varlistentry>
       </variablelist>
+
+      <para>
+       If further options are given, we can control the behavior of the
+       walsender more detailed. Currently the following option is accepted:
+      </para>
+
+      <variablelist>
+       <varlistentry>
+        <term>exit_before_confirming</term>
+        <listitem>
+         <para>
+          If set to true, the walsender will exit before confirming the remote
+          flush of WALs at shutdown. This can be useful when the network lag
+          between nodes are large and it takes time to shut down the server.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+
      </listitem>
     </varlistentry>
 
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 560ec974fa..8bf8e03063 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -443,6 +443,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
 			PQserverVersion(conn->streamConn) >= 140000)
 			appendStringInfoString(&cmd, ", binary 'true'");
 
+		if (options->proto.logical.exit_before_confirming &&
+			PQserverVersion(conn->streamConn) >= 160000)
+			appendStringInfoString(&cmd, ", exit_before_confirming 'true'");
+
 		appendStringInfoChar(&cmd, ')');
 	}
 	else
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c574531040..d768bafd3e 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4034,7 +4034,9 @@ maybe_reread_subscription(void)
 		newsub->stream != MySubscription->stream ||
 		strcmp(newsub->origin, MySubscription->origin) != 0 ||
 		newsub->owner != MySubscription->owner ||
-		!equal(newsub->publications, MySubscription->publications))
+		!equal(newsub->publications, MySubscription->publications) ||
+		(newsub->minapplydelay > 0 && MySubscription->minapplydelay == 0) ||
+		(newsub->minapplydelay == 0 && MySubscription->minapplydelay > 0))
 	{
 		if (am_parallel_apply_worker())
 			ereport(LOG,
@@ -4756,6 +4758,15 @@ ApplyWorkerMain(Datum main_arg)
 
 	if (!am_tablesync_worker())
 	{
+		/*
+		 * time-delayed logical replication does not support tablesync
+		 * workers, so only the leader apply worker can request walsenders to
+		 * exit before confirming remote flush.
+		 */
+		if (server_version >= 160000)
+			options.proto.logical.exit_before_confirming =
+				MySubscription->minapplydelay > 0;
+
 		/*
 		 * Even when the two_phase mode is requested by the user, it remains
 		 * as the tri-state PENDING until all tablesyncs have reached READY
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 0c874e33cf..1705d52a58 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -91,6 +91,7 @@ Node *replication_parse_result;
 %type <boolval>	opt_temporary
 %type <list>	create_slot_options create_slot_legacy_opt_list
 %type <defelt>	create_slot_legacy_opt
+%type <list>	walsender_options
 
 %%
 
@@ -261,7 +262,7 @@ drop_replication_slot:
  * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
  */
 start_replication:
-			K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline
+			K_START_REPLICATION opt_slot opt_physical RECPTR opt_timeline walsender_options
 				{
 					StartReplicationCmd *cmd;
 
@@ -270,6 +271,7 @@ start_replication:
 					cmd->slotname = $2;
 					cmd->startpoint = $4;
 					cmd->timeline = $5;
+					cmd->options = $6;
 					$$ = (Node *) cmd;
 				}
 			;
@@ -336,6 +338,10 @@ opt_timeline:
 				| /* EMPTY */			{ $$ = 0; }
 			;
 
+walsender_options:
+			'(' generic_option_list ')'			{ $$ = $2; }
+			| /* EMPTY */					{ $$ = NIL; }
+		;
 
 plugin_options:
 			'(' plugin_opt_list ')'			{ $$ = $2; }
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4ed3747e3f..1bbcb8adf1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -219,6 +219,22 @@ typedef struct
 
 static LagTracker *lag_tracker;
 
+/*
+ * If set to true, the walsender will exit before confirming flush of remote
+ * WALs and whether the send buffer is empty.
+ */
+static bool exit_before_confirming = false;
+
+/*
+ * Options for controlling the behavior of the walsender. Options can be
+ * specified in the START_STREAMING replication command. Currently only one
+ * option is allowed.
+ */
+typedef struct
+{
+	bool		exit_before_confirming;
+} WalSndData;
+
 /* Signal handlers */
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
@@ -260,6 +276,7 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
 static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
 							  TimeLineID *tli_p);
 
+static void ConsumeWalsenderOptions(List *options, WalSndData *data);
 
 /* Initialize walsender process before entering the main command loop */
 void
@@ -672,6 +689,7 @@ StartReplication(StartReplicationCmd *cmd)
 	StringInfoData buf;
 	XLogRecPtr	FlushPtr;
 	TimeLineID	FlushTLI;
+	WalSndData	data;
 
 	/* create xlogreader for physical replication */
 	xlogreader =
@@ -710,6 +728,12 @@ StartReplication(StartReplicationCmd *cmd)
 		 */
 	}
 
+	/* Check given options and set flags accordingly */
+	ConsumeWalsenderOptions(cmd->options, &data);
+
+	if (data.exit_before_confirming)
+		exit_before_confirming = true;
+
 	/*
 	 * Select the timeline. If it was given explicitly by the client, use
 	 * that. Otherwise use the timeline of the last replayed record.
@@ -1245,6 +1269,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 {
 	StringInfoData buf;
 	QueryCompletion qc;
+	WalSndData	data;
 
 	/* make sure that our requirements are still fulfilled */
 	CheckLogicalDecodingRequirements();
@@ -1272,6 +1297,12 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 		got_STOPPING = true;
 	}
 
+	/* Check given options and set flags accordingly */
+	ConsumeWalsenderOptions(cmd->options, &data);
+
+	if (data.exit_before_confirming)
+		exit_before_confirming = true;
+
 	/*
 	 * Create our decoding context, making it start at the previously ack'ed
 	 * position.
@@ -1450,6 +1481,9 @@ ProcessPendingWrites(void)
 		/* Try to flush pending output to the client */
 		if (pq_flush_if_writable() != 0)
 			WalSndShutdown();
+
+		if (exit_before_confirming)
+			WalSndDone(XLogSendLogical);
 	}
 
 	/* reactivate latch so WalSndLoop knows to continue */
@@ -3118,15 +3152,16 @@ WalSndDone(WalSndSendDataCallback send_data)
 	replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
 		MyWalSnd->write : MyWalSnd->flush;
 
-	if (WalSndCaughtUp && sentPtr == replicatedPtr &&
-		!pq_is_send_pending())
+	if (WalSndCaughtUp &&
+		(exit_before_confirming ||
+		 (sentPtr == replicatedPtr && !pq_is_send_pending())))
 	{
 		QueryCompletion qc;
 
 		/* Inform the standby that XLOG streaming is done */
 		SetQueryCompletion(&qc, CMDTAG_COPY, 0);
 		EndCommand(&qc, DestRemote, false);
-		pq_flush();
+		pq_flush_if_writable();
 
 		proc_exit(0);
 	}
@@ -3849,3 +3884,49 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
 	Assert(time != 0);
 	return now - time;
 }
+
+/*
+ * Reads all entrly of the list and consume if needed.
+ *
+ * In logical replication mode, the given list may contain both walsender and
+ * output_plugin options, and it leads "unrecognized pgoutput option" ERROR.
+ * Therefore, the entry for walsender options will be eliminated from the list
+ * if we found.
+ */
+static void
+ConsumeWalsenderOptions(List *options, WalSndData *data)
+{
+	ListCell   *lc;
+	bool		exit_before_confirming_given = false;
+
+	foreach(lc, options)
+	{
+		DefElem    *defel = (DefElem *) lfirst(lc);
+
+		Assert(defel->arg == NULL || IsA(defel->arg, String));
+
+		/* Check each param, whether or not we recognize it */
+		if (strcmp(defel->defname, "exit_before_confirming") == 0)
+		{
+			if (exit_before_confirming_given)
+				ereport(ERROR,
+						errcode(ERRCODE_SYNTAX_ERROR),
+						errmsg("conflicting or redundant options"));
+			exit_before_confirming_given = true;
+
+			data->exit_before_confirming = defGetBoolean(defel);
+
+			/*
+			 * Elimitates current element, because the list may be bypassed to
+			 * the pgoutput module and it will raise an ERROR due to the
+			 * unrecognized option.
+			 */
+			options = foreach_delete_current(options, lc);
+		}
+
+		/*
+		 * ERROR is not raised here even if the given parameter is not known,
+		 * because it may be written for the output plugin.
+		 */
+	}
+}
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index decffe352d..f801fb3e0d 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -187,6 +187,7 @@ typedef struct
 									 * prepare time */
 			char	   *origin; /* Only publish data originating from the
 								 * specified origin */
+			bool		exit_before_confirming;
 		}			logical;
 	}			proto;
 } WalRcvStreamOptions;
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index f94819672b..d7a6fd0e38 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -523,9 +523,17 @@ $node_publisher->poll_query_until('postgres',
 # changes are replicated on subscriber.
 my $delay = 3;
 
-# Set min_apply_delay parameter to 3 seconds
+# check restart on changing min_apply_delay to 3 seconds
+$oldpid = $node_publisher->safe_psql('postgres',
+	"SELECT pid FROM pg_stat_replication WHERE application_name = 'tap_sub_renamed' AND state = 'streaming';"
+);
 $node_subscriber->safe_psql('postgres',
 	"ALTER SUBSCRIPTION tap_sub_renamed SET (min_apply_delay = '${delay}s')");
+$node_publisher->poll_query_until('postgres',
+	"SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = 'tap_sub_renamed' AND state = 'streaming';"
+  )
+  or die
+  "Timed out while waiting for apply to restart after changing min_apply_delay to non-zero value";
 
 # Make new content on publisher and check its presence in subscriber depending
 # on the delay applied above. Before doing the insertion, get the
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 07fbb7ccf6..3b7f8eb063 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2968,6 +2968,7 @@ WalReceiverConn
 WalReceiverFunctionsType
 WalSnd
 WalSndCtlData
+WalSndData
 WalSndSendDataCallback
 WalSndState
 WalTimeSample
-- 
2.27.0

