From 3a7125c89de3d6fd6c1081e1100476e99d74c9c1 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 15 Apr 2024 04:58:29 +0000
Subject: [PATCH v3 4/5] Prohibit altering from false to true if there are
 prepared transactions on publisher

---
 doc/src/sgml/ref/alter_subscription.sgml |  9 +--
 src/backend/commands/subscriptioncmds.c  | 73 ++++++++++++++++++++++++
 2 files changed, 78 insertions(+), 4 deletions(-)

diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml
index 4f33769858..12d6ca2f5e 100644
--- a/doc/src/sgml/ref/alter_subscription.sgml
+++ b/doc/src/sgml/ref/alter_subscription.sgml
@@ -254,10 +254,11 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
      </para>
 
      <para>
-      <literal>two_phase</literal> can be altered only for disabled
-      subscriptions.  Prepared transactions done by the logical replication
-      worker must not be existed. If found, the <command>ALTER
-      SUBSCRIPTION</command> command will fail.
+      On the publisher side, any prepared transactions must not exist.  On the
+      subscriber side, prepared transactions done by the logical replication
+      worker must not exist. Prepared transactions done by users are allowed.
+      The <command>ALTER SUBSCRIPTION</command> command will fail if they are
+      found.
      </para>
     </listitem>
    </varlistentry>
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 563c757be5..57d2e615c6 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -110,6 +110,7 @@ static void check_publications_origin(WalReceiverConn *wrconn,
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
+static bool IsPreparedTransactionExistsOnPublisher(Subscription *sub);
 
 
 /*
@@ -1202,6 +1203,24 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
 								 errmsg("cannot alter %s to false if there are prepared transactions by the subscription",
 										"two_phase")));
 
+					/*
+					 * Suppose the two_phase is altering from false to true,
+					 * and there have been prepared transactions on the
+					 * publisher. In that case, only the COMMIT PREPARED
+					 * record may be decoded and sent to the subscriber. It
+					 * occurs because confirmed_flush_lsn can be ahead of the
+					 * PREPARE record, so decoding all the transactions might
+					 * be skipped after enabling the subscription.
+					 *
+					 * We prohibit the existing prepared transactions on the
+					 * publisher to avoid the issue.
+					 */
+					else if (opts.twophase &&
+							 IsPreparedTransactionExistsOnPublisher(sub))
+						ereport(ERROR,
+								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								 errmsg("cannot alter %s to true if there are prepared transactions on publisher",
+										"two_phase")));
 
 					/* Change system catalog acoordingly */
 					values[Anum_pg_subscription_subtwophasestate - 1] =
@@ -2489,3 +2508,57 @@ defGetStreamingMode(DefElem *def)
 					def->defname)));
 	return LOGICALREP_STREAM_OFF;	/* keep compiler quiet */
 }
+
+/*
+ * Check whether there are prepared transactions on the publisher node. Returns
+ * true if exists, otherwise false.
+ */
+static bool
+IsPreparedTransactionExistsOnPublisher(Subscription *sub)
+{
+	bool		must_use_password;
+	bool		found = false;
+	WalReceiverConn *wrconn;
+	WalRcvExecResult *res;
+	char	   *err;
+	StringInfo	cmd;
+	Oid			tableRow[1] = {INT4OID};
+
+	/* Load the library providing us libpq calls. */
+	load_file("libpqwalreceiver", false);
+	/* Try to connect to the publisher. */
+	must_use_password = (!superuser_arg(GetUserId()) &&
+						 sub->passwordrequired);
+	wrconn = walrcv_connect(sub->conninfo, true, true,
+							must_use_password,
+							sub->name, &err);
+	if (!wrconn)
+		ereport(ERROR,
+				(errcode(ERRCODE_CONNECTION_FAILURE),
+				 errmsg("could not connect to the publisher: %s", err)));
+
+	/* Construct a query and execute it */
+	cmd = makeStringInfo();
+	appendStringInfo(cmd,
+					 "SELECT 1 FROM pg_prepared_xacts WHERE database = '%s'",
+					 get_database_name(sub->dbid));
+
+	res = walrcv_exec(wrconn, cmd->data, 1, tableRow);
+	destroyStringInfo(cmd);
+
+	if (res->status != WALRCV_OK_TUPLES)
+		ereport(ERROR,
+				errmsg("could not fetch number of prepared transactions from the primary server: %s",
+					   res->err));
+
+	/*
+	 * We are only interested in the existence of prepared transactions.
+	 * Hence, it is sufficient to check the number of returned rows.
+	 */
+	if (tuplestore_tuple_count(res->tuplestore) > 1)
+		found = true;
+
+	walrcv_clear_result(res);
+
+	return found;
+}
-- 
2.43.0

