From 4e9fbe648773463688d2b8d1202939a5f8349420 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Fri, 22 Nov 2024 12:03:13 +0530
Subject: [PATCH v2] Add support for enabling/disabling two-phase commit in
 pg_createsubscriber

This patch introduces the '--enable-two-phase' option to the
'pg_createsubscriber' utility, allowing users to enable or disable two-phase
commit for subscriptions during their creation.

By default, two-phase commit is disabled if the option is provided without
arguments.

When two-phase commit is enabled, prepared transactions are sent to the
subscriber at the time of 'PREPARE TRANSACTION', and they are processed as
two-phase transactions on the subscriber as well. If disabled, prepared
transactions are sent only when committed and are processed immediately by the
subscriber.

Notably, the replication for prepared transactions functions regardless of the
initial two-phase setting on the replication slot. However, the user cannot
change the setting after the subscription is created unless a future command,
such as 'ALTER SUBSCRIPTION ... SET (two_phase = on)', is supported.
This provides flexibility for future enhancements.

Documentation has been updated to reflect the new option, and test cases have
been added to validate various scenarios, including proper validation of the
'--enable-two-phase' option and its combinations with other options.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     | 16 ++++++++++++++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 22 ++++++++++++++-----
 .../t/040_pg_createsubscriber.pl              | 19 +++++++++++++++-
 3 files changed, 51 insertions(+), 6 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index df1a92b4da..0fcd30db7f 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -161,6 +161,22 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>-T</option></term>
+     <term><option>--enable-two-phase</option></term>
+     <listitem>
+      <para>
+       Enables two-phase commit for the subscription. When the option is
+       provided, it is explicitly enabled. By default, two-phase commit is
+       <literal>off</literal>.
+       Two-phase commit ensures atomicity in logical replication for prepared
+       transactions. See the
+       <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
+       documentation for more details.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-U <replaceable class="parameter">username</replaceable></option></term>
      <term><option>--subscriber-username=<replaceable class="parameter">username</replaceable></option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index e96370a9ec..ce11ab7542 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -38,6 +38,7 @@ struct CreateSubscriberOptions
 	char	   *socket_dir;		/* directory for Unix-domain socket, if any */
 	char	   *sub_port;		/* subscriber port number */
 	const char *sub_username;	/* subscriber username */
+	bool		two_phase;		/* two-phase option */
 	SimpleStringList database_names;	/* list of database names */
 	SimpleStringList pub_names; /* list of publication names */
 	SimpleStringList sub_names; /* list of subscription names */
@@ -53,6 +54,7 @@ struct LogicalRepInfo
 	char	   *pubname;		/* publication name */
 	char	   *subname;		/* subscription name */
 	char	   *replslotname;	/* replication slot name */
+	bool		two_phase;		/* two-phase enabled for the subscription */
 
 	bool		made_replslot;	/* replication slot was created */
 	bool		made_publication;	/* publication was created */
@@ -227,6 +229,7 @@ usage(void)
 	printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
 	printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
 	printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
+	printf(_("  -T, --enable-two-phase			enable two-phase commit for the subscription\n"));
 	printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
 	printf(_("  -v, --verbose                   output verbose messages\n"));
 	printf(_("      --config-file=FILENAME      use specified main server configuration\n"
@@ -456,6 +459,7 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 		conninfo = concat_conninfo_dbname(pub_base_conninfo, cell->val);
 		dbinfo[i].pubconninfo = conninfo;
 		dbinfo[i].dbname = cell->val;
+		dbinfo[i].two_phase = opt->two_phase;	/* Set two-phase option */
 		if (num_pubs > 0)
 			dbinfo[i].pubname = pubcell->val;
 		else
@@ -466,6 +470,7 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 			dbinfo[i].replslotname = NULL;
 		dbinfo[i].made_replslot = false;
 		dbinfo[i].made_publication = false;
+
 		/* Fill subscriber attributes */
 		conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
 		dbinfo[i].subconninfo = conninfo;
@@ -479,9 +484,10 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 					 dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)",
 					 dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)",
 					 dbinfo[i].pubconninfo);
-		pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
+		pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, --enable-two-phase: %s", i,
 					 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
-					 dbinfo[i].subconninfo);
+					 dbinfo[i].subconninfo,
+					 dbinfo[i].two_phase ? "true" : "false");
 
 		if (num_pubs > 0)
 			pubcell = pubcell->next;
@@ -1699,8 +1705,9 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
 	appendPQExpBuffer(str,
 					  "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
 					  "WITH (create_slot = false, enabled = false, "
-					  "slot_name = %s, copy_data = false)",
-					  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
+					  "slot_name = %s, copy_data = false, two_phase = %s)",
+					  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
+					  dbinfo->two_phase ? "true" : "false");
 
 	pg_free(pubname_esc);
 	pg_free(subname_esc);
@@ -1872,6 +1879,7 @@ main(int argc, char **argv)
 		{"publisher-server", required_argument, NULL, 'P'},
 		{"socketdir", required_argument, NULL, 's'},
 		{"recovery-timeout", required_argument, NULL, 't'},
+		{"enable-two-phase", no_argument, NULL, 'T'},
 		{"subscriber-username", required_argument, NULL, 'U'},
 		{"verbose", no_argument, NULL, 'v'},
 		{"version", no_argument, NULL, 'V'},
@@ -1927,6 +1935,7 @@ main(int argc, char **argv)
 	opt.socket_dir = NULL;
 	opt.sub_port = DEFAULT_SUB_PORT;
 	opt.sub_username = NULL;
+	opt.two_phase = false;
 	opt.database_names = (SimpleStringList)
 	{
 		0
@@ -1949,7 +1958,7 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:T:U:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -1986,6 +1995,9 @@ main(int argc, char **argv)
 			case 't':
 				opt.recovery_timeout = atoi(optarg);
 				break;
+			case 'T':
+				opt.two_phase = true;
+				break;
 			case 'U':
 				opt.sub_username = pg_strdup(optarg);
 				break;
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 0a900edb65..e843879500 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -257,6 +257,7 @@ wal_level = logical
 max_replication_slots = 10
 max_wal_senders = 10
 max_worker_processes = 8
+max_prepared_transactions = 10
 });
 
 # Check some unmet conditions on node S
@@ -283,6 +284,7 @@ $node_s->append_conf(
 max_replication_slots = 10
 max_logical_replication_workers = 4
 max_worker_processes = 8
+max_prepared_transactions = 10
 });
 # Restore default settings on both servers
 $node_p->restart;
@@ -341,6 +343,7 @@ command_ok(
 $node_s->start;
 is($node_s->safe_psql('postgres', 'SELECT pg_catalog.pg_is_in_recovery()'),
 	't', 'standby is in recovery');
+
 $node_s->stop;
 
 # pg_createsubscriber can run without --databases option
@@ -352,7 +355,7 @@ command_ok(
 		$node_p->connstr($db1), '--socketdir',
 		$node_s->host, '--subscriber-port',
 		$node_s->port, '--replication-slot',
-		'replslot1'
+		'replslot1', '--enable-two-phase'
 	],
 	'run pg_createsubscriber without --databases');
 
@@ -387,9 +390,23 @@ is($result, qq(0),
 $node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('third row')");
 $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 
+# Prepare a transaction on the publisher
+$node_p->safe_psql(
+	$db1, qq[
+        BEGIN;
+        INSERT INTO tbl1 SELECT generate_series(1, 10);
+        PREPARE TRANSACTION 'test_prepare';
+]);
+
 # Start subscriber
 $node_s->start;
 
+# Verify that the prepared transaction is replicated to the subscriber
+my $count_prepared_s =
+  $node_s->safe_psql($db1, "SELECT count(*) FROM pg_prepared_xacts;");
+
+is($count_prepared_s, qq(0), 'Prepared transaction replicated to subscriber');
+
 # Confirm the pre-existing subscription has been removed
 $result = $node_s->safe_psql(
 	'postgres', qq(
-- 
2.41.0.windows.3

