From d0c8138ccdf19dd9d4395855e5482cce496bda22 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 8 Apr 2024 12:39:12 +0000
Subject: [PATCH v6 3/4] Abort prepared transactions while altering two_phase
 to false

---
 doc/src/sgml/ref/alter_subscription.sgml      |  8 ++++-
 src/backend/access/transam/twophase.c         | 19 +++++-----
 src/backend/commands/subscriptioncmds.c       | 33 +++++++++++------
 src/include/access/twophase.h                 |  3 +-
 src/test/subscription/t/099_twophase_added.pl | 35 +++++++++++++++++++
 5 files changed, 76 insertions(+), 22 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index e54aa1b128..926f560566 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -233,7 +233,6 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and
       <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>.
       Only a superuser can set <literal>password_required = false</literal>.
-      <literal>two_phase</literal> can be altered only for disabled subscription.
      </para>
 
      <para>
@@ -255,6 +254,13 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
       option is enabled.
      </para>
+
+     <para>
+      <literal>two_phase</literal> can be altered only for disabled
+      subscriptions. When altering the parameter from <literal>true</literal>
+      to <literal>false</literal>, the backend process checks prepared
+      transactions done by the logical replication worker and aborts them.
+     </para>
     </listitem>
    </varlistentry>
 
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 495f99a357..9121195725 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2702,13 +2702,13 @@ checkGid(char *gid, Oid subid)
 }
 
 /*
- * LookupGXactBySubid
- *		Check if the prepared transaction done by apply worker exists.
+ * GetGidListBySubid
+ *      Get a list of GIDs which is PREPARE'd by the given subscription.
  */
-bool
-LookupGXactBySubid(Oid subid)
+List *
+GetGidListBySubid(Oid subid)
 {
-	bool		found = false;
+	List *list = NIL;
 
 	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
 	for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
@@ -2717,11 +2717,10 @@ LookupGXactBySubid(Oid subid)
 
 		/* Ignore not-yet-valid GIDs. */
 		if (gxact->valid && checkGid(gxact->gid, subid))
-		{
-			found = true;
-			break;
-		}
+			list = lappend(list, pstrdup(gxact->gid));
+
 	}
 	LWLockRelease(TwoPhaseStateLock);
-	return found;
+
+	return list;
 }
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b02e21f535..8a36558b2c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -1156,6 +1156,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				/* XXX */
 				if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
 				{
+					List *prepared_xacts = NIL;
+
 					/*
 					 * two_phase can be only changed for disabled
 					 * subscriptions
@@ -1172,22 +1174,33 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 					 */
 					logicalrep_workers_stop(subid);
 
-					/* Check whether the number of prepared transactions */
-					if (!opts.twophase &&
-						form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
-						LookupGXactBySubid(subid))
-						ereport(ERROR,
-								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-								 errmsg("cannot disable two_phase when uncommitted prepared transactions present")));
-
 					/*
-					 * The changed failover option of the slot can't be rolled
-					 * back.
+					 * If two phase was enabled, there is a possibility the
+					 * transactions has already been PREPARE'd.
 					 */
 					if (!opts.twophase)
+					{
+						/*
+						 * The changed failover option of the slot can't be rolled
+						 * back.
+						 */
 						PreventInTransactionBlock(isTopLevel,
 												  "ALTER SUBSCRIPTION ... SET (two_phase = off)");
 
+						if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
+							(prepared_xacts = GetGidListBySubid(subid)) != NIL)
+						{
+							ListCell	*cell;
+
+							/* Abort all listed transactions */
+							foreach(cell, prepared_xacts)
+								FinishPreparedTransaction((char *) lfirst(cell),
+														  false);
+
+							list_free(prepared_xacts);
+						}
+					}
+
 					/* Change system catalog acoordingly */
 					values[Anum_pg_subscription_subtwophasestate - 1] =
 						CharGetDatum(opts.twophase ?
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index d493ed24c5..95770bbd69 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -18,6 +18,7 @@
 #include "access/xlogdefs.h"
 #include "datatype/timestamp.h"
 #include "storage/lock.h"
+#include "nodes/pg_list.h"
 
 /*
  * GlobalTransactionData is defined in twophase.c; other places have no
@@ -63,6 +64,6 @@ extern void restoreTwoPhaseData(void);
 extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 						TimestampTz origin_prepare_timestamp);
 
-extern bool LookupGXactBySubid(Oid subid);
+extern List *GetGidListBySubid(Oid subid);
 
 #endif							/* TWOPHASE_H */
diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl
index c13a37675a..a8135b671c 100644
--- a/src/test/subscription/t/099_twophase_added.pl
+++ b/src/test/subscription/t/099_twophase_added.pl
@@ -69,4 +69,39 @@ $result = $node_subscriber->safe_psql('postgres',
 is($result, q(5),
    "prepared transactions done before altering can be replicated");
 
+######
+# Check the case that prepared transactions exist on subscriber node
+######
+
+$node_publisher->safe_psql(
+	'postgres', "
+	BEGIN;
+	INSERT INTO tab_full VALUES (generate_series(6, 10));
+	PREPARE TRANSACTION 'test_prepared_tab_full';");
+
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(1), "transaction has been prepared on subscriber");
+
+$node_subscriber->safe_psql(
+    'postgres', "
+    ALTER SUBSCRIPTION sub DISABLE;
+    ALTER SUBSCRIPTION sub SET (two_phase = off);
+    ALTER SUBSCRIPTION sub ENABLE;");
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(*) FROM pg_prepared_xacts;");
+is($result, q(0), "prepared transaction done by worker is aborted");
+
+$node_publisher->safe_psql( 'postgres',
+    "COMMIT PREPARED 'test_prepared_tab_full';");
+$node_publisher->wait_for_catchup('sub');
+
+$result = $node_subscriber->safe_psql('postgres',
+    "SELECT count(10) FROM tab_full;");
+is($result, q(10),
+   "prepared transactions on publisher can be replicated");
+
 done_testing();
-- 
2.43.0

