From 5e4e79a5056bae374911d96eef6c4d945e23903e Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 17 Apr 2024 06:18:23 +0000
Subject: [PATCH v4 2/3] Alter slot option two_phase only when altering true to
 false

---
 src/backend/commands/subscriptioncmds.c       | 21 +++++-
 .../libpqwalreceiver/libpqwalreceiver.c       | 21 ++++--
 src/include/replication/walreceiver.h         |  8 +--
 src/test/subscription/meson.build             |  1 +
 src/test/subscription/t/099_twophase_added.pl | 72 +++++++++++++++++++
 5 files changed, 111 insertions(+), 12 deletions(-)
 create mode 100644 src/test/subscription/t/099_twophase_added.pl

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 3299a60fff..0d80d6e110 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -850,7 +850,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			else if (opts.slot_name &&
 					 (opts.failover || walrcv_server_version(wrconn) >= 170000))
 			{
-				walrcv_alter_slot(wrconn, opts.slot_name, opts.twophase, opts.failover);
+				walrcv_alter_slot(wrconn, opts.slot_name, NULL, &opts.failover);
 			}
 		}
 		PG_FINALLY();
@@ -1564,6 +1564,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 		bool		must_use_password;
 		char	   *err;
 		WalReceiverConn *wrconn;
+		bool		two_phase_needs_to_be_updated;
+		bool		failover_needs_to_be_updated;
 
 		/* Load the library providing us libpq calls. */
 		load_file("libpqwalreceiver", false);
@@ -1577,9 +1579,24 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					(errcode(ERRCODE_CONNECTION_FAILURE),
 					 errmsg("could not connect to the publisher: %s", err)));
 
+		/*
+		 * Consider which slot option must be altered.
+		 *
+		 * We must alter the failover option whenever subfailover is updated.
+		 * Two_phase, however, is altered only when changing true to false.
+		 */
+		two_phase_needs_to_be_updated =
+						(replaces[Anum_pg_subscription_subtwophasestate - 1] &&
+						 !opts.twophase);
+		failover_needs_to_be_updated =
+								replaces[Anum_pg_subscription_subfailover - 1];
+
 		PG_TRY();
 		{
-			walrcv_alter_slot(wrconn, sub->slotname, opts.twophase, opts.failover);
+			if (two_phase_needs_to_be_updated || failover_needs_to_be_updated)
+				walrcv_alter_slot(wrconn, sub->slotname,
+								  two_phase_needs_to_be_updated ? &opts.twophase : NULL,
+								  failover_needs_to_be_updated ? &opts.failover : NULL);
 		}
 		PG_FINALLY();
 		{
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index baef3bdec0..546b599848 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
 								  CRSSnapshotAction snapshot_action,
 								  XLogRecPtr *lsn);
 static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-								bool two_phase, bool failover);
+								const bool *two_phase, const bool *failover);
 static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const char *query,
@@ -1121,16 +1121,25 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
  */
 static void
 libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-					bool two_phase, bool failover)
+					const bool *two_phase, const bool *failover)
 {
 	StringInfoData cmd;
 	PGresult   *res;
 
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( TWO_PHASE %s, FAILOVER %s )",
-					 quote_identifier(slotname),
-					 two_phase ? "true" : "false",
-					 failover ? "true" : "false");
+	appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ",
+					 quote_identifier(slotname));
+
+	if (two_phase)
+		appendStringInfo(&cmd, "TWO_PHASE %s%s ",
+						 (*two_phase) ? "true" : "false",
+						 failover ? ", " : "");
+
+	if (failover)
+		appendStringInfo(&cmd, "FAILOVER %s ",
+						 (*failover) ? "true" : "false");
+
+	appendStringInfoString(&cmd, ");");
 
 	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
 	pfree(cmd.data);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index a443f402f5..f30637aa4a 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -372,13 +372,13 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
 /*
  * walrcv_alter_slot_fn
  *
- * Change the definition of a replication slot. Currently, it only supports
- * changing the failover property of the slot.
+ * Change the definition of a replication slot. Currently, it supports
+ * changing the two_phase and the failover property of the slot.
  */
 typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
 									  const char *slotname,
-									  bool two_phase,
-									  bool failover);
+									  const bool *two_phase,
+									  const bool *failover);
 
 /*
  * walrcv_get_backend_pid_fn
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index c591cd7d61..b4bd522c3d 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -40,6 +40,7 @@ tests += {
       't/031_column_list.pl',
       't/032_subscribe_use_index.pl',
       't/033_run_as_table_owner.pl',
+      't/099_twophase_added.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl
new file mode 100644
index 0000000000..c13a37675a
--- /dev/null
+++ b/src/test/subscription/t/099_twophase_added.pl
@@ -0,0 +1,72 @@
+# Copyright (c) 2021-2024, PostgreSQL Global Development Group
+
+# Additional tests for altering two_phase option
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->start;
+
+# Define pre-existing tables on both nodes
+$node_publisher->safe_psql('postgres',
+    "CREATE TABLE tab_full (a int PRIMARY KEY);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication, with two_phase = off
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR ALL TABLES");
+
+$node_subscriber->safe_psql(
+	'postgres', "
+	CREATE SUBSCRIPTION sub
+	CONNECTION '$publisher_connstr' PUBLICATION pub
+	WITH (two_phase = off, copy_data = off)");
+
+######
+# Check the case that prepared transactions exist on publisher node
+######
+
+$node_publisher->safe_psql(
+	'postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (generate_series(1, 5));
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup('sub');
+
+my $result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(0), "transaction is not prepared on subscriber");
+
+$node_subscriber->safe_psql(
+    'postgres', "
+    ALTER SUBSCRIPTION sub DISABLE;
+    ALTER SUBSCRIPTION sub SET (two_phase = on);
+    ALTER SUBSCRIPTION sub ENABLE;");
+
+$node_publisher->safe_psql( 'postgres',
+    "COMMIT PREPARED 'test_prepared_tab_full';");
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM tab_full;");
+is($result, q(5),
+   "prepared transactions done before altering can be replicated");
+
+done_testing();
-- 
2.43.0

