From e7a2051d90b236989707d33236f0ab840c982283 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Fri, 5 Apr 2024 06:47:18 -0400
Subject: [PATCH v18 1/3] Allow altering of two_phase option of a SUBSCRIPTION

This patch allows the user to alter the 'two_phase' option of a subscriber provided no
uncommitted prepared transactions are pending on that subscription.

Author: Cherian Ajin, Hayato Kuroda
---
 doc/src/sgml/protocol.sgml                    |  16 +++
 doc/src/sgml/ref/alter_subscription.sgml      |  12 +-
 src/backend/access/transam/twophase.c         |  62 +++++++++
 src/backend/commands/subscriptioncmds.c       | 131 +++++++++++++-----
 .../libpqwalreceiver/libpqwalreceiver.c       |   9 +-
 src/backend/replication/logical/launcher.c    |  12 +-
 src/backend/replication/logical/worker.c      |  25 +---
 src/backend/replication/slot.c                |  28 +++-
 src/backend/replication/walsender.c           |  33 ++++-
 src/bin/psql/tab-complete.c                   |   2 +-
 src/include/access/twophase.h                 |   5 +
 src/include/replication/slot.h                |   3 +-
 src/include/replication/walreceiver.h         |  11 +-
 src/include/replication/worker_internal.h     |   3 +-
 src/test/regress/expected/subscription.out    |   5 +-
 src/test/regress/sql/subscription.sql         |   5 +-
 src/test/subscription/t/021_twophase.pl       |  77 +++++++++-
 17 files changed, 337 insertions(+), 102 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 1b27d0a547..3ac4a4be28 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2206,6 +2206,22 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
        </varlistentry>
       </variablelist>
 
+      <variablelist>
+       <varlistentry>
+        <term><literal>TWO_PHASE [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
+        <listitem>
+         <para>
+          If true, this logical replication slot supports decoding of two-phase
+          commit. With this option, commands related to two-phase commit such as
+          <literal>PREPARE TRANSACTION</literal>, <literal>COMMIT PREPARED</literal>
+          and <literal>ROLLBACK PREPARED</literal> are decoded and transmitted.
+          The transaction will be decoded and transmitted at
+          <literal>PREPARE TRANSACTION</literal> time.
+         </para>
+        </listitem>
+       </varlistentry>
+      </variablelist>
+
      </listitem>
     </varlistentry>
 
diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 476f195622..0b23df1b77 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -68,8 +68,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
   <para>
    Commands <command>ALTER SUBSCRIPTION ... REFRESH PUBLICATION</command>,
    <command>ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ...</command>
-   with <literal>refresh</literal> option as <literal>true</literal> and
-   <command>ALTER SUBSCRIPTION ... SET (failover = true|false)</command>
+   with <literal>refresh</literal> option as <literal>true</literal>,
+   <command>ALTER SUBSCRIPTION ... SET (failover = true|false)</command> and
+   <command>ALTER SUBSCRIPTION ... SET (two_phase = true|false)</command>
    cannot be executed inside a transaction block.
 
    These commands also cannot be executed when the subscription has
@@ -228,9 +229,12 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
       <link linkend="sql-createsubscription-params-with-disable-on-error"><literal>disable_on_error</literal></link>,
       <link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>,
       <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
-      <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>, and
-      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>.
+      <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>,
+      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and
+      <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>.
       Only a superuser can set <literal>password_required = false</literal>.
+      The <literal>two_phase</literal> parameter can only be altered when the
+      subscription is disabled.
      </para>
 
      <para>
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 9a8257fcaf..35bce6809d 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2681,3 +2681,65 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 	LWLockRelease(TwoPhaseStateLock);
 	return found;
 }
+
+/*
+ * TwoPhaseTransactionGid
+ *		Form the prepared transaction GID for two_phase transactions.
+ *
+ * Return the GID in the supplied buffer.
+ */
+void
+TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
+{
+	Assert(subid != InvalidRepOriginId);
+
+	if (!TransactionIdIsValid(xid))
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg_internal("invalid two-phase transaction ID")));
+
+	snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
+}
+
+/*
+ * IsTwoPhaseTransactionGidForSubid
+ *		Check whether the given GID (as formed by TwoPhaseTransactionGid) is
+ *		for the specified 'subid'.
+ */
+static bool
+IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid)
+{
+	int			ret;
+	Oid			subid_written;
+	TransactionId xid;
+
+	ret = sscanf(gid, "pg_gid_%u_%u", &subid_written, &xid);
+
+	return (ret == 2 && subid == subid_written);
+}
+
+/*
+ * LookupGXactBySubid
+ *		Check if the prepared transaction done by apply worker exists.
+ */
+bool
+LookupGXactBySubid(Oid subid)
+{
+	bool		found = false;
+
+	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+	for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
+	{
+		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+		/* Ignore not-yet-valid GIDs. */
+		if (gxact->valid &&
+			IsTwoPhaseTransactionGidForSubid(subid, gxact->gid))
+		{
+			found = true;
+			break;
+		}
+	}
+	LWLockRelease(TwoPhaseStateLock);
+	return found;
+}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 16d83b3253..d7e2b141b3 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -16,6 +16,7 @@
 
 #include "access/htup_details.h"
 #include "access/table.h"
+#include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/dependency.h"
@@ -109,6 +110,8 @@ static void check_publications_origin(WalReceiverConn *wrconn,
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
+static void CheckAlterSubOption(Subscription *sub, const char *option,
+								bool isTopLevel);
 
 
 /*
@@ -259,21 +262,9 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
 			opts->specified_opts |= SUBOPT_STREAMING;
 			opts->streaming = defGetStreamingMode(defel);
 		}
-		else if (strcmp(defel->defname, "two_phase") == 0)
+		else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
+				 strcmp(defel->defname, "two_phase") == 0)
 		{
-			/*
-			 * Do not allow toggling of two_phase option. Doing so could cause
-			 * missing of transactions and lead to an inconsistent replica.
-			 * See comments atop worker.c
-			 *
-			 * Note: Unsupported twophase indicates that this call originated
-			 * from AlterSubscription.
-			 */
-			if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
-				ereport(ERROR,
-						(errcode(ERRCODE_SYNTAX_ERROR),
-						 errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
-
 			if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
 				errorConflictingDefElem(defel, pstate);
 
@@ -1079,6 +1070,44 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		table_close(rel, NoLock);
 }
 
+/*
+ * Common checks for altering failover and two_phase option
+ */
+static void
+CheckAlterSubOption(Subscription *sub, const char *option, bool isTopLevel)
+{
+	StringInfoData cmd;
+
+	if (!sub->slotname)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot set %s for a subscription that does not have a slot name",
+						option)));
+
+	/*
+	 * Do not allow changing the option if the subscription is enabled. This
+	 * is because both failover and two_phase options of the slot on the
+	 * publisher cannot be modified if the slot is currently acquired by the
+	 * apply worker.
+	 */
+	if (sub->enabled)
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("cannot set %s for enabled subscription",
+						option)));
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option);
+
+	/*
+	 * The changed option of the slot can't be rolled back: prevent we are in
+	 * the transaction state.
+	 */
+	PreventInTransactionBlock(isTopLevel, cmd.data);
+
+	pfree(cmd.data);
+}
+
 /*
  * Alter the existing subscription.
  */
@@ -1145,7 +1174,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			{
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+								  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+								  SUBOPT_DISABLE_ON_ERR |
 								  SUBOPT_PASSWORD_REQUIRED |
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
 								  SUBOPT_ORIGIN);
@@ -1229,33 +1259,59 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 				if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
 				{
-					if (!sub->slotname)
+					CheckAlterSubOption(sub, "failover", isTopLevel);
+
+					values[Anum_pg_subscription_subfailover - 1] =
+						BoolGetDatum(opts.failover);
+					replaces[Anum_pg_subscription_subfailover - 1] = true;
+				}
+
+				if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
+				{
+					CheckAlterSubOption(sub, "two_phase", isTopLevel);
+
+					/*
+					 * slot_name and two_phase cannot be altered
+					 * simultaneously. The latter part refers to the pre-set
+					 * slot name and tries to modify the slot option, so
+					 * changing both does not make sense.
+					 */
+					if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
 						ereport(ERROR,
-								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-								 errmsg("cannot set %s for a subscription that does not have a slot name",
-										"failover")));
+								(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("slot_name and two_phase cannot be altered at the same time")));
 
 					/*
-					 * Do not allow changing the failover state if the
-					 * subscription is enabled. This is because the failover
-					 * state of the slot on the publisher cannot be modified
-					 * if the slot is currently acquired by the apply worker.
+					 * Workers may still survive even if the subscription has
+					 * been disabled. They may read the pg_subscription
+					 * catalog and detect that the twophase parameter is
+					 * updated, which causes the assertion failure. Ensure
+					 * workers have already been exited to avoid it.
 					 */
-					if (sub->enabled)
+					if (logicalrep_workers_find(subid, true, true))
 						ereport(ERROR,
 								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-								 errmsg("cannot set %s for enabled subscription",
-										"failover")));
+								 errmsg("cannot alter two_phase when logical replication worker is still running"),
+								 errhint("Wait certain time and try again.")));
 
 					/*
-					 * The changed failover option of the slot can't be rolled
-					 * back.
+					 * two_phase cannot be disabled if there are any
+					 * uncommitted prepared transactions present.
 					 */
-					PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (failover)");
-
-					values[Anum_pg_subscription_subfailover - 1] =
-						BoolGetDatum(opts.failover);
-					replaces[Anum_pg_subscription_subfailover - 1] = true;
+					if (!opts.twophase &&
+						sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
+						LookupGXactBySubid(subid))
+						ereport(ERROR,
+								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								 errmsg("cannot disable two_phase when prepared transactions are present"),
+								 errhint("Resolve these transactions and try again.")));
+
+					/* Change system catalog acoordingly */
+					values[Anum_pg_subscription_subtwophasestate - 1] =
+						CharGetDatum(opts.twophase ?
+									 LOGICALREP_TWOPHASE_STATE_PENDING :
+									 LOGICALREP_TWOPHASE_STATE_DISABLED);
+					replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
 				}
 
 				if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
@@ -1507,7 +1563,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 	 * doing the database operations we won't be able to rollback altered
 	 * slot.
 	 */
-	if (replaces[Anum_pg_subscription_subfailover - 1])
+	if (replaces[Anum_pg_subscription_subfailover - 1] ||
+		replaces[Anum_pg_subscription_subtwophasestate - 1])
 	{
 		bool		must_use_password;
 		char	   *err;
@@ -1528,7 +1585,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 
 		PG_TRY();
 		{
-			walrcv_alter_slot(wrconn, sub->slotname, opts.failover);
+			walrcv_alter_slot(wrconn, sub->slotname, opts.failover, opts.twophase);
 		}
 		PG_FINALLY();
 		{
@@ -1675,9 +1732,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * New workers won't be started because we hold an exclusive lock on the
 	 * subscription till the end of the transaction.
 	 */
-	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-	subworkers = logicalrep_workers_find(subid, false);
-	LWLockRelease(LogicalRepWorkerLock);
+	subworkers = logicalrep_workers_find(subid, false, true);
 	foreach(lc, subworkers)
 	{
 		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 6c42c209d2..1cb601a148 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
 								  CRSSnapshotAction snapshot_action,
 								  XLogRecPtr *lsn);
 static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-								bool failover);
+								bool failover, bool two_phase);
 static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const char *query,
@@ -1121,15 +1121,16 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
  */
 static void
 libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
-					bool failover)
+					bool failover, bool two_phase)
 {
 	StringInfoData cmd;
 	PGresult   *res;
 
 	initStringInfo(&cmd);
-	appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
+	appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s, TWO_PHASE %s )",
 					 quote_identifier(slotname),
-					 failover ? "true" : "false");
+					 failover ? "true" : "false",
+					 two_phase ? "true" : "false");
 
 	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
 	pfree(cmd.data);
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 27c3a91fb7..45744b771f 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -272,12 +272,15 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
  * the subscription, instead of just one.
  */
 List *
-logicalrep_workers_find(Oid subid, bool only_running)
+logicalrep_workers_find(Oid subid, bool only_running, bool require_lock)
 {
 	int			i;
 	List	   *res = NIL;
 
-	Assert(LWLockHeldByMe(LogicalRepWorkerLock));
+	if (require_lock)
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+	else
+		Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 
 	/* Search for attached worker for a given subscription id. */
 	for (i = 0; i < max_logical_replication_workers; i++)
@@ -288,6 +291,9 @@ logicalrep_workers_find(Oid subid, bool only_running)
 			res = lappend(res, w);
 	}
 
+	if (require_lock)
+		LWLockRelease(LogicalRepWorkerLock);
+
 	return res;
 }
 
@@ -759,7 +765,7 @@ logicalrep_worker_detach(void)
 
 		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-		workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true);
+		workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false);
 		foreach(lc, workers)
 		{
 			LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c0bda6269b..6c798cd5b4 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -401,9 +401,6 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
 									   LogicalRepTupleData *newtup,
 									   CmdType operation);
 
-/* Compute GID for two_phase transactions */
-static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
-
 /* Functions for skipping changes */
 static void maybe_start_skipping_changes(XLogRecPtr finish_lsn);
 static void stop_skipping_changes(void);
@@ -3911,7 +3908,7 @@ maybe_reread_subscription(void)
 	/* !slotname should never happen when enabled is true. */
 	Assert(newsub->slotname);
 
-	/* two-phase should not be altered */
+	/* two-phase cannot be altered while the worker exists */
 	Assert(newsub->twophasestate == MySubscription->twophasestate);
 
 	/*
@@ -4396,24 +4393,6 @@ cleanup_subxact_info()
 	subxact_data.nsubxacts_max = 0;
 }
 
-/*
- * Form the prepared transaction GID for two_phase transactions.
- *
- * Return the GID in the supplied buffer.
- */
-static void
-TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid)
-{
-	Assert(subid != InvalidRepOriginId);
-
-	if (!TransactionIdIsValid(xid))
-		ereport(ERROR,
-				(errcode(ERRCODE_PROTOCOL_VIOLATION),
-				 errmsg_internal("invalid two-phase transaction ID")));
-
-	snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid);
-}
-
 /*
  * Common function to run the apply loop with error handling. Disable the
  * subscription, if necessary.
@@ -5014,7 +4993,7 @@ AtEOXact_LogicalRepWorkers(bool isCommit)
 			List	   *workers;
 			ListCell   *lc2;
 
-			workers = logicalrep_workers_find(subid, true);
+			workers = logicalrep_workers_find(subid, true, false);
 			foreach(lc2, workers)
 			{
 				LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index baf9b89dc4..2f167a2adc 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -804,9 +804,12 @@ ReplicationSlotDrop(const char *name, bool nowait)
  * Change the definition of the slot identified by the specified name.
  */
 void
-ReplicationSlotAlter(const char *name, bool failover)
+ReplicationSlotAlter(const char *name, bool *failover, bool *two_phase)
 {
+	bool		update_slot = false;
+
 	Assert(MyReplicationSlot == NULL);
+	Assert(failover || two_phase);
 
 	ReplicationSlotAcquire(name, false);
 
@@ -832,7 +835,7 @@ ReplicationSlotAlter(const char *name, bool failover)
 		 * Do not allow users to enable failover on the standby as we do not
 		 * support sync to the cascading standby.
 		 */
-		if (failover)
+		if (failover && *failover)
 			ereport(ERROR,
 					errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 					errmsg("cannot enable failover for a replication slot"
@@ -843,17 +846,32 @@ ReplicationSlotAlter(const char *name, bool failover)
 	 * Do not allow users to enable failover for temporary slots as we do not
 	 * support syncing temporary slots to the standby.
 	 */
-	if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
+	if (failover && *failover &&
+		MyReplicationSlot->data.persistency == RS_TEMPORARY)
 		ereport(ERROR,
 				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				errmsg("cannot enable failover for a temporary replication slot"));
 
-	if (MyReplicationSlot->data.failover != failover)
+	if (failover && MyReplicationSlot->data.failover != *failover)
+	{
+		SpinLockAcquire(&MyReplicationSlot->mutex);
+		MyReplicationSlot->data.failover = *failover;
+		SpinLockRelease(&MyReplicationSlot->mutex);
+
+		update_slot = true;
+	}
+
+	if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
 	{
 		SpinLockAcquire(&MyReplicationSlot->mutex);
-		MyReplicationSlot->data.failover = failover;
+		MyReplicationSlot->data.two_phase = *two_phase;
 		SpinLockRelease(&MyReplicationSlot->mutex);
 
+		update_slot = true;
+	}
+
+	if (update_slot)
+	{
 		ReplicationSlotMarkDirty();
 		ReplicationSlotSave();
 	}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 2d1a9ec900..f3b5068d95 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1410,22 +1410,34 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
  * Process extra options given to ALTER_REPLICATION_SLOT.
  */
 static void
-ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
+ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd,
+						  bool *failover_given, bool *failover,
+						  bool *two_phase_given, bool *two_phase)
 {
-	bool		failover_given = false;
+	*failover_given = false;
+	*two_phase_given = false;
 
 	/* Parse options */
 	foreach_ptr(DefElem, defel, cmd->options)
 	{
 		if (strcmp(defel->defname, "failover") == 0)
 		{
-			if (failover_given)
+			if (*failover_given)
 				ereport(ERROR,
 						(errcode(ERRCODE_SYNTAX_ERROR),
 						 errmsg("conflicting or redundant options")));
-			failover_given = true;
+			*failover_given = true;
 			*failover = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "two_phase") == 0)
+		{
+			if (*two_phase_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			*two_phase_given = true;
+			*two_phase = defGetBoolean(defel);
+		}
 		else
 			elog(ERROR, "unrecognized option: %s", defel->defname);
 	}
@@ -1437,10 +1449,17 @@ ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover)
 static void
 AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
 {
-	bool		failover = false;
+	bool		failover_given;
+	bool		two_phase_given;
+	bool		failover;
+	bool		two_phase;
+
+	ParseAlterReplSlotOptions(cmd, &failover_given, &failover,
+							  &two_phase_given, &two_phase);
 
-	ParseAlterReplSlotOptions(cmd, &failover);
-	ReplicationSlotAlter(cmd->slotname, failover);
+	ReplicationSlotAlter(cmd->slotname,
+						 failover_given ? &failover : NULL,
+						 two_phase_given ? &two_phase : NULL);
 }
 
 /*
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index d453e224d9..891face1b6 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1948,7 +1948,7 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
 					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit");
+					  "streaming", "synchronous_commit", "two_phase");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 56248c0006..d37e06fdee 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -62,4 +62,9 @@ extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
 extern void restoreTwoPhaseData(void);
 extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 						TimestampTz origin_prepare_timestamp);
+
+extern void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid,
+								   int szgid);
+extern bool LookupGXactBySubid(Oid subid);
+
 #endif							/* TWOPHASE_H */
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index c9675ee87c..cde164472a 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -243,7 +243,8 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
 extern void ReplicationSlotDropAcquired(void);
-extern void ReplicationSlotAlter(const char *name, bool failover);
+extern void ReplicationSlotAlter(const char *name, bool *failover,
+								 bool *two_phase);
 
 extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 12f71fa99b..31fa1257ec 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -372,12 +372,13 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
 /*
  * walrcv_alter_slot_fn
  *
- * Change the definition of a replication slot. Currently, it only supports
- * changing the failover property of the slot.
+ * Change the definition of a replication slot. Currently, it supports
+ * changing the failover and the two_phase property of the slot.
  */
 typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
 									  const char *slotname,
-									  bool failover);
+									  bool failover,
+									  bool two_phase);
 
 /*
  * walrcv_get_backend_pid_fn
@@ -455,8 +456,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
 #define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
 	WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
-#define walrcv_alter_slot(conn, slotname, failover) \
-	WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover)
+#define walrcv_alter_slot(conn, slotname, failover, two_phase) \
+	WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover, two_phase)
 #define walrcv_get_backend_pid(conn) \
 	WalReceiverFunctions->walrcv_get_backend_pid(conn)
 #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 515aefd519..990f5242f9 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -240,7 +240,8 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
 												bool only_running);
-extern List *logicalrep_workers_find(Oid subid, bool only_running);
+extern List *logicalrep_workers_find(Oid subid, bool only_running,
+									 bool require_lock);
 extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
 									 Oid dbid, Oid subid, const char *subname,
 									 Oid userid, Oid relid,
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index 5c2f1ee517..52ccb160fa 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -377,10 +377,7 @@ HINT:  To initiate replication, you must manually create the replication slot, e
  regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
---fail - alter of two_phase option not supported.
-ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
-ERROR:  unrecognized subscription parameter: "two_phase"
--- but can alter streaming when two_phase enabled
+-- We can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
                                                                                                                 List of subscriptions
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 3e5ba4cb8c..a3886d79ca 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -256,10 +256,7 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 
 \dRs+
---fail - alter of two_phase option not supported.
-ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
-
--- but can alter streaming when two_phase enabled
+-- We can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 
 \dRs+
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 9437cd4c3b..4e8f627f7b 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -367,6 +367,81 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
 is($result, qq(2), 'replicated data in subscriber table');
 
+# Clean up
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+
+###############################
+# Disable the subscription and alter it to two_phase = false,
+# then verify that the altered subscription reflects the two_phase option.
+###############################
+
+# Alter subscription two_phase to false
+$node_subscriber->safe_psql('postgres',
+    "ALTER SUBSCRIPTION tap_sub_copy DISABLE;");
+$node_subscriber->poll_query_until('postgres',
+    "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'"
+);
+$node_subscriber->safe_psql(
+	'postgres', "
+    ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = false);
+    ALTER SUBSCRIPTION tap_sub_copy ENABLE;");
+
+# Wait for subscription startup
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy);
+
+# Make sure that the two-phase is disabled on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';"
+);
+is($result, qq(d), 'two-phase should be disabled');
+
+# Now do a prepare on the publisher and make sure that it is not replicated.
+$node_publisher->safe_psql(
+	'postgres', qq{
+    BEGIN;
+    INSERT INTO tab_copy VALUES (100);
+    PREPARE TRANSACTION 'newgid';
+	});
+
+# Wait for the subscriber to catchup
+$node_publisher->wait_for_catchup($appname_copy);
+
+# Make sure there are no prepared transactions on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'should be no prepared transactions on subscriber');
+
+# Now commit the insert and verify that it is replicated
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'newgid';");
+
+# Wait for the subscriber to catchup
+$node_publisher->wait_for_catchup($appname_copy);
+
+# Make sure that the committed transaction is replicated.
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
+is($result, qq(3), 'replicated data in subscriber table');
+
+# Alter subscription two_phase to true
+$node_subscriber->safe_psql('postgres',
+    "ALTER SUBSCRIPTION tap_sub_copy DISABLE;");
+$node_subscriber->poll_query_until('postgres',
+    "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'"
+);
+$node_subscriber->safe_psql(
+	'postgres', "
+    ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = true);
+    ALTER SUBSCRIPTION tap_sub_copy ENABLE;");
+
+# Wait for subscription startup
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy);
+
+# Make sure that the two-phase is enabled on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';"
+);
+is($result, qq(e), 'two-phase should be enabled');
+
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_copy;");
 $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;");
 
@@ -374,8 +449,6 @@ $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;");
 # check all the cleanup
 ###############################
 
-$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
-
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*) FROM pg_subscription");
 is($result, qq(0), 'check subscription was dropped on subscriber');
-- 
2.43.0

