From c23330158b9e7407922d766d4c7ea6113eda14b2 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 8 Apr 2024 12:39:12 +0000
Subject: [PATCH v7 3/4] Abort prepared transactions while altering two_phase
 to false

---
 doc/src/sgml/ref/alter_subscription.sgml      |  9 ++++-
 src/backend/access/transam/twophase.c         | 17 ++++----
 src/backend/commands/subscriptioncmds.c       | 39 ++++++++++++-------
 src/include/access/twophase.h                 |  3 +-
 src/test/subscription/t/099_twophase_added.pl | 35 +++++++++++++++++
 5 files changed, 75 insertions(+), 28 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 0c2894a94e..848e4af649 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -233,8 +233,6 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and
       <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>.
       Only a superuser can set <literal>password_required = false</literal>.
-      The <literal>two_phase</literal> parameter can only be altered when the
-      subscription is disabled.
      </para>
 
      <para>
@@ -256,6 +254,13 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
       option is enabled.
      </para>
+
+     <para>
+      The <literal>two_phase</literal> parameter can only be altered when the
+      subscription is disabled. When altering the parameter from <literal>true</literal>
+      to <literal>false</literal>, the backend process checks prepared
+      transactions done by the logical replication worker and aborts them.
+     </para>
     </listitem>
    </varlistentry>
 
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index a67a8c48aa..e0cc0e9b21 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2719,13 +2719,13 @@ IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
 }
 
 /*
- * LookupGXactBySubid
- *		Check if the prepared transaction done by apply worker exists.
+ * GetGidListBySubid
+ *      Get a list of GIDs which is PREPARE'd by the given subscription.
  */
-bool
-LookupGXactBySubid(Oid subid)
+List *
+GetGidListBySubid(Oid subid)
 {
-	bool		found = false;
+	List *list = NIL;
 
 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 	for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
@@ -2735,11 +2735,8 @@ LookupGXactBySubid(Oid subid)
 		/* Ignore not-yet-valid GIDs. */
 		if (gxact->valid &&
 			IsTwoPhaseTransactionGidForSubid(subid, gxact->gid))
-		{
-			found = true;
-			break;
-		}
+			list = lappend(list, pstrdup(gxact->gid));
 	}
 	LWLockRelease(TwoPhaseStateLock);
-	return found;
+	return list;
 }
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 71b058b385..27be6299e0 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1155,6 +1155,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
 				{
+					List *prepared_xacts = NIL;
+
 					/*
 					 * Do not allow changing the two_phase option if the
 					 * subscription is enabled. This is because the two_phase
@@ -1174,26 +1176,33 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					logicalrep_workers_stop(subid);
 
 					/*
-					 * two_phase cannot be disabled if there are any
-					 * uncommitted prepared transactions present.
-					 */
-					if (!opts.twophase &&
-						form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
-						LookupGXactBySubid(subid))
-						/* Add error message */
-						ereport(ERROR,
-								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-								 errmsg("cannot disable two_phase when uncommitted prepared transactions present"),
-								 errhint("Resolve these transactions and try again")));
-
-					/*
-					 * The changed two_phase option of the slot can't be rolled
-					 * back.
+					 * If two_phase was enabled, there is a possibility the
+					 * transactions has already been PREPARE'd. They must be
+					 * checked and rolled back.
 					 */
 					if (!opts.twophase)
+					{
+						/*
+						 * The changed two_phase option (true->false) of the
+						 * slot can't be rolled back.
+						 */
 						PreventInTransactionBlock(isTopLevel,
 												  "ALTER SUBSCRIPTION ... SET (two_phase = off)");
 
+						if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
+							(prepared_xacts = GetGidListBySubid(subid)) != NIL)
+						{
+							ListCell	*cell;
+
+							/* Abort all listed transactions */
+							foreach(cell, prepared_xacts)
+								FinishPreparedTransaction((char *) lfirst(cell),
+														  false);
+
+							list_free(prepared_xacts);
+						}
+					}
+
 					/* Change system catalog acoordingly */
 					values[Anum_pg_subscription_subtwophasestate - 1] =
 						CharGetDatum(opts.twophase ?
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index d37e06fdee..f7a5cf0c12 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -18,6 +18,7 @@
 #include "access/xlogdefs.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
+#include "nodes/pg_list.h"
 
 /*
  * GlobalTransactionData is defined in twophase.c; other places have no
@@ -65,6 +66,6 @@ extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 
 extern void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid,
 								   int szgid);
-extern bool LookupGXactBySubid(Oid subid);
+extern List *GetGidListBySubid(Oid subid);
 
 #endif							/* TWOPHASE_H */
diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl
index c13a37675a..a8135b671c 100644
--- a/src/test/subscription/t/099_twophase_added.pl
+++ b/src/test/subscription/t/099_twophase_added.pl
@@ -69,4 +69,39 @@ $result = $node_subscriber->safe_psql('postgres',
 is($result, q(5),
    "prepared transactions done before altering can be replicated");
 
+######
+# Check the case that prepared transactions exist on subscriber node
+######
+
+$node_publisher->safe_psql(
+	'postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (generate_series(6, 10));
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(1), "transaction has been prepared on subscriber");
+
+$node_subscriber->safe_psql(
+    'postgres', "
+    ALTER SUBSCRIPTION sub DISABLE;
+    ALTER SUBSCRIPTION sub SET (two_phase = off);
+    ALTER SUBSCRIPTION sub ENABLE;");
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(0), "prepared transaction done by worker is aborted");
+
+$node_publisher->safe_psql( 'postgres',
+    "COMMIT PREPARED 'test_prepared_tab_full';");
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(10) FROM tab_full;");
+is($result, q(10),
+   "prepared transactions on publisher can be replicated");
+
 done_testing();
-- 
2.43.0

