From 9d35e48c64911880ec7c1fb3c7e3fdc47c923854 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Fri, 22 Nov 2024 12:03:13 +0530
Subject: [PATCH v5] Add support for two-phase commit in pg_createsubscriber

This patch introduces the '--enable-two-phase' option to the
'pg_createsubscriber' utility, allowing users to enable two-phase commit for
all subscriptions during their creation.

By default, two-phase commit is disabled if the option is not provided.

When two-phase commit is enabled, prepared transactions are sent to the
subscriber at the time of 'PREPARE TRANSACTION', and they are processed as
two-phase transactions on the subscriber as well. If disabled, prepared
transactions are sent only when committed and are processed immediately by the
subscriber.

Documentation has been updated to reflect the new option, and test cases have
been added to validate various scenarios, including proper validation of the
'--enable-two-phase' option and its combinations with other options.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     | 30 +++++++++++-----
 src/bin/pg_basebackup/pg_createsubscriber.c   | 35 +++++++++++++------
 .../t/040_pg_createsubscriber.pl              | 24 +++++++++++--
 3 files changed, 67 insertions(+), 22 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index df1a92b4da..d92b43e7ae 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -161,6 +161,19 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>-T</option></term>
+     <term><option>--enable-two-phase</option></term>
+     <listitem>
+      <para>
+       Enables <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
+       commit for the subscription. If there are multiple subscriptions
+       specified, this option applies to all of them.
+       The default is <literal>false</literal>.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-U <replaceable class="parameter">username</replaceable></option></term>
      <term><option>--subscriber-username=<replaceable class="parameter">username</replaceable></option></term>
@@ -296,7 +309,9 @@ PostgreSQL documentation
     greater than or equal to the number of specified databases.  The target
     server must have <xref linkend="guc-max-worker-processes"/> configured to a
     value greater than the number of specified databases.  The target server
-    must accept local connections.
+    must accept local connections. If you are planning to use the
+    --enable-two-phase switch then you will also need to set the
+    <xref linkend="guc-max-prepared-transactions"/> appropriately.
    </para>
 
    <para>
@@ -356,14 +371,13 @@ PostgreSQL documentation
    </para>
 
    <para>
-    <application>pg_createsubscriber</application> sets up logical
-    replication with two-phase commit disabled.  This means that any
-    prepared transactions will be replicated at the time
-    of <command>COMMIT PREPARED</command>, without advance preparation.
-    Once setup is complete, you can manually drop and re-create the
-    subscription(s) with
+    If --enable-two-phase switch is not specified, the
+    <application>pg_createsubscriber</application> sets up logical replication
+    with two-phase commit disabled.  This means that any prepared transactions
+    will be replicated at the time of <command>COMMIT PREPARED</command>,
+    without advance preparation. Once setup is complete, you can manually drop
+    and re-create the subscription(s) with
     the <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
-    option enabled.
    </para>
 
    <para>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index e96370a9ec..f5d9fbc2e8 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -38,6 +38,7 @@ struct CreateSubscriberOptions
 	char	   *socket_dir;		/* directory for Unix-domain socket, if any */
 	char	   *sub_port;		/* subscriber port number */
 	const char *sub_username;	/* subscriber username */
+	bool		two_phase;		/* two-phase option */
 	SimpleStringList database_names;	/* list of database names */
 	SimpleStringList pub_names; /* list of publication names */
 	SimpleStringList sub_names; /* list of subscription names */
@@ -79,7 +80,7 @@ static void check_publisher(const struct LogicalRepInfo *dbinfo);
 static char *setup_publisher(struct LogicalRepInfo *dbinfo);
 static void check_subscriber(const struct LogicalRepInfo *dbinfo);
 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
-							 const char *consistent_lsn);
+							 const char *consistent_lsn, bool two_phase);
 static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
 						   const char *lsn);
 static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
@@ -98,7 +99,9 @@ static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
 static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
-static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
+static void create_subscription(PGconn *conn,
+								const struct LogicalRepInfo *dbinfo,
+								bool two_phase);
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
 									 const char *lsn);
 static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
@@ -227,6 +230,7 @@ usage(void)
 	printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
 	printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
 	printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
+	printf(_("  -T, --enable-two-phase          enable two-phase commit for all subscriptions\n"));
 	printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
 	printf(_("  -v, --verbose                   output verbose messages\n"));
 	printf(_("      --config-file=FILENAME      use specified main server configuration\n"
@@ -479,9 +483,10 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 					 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
 					 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
 					 dbinfo[i].pubconninfo);
-		pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
+		pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i,
 					 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
-					 dbinfo[i].subconninfo);
+					 dbinfo[i].subconninfo,
+					 opt->two_phase ? "true" : "false");
 
 		if (num_pubs > 0)
 			pubcell = pubcell->next;
@@ -1138,7 +1143,8 @@ check_and_drop_existing_subscriptions(PGconn *conn,
  * replication setup.
  */
 static void
-setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
+setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn,
+				 bool two_phase)
 {
 	for (int i = 0; i < num_dbs; i++)
 	{
@@ -1162,7 +1168,7 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 		 */
 		drop_publication(conn, &dbinfo[i]);
 
-		create_subscription(conn, &dbinfo[i]);
+		create_subscription(conn, &dbinfo[i], two_phase);
 
 		/* Set the replication progress to the correct LSN */
 		set_replication_progress(conn, &dbinfo[i], consistent_lsn);
@@ -1677,7 +1683,8 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
  * initial location.
  */
 static void
-create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
+create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo,
+					bool two_phase)
 {
 	PQExpBuffer str = createPQExpBuffer();
 	PGresult   *res;
@@ -1699,8 +1706,9 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
 	appendPQExpBuffer(str,
 					  "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
 					  "WITH (create_slot = false, enabled = false, "
-					  "slot_name = %s, copy_data = false)",
-					  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
+					  "slot_name = %s, copy_data = false, two_phase = %s)",
+					  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
+					  two_phase ? "true" : "false");
 
 	pg_free(pubname_esc);
 	pg_free(subname_esc);
@@ -1872,6 +1880,7 @@ main(int argc, char **argv)
 		{"publisher-server", required_argument, NULL, 'P'},
 		{"socketdir", required_argument, NULL, 's'},
 		{"recovery-timeout", required_argument, NULL, 't'},
+		{"enable-two-phase", no_argument, NULL, 'T'},
 		{"subscriber-username", required_argument, NULL, 'U'},
 		{"verbose", no_argument, NULL, 'v'},
 		{"version", no_argument, NULL, 'V'},
@@ -1927,6 +1936,7 @@ main(int argc, char **argv)
 	opt.socket_dir = NULL;
 	opt.sub_port = DEFAULT_SUB_PORT;
 	opt.sub_username = NULL;
+	opt.two_phase = false;
 	opt.database_names = (SimpleStringList)
 	{
 		0
@@ -1949,7 +1959,7 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -1986,6 +1996,9 @@ main(int argc, char **argv)
 			case 't':
 				opt.recovery_timeout = atoi(optarg);
 				break;
+			case 'T':
+				opt.two_phase = true;
+				break;
 			case 'U':
 				opt.sub_username = pg_strdup(optarg);
 				break;
@@ -2229,7 +2242,7 @@ main(int argc, char **argv)
 	 * point to the LSN reported by setup_publisher().  It also cleans up
 	 * publications created by this tool and replication to the standby.
 	 */
-	setup_subscriber(dbinfo, consistent_lsn);
+	setup_subscriber(dbinfo, consistent_lsn, opt.two_phase);
 
 	/* Remove primary_slot_name if it exists on primary */
 	drop_primary_replication_slot(dbinfo, primary_slot_name);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 0a900edb65..488841e09f 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -249,14 +249,16 @@ command_fails(
 		$db2
 	],
 	'primary contains unmet conditions on node P');
-# Restore default settings here but only apply it after testing standby. Some
-# standby settings should not be a lower setting than on the primary.
+# Restore default settings here (except for max_prepared_transactions as this is
+# required for --enable-two-phase) but only apply it after testing standby.
+# Some standby settings should not be a lower setting than on the primary.
 $node_p->append_conf(
 	'postgresql.conf', q{
 wal_level = logical
 max_replication_slots = 10
 max_wal_senders = 10
 max_worker_processes = 8
+max_prepared_transactions = 10
 });
 
 # Check some unmet conditions on node S
@@ -283,6 +285,7 @@ $node_s->append_conf(
 max_replication_slots = 10
 max_logical_replication_workers = 4
 max_worker_processes = 8
+max_prepared_transactions = 10
 });
 # Restore default settings on both servers
 $node_p->restart;
@@ -357,6 +360,7 @@ command_ok(
 	'run pg_createsubscriber without --databases');
 
 # Run pg_createsubscriber on node S
+# In passing, also test the --enable-two-phase option
 command_ok(
 	[
 		'pg_createsubscriber', '--verbose',
@@ -371,10 +375,24 @@ command_ok(
 		'replslot1', '--replication-slot',
 		'replslot2', '--database',
 		$db1, '--database',
-		$db2
+		$db2, '--enable-two-phase'
 	],
 	'run pg_createsubscriber on node S');
 
+# Start subscriber
+$node_s->start;
+
+# Verify that the subtwophase is 'p' in the pg_subscription catalog
+my $poll_query_until = $node_s->safe_psql('postgres',
+	"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate NOT IN ('e');"
+);
+
+is($poll_query_until, qq(t),
+	'Timed out while waiting for subscriber to enable twophase');
+
+# Stop subscriber
+$node_s->stop;
+
 # Confirm the physical replication slot has been removed
 $result = $node_p->safe_psql($db1,
 	"SELECT count(*) FROM pg_replication_slots WHERE slot_name = '$slotname'"
-- 
2.34.1

