From e842cc85f752539e6e238e05f252b7ab00c5abf8 Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Thu, 28 Aug 2025 22:26:04 +0530
Subject: [PATCH v7] Support existing publications in pg_createsubscriber

Allow pg_createsubscriber to reuse existing publications instead of failing
when they already exist on the publisher.

Previously, pg_createsubscriber would fail if any specified publication
already existed. Now, existing publications are reused as-is, and
non-existing publications are created automatically with FOR ALL TABLES.

This change eliminates the need to know in advance which publications exist
on the publisher, making the tool more user-friendly. Users can specify
publication names and the tool will handle both existing and new publications
appropriately.

When publications are reused, they are never dropped during cleanup operations,
ensuring pre-existing publications remain available for other uses. Only
publications created by pg_createsubscriber are cleaned up.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     | 22 ++++++
 src/bin/pg_basebackup/pg_createsubscriber.c   | 56 ++++++++++++--
 .../t/040_pg_createsubscriber.pl              | 77 +++++++++++++++++++
 3 files changed, 148 insertions(+), 7 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index bb9cc72576c..db28956a441 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -285,6 +285,28 @@ PostgreSQL documentation
        a generated name is assigned to the publication name. This option cannot
        be used together with <option>--all</option>.
       </para>
+      <para>
+       If a publication with the specified name already exists on the publisher,
+       it will be reused as-is with its current configuration, including its
+       table list, row filters, column filters, and all other settings.
+       If a publication does not exist, it will be created automatically with
+       <literal>FOR ALL TABLES</literal>.
+      </para>
+      <para>
+       When reusing existing publications, you should understand their current
+       configuration. Existing publications are used exactly as configured,
+       which may replicate different tables than expected.
+       New publications created with <literal>FOR ALL TABLES</literal> will
+       replicate all tables in the database, which may be more than intended.
+      </para>
+      <para>
+       Use <option>--dry-run</option> to see which publications will be reused
+       and which will be created before running the actual command.
+       When publications are reused, they will not be dropped during cleanup
+       operations, ensuring they remain available for other uses.
+       Only publications created by
+       <application>pg_createsubscriber</application> will be cleaned up.
+      </para>
      </listitem>
     </varlistentry>
 
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 3986882f042..afdd7ba258e 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -114,6 +114,7 @@ static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static bool check_publication_exists(PGconn *conn, const char *pubname, const char *dbname);
 static void drop_publication(PGconn *conn, const char *pubname,
 							 const char *dbname, bool *made_publication);
 static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
@@ -753,6 +754,32 @@ generate_object_name(PGconn *conn)
 	return objname;
 }
 
+/*
+ * Check if a publication with the given name exists in the specified database.
+ * Returns true if it exists, false otherwise.
+ */
+static bool
+check_publication_exists(PGconn *conn, const char *pubname, const char *dbname)
+{
+	PGresult   *res;
+	bool		exists;
+	char	   *query;
+
+	query = psprintf("SELECT 1 FROM pg_publication WHERE pubname = %s",
+					 PQescapeLiteral(conn, pubname, strlen(pubname)));
+	res = PQexec(conn, query);
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		pg_fatal("could not check for publication \"%s\" in database \"%s\": %s",
+				 pubname, dbname, PQerrorMessage(conn));
+
+	exists = (PQntuples(res) == 1);
+
+	PQclear(res);
+	pg_free(query);
+	return exists;
+}
+
 /*
  * Create the publications and replication slots in preparation for logical
  * replication. Returns the LSN from latest replication slot. It will be the
@@ -780,13 +807,13 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
 		 * no replication slot is specified. It follows the same rule as
 		 * CREATE SUBSCRIPTION.
 		 */
-		if (num_pubs == 0 || num_subs == 0 || num_replslots == 0)
+		if (dbinfo[i].pubname == NULL || dbinfo[i].subname == NULL || dbinfo[i].replslotname == NULL)
 			genname = generate_object_name(conn);
-		if (num_pubs == 0)
+		if (dbinfo[i].pubname == NULL)
 			dbinfo[i].pubname = pg_strdup(genname);
-		if (num_subs == 0)
+		if (dbinfo[i].subname == NULL)
 			dbinfo[i].subname = pg_strdup(genname);
-		if (num_replslots == 0)
+		if (dbinfo[i].replslotname == NULL)
 			dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
 
 		/*
@@ -795,7 +822,19 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
 		 * consistent LSN and the new publication rows (such transactions
 		 * wouldn't see the new publication rows resulting in an error).
 		 */
-		create_publication(conn, &dbinfo[i]);
+		if (check_publication_exists(conn, dbinfo[i].pubname, dbinfo[i].dbname))
+		{
+			pg_log_info("using existing publication \"%s\" in database \"%s\"",
+						dbinfo[i].pubname, dbinfo[i].dbname);
+			dbinfo[i].made_publication = false;
+		}
+		else
+		{
+			create_publication(conn, &dbinfo[i]);
+			pg_log_info("created publication \"%s\" in database \"%s\"",
+						dbinfo[i].pubname, dbinfo[i].dbname);
+			dbinfo[i].made_publication = true;
+		}
 
 		/* Create replication slot on publisher */
 		if (lsn)
@@ -1771,8 +1810,11 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
 	 * those to provide necessary information to the user.
 	 */
 	if (!drop_all_pubs || dry_run)
-		drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
-						 &dbinfo->made_publication);
+	{
+		if (dbinfo->made_publication)
+			drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+							 &dbinfo->made_publication);
+	}
 }
 
 /*
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 229fef5b3b5..1fb46042090 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -537,9 +537,86 @@ my $sysid_s = $node_s->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 ok($sysid_p != $sysid_s, 'system identifier was changed');
 
+# Create user-defined publications.
+$node_p->safe_psql($db1,
+	"CREATE PUBLICATION test_pub_existing FOR TABLE tbl1");
+
+# Initialize node_s2 as a fresh standby of node_p for existing/new
+# publication test.
+$node_p->backup('backup_tablepub');
+my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2');
+$node_s2->init_from_backup($node_p, 'backup_tablepub', has_streaming => 1);
+$node_s2->start;
+$node_s2->stop;
+
+# Run pg_createsubscriber on node S2
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose', '--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db1),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--database' => $db1,
+		'--database' => $db2,
+		'--publication' => 'test_pub_existing',
+		'--publication' => 'test_pub_new',
+	],
+	'run pg_createsubscriber on node S2');
+
+# Start subscriber
+$node_s2->start;
+
+# Verify that test_pub_new was created in db2
+$result = $node_p->safe_psql($db2,
+	"SELECT COUNT(*) FROM pg_publication WHERE pubname = 'test_pub_new'");
+is($result, '1', 'test_pub_new publication was created in db2');
+
+# Insert rows on P
+$node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('fourth row')");
+$node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 2')");
+
+# Get subscription names and publications
+$result = $node_s2->safe_psql(
+	'postgres', qq(
+    SELECT subname, subpublications FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
+));
+@subnames = split("\n", $result);
+
+# Check result in database $db1
+$result = $node_s2->safe_psql($db1, 'SELECT * FROM tbl1');
+is( $result, qq(first row
+second row
+third row
+fourth row),
+	"logical replication works in database $db1");
+
+# Check result in database $db2
+$result = $node_s2->safe_psql($db2, 'SELECT * FROM tbl2');
+is( $result, qq(row 1
+row 2),
+	"logical replication works in database $db2");
+
+# Verify that the correct publications are being used
+$result = $node_s2->safe_psql(
+	'postgres', qq(
+        SELECT s.subpublications
+        FROM pg_subscription s
+        WHERE s.subname ~ '^pg_createsubscriber_'
+        ORDER BY s.subdbid
+    )
+);
+
+is( $result, qq({test_pub_existing}
+{test_pub_new}),
+	"subscriptions use the correct publications");
+
 # clean up
 $node_p->teardown_node;
 $node_s->teardown_node;
+$node_s2->teardown_node;
 $node_t->teardown_node;
 $node_f->teardown_node;
 
-- 
2.34.1

