From e5c865867983282814c60723499546913fd2d8c1 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <ajinc@fast.au.fujitsu.com>
Date: Fri, 5 Apr 2024 06:47:18 -0400
Subject: [PATCH 1/4] Allow altering of two_phase option of a SUBSCRIPTION

This patch allows user to alter two_phase option of a subscriber provided no uncommitted
prepared transactions are pending on that subscription.

Author: Cherian Ajin, Hayato Kuroda
---
 src/backend/access/transam/twophase.c         | 43 ++++++++++
 src/backend/commands/subscriptioncmds.c       | 83 ++++++++++++++++---
 src/backend/nodes/gen_node_support.pl         |  2 +-
 .../libpqwalreceiver/libpqwalreceiver.c       | 30 +++++++
 src/backend/replication/logical/launcher.c    | 21 +++++
 src/backend/replication/logical/worker.c      |  2 +-
 src/backend/replication/repl_gram.y           | 20 ++++-
 src/backend/replication/repl_scanner.l        |  2 +
 src/backend/replication/slot.c                | 25 ++++++
 src/backend/replication/walsender.c           | 47 +++++++++++
 src/bin/psql/tab-complete.c                   |  2 +-
 src/include/access/twophase.h                 |  3 +
 src/include/nodes/replnodes.h                 | 12 +++
 src/include/replication/slot.h                |  2 +-
 src/include/replication/walreceiver.h         | 10 +++
 src/include/replication/worker_internal.h     |  1 +
 src/test/regress/expected/subscription.out    |  5 +-
 src/test/regress/sql/subscription.sql         |  5 +-
 src/test/subscription/t/021_twophase.pl       | 67 ++++++++++++++-
 19 files changed, 354 insertions(+), 28 deletions(-)

diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index c6af8cfd7e..2148daba3c 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2658,3 +2658,46 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 	LWLockRelease(TwoPhaseStateLock);
 	return found;
 }
+
+/*
+ * checkGid
+ */
+static bool
+checkGid(char *gid, Oid subid)
+{
+	int			ret;
+	Oid			subid_written,
+				xid;
+
+	ret = sscanf(gid, "pg_gid_%u_%u", &subid_written, &xid);
+
+	if (ret != 2 || subid != subid_written)
+		return false;
+
+	return true;
+}
+
+/*
+ * LookupGXactBySubid
+ *		Check if the prepared transaction done by apply worker exists.
+ */
+bool
+LookupGXactBySubid(Oid subid)
+{
+	bool		found = false;
+
+	LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+	for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
+	{
+		GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+
+		/* Ignore not-yet-valid GIDs. */
+		if (gxact->valid && checkGid(gxact->gid, subid))
+		{
+			found = true;
+			break;
+		}
+	}
+	LWLockRelease(TwoPhaseStateLock);
+	return found;
+}
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6fe111e98d..59852fd9af 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -16,6 +16,7 @@
 
 #include "access/htup_details.h"
 #include "access/table.h"
+#include "access/twophase.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/dependency.h"
@@ -1130,13 +1131,49 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 			{
 				supported_opts = (SUBOPT_SLOT_NAME |
 								  SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
-								  SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
+								  SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
+								  SUBOPT_DISABLE_ON_ERR |
 								  SUBOPT_PASSWORD_REQUIRED |
 								  SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
 
 				parse_subscription_options(pstate, stmt->options,
 										   supported_opts, &opts);
 
+				/* XXX */
+				if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT))
+				{
+					/*
+					 * two_phase can be only changed for disabled
+					 * subscriptions
+					 */
+					if (form->subenabled)
+						ereport(ERROR,
+								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								 errmsg("cannot set %s for enabled subscription",
+										"two_phase")));
+
+					/*
+					 * Stop all the subscription workers, just in case. Workers
+					 * may still survive even if the subscription is disabled.
+					 */
+					logicalrep_workers_stop(subid);
+
+					/* Check whether the number of prepared transactions */
+					if (!opts.twophase &&
+						form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED &&
+						LookupGXactBySubid(subid))
+						ereport(ERROR,
+								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								 errmsg("cannot disable two_phase when uncommitted prepared transactions present")));
+
+					/* Change system catalog acoordingly */
+					values[Anum_pg_subscription_subtwophasestate - 1] =
+						CharGetDatum(opts.twophase ?
+									 LOGICALREP_TWOPHASE_STATE_PENDING :
+									 LOGICALREP_TWOPHASE_STATE_DISABLED);
+					replaces[Anum_pg_subscription_subtwophasestate - 1] = true;
+				}
+
 				if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
 				{
 					/*
@@ -1453,6 +1490,38 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 		heap_freetuple(tup);
 	}
 
+	/*
+	 * Try to acquire the connection necessary for altering slot.
+	 */
+	if (replaces[Anum_pg_subscription_subtwophasestate - 1])
+	{
+		bool		must_use_password;
+		char	   *err;
+		WalReceiverConn *wrconn;
+
+		/* Load the library providing us libpq calls. */
+		load_file("libpqwalreceiver", false);
+
+		/* Try to connect to the publisher */
+		must_use_password = (!superuser() && opts.passwordrequired);
+		wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
+								sub->name, &err);
+		if (!wrconn)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONNECTION_FAILURE),
+					 errmsg("could not connect to the publisher: %s", err)));
+
+		PG_TRY();
+		{
+			walrcv_alter_slot(wrconn, sub->slotname, opts.twophase);
+		}
+		PG_FINALLY();
+		{
+			walrcv_disconnect(wrconn);
+		}
+		PG_END_TRY();
+	}
+
 	table_close(rel, RowExclusiveLock);
 
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
@@ -1481,7 +1550,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	char	   *subname;
 	char	   *conninfo;
 	char	   *slotname;
-	List	   *subworkers;
 	ListCell   *lc;
 	char		originname[NAMEDATALEN];
 	char	   *err = NULL;
@@ -1591,16 +1659,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
 	 * New workers won't be started because we hold an exclusive lock on the
 	 * subscription till the end of the transaction.
 	 */
-	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-	subworkers = logicalrep_workers_find(subid, false);
-	LWLockRelease(LogicalRepWorkerLock);
-	foreach(lc, subworkers)
-	{
-		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
-
-		logicalrep_worker_stop(w->subid, w->relid);
-	}
-	list_free(subworkers);
+	logicalrep_workers_stop(subid);
 
 	/*
 	 * Remove the no-longer-useful entry in the launcher's table of apply
diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl
index d67565b925..fcba6f986f 100644
--- a/src/backend/nodes/gen_node_support.pl
+++ b/src/backend/nodes/gen_node_support.pl
@@ -107,7 +107,7 @@ my @nodetag_only_files = qw(
 # ABI stability during development.
 
 my $last_nodetag = 'WindowObjectData';
-my $last_nodetag_no = 454;
+my $last_nodetag_no = 455;
 
 # output file names
 my @output_files;
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index b4038e114d..815ab8f9df 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -75,6 +75,8 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn,
 								  bool two_phase,
 								  CRSSnapshotAction snapshot_action,
 								  XLogRecPtr *lsn);
+static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
+								bool two_phase);
 static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
 static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
 									   const char *query,
@@ -95,6 +97,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	.walrcv_receive = libpqrcv_receive,
 	.walrcv_send = libpqrcv_send,
 	.walrcv_create_slot = libpqrcv_create_slot,
+	.walrcv_alter_slot = libpqrcv_alter_slot,
 	.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
 	.walrcv_exec = libpqrcv_exec,
 	.walrcv_disconnect = libpqrcv_disconnect
@@ -1039,6 +1042,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
 	return snapshot;
 }
 
+/*
+ * Change the definition of the replication slot.
+ */
+static void
+libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
+					bool two_phase)
+{
+	StringInfoData cmd;
+	PGresult   *res;
+
+	initStringInfo(&cmd);
+	appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( TWO_PHASE %s )",
+					 quote_identifier(slotname),
+					 two_phase ? "true" : "false");
+
+	res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+	pfree(cmd.data);
+
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		ereport(ERROR,
+				(errcode(ERRCODE_PROTOCOL_VIOLATION),
+				 errmsg("could not alter replication slot \"%s\": %s",
+						slotname, pchomp(PQerrorMessage(conn->streamConn)))));
+
+	PQclear(res);
+}
+
 /*
  * Return PID of remote backend process.
  */
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 8395ae7b23..295fe0f2a0 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -608,6 +608,27 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 	LWLockRelease(LogicalRepWorkerLock);
 }
 
+/*
+ * Stop all the subscription workers.
+ */
+void
+logicalrep_workers_stop(Oid subid)
+{
+	List	   *subworkers;
+	ListCell   *lc;
+
+	LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+	subworkers = logicalrep_workers_find(subid, false);
+	LWLockRelease(LogicalRepWorkerLock);
+	foreach(lc, subworkers)
+	{
+		LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
+
+		logicalrep_worker_stop(w->subid, w->relid);
+	}
+	list_free(subworkers);
+}
+
 /*
  * Stop the given logical replication parallel apply worker.
  *
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 832b1cf764..7da6d546dd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3926,7 +3926,7 @@ maybe_reread_subscription(void)
 	/* !slotname should never happen when enabled is true. */
 	Assert(newsub->slotname);
 
-	/* two-phase should not be altered */
+	/* two-phase should not be altered while the worker exists */
 	Assert(newsub->twophasestate == MySubscription->twophasestate);
 
 	/*
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 0c874e33cf..bff501263e 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -64,6 +64,7 @@ Node *replication_parse_result;
 %token K_START_REPLICATION
 %token K_CREATE_REPLICATION_SLOT
 %token K_DROP_REPLICATION_SLOT
+%token K_ALTER_REPLICATION_SLOT
 %token K_TIMELINE_HISTORY
 %token K_WAIT
 %token K_TIMELINE
@@ -79,8 +80,9 @@ Node *replication_parse_result;
 
 %type <node>	command
 %type <node>	base_backup start_replication start_logical_replication
-				create_replication_slot drop_replication_slot identify_system
-				read_replication_slot timeline_history show
+				create_replication_slot drop_replication_slot
+				alter_replication_slot identify_system read_replication_slot
+				timeline_history show
 %type <list>	generic_option_list
 %type <defelt>	generic_option
 %type <uintval>	opt_timeline
@@ -111,6 +113,7 @@ command:
 			| start_logical_replication
 			| create_replication_slot
 			| drop_replication_slot
+			| alter_replication_slot
 			| read_replication_slot
 			| timeline_history
 			| show
@@ -257,6 +260,18 @@ drop_replication_slot:
 				}
 			;
 
+/* ALTER_REPLICATION_SLOT slot (options) */
+alter_replication_slot:
+			K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')'
+				{
+					AlterReplicationSlotCmd *cmd;
+					cmd = makeNode(AlterReplicationSlotCmd);
+					cmd->slotname = $2;
+					cmd->options = $4;
+					$$ = (Node *) cmd;
+				}
+			;
+
 /*
  * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d]
  */
@@ -399,6 +414,7 @@ ident_or_keyword:
 			| K_START_REPLICATION			{ $$ = "start_replication"; }
 			| K_CREATE_REPLICATION_SLOT	{ $$ = "create_replication_slot"; }
 			| K_DROP_REPLICATION_SLOT		{ $$ = "drop_replication_slot"; }
+			| K_ALTER_REPLICATION_SLOT		{ $$ = "alter_replication_slot"; }
 			| K_TIMELINE_HISTORY			{ $$ = "timeline_history"; }
 			| K_WAIT						{ $$ = "wait"; }
 			| K_TIMELINE					{ $$ = "timeline"; }
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index cb467ca46f..6396463cf9 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -125,6 +125,7 @@ TIMELINE			{ return K_TIMELINE; }
 START_REPLICATION	{ return K_START_REPLICATION; }
 CREATE_REPLICATION_SLOT		{ return K_CREATE_REPLICATION_SLOT; }
 DROP_REPLICATION_SLOT		{ return K_DROP_REPLICATION_SLOT; }
+ALTER_REPLICATION_SLOT		{ return K_ALTER_REPLICATION_SLOT; }
 TIMELINE_HISTORY	{ return K_TIMELINE_HISTORY; }
 PHYSICAL			{ return K_PHYSICAL; }
 RESERVE_WAL			{ return K_RESERVE_WAL; }
@@ -301,6 +302,7 @@ replication_scanner_is_replication_command(void)
 		case K_START_REPLICATION:
 		case K_CREATE_REPLICATION_SLOT:
 		case K_DROP_REPLICATION_SLOT:
+		case K_ALTER_REPLICATION_SLOT:
 		case K_READ_REPLICATION_SLOT:
 		case K_TIMELINE_HISTORY:
 		case K_SHOW:
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 1f9d0ed9b3..04915590d2 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -648,6 +648,31 @@ ReplicationSlotDrop(const char *name, bool nowait)
 	ReplicationSlotDropAcquired();
 }
 
+/*
+ * Change the definition of the slot identified by the specified name.
+ */
+void
+ReplicationSlotAlter(const char *name, bool two_phase)
+{
+	Assert(MyReplicationSlot == NULL);
+
+	ReplicationSlotAcquire(name, false);
+
+	if (SlotIsPhysical(MyReplicationSlot))
+		ereport(ERROR,
+				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("cannot use %s with a physical replication slot",
+					   "ALTER_REPLICATION_SLOT"));
+
+	SpinLockAcquire(&MyReplicationSlot->mutex);
+	MyReplicationSlot->data.two_phase = two_phase;
+	SpinLockRelease(&MyReplicationSlot->mutex);
+
+	ReplicationSlotMarkDirty();
+	ReplicationSlotSave();
+	ReplicationSlotRelease();
+}
+
 /*
  * Permanently drop the currently acquired replication slot.
  */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 4c53de08b9..4c18dadaad 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1247,6 +1247,46 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd)
 	ReplicationSlotDrop(cmd->slotname, !cmd->wait);
 }
 
+/*
+ * Process extra options given to ALTER_REPLICATION_SLOT.
+ */
+static void
+ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *two_phase)
+{
+	bool		two_phase_given = false;
+	ListCell   *lc;
+
+	/* Parse options */
+	foreach(lc, cmd->options)
+	{
+		DefElem    *defel = (DefElem *) lfirst(lc);
+
+		if (strcmp(defel->defname, "two_phase") == 0)
+		{
+			if (two_phase_given)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			two_phase_given = true;
+			*two_phase = defGetBoolean(defel);
+		}
+		else
+			elog(ERROR, "unrecognized option: %s", defel->defname);
+	}
+}
+
+/*
+ * Change the definition of a replication slot.
+ */
+static void
+AlterReplicationSlot(AlterReplicationSlotCmd *cmd)
+{
+	bool		two_phase = false;
+
+	ParseAlterReplSlotOptions(cmd, &two_phase);
+	ReplicationSlotAlter(cmd->slotname, two_phase);
+}
+
 /*
  * Load previously initiated logical slot and prepare for sending data (via
  * WalSndLoop).
@@ -1820,6 +1860,13 @@ exec_replication_command(const char *cmd_string)
 			EndReplicationCommand(cmdtag);
 			break;
 
+		case T_AlterReplicationSlotCmd:
+			cmdtag = "ALTER_REPLICATION_SLOT";
+			set_ps_display(cmdtag);
+			AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node);
+			EndReplicationCommand(cmdtag);
+			break;
+
 		case T_StartReplicationCmd:
 			{
 				StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index a1aa946b30..b689b6906a 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -1927,7 +1927,7 @@ psql_completion(const char *text, int start, int end)
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "("))
 		COMPLETE_WITH("binary", "disable_on_error", "origin",
 					  "password_required", "run_as_owner", "slot_name",
-					  "streaming", "synchronous_commit");
+					  "streaming", "synchronous_commit", "two_phase");
 	/* ALTER SUBSCRIPTION <name> SKIP ( */
 	else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "("))
 		COMPLETE_WITH("lsn");
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 21e2af7387..dac3f27bc3 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -62,4 +62,7 @@ extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
 extern void restoreTwoPhaseData(void);
 extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
 						TimestampTz origin_prepare_timestamp);
+
+extern bool LookupGXactBySubid(Oid subid);
+
 #endif							/* TWOPHASE_H */
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index 4321ba8f86..5819276d19 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -72,6 +72,18 @@ typedef struct DropReplicationSlotCmd
 } DropReplicationSlotCmd;
 
 
+/* ----------------------
+ *		ALTER_REPLICATION_SLOT command
+ * ----------------------
+ */
+typedef struct AlterReplicationSlotCmd
+{
+	NodeTag		type;
+	char	   *slotname;
+	List	   *options;
+} AlterReplicationSlotCmd;
+
+
 /* ----------------------
  *		START_REPLICATION command
  * ----------------------
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a8a89dc784..24ab5117bd 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -214,7 +214,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
 								  bool two_phase);
 extern void ReplicationSlotPersist(void);
 extern void ReplicationSlotDrop(const char *name, bool nowait);
-
+extern void ReplicationSlotAlter(const char *name, bool two_phase);
 extern void ReplicationSlotAcquire(const char *name, bool nowait);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(void);
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 281626fa6f..8ba1599fad 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -359,6 +359,13 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
 										CRSSnapshotAction snapshot_action,
 										XLogRecPtr *lsn);
 
+/*
+ * walrcv_alter_slot_fn
+ */
+typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
+									  const char *slotname,
+									  bool two_phase);
+
 /*
  * walrcv_get_backend_pid_fn
  *
@@ -400,6 +407,7 @@ typedef struct WalReceiverFunctionsType
 	walrcv_receive_fn walrcv_receive;
 	walrcv_send_fn walrcv_send;
 	walrcv_create_slot_fn walrcv_create_slot;
+	walrcv_alter_slot_fn walrcv_alter_slot;
 	walrcv_get_backend_pid_fn walrcv_get_backend_pid;
 	walrcv_exec_fn walrcv_exec;
 	walrcv_disconnect_fn walrcv_disconnect;
@@ -431,6 +439,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
 #define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
 	WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
+#define walrcv_alter_slot(conn, slotname, two_phase) \
+	WalReceiverFunctions->walrcv_alter_slot(conn, slotname, two_phase)
 #define walrcv_get_backend_pid(conn) \
 	WalReceiverFunctions->walrcv_get_backend_pid(conn)
 #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 343e781896..3f19ae4f5f 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -235,6 +235,7 @@ extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
 									 Oid userid, Oid relid,
 									 dsm_handle subworker_dsm);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_workers_stop(Oid subid);
 extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out
index b15eddbff3..51c466f42e 100644
--- a/src/test/regress/expected/subscription.out
+++ b/src/test/regress/expected/subscription.out
@@ -377,10 +377,7 @@ HINT:  To initiate replication, you must manually create the replication slot, e
  regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | p                | f                | any    | t                 | f             | off                | dbname=regress_doesnotexist | 0/0
 (1 row)
 
---fail - alter of two_phase option not supported.
-ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
-ERROR:  unrecognized subscription parameter: "two_phase"
--- but can alter streaming when two_phase enabled
+-- We can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
                                                                                                            List of subscriptions
diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql
index 444e563ff3..b7764c1074 100644
--- a/src/test/regress/sql/subscription.sql
+++ b/src/test/regress/sql/subscription.sql
@@ -256,10 +256,7 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB
 CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true);
 
 \dRs+
---fail - alter of two_phase option not supported.
-ALTER SUBSCRIPTION regress_testsub SET (two_phase = false);
-
--- but can alter streaming when two_phase enabled
+-- We can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 
 \dRs+
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 8ce4cfc983..b6ce59763a 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -367,6 +367,71 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
 is($result, qq(2), 'replicated data in subscriber table');
 
+# Disable the subscription and alter it to two_phase = false,
+# verify that the altered subscription reflects the two_phase option.
+
+# Alter subscription two_phase to false
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_copy DISABLE");
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = false)");
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_copy ENABLE");
+
+# Wait for subscription startup
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy);
+
+# Make sure that the two-phase is disabled on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+       "SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';"
+);
+is($result, qq(d), 'two-phase is disabled');
+
+# Now do a prepare on publisher and make sure that it is not replicated.
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_publisher->safe_psql(
+       'postgres', qq{
+    BEGIN;
+    INSERT INTO tab_copy VALUES (100);
+    PREPARE TRANSACTION 'newgid';
+	});
+
+# Wait for the subscriber to catchup
+$node_publisher->wait_for_catchup($appname_copy);
+
+# Make sure that there is 0 prepared transaction on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM pg_prepared_xacts;");
+is($result, qq(0), 'transaction is prepared on subscriber');
+
+# Now commit the insert and verify that it IS replicated
+$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'newgid';");
+
+# Wait for the subscriber to catchup
+$node_publisher->wait_for_catchup($appname_copy);
+
+# Made sure that the commited transaction is replicated.
+$result =
+	$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;");
+is($result, qq(3), 'replicated data in subscriber table');
+
+# Alter subscription two_phase to true
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_copy DISABLE");
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = true)");
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_copy ENABLE");
+
+# Wait for subscription startup
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy);
+
+# Make sure that the two-phase is enabled on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';"
+);
+is($result, qq(e), 'two-phase is disabled');
+
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_copy;");
 $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;");
 
@@ -374,8 +439,6 @@ $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;");
 # check all the cleanup
 ###############################
 
-$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
-
 $result = $node_subscriber->safe_psql('postgres',
 	"SELECT count(*) FROM pg_subscription");
 is($result, qq(0), 'check subscription was dropped on subscriber');
-- 
2.43.0

