From e5da6b142c6bcfb2943cdc24c6d63bf110dcf75e Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 8 Apr 2024 02:32:38 +0000
Subject: [PATCH v3 2/5] Mandate the subscription has been disabled

---
 doc/src/sgml/ref/alter_subscription.sgml   |  6 ++++--
 src/backend/commands/subscriptioncmds.c    | 20 ++++++++++++--------
 src/backend/replication/logical/launcher.c | 21 +++------------------
 src/backend/replication/logical/worker.c   |  3 +++
 src/include/replication/logicallauncher.h  |  2 +-
 5 files changed, 23 insertions(+), 29 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 413ce68ce2..20b45e36e0 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -227,9 +227,11 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-disable-on-error"><literal>disable_on_error</literal></link>,
       <link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>,
       <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
-      <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>, and
-      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>.
+      <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>,
+      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and
+      <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>.
       Only a superuser can set <literal>password_required = false</literal>.
+      <literal>two_phase</literal> can be altered only for disabled subscription.
      </para>
 
      <para>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6643fc08a6..bfbb2873b1 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -869,7 +869,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 	pgstat_create_subscription(subid);
 
 	if (opts.enabled)
-		ApplyLauncherWakeupAtEOXact(true);
+		ApplyLauncherWakeupAtCommit();
 
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
 
@@ -1178,11 +1178,15 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				/* XXX */
 				if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
 				{
-					/* Stop corresponding worker */
-					logicalrep_worker_stop(subid, InvalidOid);
-
-					/* Request to start worker at the end of transaction */
-					ApplyLauncherWakeupAtEOXact(false);
+					/*
+					 * two_phase can be only changed for disabled
+					 * subscriptions
+					 */
+					if (form->subenabled)
+						ereport(ERROR,
+								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								 errmsg("cannot set %s for enabled subscription",
+										"two_phase")));
 
 					/* Check whether the number of prepared transactions */
 					if (!opts.twophase &&
@@ -1326,7 +1330,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 				replaces[Anum_pg_subscription_subenabled - 1] = true;
 
 				if (opts.enabled)
-					ApplyLauncherWakeupAtEOXact(true);
+					ApplyLauncherWakeupAtCommit();
 
 				update_tuple = true;
 				break;
@@ -1990,7 +1994,7 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
 							  form->oid, 0);
 
 	/* Wake up related background processes to handle this change quickly. */
-	ApplyLauncherWakeupAtEOXact(true);
+	ApplyLauncherWakeupAtCommit();
 	LogicalRepWorkersWakeupAtCommit(form->oid);
 }
 
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 3e0e5a77e0..66070e9131 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -89,7 +89,6 @@ static dsa_area *last_start_times_dsa = NULL;
 static dshash_table *last_start_times = NULL;
 
 static bool on_commit_launcher_wakeup = false;
-static bool launcher_wakeup = false;
 
 
 static void ApplyLauncherWakeup(void);
@@ -1086,22 +1085,13 @@ ApplyLauncherForgetWorkerStartTime(Oid subid)
 void
 AtEOXact_ApplyLauncher(bool isCommit)
 {
-	bool		kicked = false;
-
 	if (isCommit)
 	{
 		if (on_commit_launcher_wakeup)
-		{
 			ApplyLauncherWakeup();
-			kicked = true;
-		}
 	}
 
-	if (!kicked && launcher_wakeup)
-		ApplyLauncherWakeup();
-
 	on_commit_launcher_wakeup = false;
-	launcher_wakeup = false;
 }
 
 /*
@@ -1112,15 +1102,10 @@ AtEOXact_ApplyLauncher(bool isCommit)
  * tuple was added to the pg_subscription catalog.
 */
 void
-ApplyLauncherWakeupAtEOXact(bool on_commit)
+ApplyLauncherWakeupAtCommit(void)
 {
-	if (on_commit)
-	{
-		if (!on_commit_launcher_wakeup)
-			on_commit_launcher_wakeup = true;
-	}
-	else if (!launcher_wakeup)
-		launcher_wakeup = true;
+	if (!on_commit_launcher_wakeup)
+		on_commit_launcher_wakeup = true;
 }
 
 static void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index ca3d260fc3..374aa22091 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3911,6 +3911,9 @@ maybe_reread_subscription(void)
 	/* !slotname should never happen when enabled is true. */
 	Assert(newsub->slotname);
 
+	/* two-phase should not be altered while the worker exists */
+	Assert(newsub->twophasestate == MySubscription->twophasestate);
+
 	/*
 	 * Exit if any parameter that affects the remote connection was changed.
 	 * The launcher will start a new worker but note that the parallel apply
diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h
index 075842c67e..ff0438b5bb 100644
--- a/src/include/replication/logicallauncher.h
+++ b/src/include/replication/logicallauncher.h
@@ -24,7 +24,7 @@ extern void ApplyLauncherShmemInit(void);
 
 extern void ApplyLauncherForgetWorkerStartTime(Oid subid);
 
-extern void ApplyLauncherWakeupAtEOXact(bool on_commit);
+extern void ApplyLauncherWakeupAtCommit(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
 extern bool IsLogicalLauncher(void);
-- 
2.43.0

