From 7378b26f9ee6e5eebf2756b174f101059d2c670e Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Thu, 23 Jan 2025 12:39:03 +0530
Subject: [PATCH v1] Support for dropping all publications in
 'pg_createsubscriber'

This patch introduces a new '--clean-publisher-objects' option in the
'pg_createsubscriber utility'.
This feature ensures a clean and streamlined setup of logical replication by
removing stale or unnecessary publications from the subscriber node.
These publications, replicated during streaming replication, become redundant
after converting to logical replication and serve no further purpose.
By ensuring that outdated publications are removed, it helps avoid potential
conflicts and simplifies replication management.

A new 'drop_all_publications()' function is added to fetch and drop all
publications on the subscriber node within a single transaction.
Since this cleanup is not required when upgrading streaming replication
clusters,this feature is supported only when the '--clean-publisher-objects'
option is specified, allowing users to choose accordingly.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     | 12 +++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 73 ++++++++++++++++++-
 .../t/040_pg_createsubscriber.pl              | 65 +++++++++++++++++
 3 files changed, 149 insertions(+), 1 deletion(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index 26b8e64a4e..c922cc1ed0 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -87,6 +87,18 @@ PostgreSQL documentation
    command-line arguments:
 
    <variablelist>
+    <varlistentry>
+     <term><option>-C</option></term>
+     <term><option>--clean-publisher-objects</option></term>
+     <listitem>
+      <para>
+       The <application>pg_createsubscriber</application> now supports the
+       <option>--clean-publisher-objects</option> to remove all publications on
+       the subscriber node before creating a new subscription.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-d <replaceable class="parameter">dbname</replaceable></option></term>
      <term><option>--database=<replaceable class="parameter">dbname</replaceable></option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index faf18ccf13..900045c00a 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -43,6 +43,7 @@ struct CreateSubscriberOptions
 	SimpleStringList sub_names; /* list of subscription names */
 	SimpleStringList replslot_names;	/* list of replication slot names */
 	int			recovery_timeout;	/* stop recovery after this time */
+	bool		clean_publisher_objects;	/* Drop all publications */
 };
 
 struct LogicalRepInfo
@@ -98,6 +99,7 @@ static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
 static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void drop_all_publications(const struct LogicalRepInfo *dbinfo);
 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
 									 const char *lsn);
@@ -220,6 +222,7 @@ usage(void)
 	printf(_("Usage:\n"));
 	printf(_("  %s [OPTION]...\n"), progname);
 	printf(_("\nOptions:\n"));
+	printf(_("  -C  --clean-publisher-objects   drop all publications on the logical replica\n"));
 	printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
 	printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
 	printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
@@ -1860,11 +1863,72 @@ enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo)
 	destroyPQExpBuffer(str);
 }
 
+static void
+drop_all_publications(const struct LogicalRepInfo *dbinfo)
+{
+	char	   *search_query = "SELECT pubname FROM pg_catalog.pg_publication;";
+
+	for (int i = 0; i < num_dbs; i++)
+	{
+		PGconn	   *conn;
+		PGresult   *res;
+		int			num_rows;
+		PQExpBuffer query = createPQExpBuffer();
+
+		/* Connect to the subscriber */
+		conn = connect_database(dbinfo[i].subconninfo, true);
+
+		/* Fetch all publications */
+		res = PQexec(conn, search_query);
+
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_warning("could not obtain publication information: %s",
+						   PQresultErrorMessage(res));
+
+			PQclear(res);
+			disconnect_database(conn, false);
+			continue;
+		}
+
+		num_rows = PQntuples(res);
+
+		for (int j = 0; j < num_rows; j++)
+		{
+			char	   *pubname = PQgetvalue(res, j, 0);
+			PGresult   *res_for_drop;
+
+			pg_log_debug("dropping publication \"%s\"", pubname);
+
+			appendPQExpBuffer(query, "DROP PUBLICATION %s;", pubname);
+
+			if (!dry_run)
+			{
+				res_for_drop = PQexec(conn, query->data);
+
+				if (PQresultStatus(res_for_drop) != PGRES_COMMAND_OK)
+				{
+					pg_log_warning("could not drop publication \"%s\": %s",
+								   pubname, PQresultErrorMessage(res));
+				}
+
+				PQclear(res_for_drop);
+			}
+
+			resetPQExpBuffer(query);
+		}
+
+		disconnect_database(conn, false);
+		destroyPQExpBuffer(query);
+	}
+}
+
 int
 main(int argc, char **argv)
 {
 	static struct option long_options[] =
 	{
+		{"clean-publisher-objects", no_argument, NULL, 'C'},
 		{"database", required_argument, NULL, 'd'},
 		{"pgdata", required_argument, NULL, 'D'},
 		{"dry-run", no_argument, NULL, 'n'},
@@ -1927,6 +1991,7 @@ main(int argc, char **argv)
 	opt.socket_dir = NULL;
 	opt.sub_port = DEFAULT_SUB_PORT;
 	opt.sub_username = NULL;
+	opt.clean_publisher_objects = false;
 	opt.database_names = (SimpleStringList)
 	{
 		0
@@ -1949,11 +2014,14 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v",
+	while ((c = getopt_long(argc, argv, "Cd:D:np:P:s:t:U:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
 		{
+			case 'C':
+				opt.clean_publisher_objects = true;
+				break;
 			case 'd':
 				if (!simple_string_list_member(&opt.database_names, optarg))
 				{
@@ -2237,6 +2305,9 @@ main(int argc, char **argv)
 	/* Remove failover replication slots if they exist on subscriber */
 	drop_failover_replication_slots(dbinfo);
 
+	/* Drop publications from the subscriber if requested */
+	drop_all_publications(dbinfo);
+
 	/* Stop the subscriber */
 	pg_log_info("stopping the subscriber");
 	stop_standby_server(subscriber_dir);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index c8dbdb7e9b..2a95962cbd 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -448,10 +448,75 @@ my $sysid_s = $node_s->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 ok($sysid_p != $sysid_s, 'system identifier was changed');
 
+# Set up node A as primary
+my $node_a = PostgreSQL::Test::Cluster->new('node_a');
+my $aconnstr = $node_a->connstr;
+$node_a->init(allows_streaming => 'logical');
+$node_a->append_conf('postgresql.conf', 'autovacuum = off');
+$node_a->start;
+
+# Set up node B as standby linking to node A
+$node_a->backup('backup_3');
+my $node_b = PostgreSQL::Test::Cluster->new('node_b');
+$node_b->init_from_backup($node_a, 'backup_3', has_streaming => 1);
+$node_b->append_conf(
+	'postgresql.conf', qq[
+       primary_conninfo = '$aconnstr'
+       hot_standby_feedback = on
+       max_logical_replication_workers = 5
+       ]);
+$node_b->set_standby_mode();
+$node_b->start;
+
+# Ensure there are some user databases on the publisher
+my $db3 = generate_db($node_a, 'regression', 91, 127, '');
+
+# Create publications to test it's removal
+$node_a->safe_psql($db3, "CREATE PUBLICATION test_pub FOR ALL TABLES;");
+$node_a->safe_psql($db3, "CREATE PUBLICATION test_pub2 FOR ALL TABLES;");
+
+# Verify the existing publications
+my $pub_count_before =
+  $node_a->safe_psql($db3, "SELECT COUNT(*) FROM pg_publication;");
+is($pub_count_before, '2',
+	'two publications created before --clean-publisher-objects is run');
+
+$node_b->stop;
+
+# Run pg_createsubscriber on node A using --clean-publisher-objects.
+# --verbose is used twice to show more information.
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose', '--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_b->data_dir,
+		'--publisher-server' => $node_a->connstr($db3),
+		'--socketdir' => $node_b->host,
+		'--subscriber-port' => $node_b->port,
+		'--database' => $db3,
+		'--clean-publisher-objects',
+	],
+	'run pg_createsubscriber with --clean-publisher-objects on node A');
+
+$node_b->start;
+
+# Confirm publications are removed
+my $pub_count_after =
+  $node_b->safe_psql($db3, "SELECT COUNT(*) FROM pg_publication;");
+is($pub_count_after, '0',
+	'all publications dropped after --clean-publisher-objects is run');
+
+# Drop the newly created publications
+$node_a->safe_psql($db3, "DROP PUBLICATION IF EXISTS test_pub;");
+$node_a->safe_psql($db3, "DROP PUBLICATION IF EXISTS test_pub2;");
+
 # clean up
 $node_p->teardown_node;
 $node_s->teardown_node;
 $node_t->teardown_node;
 $node_f->teardown_node;
+$node_a->teardown_node;
+$node_b->teardown_node;
 
 done_testing();
-- 
2.41.0.windows.3

