From 16b0864f6daba9e23a8ed53d001099d0e51c0194 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Tue, 22 Apr 2025 09:36:29 +0530
Subject: [PATCH v1] pg_basebackup via pg_createsubscriber

This patch adds support for initializing a standby node using 'pg_basebackup'
through the 'pg_createsubscriber' tool. A new '--create-standby' option is
introduced, which automates the process of performing a base backup from the
publisher, and configuring the standby node for streaming replication.

To support this functionality, the patch introduces helper functions for
invoking 'pg_basebackup', and configuring the standby server.
It also adds support for managing external binary paths for 'pg_basebackup' and
'psql', making the tool more flexible in varied environments.

By integrating 'pg_basebackup', this patch enhances the usability of
'pg_createsubscriber' by enabling a streamlined, end-to-end setup of
subscriber nodes.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  13 ++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 126 +++++++++++++++++-
 .../t/040_pg_createsubscriber.pl              |  54 ++++++++
 3 files changed, 188 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index 4b1d08d5f16..44142f23674 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -108,6 +108,19 @@ PostgreSQL documentation
      </listitem>
     </varlistentry>
 
+    <varlistentry>
+     <term><option>-c</option></term>
+     <term><option>--create-standby</option></term>
+     <listitem>
+      <para>
+       Initializes the subscriber node as a physical standby using
+       <xref linkend="app-pgbasebackup"/>. This automates the process of taking
+       a base backup from the publisher server, configuring the standby server,
+       and preparing it for logical replication.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-d <replaceable class="parameter">dbname</replaceable></option></term>
      <term><option>--database=<replaceable class="parameter">dbname</replaceable></option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index f65acc7cb11..0c0a5f1df9a 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -27,6 +27,9 @@
 #include "fe_utils/simple_list.h"
 #include "fe_utils/string_utils.h"
 #include "getopt_long.h"
+#include <dirent.h>
+#include <unistd.h>
+#include "port.h"
 
 #define	DEFAULT_SUB_PORT	"50432"
 #define	OBJECTTYPE_PUBLICATIONS  0x0001
@@ -47,6 +50,7 @@ struct CreateSubscriberOptions
 	int			recovery_timeout;	/* stop recovery after this time */
 	bool		all_dbs;		/* all option */
 	SimpleStringList objecttypes_to_remove; /* list of object types to remove */
+	bool		create_standby; /* enable standby setup via pg_basebackup */
 };
 
 /* per-database publication/subscription info */
@@ -127,6 +131,8 @@ static void drop_existing_subscriptions(PGconn *conn, const char *subname,
 										const char *dbname);
 static void get_publisher_databases(struct CreateSubscriberOptions *opt,
 									bool dbnamespecified);
+static void create_standby(const struct CreateSubscriberOptions *opt, const char *standby_dir);
+static void configure_standby(const struct CreateSubscriberOptions *opt, const char *standby_dir);
 
 #define	USEC_PER_SEC	1000000
 #define	WAIT_INTERVAL	1		/* 1 second */
@@ -148,6 +154,8 @@ static pg_prng_state prng_state;
 
 static char *pg_ctl_path = NULL;
 static char *pg_resetwal_path = NULL;
+static char *pg_basebackup_path = NULL;
+static char *psql_path = NULL;
 
 /* standby / subscriber data directory */
 static char *subscriber_dir = NULL;
@@ -248,6 +256,7 @@ usage(void)
 	printf(_("\nOptions:\n"));
 	printf(_("  -a, --all                       create subscriptions for all databases except template\n"
 			 "                                  databases or databases that don't allow connections\n"));
+	printf(_("  -c  --create-standby            prepare the standby using pg_basebackup\n"));
 	printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
 	printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
 	printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
@@ -1405,6 +1414,96 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	return lsn;
 }
 
+/*
+ * Function to create a standby server using pg_basebackup.
+ */
+static void
+create_standby(const struct CreateSubscriberOptions *opt,
+			   const char *standby_dir)
+{
+	PQExpBuffer pg_basebackup_cmd = createPQExpBuffer();
+	char		auto_conf_path[MAXPGPATH];
+	char		port_setting[64];
+
+	appendPQExpBuffer(pg_basebackup_cmd, "\"%s\" -d", pg_basebackup_path);
+	appendShellString(pg_basebackup_cmd, opt->pub_conninfo_str);
+	appendPQExpBufferStr(pg_basebackup_cmd, " -D ");
+	appendShellString(pg_basebackup_cmd, standby_dir);
+	appendPQExpBufferStr(pg_basebackup_cmd, " -P -X stream -R");
+
+	pg_log_info("creating the standby server: %s", pg_basebackup_cmd->data);
+
+	if (!dry_run && system(pg_basebackup_cmd->data) != 0)
+		pg_fatal("pg_basebackup command failed");
+
+	snprintf(auto_conf_path, sizeof(auto_conf_path), "%s/postgresql.auto.conf", standby_dir);
+	snprintf(port_setting, sizeof(port_setting), "port = %s\n", opt->sub_port);
+
+	if (!dry_run)
+	{
+		FILE	   *f = fopen(auto_conf_path, "a");
+
+		if (!f)
+			pg_fatal("could not open %s for appending: %m", auto_conf_path);
+		if (fprintf(f, "%s", port_setting) < 0)
+		{
+			fclose(f);
+			pg_fatal("could not write port setting to %s: %m", auto_conf_path);
+		}
+		fclose(f);
+		pg_log_info("set standby port in %s: %s", auto_conf_path, port_setting);
+	}
+	else
+	{
+		pg_log_info("dry run: would append to %s: %s", auto_conf_path, port_setting);
+	}
+
+	destroyPQExpBuffer(pg_basebackup_cmd);
+}
+
+/*
+ * Function to configure the standby server for logical replication.
+ */
+static void
+configure_standby(const struct CreateSubscriberOptions *opt,
+				  const char *standby_dir)
+{
+	static const char *alter_cmds[] =
+	{
+		"ALTER SYSTEM SET wal_level = 'logical';",
+		"ALTER SYSTEM SET listen_addresses = '*';",
+	};
+
+	char		command[MAXPGPATH * 2];
+
+	/* Start the standby server */
+	snprintf(command, sizeof(command), "\"%s\" -D \"%s\" -l \"%s/standby.log\" -o \"-p %s\" start",
+			 pg_ctl_path, standby_dir, standby_dir, opt->sub_port);
+	pg_log_info("starting the standby server: %s", command);
+
+	if (!dry_run && system(command) != 0)
+		pg_fatal("failed to start standby server");
+
+	for (int i = 0; i < lengthof(alter_cmds); i++)
+	{
+		snprintf(command, sizeof(command),
+				 "\"%s\" -d postgres -p %s -c \"%s\"", psql_path,
+				 opt->sub_port, alter_cmds[i]);
+		pg_log_info("configuring the standby server: %s", command);
+
+		if (!dry_run && system(command) != 0)
+			pg_fatal("failed to run: %s", alter_cmds[i]);
+	}
+
+	/* Stop the standby server */
+	snprintf(command, sizeof(command), "\"%s\" -D \"%s\" -l \"%s/standby.log\" -o \"-p %s\" stop",
+			 pg_ctl_path, standby_dir, standby_dir, opt->sub_port);
+	pg_log_info("stopping the standby server: %s", command);
+
+	if (!dry_run && system(command) != 0)
+		pg_fatal("failed to stop standby server after ALTER SYSTEM");
+}
+
 static void
 drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
 					  const char *slot_name)
@@ -2021,6 +2120,7 @@ main(int argc, char **argv)
 	static struct option long_options[] =
 	{
 		{"all", no_argument, NULL, 'a'},
+		{"create-standby", no_argument, NULL, 'c'},
 		{"database", required_argument, NULL, 'd'},
 		{"pgdata", required_argument, NULL, 'D'},
 		{"dry-run", no_argument, NULL, 'n'},
@@ -2092,6 +2192,7 @@ main(int argc, char **argv)
 	};
 	opt.recovery_timeout = 0;
 	opt.all_dbs = false;
+	opt.create_standby = false;
 
 	/*
 	 * Don't allow it to be run as root. It uses pg_ctl which does not allow
@@ -2109,7 +2210,7 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "ad:D:np:P:R:s:t:TU:v",
+	while ((c = getopt_long(argc, argv, "acd:D:np:P:R:s:t:TU:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -2117,6 +2218,9 @@ main(int argc, char **argv)
 			case 'a':
 				opt.all_dbs = true;
 				break;
+			case 'c':
+				opt.create_standby = true;
+				break;
 			case 'd':
 				if (!simple_string_list_member(&opt.database_names, optarg))
 				{
@@ -2264,6 +2368,22 @@ main(int argc, char **argv)
 		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 		exit(1);
 	}
+
+	/*
+	 * Get the absolute path of pg_ctl, pg_resetwal, pg_basebackup and psql on
+	 * the subscriber
+	 */
+	pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
+	pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
+	pg_basebackup_path = get_exec_path(argv[0], "pg_basebackup");
+	psql_path = get_exec_path(argv[0], "psql");
+
+	if (opt.create_standby)
+	{
+		create_standby(&opt, subscriber_dir);
+		configure_standby(&opt, subscriber_dir);
+	}
+
 	pg_log_info("validating publisher connection string");
 	pub_base_conninfo = get_base_conninfo(opt.pub_conninfo_str,
 										  &dbname_conninfo);
@@ -2346,10 +2466,6 @@ main(int argc, char **argv)
 		}
 	}
 
-	/* Get the absolute path of pg_ctl and pg_resetwal on the subscriber */
-	pg_ctl_path = get_exec_path(argv[0], "pg_ctl");
-	pg_resetwal_path = get_exec_path(argv[0], "pg_resetwal");
-
 	/* Rudimentary check for a data directory */
 	check_data_directory(subscriber_dir);
 
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 2d532fee567..eb924d4d705 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -8,6 +8,8 @@ use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
+use File::Path qw(rmtree);
+use File::Copy;
 
 program_help_ok('pg_createsubscriber');
 program_version_ok('pg_createsubscriber');
@@ -537,6 +539,58 @@ my $sysid_s = $node_s->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 ok($sysid_p != $sysid_s, 'system identifier was changed');
 
+$node_s->stop;
+
+my $tempdir = PostgreSQL::Test::Utils::tempdir;
+my $conf_file = $node_s->data_dir . '/' . 'postgresql.conf';
+
+copy("$conf_file", "$tempdir/postgresql.conf") or die "Copy failed: $!";
+
+# Remove an old directory.
+rmtree($node_s->data_dir);
+
+# Run pg_createsubscriber on node S with '--create-standby' option
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose', '--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s->data_dir,
+		'--publisher-server' => $node_p->connstr($db1),
+		'--publication' => 'pub3',
+		'--publication' => 'pub4',
+		'--database' => $db1,
+		'--database' => $db2,
+		'--create-standby',
+	],
+	'run pg_createsubscriber on node S with --create-standby');
+
+# Insert a row on P
+$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('fourth row')");
+
+my $port = $node_s->port;
+
+$node_s->append_conf(
+	'postgresql.conf', qq[
+hot_standby_feedback = on
+]);
+$node_s->append_conf(
+	'postgresql.auto.conf', qq[
+port = $port
+]);
+
+copy("$tempdir/postgresql.conf", "$conf_file") or die "Copy failed: $!";
+
+$node_s->start;
+
+# Check result in database $db1
+$result = $node_s->safe_psql($db1, 'SELECT * FROM tbl1');
+is( $result, qq(first row
+second row
+third row
+fourth row),
+	"logical replication works in database $db1");
+
 # clean up
 $node_p->teardown_node;
 $node_s->teardown_node;
-- 
2.41.0.windows.3

