From 7588a8ddfdce6193c6bfaf43ab5a09c18ed4bc8a Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 19 Apr 2024 11:03:19 +0000
Subject: [PATCH 4/4] Add force_alter option

---
 src/backend/commands/subscriptioncmds.c       | 37 ++++++++++++++++++-
 src/test/regress/expected/subscription.out    |  3 ++
 src/test/regress/sql/subscription.sql         |  3 ++
 src/test/subscription/t/099_twophase_added.pl | 23 +++++++++---
 4 files changed, 59 insertions(+), 7 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b1c00e36db..e38d808d32 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -72,6 +72,7 @@
 #define SUBOPT_RUN_AS_OWNER			0x00001000
 #define SUBOPT_LSN					0x00002000
 #define SUBOPT_ORIGIN				0x00004000
+#define SUBOPT_FORCE_ALTER			0x00008000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
@@ -98,6 +99,7 @@ typedef struct SubOpts
 	bool		runasowner;
 	char	   *origin;
 	XLogRecPtr	lsn;
+	bool		twophase_force;
 } SubOpts;
 
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
@@ -158,6 +160,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 		opts->runasowner = false;
 	if (IsSet(supported_opts, SUBOPT_ORIGIN))
 		opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
+	if (IsSet(supported_opts, SUBOPT_FORCE_ALTER))
+		opts->twophase_force = false;
 
 	/* Parse options */
 	foreach(lc, stmt_options)
@@ -354,6 +358,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_LSN;
 			opts->lsn = lsn;
 		}
+		else if (IsSet(supported_opts, SUBOPT_FORCE_ALTER) &&
+				 strcmp(defel->defname, "force_alter") == 0)
+		{
+			if (IsSet(opts->specified_opts, SUBOPT_FORCE_ALTER))
+				errorConflictingDefElem(defel, pstate);
+
+			opts->specified_opts |= SUBOPT_FORCE_ALTER;
+			opts->twophase_force = defGetBoolean(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1134,7 +1147,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
 								  SUBOPT_DISABLE_ON_ERR |
 								  SUBOPT_PASSWORD_REQUIRED |
-								  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
+								  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN |
+								  SUBOPT_FORCE_ALTER);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
@@ -1170,6 +1184,16 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					{
 						ListCell	*cell;
 
+						/*
+						 * Abort prepared transactions if force option is also
+						 * specified. Otherwise raise an ERROR.
+						 */
+						if (!opts.twophase_force)
+							ereport(ERROR,
+									(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+									 errmsg("cannot alter %s when there are prepared transactions",
+											"two_phase = false")));
+
 						/* Must not be in the transaction */
 						PreventInTransactionBlock(isTopLevel,
 												  "ALTER SUBSCRIPTION ... SET (two_phase = ...)");
@@ -1179,8 +1203,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 						{
 							FinishPreparedTransaction((char *) lfirst(cell),
 													  false);
-							prepared_xacts = list_delete_cell(prepared_xacts, cell);
 						}
+
+						list_free(prepared_xacts);
 					}
 
 					/* Change system catalog acoordingly */
@@ -1272,6 +1297,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					replaces[Anum_pg_subscription_suborigin - 1] = true;
 				}
 
+				/* force_alter cannot be used standalone */
+				if (IsSet(opts.specified_opts, SUBOPT_FORCE_ALTER) &&
+					!IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("%s must be specified with %s",
+									"force_alter", "two_phase")));
+
 				update_tuple = true;
 				break;
 			}
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 51c466f42e..946f3f6721 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -370,6 +370,9 @@ ERROR:  two_phase requires a Boolean value
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+-- fail - force_alter cannot be set alone
+ALTER SUBSCRIPTION regress_testsub SET (force_alter = true);
+ERROR:  force_alter must be specified with two_phase
 \dRs+
                                                                                                            List of subscriptions
       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit |          Conninfo           | Skip LSN 
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index b7764c1074..2f04675980 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -255,6 +255,9 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 -- now it works
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 
+-- fail - force_alter cannot be set alone
+ALTER SUBSCRIPTION regress_testsub SET (force_alter = true);
+
 \dRs+
 -- We can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl
index a8135b671c..7c73a58f2a 100644
--- a/src/test/subscription/t/099_twophase_added.pl
+++ b/src/test/subscription/t/099_twophase_added.pl
@@ -85,16 +85,29 @@ $result = $node_subscriber->safe_psql('postgres',
     "SELECT count(*) FROM pg_prepared_xacts;");
 is($result, q(1), "transaction has been prepared on subscriber");
 
-$node_subscriber->safe_psql(
-    'postgres', "
-    ALTER SUBSCRIPTION sub DISABLE;
-    ALTER SUBSCRIPTION sub SET (two_phase = off);
-    ALTER SUBSCRIPTION sub ENABLE;");
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE;");
+
+my $stdout;
+my $stderr;
+
+($result, $stdout, $stderr) = $node_subscriber->psql(
+	'postgres', "ALTER SUBSCRIPTION sub SET (two_phase = off);");
+ok($stderr =~ /cannot alter two_phase = false when there are prepared transactions/,
+	'ALTER SUBSCRIPTION failed');
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(1), "prepared transaction still exits");
+
+$node_subscriber->safe_psql('postgres',
+    "ALTER SUBSCRIPTION sub SET (two_phase = off, force_alter = on);");
 
 $result = $node_subscriber->safe_psql('postgres',
     "SELECT count(*) FROM pg_prepared_xacts;");
 is($result, q(0), "prepared transaction done by worker is aborted");
 
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE;");
+
 $node_publisher->safe_psql( 'postgres',
     "COMMIT PREPARED 'test_prepared_tab_full';");
 $node_publisher->wait_for_catchup('sub');
-- 
2.43.0

