From 7a37e711a1dd3a782ba7b7486e7d5845e7ccc61b Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Fri, 22 Nov 2024 12:03:13 +0530
Subject: [PATCH v7] Add support for two-phase commit in pg_createsubscriber

This patch introduces the '--enable-two-phase' option to the
'pg_createsubscriber' utility, allowing users to enable two-phase commit for
all subscriptions during their creation.

By default, two-phase commit is disabled if the option is not provided.

When two-phase commit is enabled, prepared transactions are sent to the
subscriber at the time of 'PREPARE TRANSACTION', and they are processed as
two-phase transactions on the subscriber as well. If disabled, prepared
transactions are sent only when committed and are processed immediately by the
subscriber.

Documentation has been updated to reflect the new option, and test cases have
been added to validate various scenarios, including proper validation of the
'--enable-two-phase' option and its combinations with other options.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     | 30 +++++++++++-----
 src/bin/pg_basebackup/pg_createsubscriber.c   | 35 +++++++++++++------
 .../t/040_pg_createsubscriber.pl              |  9 ++++-
 3 files changed, 54 insertions(+), 20 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index df1a92b4da..7cae5cbe70 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -161,6 +161,19 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>-T</option></term>
+     <term><option>--enable-two-phase</option></term>
+     <listitem>
+      <para>
+       Enables <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
+       commit for the subscription. If there are multiple subscriptions
+       specified, this option applies to all of them.
+       The default is <literal>false</literal>.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-U <replaceable class="parameter">username</replaceable></option></term>
      <term><option>--subscriber-username=<replaceable class="parameter">username</replaceable></option></term>
@@ -296,7 +309,9 @@ PostgreSQL documentation
     greater than or equal to the number of specified databases.  The target
     server must have <xref linkend="guc-max-worker-processes"/> configured to a
     value greater than the number of specified databases.  The target server
-    must accept local connections.
+    must accept local connections. If you are planning to use the
+    <option>--enable-two-phase</option> then you will also need to set the
+    <xref linkend="guc-max-prepared-transactions"/> appropriately.
    </para>
 
    <para>
@@ -356,14 +371,13 @@ PostgreSQL documentation
    </para>
 
    <para>
-    <application>pg_createsubscriber</application> sets up logical
-    replication with two-phase commit disabled.  This means that any
-    prepared transactions will be replicated at the time
-    of <command>COMMIT PREPARED</command>, without advance preparation.
-    Once setup is complete, you can manually drop and re-create the
-    subscription(s) with
+    If <option>--enable-two-phase</option> is not specified, the
+    <application>pg_createsubscriber</application> sets up logical replication
+    with two-phase commit disabled.  This means that any prepared transactions
+    will be replicated at the time of <command>COMMIT PREPARED</command>,
+    without advance preparation. Once setup is complete, you can manually drop
+    and re-create the subscription(s) with
     the <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
-    option enabled.
    </para>
 
    <para>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index e96370a9ec..f5d9fbc2e8 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -38,6 +38,7 @@ struct CreateSubscriberOptions
 	char	   *socket_dir;		/* directory for Unix-domain socket, if any */
 	char	   *sub_port;		/* subscriber port number */
 	const char *sub_username;	/* subscriber username */
+	bool		two_phase;		/* two-phase option */
 	SimpleStringList database_names;	/* list of database names */
 	SimpleStringList pub_names; /* list of publication names */
 	SimpleStringList sub_names; /* list of subscription names */
@@ -79,7 +80,7 @@ static void check_publisher(const struct LogicalRepInfo *dbinfo);
 static char *setup_publisher(struct LogicalRepInfo *dbinfo);
 static void check_subscriber(const struct LogicalRepInfo *dbinfo);
 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
-							 const char *consistent_lsn);
+							 const char *consistent_lsn, bool two_phase);
 static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
 						   const char *lsn);
 static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
@@ -98,7 +99,9 @@ static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
 static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
-static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
+static void create_subscription(PGconn *conn,
+								const struct LogicalRepInfo *dbinfo,
+								bool two_phase);
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
 									 const char *lsn);
 static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
@@ -227,6 +230,7 @@ usage(void)
 	printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
 	printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
 	printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
+	printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
 	printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
 	printf(_("  -v, --verbose                   output verbose messages\n"));
 	printf(_("      --config-file=FILENAME      use specified main server configuration\n"
@@ -479,9 +483,10 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 					 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
 					 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
 					 dbinfo[i].pubconninfo);
-		pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
+		pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
 					 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
-					 dbinfo[i].subconninfo);
+					 dbinfo[i].subconninfo,
+					 opt->two_phase ? "true" : "false");
 
 		if (num_pubs > 0)
 			pubcell = pubcell->next;
@@ -1138,7 +1143,8 @@ check_and_drop_existing_subscriptions(PGconn *conn,
  * replication setup.
  */
 static void
-setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
+setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn,
+				 bool two_phase)
 {
 	for (int i = 0; i < num_dbs; i++)
 	{
@@ -1162,7 +1168,7 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 		 */
 		drop_publication(conn, &dbinfo[i]);
 
-		create_subscription(conn, &dbinfo[i]);
+		create_subscription(conn, &dbinfo[i], two_phase);
 
 		/* Set the replication progress to the correct LSN */
 		set_replication_progress(conn, &dbinfo[i], consistent_lsn);
@@ -1677,7 +1683,8 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
  * initial location.
  */
 static void
-create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
+create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo,
+					bool two_phase)
 {
 	PQExpBuffer str = createPQExpBuffer();
 	PGresult   *res;
@@ -1699,8 +1706,9 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
 	appendPQExpBuffer(str,
 					  "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
 					  "WITH (create_slot = false, enabled = false, "
-					  "slot_name = %s, copy_data = false)",
-					  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
+					  "slot_name = %s, copy_data = false, two_phase = %s)",
+					  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
+					  two_phase ? "true" : "false");
 
 	pg_free(pubname_esc);
 	pg_free(subname_esc);
@@ -1872,6 +1880,7 @@ main(int argc, char **argv)
 		{"publisher-server", required_argument, NULL, 'P'},
 		{"socketdir", required_argument, NULL, 's'},
 		{"recovery-timeout", required_argument, NULL, 't'},
+		{"enable-two-phase", no_argument, NULL, 'T'},
 		{"subscriber-username", required_argument, NULL, 'U'},
 		{"verbose", no_argument, NULL, 'v'},
 		{"version", no_argument, NULL, 'V'},
@@ -1927,6 +1936,7 @@ main(int argc, char **argv)
 	opt.socket_dir = NULL;
 	opt.sub_port = DEFAULT_SUB_PORT;
 	opt.sub_username = NULL;
+	opt.two_phase = false;
 	opt.database_names = (SimpleStringList)
 	{
 		0
@@ -1949,7 +1959,7 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -1986,6 +1996,9 @@ main(int argc, char **argv)
 			case 't':
 				opt.recovery_timeout = atoi(optarg);
 				break;
+			case 'T':
+				opt.two_phase = true;
+				break;
 			case 'U':
 				opt.sub_username = pg_strdup(optarg);
 				break;
@@ -2229,7 +2242,7 @@ main(int argc, char **argv)
 	 * point to the LSN reported by setup_publisher().  It also cleans up
 	 * publications created by this tool and replication to the standby.
 	 */
-	setup_subscriber(dbinfo, consistent_lsn);
+	setup_subscriber(dbinfo, consistent_lsn, opt.two_phase);
 
 	/* Remove primary_slot_name if it exists on primary */
 	drop_primary_replication_slot(dbinfo, primary_slot_name);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 0a900edb65..b4c2156c5b 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -357,6 +357,7 @@ command_ok(
 	'run pg_createsubscriber without --databases');
 
 # Run pg_createsubscriber on node S
+# In passing, also test the --enable-two-phase option
 command_ok(
 	[
 		'pg_createsubscriber', '--verbose',
@@ -371,7 +372,7 @@ command_ok(
 		'replslot1', '--replication-slot',
 		'replslot2', '--database',
 		$db1, '--database',
-		$db2
+		$db2, '--enable-two-phase'
 	],
 	'run pg_createsubscriber on node S');
 
@@ -390,6 +391,12 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 # Start subscriber
 $node_s->start;
 
+# Verify that all subtwophase states are pending or enabled,
+# e.g. there are no subscriptions where subtwophase is disabled ('d')
+$node_s->poll_query_until('postgres',
+	"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate IN ('d');"
+);
+
 # Confirm the pre-existing subscription has been removed
 $result = $node_s->safe_psql(
 	'postgres', qq(
-- 
2.34.1

