From bb19db9bd4e73d8892f202fb7b8771c25a033681 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Thu, 4 Apr 2024 01:17:29 -0400
Subject: [PATCH v1] Allow altering of two_phase option in subscribers

This patch allows user to alter two_phase option on a subscriber provided no uncommitted
prepared transactions are pending on that subscription.
---
 src/backend/access/transam/twophase.c      | 42 ++++++++++++++++++++++++++++++
 src/backend/commands/subscriptioncmds.c    | 35 ++++++++++++++++++++++---
 src/backend/replication/logical/launcher.c | 22 +++++++++++++---
 src/backend/replication/logical/worker.c   |  3 ---
 src/include/access/twophase.h              |  3 +++
 src/include/replication/logicallauncher.h  |  2 +-
 6 files changed, 96 insertions(+), 11 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 8090ac9..b0aae25 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2682,3 +2682,45 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 	LWLockRelease(TwoPhaseStateLock);
 	return found;
 }
+
+/*
+ * checkGid
+ */
+static bool
+checkGid(char *gid, Oid subid)
+{
+	int ret;
+	Oid subid_written, xid;
+
+	ret = sscanf(gid, "pg_gid_%u_%u", &subid_written, &xid);
+
+	if (ret != 2 || subid != subid_written)
+		return false;
+
+	return true;
+}
+
+/*
+ * LookupGXactBySubid
+ *		Check if the prepared transaction done by apply worker exists.
+ */
+bool
+LookupGXactBySubid(Oid subid)
+{
+	bool		found = false;
+
+	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+	for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
+	{
+		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+		/* Ignore not-yet-valid GIDs. */
+		if (gxact->valid &&	checkGid(gxact->gid, subid))
+		{
+			found = true;
+			break;
+		}
+	}
+	LWLockRelease(TwoPhaseStateLock);
+	return found;
+}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5a47fa9..8306929 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -16,6 +16,7 @@
 
 #include "access/htup_details.h"
 #include "access/table.h"
+#include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/dependency.h"
@@ -868,7 +869,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	pgstat_create_subscription(subid);
 
 	if (opts.enabled)
-		ApplyLauncherWakeupAtCommit();
+		ApplyLauncherWakeupAtEOXact(true);
 
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
 
@@ -1165,7 +1166,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			{
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+								  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+								  SUBOPT_DISABLE_ON_ERR |
 								  SUBOPT_PASSWORD_REQUIRED |
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 								  SUBOPT_ORIGIN);
@@ -1173,6 +1175,31 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
 
+				/* XXX */
+				if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
+				{
+					/* Stop corresponding worker */
+					logicalrep_worker_stop(subid, InvalidOid);
+
+					/* Request to start worker at the end of transaction */
+					ApplyLauncherWakeupAtEOXact(false);
+
+					/* Check whether the number of prepared transactions */
+					if (!opts.twophase &&
+						form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
+						LookupGXactBySubid(subid))
+						ereport(ERROR,
+								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								 errmsg("cannot disable two_phase when uncommitted prepared transactions present")));
+
+					/* Change system catalog acoordingly */
+					values[Anum_pg_subscription_subtwophasestate - 1] =
+						CharGetDatum(opts.twophase ?
+									 LOGICALREP_TWOPHASE_STATE_PENDING :
+									 LOGICALREP_TWOPHASE_STATE_DISABLED);
+					replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
 				{
 					/*
@@ -1299,7 +1326,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				replaces[Anum_pg_subscription_subenabled - 1] = true;
 
 				if (opts.enabled)
-					ApplyLauncherWakeupAtCommit();
+					ApplyLauncherWakeupAtEOXact(true);
 
 				update_tuple = true;
 				break;
@@ -1962,7 +1989,7 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 							  form->oid, 0);
 
 	/* Wake up related background processes to handle this change quickly. */
-	ApplyLauncherWakeupAtCommit();
+	ApplyLauncherWakeupAtEOXact(true);
 	LogicalRepWorkersWakeupAtCommit(form->oid);
 }
 
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 66070e9..899ec22 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -89,6 +89,7 @@ static dsa_area *last_start_times_dsa = NULL;
 static dshash_table *last_start_times = NULL;
 
 static bool on_commit_launcher_wakeup = false;
+static bool launcher_wakeup = false;
 
 
 static void ApplyLauncherWakeup(void);
@@ -1085,13 +1086,22 @@ ApplyLauncherForgetWorkerStartTime(Oid subid)
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
+	bool kicked = false;
+
 	if (isCommit)
 	{
 		if (on_commit_launcher_wakeup)
+		{
 			ApplyLauncherWakeup();
+			kicked = true;
+		}
 	}
 
+	if (!kicked && launcher_wakeup)
+		ApplyLauncherWakeup();
+
 	on_commit_launcher_wakeup = false;
+	launcher_wakeup = false;
 }
 
 /*
@@ -1102,10 +1112,16 @@ AtEOXact_ApplyLauncher(bool isCommit)
  * tuple was added to the pg_subscription catalog.
 */
 void
-ApplyLauncherWakeupAtCommit(void)
+ApplyLauncherWakeupAtEOXact(bool on_commit)
 {
-	if (!on_commit_launcher_wakeup)
-		on_commit_launcher_wakeup = true;
+	if (on_commit)
+	{
+		if (!on_commit_launcher_wakeup)
+			on_commit_launcher_wakeup = true;
+	}
+	else
+		if (!launcher_wakeup)
+			launcher_wakeup = true;
 }
 
 static void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index b5a80fe..ca3d260 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3911,9 +3911,6 @@ maybe_reread_subscription(void)
 	/* !slotname should never happen when enabled is true. */
 	Assert(newsub->slotname);
 
-	/* two-phase should not be altered */
-	Assert(newsub->twophasestate == MySubscription->twophasestate);
-
 	/*
 	 * Exit if any parameter that affects the remote connection was changed.
 	 * The launcher will start a new worker but note that the parallel apply
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 56248c0..d493ed2 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -62,4 +62,7 @@ extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
 extern void restoreTwoPhaseData(void);
 extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 						TimestampTz origin_prepare_timestamp);
+
+extern bool LookupGXactBySubid(Oid subid);
+
 #endif							/* TWOPHASE_H */
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index ff0438b..075842c 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -24,7 +24,7 @@ extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherForgetWorkerStartTime(Oid subid);
 
-extern void ApplyLauncherWakeupAtCommit(void);
+extern void ApplyLauncherWakeupAtEOXact(bool on_commit);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
-- 
1.8.3.1

