From 458bdbfeabae2e22dae84386a8b8e54126d101a3 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Wed, 17 Apr 2024 06:18:23 +0000
Subject: [PATCH v6 2/4] Alter slot option two_phase only when altering true to
 false

---
 doc/src/sgml/ref/alter_subscription.sgml      |  2 +-
 src/backend/commands/subscriptioncmds.c       | 23 +++++-
 .../libpqwalreceiver/libpqwalreceiver.c       | 21 ++++--
 src/include/replication/walreceiver.h         |  8 +--
 src/test/subscription/meson.build             |  1 +
 src/test/subscription/t/099_twophase_added.pl | 72 +++++++++++++++++++
 6 files changed, 114 insertions(+), 13 deletions(-)
 create mode 100644 src/test/subscription/t/099_twophase_added.pl

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index e69132c39d..e54aa1b128 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -70,7 +70,7 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
    <command>ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ...</command>
    with <literal>refresh</literal> option as <literal>true</literal>,
    <command>ALTER SUBSCRIPTION ... SET (failover = on|off)</command> and
-   <command>ALTER SUBSCRIPTION ... SET (two_phase = on|off)</command>
+   <command>ALTER SUBSCRIPTION ... SET (two_phase = off)</command>
    cannot be executed inside a transaction block.
 
    These commands also cannot be executed when the subscription has
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index aa8a8e1f84..b02e21f535 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1184,7 +1184,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					 * The changed failover option of the slot can't be rolled
 					 * back.
 					 */
-					PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (two_phase)");
+					if (!opts.twophase)
+						PreventInTransactionBlock(isTopLevel,
+												  "ALTER SUBSCRIPTION ... SET (two_phase = off)");
 
 					/* Change system catalog acoordingly */
 					values[Anum_pg_subscription_subtwophasestate - 1] =
@@ -1554,6 +1556,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 		bool		must_use_password;
 		char	   *err;
 		WalReceiverConn *wrconn;
+		bool		two_phase_needs_to_be_updated;
+		bool		failover_needs_to_be_updated;
 
 		/* Load the library providing us libpq calls. */
 		load_file("libpqwalreceiver", false);
@@ -1567,9 +1571,24 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					(errcode(ERRCODE_CONNECTION_FAILURE),
 					 errmsg("could not connect to the publisher: %s", err)));
 
+		/*
+		 * Consider which slot option must be altered.
+		 *
+		 * We must alter the failover option whenever subfailover is updated.
+		 * Two_phase, however, is altered only when changing true to false.
+		 */
+		two_phase_needs_to_be_updated =
+						(replaces[Anum_pg_subscription_subtwophasestate - 1] &&
+						 !opts.twophase);
+		failover_needs_to_be_updated =
+								replaces[Anum_pg_subscription_subfailover - 1];
+
 		PG_TRY();
 		{
-			walrcv_alter_slot(wrconn, sub->slotname, opts.twophase, opts.failover);
+			if (two_phase_needs_to_be_updated || failover_needs_to_be_updated)
+				walrcv_alter_slot(wrconn, sub->slotname,
+								  two_phase_needs_to_be_updated ? &opts.twophase : NULL,
+								  failover_needs_to_be_updated ? &opts.failover : NULL);
 		}
 		PG_FINALLY();
 		{
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index baef3bdec0..546b599848 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
 								  CRSSnapshotAction snapshot_action,
 								  XLogRecPtr *lsn);
 static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-								bool two_phase, bool failover);
+								const bool *two_phase, const bool *failover);
 static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const char *query,
@@ -1121,16 +1121,25 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
  */
 static void
 libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-					bool two_phase, bool failover)
+					const bool *two_phase, const bool *failover)
 {
 	StringInfoData cmd;
 	PGresult   *res;
 
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( TWO_PHASE %s, FAILOVER %s )",
-					 quote_identifier(slotname),
-					 two_phase ? "true" : "false",
-					 failover ? "true" : "false");
+	appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ",
+					 quote_identifier(slotname));
+
+	if (two_phase)
+		appendStringInfo(&cmd, "TWO_PHASE %s%s ",
+						 (*two_phase) ? "true" : "false",
+						 failover ? ", " : "");
+
+	if (failover)
+		appendStringInfo(&cmd, "FAILOVER %s ",
+						 (*failover) ? "true" : "false");
+
+	appendStringInfoString(&cmd, ");");
 
 	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
 	pfree(cmd.data);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index a443f402f5..f30637aa4a 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -372,13 +372,13 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
 /*
  * walrcv_alter_slot_fn
  *
- * Change the definition of a replication slot. Currently, it only supports
- * changing the failover property of the slot.
+ * Change the definition of a replication slot. Currently, it supports
+ * changing the two_phase and the failover property of the slot.
  */
 typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
 									  const char *slotname,
-									  bool two_phase,
-									  bool failover);
+									  const bool *two_phase,
+									  const bool *failover);
 
 /*
  * walrcv_get_backend_pid_fn
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index c591cd7d61..b4bd522c3d 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -40,6 +40,7 @@ tests += {
       't/031_column_list.pl',
       't/032_subscribe_use_index.pl',
       't/033_run_as_table_owner.pl',
+      't/099_twophase_added.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl
new file mode 100644
index 0000000000..c13a37675a
--- /dev/null
+++ b/src/test/subscription/t/099_twophase_added.pl
@@ -0,0 +1,72 @@
+# Copyright (c) 2021-2024, PostgreSQL Global Development Group
+
+# Additional tests for altering two_phase option
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->append_conf('postgresql.conf',
+	qq(max_prepared_transactions = 10));
+$node_subscriber->start;
+
+# Define pre-existing tables on both nodes
+$node_publisher->safe_psql('postgres',
+    "CREATE TABLE tab_full (a int PRIMARY KEY);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE tab_full (a int PRIMARY KEY)");
+
+# Setup logical replication, with two_phase = off
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION pub FOR ALL TABLES");
+
+$node_subscriber->safe_psql(
+	'postgres', "
+	CREATE SUBSCRIPTION sub
+	CONNECTION '$publisher_connstr' PUBLICATION pub
+	WITH (two_phase = off, copy_data = off)");
+
+######
+# Check the case that prepared transactions exist on publisher node
+######
+
+$node_publisher->safe_psql(
+	'postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (generate_series(1, 5));
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup('sub');
+
+my $result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(0), "transaction is not prepared on subscriber");
+
+$node_subscriber->safe_psql(
+    'postgres', "
+    ALTER SUBSCRIPTION sub DISABLE;
+    ALTER SUBSCRIPTION sub SET (two_phase = on);
+    ALTER SUBSCRIPTION sub ENABLE;");
+
+$node_publisher->safe_psql( 'postgres',
+    "COMMIT PREPARED 'test_prepared_tab_full';");
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM tab_full;");
+is($result, q(5),
+   "prepared transactions done before altering can be replicated");
+
+done_testing();
-- 
2.43.0

