From 81ee58d06cbc6d688281acac15766619e406e82f Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Fri, 22 Nov 2024 12:03:13 +0530
Subject: [PATCH v1] Add support for enabling/disabling two-phase commit in
 pg_createsubscriber

This patch introduces the '--two-phase' option to the 'pg_createsubscriber'
utility, allowing users to enable or disable two-phase commit for subscriptions
during their creation.

By default, two-phase commit is enabled if the option is provided without
arguments. Users can explicitly set it to 'on' or 'off' using '--two-phase=on'
or '--two-phase=off'.

When two-phase commit is enabled, prepared transactions are sent to the
subscriber at the time of 'PREPARE TRANSACTION', and they are processed as
two-phase transactions on the subscriber as well. If disabled, prepared
transactions are sent only when committed and are processed immediately by the
subscriber.

Notably, the replication for prepared transactions functions regardless of the
initial two-phase setting on the replication slot. However, the user cannot
change the setting after the subscription is created unless a future command,
such as 'ALTER SUBSCRIPTION ... SET (two-phase = on)', is supported.
This provides flexibility for future enhancements.

Documentation has been updated to reflect the new option, and test cases have
been added to validate various scenarios, including proper validation of the
'--two-phase' flag and its combinations with other options.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     | 16 +++++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 31 +++++++-
 .../t/040_pg_createsubscriber.pl              | 71 +++++++++++++++++++
 3 files changed, 115 insertions(+), 3 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index df1a92b4da..b9814e6b15 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -186,6 +186,22 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>-T</option></term>
+     <term><option>--two_phase</option></term>
+     <listitem>
+      <para>
+       Enables or disables two-phase commit for the subscription.
+       When the option is provided without a value, it defaults to
+       <literal>on</literal>. Specify <literal>on</literal> to enable or
+       <literal>off</literal> to disable.
+       Two-phase commit ensures atomicity in logical replication for prepared
+       transactions. By default, this option is enabled unless explicitly set
+       to <literal>off</literal>.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>--config-file=<replaceable class="parameter">filename</replaceable></option></term>
      <listitem>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index e96370a9ec..0c2f3f2a84 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -38,6 +38,7 @@ struct CreateSubscriberOptions
 	char	   *socket_dir;		/* directory for Unix-domain socket, if any */
 	char	   *sub_port;		/* subscriber port number */
 	const char *sub_username;	/* subscriber username */
+	bool		two_phase;		/* two-phase option */
 	SimpleStringList database_names;	/* list of database names */
 	SimpleStringList pub_names; /* list of publication names */
 	SimpleStringList sub_names; /* list of subscription names */
@@ -56,6 +57,7 @@ struct LogicalRepInfo
 
 	bool		made_replslot;	/* replication slot was created */
 	bool		made_publication;	/* publication was created */
+	bool		two_phase;		/* two-phase option was created */
 };
 
 static void cleanup_objects_atexit(void);
@@ -227,6 +229,7 @@ usage(void)
 	printf(_("  -P, --publisher-server=CONNSTR  publisher connection string\n"));
 	printf(_("  -s, --socketdir=DIR             socket directory to use (default current dir.)\n"));
 	printf(_("  -t, --recovery-timeout=SECS     seconds to wait for recovery to end\n"));
+	printf(_("  -T, --two-phase					Enable two-phase commit for the subscription\n"));
 	printf(_("  -U, --subscriber-username=NAME  user name for subscriber connection\n"));
 	printf(_("  -v, --verbose                   output verbose messages\n"));
 	printf(_("      --config-file=FILENAME      use specified main server configuration\n"
@@ -466,6 +469,9 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 			dbinfo[i].replslotname = NULL;
 		dbinfo[i].made_replslot = false;
 		dbinfo[i].made_publication = false;
+		/* Store two-phase option */
+		dbinfo[i].two_phase = opt->two_phase;
+
 		/* Fill subscriber attributes */
 		conninfo = concat_conninfo_dbname(sub_base_conninfo, cell->val);
 		dbinfo[i].subconninfo = conninfo;
@@ -482,6 +488,8 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt,
 		pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i,
 					 dbinfo[i].subname ? dbinfo[i].subname : "(auto)",
 					 dbinfo[i].subconninfo);
+		pg_log_debug("publisher(%d): two-phase: %s", i,
+					 dbinfo[i].two_phase ? "true" : "false");
 
 		if (num_pubs > 0)
 			pubcell = pubcell->next;
@@ -1699,8 +1707,9 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
 	appendPQExpBuffer(str,
 					  "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
 					  "WITH (create_slot = false, enabled = false, "
-					  "slot_name = %s, copy_data = false)",
-					  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc);
+					  "slot_name = %s, copy_data = false, two_phase = %s)",
+					  subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc,
+					  dbinfo->two_phase ? "true" : "false");
 
 	pg_free(pubname_esc);
 	pg_free(subname_esc);
@@ -1872,6 +1881,7 @@ main(int argc, char **argv)
 		{"publisher-server", required_argument, NULL, 'P'},
 		{"socketdir", required_argument, NULL, 's'},
 		{"recovery-timeout", required_argument, NULL, 't'},
+		{"two_phase", optional_argument, NULL, 'T'},
 		{"subscriber-username", required_argument, NULL, 'U'},
 		{"verbose", no_argument, NULL, 'v'},
 		{"version", no_argument, NULL, 'V'},
@@ -1927,6 +1937,7 @@ main(int argc, char **argv)
 	opt.socket_dir = NULL;
 	opt.sub_port = DEFAULT_SUB_PORT;
 	opt.sub_username = NULL;
+	opt.two_phase = true;
 	opt.database_names = (SimpleStringList)
 	{
 		0
@@ -1949,7 +1960,7 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:T:U:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -1986,6 +1997,20 @@ main(int argc, char **argv)
 			case 't':
 				opt.recovery_timeout = atoi(optarg);
 				break;
+			case 'T':
+				if (optarg != NULL)
+				{
+					if (strcmp(optarg, "on") == 0)
+						opt.two_phase = true;
+					else if (strcmp(optarg, "off") == 0)
+						opt.two_phase = false;
+					else
+						pg_fatal("invalid value for --two-phase: must be 'on' or 'off'");
+				}
+				else
+					opt.two_phase = true;	/* Default to true if no argument
+											 * is provided */
+				break;
 			case 'U':
 				opt.sub_username = pg_strdup(optarg);
 				break;
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 0a900edb65..faad1507f4 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -432,10 +432,81 @@ my $sysid_s = $node_s->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 ok($sysid_p != $sysid_s, 'system identifier was changed');
 
+# Set up node A as primary
+my $node_a = PostgreSQL::Test::Cluster->new('node_a');
+my $aconnstr = $node_a->connstr;
+$node_a->init(allows_streaming => 'logical');
+$node_a->append_conf(
+	'postgresql.conf', qq[
+	autovacuum = off
+	wal_level = logical
+	max_wal_senders = 10
+	max_worker_processes = 8
+	max_prepared_transactions = 10
+]);
+
+$node_a->start;
+
+# Set up node B as standby linking to node A
+$node_a->backup('backup_1');
+my $node_b = PostgreSQL::Test::Cluster->new('node_b');
+$node_b->init_from_backup($node_a, 'backup_1', has_streaming => 1);
+$node_b->append_conf(
+	'postgresql.conf', qq[
+	primary_conninfo = '$aconnstr dbname=postgres'
+	hot_standby_feedback = on
+	max_logical_replication_workers = 1
+	max_worker_processes = 8
+	max_prepared_transactions = 10
+]);
+$node_b->set_standby_mode();
+$node_b->start;
+
+my $db10 = generate_db($node_a, 'regression\\"\\', 1, 45, '\\\\"\\\\\\');
+
+# Create a table on Publisher
+$node_a->safe_psql($db10, "CREATE TABLE tab_full (a int PRIMARY KEY);");
+
+# Wait for replication to catch up
+$node_a->wait_for_catchup($node_b);
+
+$node_b->stop;
+
+# Run pg_createsubscriber on a promoted server with two_phase=on
+command_ok(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--recovery-timeout', "$PostgreSQL::Test::Utils::timeout_default",
+		'--pgdata', $node_b->data_dir,
+		'--publisher-server', $node_a->connstr($db10),
+		'--subscriber-port', $node_b->port,
+		'--database', $db10,
+		'--two_phase=on'
+	],
+	'created subscription with two-phase commit enabled');
+
+# Prepare a transaction on the publisher
+$node_a->safe_psql(
+	$db10, qq[
+	BEGIN;
+	INSERT INTO tab_full SELECT generate_series(1, 10);
+	PREPARE TRANSACTION 'test_prepare';
+]);
+
+$node_b->start;
+
+# Verify that the prepared transaction is replicated to the subscriber
+my $count_prepared_b =
+  $node_b->safe_psql($db10, "SELECT count(*) FROM pg_prepared_xacts;");
+
+is($count_prepared_b, qq(1), 'Prepared transaction replicated to subscriber');
+
 # clean up
 $node_p->teardown_node;
 $node_s->teardown_node;
 $node_t->teardown_node;
 $node_f->teardown_node;
+$node_a->teardown_node;
+$node_b->teardown_node;
 
 done_testing();
-- 
2.41.0.windows.3

