From 48e8be4a1f2c9fd1c103f37246eb771bb794354a Mon Sep 17 00:00:00 2001
From: Khanna <Shubham.Khanna@fujitsu.com>
Date: Fri, 28 Feb 2025 14:44:23 +0530
Subject: [PATCH v14] Support for dropping all publications in
 'pg_createsubscriber'

This patch introduces a new '--drop-all-publications' option in the
'pg_createsubscriber' utility.

This feature ensures a clean and streamlined setup of logical replication by
removing publications on the subscriber that were originally replicated from
the primary server during streaming replication.
These publications become redundant once the setup transitions to logical
replication and serve no further purpose.

This cleanup process removes all publications from the subscriber, regardless
of their origin. Users should back up any manually created publications before
running this command.

The '--drop-all-publications' option allows users to remove replicated
publications from the subscriber when setting up logical replication.
This is particularly useful when publications created in a previous setup are
no longer needed. By default, publications are preserved to avoid unintended
data loss.
---
 doc/src/sgml/ref/pg_createsubscriber.sgml     |  11 +
 src/bin/pg_basebackup/pg_createsubscriber.c   |  92 ++++++--
 .../t/040_pg_createsubscriber.pl              | 201 ++++++++++++------
 3 files changed, 218 insertions(+), 86 deletions(-)

diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml
index b4b996236e4..80eb0e79629 100644
--- a/doc/src/sgml/ref/pg_createsubscriber.sgml
+++ b/doc/src/sgml/ref/pg_createsubscriber.sgml
@@ -87,6 +87,17 @@ PostgreSQL documentation
    command-line arguments:
 
    <variablelist>
+    <varlistentry>
+     <term><option>-c</option></term>
+     <term><option>--drop-all-publications</option></term>
+     <listitem>
+      <para>
+       Remove all existing publications from specified databases on the target
+       server.
+      </para>
+     </listitem>
+    </varlistentry>
+
     <varlistentry>
      <term><option>-d <replaceable class="parameter">dbname</replaceable></option></term>
      <term><option>--database=<replaceable class="parameter">dbname</replaceable></option></term>
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index a5a2d61165d..6098e70c8db 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -44,6 +44,7 @@ struct CreateSubscriberOptions
 	SimpleStringList sub_names; /* list of subscription names */
 	SimpleStringList replslot_names;	/* list of replication slot names */
 	int			recovery_timeout;	/* stop recovery after this time */
+	bool		drop_publications;	/* drop all publications */
 };
 
 /* per-database publication/subscription info */
@@ -91,7 +92,8 @@ static void check_publisher(const struct LogicalRepInfo *dbinfo);
 static char *setup_publisher(struct LogicalRepInfo *dbinfo);
 static void check_subscriber(const struct LogicalRepInfo *dbinfo);
 static void setup_subscriber(struct LogicalRepInfo *dbinfo,
-							 const char *consistent_lsn);
+							 const char *consistent_lsn,
+							 bool drop_all_publications);
 static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *datadir,
 						   const char *lsn);
 static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
@@ -109,7 +111,10 @@ static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
-static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static void drop_publication_by_name(PGconn *conn, const char *dbname,
+									 const char *pubname);
+static void check_and_drop_existing_publications(PGconn *conn,
+												 const char *dbname);
 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
 									 const char *lsn);
@@ -192,7 +197,8 @@ cleanup_objects_atexit(void)
 			if (conn != NULL)
 			{
 				if (dbinfos.dbinfo[i].made_publication)
-					drop_publication(conn, &dbinfos.dbinfo[i]);
+					drop_publication_by_name(conn, dbinfos.dbinfo[i].dbname,
+											 dbinfos.dbinfo[i].pubname);
 				if (dbinfos.dbinfo[i].made_replslot)
 					drop_replication_slot(conn, &dbinfos.dbinfo[i], dbinfos.dbinfo[i].replslotname);
 				disconnect_database(conn, false);
@@ -234,6 +240,9 @@ usage(void)
 	printf(_("Usage:\n"));
 	printf(_("  %s [OPTION]...\n"), progname);
 	printf(_("\nOptions:\n"));
+	printf(_("  -c  --drop-all-publications\n"
+			 "                                  drop all publications from specified databases\n"
+			 "                                  on the subscriber\n"));
 	printf(_("  -d, --database=DBNAME           database in which to create a subscription\n"));
 	printf(_("  -D, --pgdata=DATADIR            location for the subscriber data directory\n"));
 	printf(_("  -n, --dry-run                   dry run, just show what would be done\n"));
@@ -1171,10 +1180,13 @@ check_and_drop_existing_subscriptions(PGconn *conn,
 /*
  * Create the subscriptions, adjust the initial location for logical
  * replication and enable the subscriptions. That's the last step for logical
- * replication setup.
+ * replication setup. If 'drop_publications' parameter is true, existing
+ * publications on the subscriber will be dropped before creating new
+ * subscriptions.
  */
 static void
-setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
+setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn,
+				 bool drop_all_publications)
 {
 	for (int i = 0; i < num_dbs; i++)
 	{
@@ -1192,11 +1204,16 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 		check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
 
 		/*
-		 * Since the publication was created before the consistent LSN, it is
-		 * available on the subscriber when the physical replica is promoted.
-		 * Remove publications from the subscriber because it has no use.
+		 * Since the publication was created before the consistent LSN, it
+		 * remains on the subscriber even after the physical replica is
+		 * promoted. Remove this publication from the subscriber because it
+		 * has no use. Additionally, if requested, drop all pre-existing
+		 * publications.
 		 */
-		drop_publication(conn, &dbinfo[i]);
+		if (drop_all_publications)
+			check_and_drop_existing_publications(conn, dbinfo[i].dbname);
+		else
+			drop_publication_by_name(conn, dbinfo[i].dbname, dbinfo[i].pubname);
 
 		create_subscription(conn, &dbinfo[i]);
 
@@ -1661,26 +1678,20 @@ create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 }
 
 /*
- * Remove publication if it couldn't finish all steps.
+ * Drop the specified publication of the given database.
  */
 static void
-drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
+drop_publication_by_name(PGconn *conn, const char *dbname, const char *pubname)
 {
 	PQExpBuffer str = createPQExpBuffer();
 	PGresult   *res;
-	char	   *pubname_esc;
-
-	Assert(conn != NULL);
 
-	pubname_esc = PQescapeIdentifier(conn, dbinfo->pubname, strlen(dbinfo->pubname));
+	char	   *pubname_esc = PQescapeIdentifier(conn, pubname, strlen(pubname));
 
-	pg_log_info("dropping publication \"%s\" in database \"%s\"",
-				dbinfo->pubname, dbinfo->dbname);
+	pg_log_info("dropping publication \"%s\" in database \"%s\"", pubname, dbname);
 
 	appendPQExpBuffer(str, "DROP PUBLICATION %s", pubname_esc);
 
-	PQfreemem(pubname_esc);
-
 	pg_log_debug("command is: %s", str->data);
 
 	if (!dry_run)
@@ -1689,8 +1700,8 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
 		{
 			pg_log_error("could not drop publication \"%s\" in database \"%s\": %s",
-						 dbinfo->pubname, dbinfo->dbname, PQresultErrorMessage(res));
-			dbinfo->made_publication = false;	/* don't try again. */
+						 pubname, dbname, PQresultErrorMessage(res));
+			dbinfos.dbinfo->made_publication = false;	/* don't try again. */
 
 			/*
 			 * Don't disconnect and exit here. This routine is used by primary
@@ -1703,9 +1714,39 @@ drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo)
 		PQclear(res);
 	}
 
+	PQfreemem(pubname_esc);
 	destroyPQExpBuffer(str);
 }
 
+/*
+ * Check and drop existing publications on the subscriber.
+ */
+static void
+check_and_drop_existing_publications(PGconn *conn, const char *dbname)
+{
+	PGresult   *res;
+
+	pg_log_info("dropping all existing publications in database \"%s\"",
+				dbname);
+
+	/* Fetch all publication names */
+	res = PQexec(conn, "SELECT pubname FROM pg_catalog.pg_publication;");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain publication information: %s",
+					 PQresultErrorMessage(res));
+		PQclear(res);
+		return;
+	}
+
+	/* Drop each publication */
+	for (int i = 0; i < PQntuples(res); i++)
+		drop_publication_by_name(conn, dbname, PQgetvalue(res, i, 0));
+
+	PQclear(res);
+	pg_log_info("dropped all publications in database \"%s\"", dbname);
+}
+
 /*
  * Create a subscription with some predefined options.
  *
@@ -1907,6 +1948,7 @@ main(int argc, char **argv)
 {
 	static struct option long_options[] =
 	{
+		{"drop-all-publications", no_argument, NULL, 'c'},
 		{"database", required_argument, NULL, 'd'},
 		{"pgdata", required_argument, NULL, 'D'},
 		{"dry-run", no_argument, NULL, 'n'},
@@ -1976,6 +2018,7 @@ main(int argc, char **argv)
 		0
 	};
 	opt.recovery_timeout = 0;
+	opt.drop_publications = false;
 
 	/*
 	 * Don't allow it to be run as root. It uses pg_ctl which does not allow
@@ -1993,11 +2036,14 @@ main(int argc, char **argv)
 
 	get_restricted_token();
 
-	while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v",
+	while ((c = getopt_long(argc, argv, "cd:D:np:P:s:t:TU:v",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
 		{
+			case 'c':
+				opt.drop_publications = true;
+				break;
 			case 'd':
 				if (!simple_string_list_member(&opt.database_names, optarg))
 				{
@@ -2278,7 +2324,7 @@ main(int argc, char **argv)
 	 * point to the LSN reported by setup_publisher().  It also cleans up
 	 * publications created by this tool and replication to the standby.
 	 */
-	setup_subscriber(dbinfos.dbinfo, consistent_lsn);
+	setup_subscriber(dbinfos.dbinfo, consistent_lsn, opt.drop_publications);
 
 	/* Remove primary_slot_name if it exists on primary */
 	drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index c35fa108ce3..22db6a44971 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -150,18 +150,18 @@ my $slotname = 'physical_slot';
 $node_p->safe_psql($db2,
 	"SELECT pg_create_physical_replication_slot('$slotname')");
 
-# Set up node S as standby linking to node P
+# Set up node S1 as standby linking to node P
 $node_p->backup('backup_1');
-my $node_s = PostgreSQL::Test::Cluster->new('node_s');
-$node_s->init_from_backup($node_p, 'backup_1', has_streaming => 1);
-$node_s->append_conf(
+my $node_s1 = PostgreSQL::Test::Cluster->new('node_s1');
+$node_s1->init_from_backup($node_p, 'backup_1', has_streaming => 1);
+$node_s1->append_conf(
 	'postgresql.conf', qq[
 primary_slot_name = '$slotname'
 primary_conninfo = '$pconnstr dbname=postgres'
 hot_standby_feedback = on
 ]);
-$node_s->set_standby_mode();
-$node_s->start;
+$node_s1->set_standby_mode();
+$node_s1->start;
 
 # Set up node T as standby linking to node P then promote it
 my $node_t = PostgreSQL::Test::Cluster->new('node_t');
@@ -192,10 +192,10 @@ command_fails(
 		'pg_createsubscriber',
 		'--verbose',
 		'--dry-run',
-		'--pgdata' => $node_s->data_dir,
+		'--pgdata' => $node_s1->data_dir,
 		'--publisher-server' => $node_p->connstr($db1),
-		'--socketdir' => $node_s->host,
-		'--subscriber-port' => $node_s->port,
+		'--socketdir' => $node_s1->host,
+		'--subscriber-port' => $node_s1->port,
 		'--database' => $db1,
 		'--database' => $db2,
 	],
@@ -215,10 +215,10 @@ command_fails(
 	],
 	'subscriber data directory is not a copy of the source database cluster');
 
-# Set up node C as standby linking to node S
-$node_s->backup('backup_2');
+# Set up node C as standby linking to node S1
+$node_s1->backup('backup_2');
 my $node_c = PostgreSQL::Test::Cluster->new('node_c');
-$node_c->init_from_backup($node_s, 'backup_2', has_streaming => 1);
+$node_c->init_from_backup($node_s1, 'backup_2', has_streaming => 1);
 $node_c->adjust_conf('postgresql.conf', 'primary_slot_name', undef);
 $node_c->set_standby_mode();
 
@@ -229,7 +229,7 @@ command_fails(
 		'--verbose',
 		'--dry-run',
 		'--pgdata' => $node_c->data_dir,
-		'--publisher-server' => $node_s->connstr($db1),
+		'--publisher-server' => $node_s1->connstr($db1),
 		'--socketdir' => $node_c->host,
 		'--subscriber-port' => $node_c->port,
 		'--database' => $db1,
@@ -246,16 +246,16 @@ max_wal_senders = 1
 max_worker_processes = 2
 });
 $node_p->restart;
-$node_s->stop;
+$node_s1->stop;
 command_fails(
 	[
 		'pg_createsubscriber',
 		'--verbose',
 		'--dry-run',
-		'--pgdata' => $node_s->data_dir,
+		'--pgdata' => $node_s1->data_dir,
 		'--publisher-server' => $node_p->connstr($db1),
-		'--socketdir' => $node_s->host,
-		'--subscriber-port' => $node_s->port,
+		'--socketdir' => $node_s1->host,
+		'--subscriber-port' => $node_s1->port,
 		'--database' => $db1,
 		'--database' => $db2,
 
@@ -271,8 +271,8 @@ max_wal_senders = 10
 max_worker_processes = 8
 });
 
-# Check some unmet conditions on node S
-$node_s->append_conf(
+# Check some unmet conditions on node S1
+$node_s1->append_conf(
 	'postgresql.conf', q{
 max_replication_slots = 1
 max_logical_replication_workers = 1
@@ -283,15 +283,15 @@ command_fails(
 		'pg_createsubscriber',
 		'--verbose',
 		'--dry-run',
-		'--pgdata' => $node_s->data_dir,
+		'--pgdata' => $node_s1->data_dir,
 		'--publisher-server' => $node_p->connstr($db1),
-		'--socketdir' => $node_s->host,
-		'--subscriber-port' => $node_s->port,
+		'--socketdir' => $node_s1->host,
+		'--subscriber-port' => $node_s1->port,
 		'--database' => $db1,
 		'--database' => $db2,
 	],
-	'standby contains unmet conditions on node S');
-$node_s->append_conf(
+	'standby contains unmet conditions on node S1');
+$node_s1->append_conf(
 	'postgresql.conf', q{
 max_replication_slots = 10
 max_logical_replication_workers = 4
@@ -305,43 +305,58 @@ my $fslotname = 'failover_slot';
 $node_p->safe_psql($db1,
 	"SELECT pg_create_logical_replication_slot('$fslotname', 'pgoutput', false, false, true)"
 );
-$node_s->start;
+$node_s1->start;
 # Wait for the standby to catch up so that the standby is not lagging behind
 # the failover slot.
-$node_p->wait_for_replay_catchup($node_s);
-$node_s->safe_psql('postgres', "SELECT pg_sync_replication_slots()");
-my $result = $node_s->safe_psql('postgres',
+$node_p->wait_for_replay_catchup($node_s1);
+$node_s1->safe_psql('postgres', "SELECT pg_sync_replication_slots()");
+my $result = $node_s1->safe_psql('postgres',
 	"SELECT slot_name FROM pg_replication_slots WHERE slot_name = '$fslotname' AND synced AND NOT temporary"
 );
 is($result, 'failover_slot', 'failover slot is synced');
 
-# Insert another row on node P and wait node S to catch up. We
+# Insert another row on node P and wait node S1 to catch up. We
 # intentionally performed this insert after syncing logical slot
 # as otherwise the local slot's (created during synchronization of
 # slot) xmin on standby could be ahead of the remote slot leading
 # to failure in synchronization.
 $node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('second row')");
-$node_p->wait_for_replay_catchup($node_s);
+$node_p->wait_for_replay_catchup($node_s1);
 
 # Create subscription to test its removal
 my $dummy_sub = 'regress_sub_dummy';
 $node_p->safe_psql($db1,
 	"CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
 );
-$node_p->wait_for_replay_catchup($node_s);
-$node_s->stop;
+$node_p->wait_for_replay_catchup($node_s1);
+
+# Create user-defined publications, wait for streaming replication to sync them
+# to the standby, then verify that '--drop-all-publications'
+# removes them.
+$node_p->safe_psql(
+	$db1, qq(
+	CREATE PUBLICATION test_pub1 FOR ALL TABLES;
+	CREATE PUBLICATION test_pub2 FOR ALL TABLES;
+));
+
+$node_p->wait_for_replay_catchup($node_s1);
+
+ok( $node_s1->safe_psql($db1, "SELECT COUNT(*) = 2 FROM pg_publication"),
+	'two publications created before --drop-all-publications is run');
 
-# dry run mode on node S
+$node_s1->stop;
+
+# dry run mode on node S1
 command_ok(
 	[
 		'pg_createsubscriber',
 		'--verbose',
 		'--dry-run',
 		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
-		'--pgdata' => $node_s->data_dir,
+		'--pgdata' => $node_s1->data_dir,
 		'--publisher-server' => $node_p->connstr($db1),
-		'--socketdir' => $node_s->host,
-		'--subscriber-port' => $node_s->port,
+		'--socketdir' => $node_s1->host,
+		'--subscriber-port' => $node_s1->port,
 		'--publication' => 'pub1',
 		'--publication' => 'pub2',
 		'--subscription' => 'sub1',
@@ -349,13 +364,14 @@ command_ok(
 		'--database' => $db1,
 		'--database' => $db2,
 	],
-	'run pg_createsubscriber --dry-run on node S');
+	'run pg_createsubscriber --dry-run on node S1');
 
-# Check if node S is still a standby
-$node_s->start;
-is($node_s->safe_psql('postgres', 'SELECT pg_catalog.pg_is_in_recovery()'),
-	't', 'standby is in recovery');
-$node_s->stop;
+# Check if node S1 is still a standby
+$node_s1->start;
+is( $node_s1->safe_psql('postgres', 'SELECT pg_catalog.pg_is_in_recovery()'),
+	't',
+	'standby is in recovery');
+$node_s1->stop;
 
 # pg_createsubscriber can run without --databases option
 command_ok(
@@ -363,35 +379,37 @@ command_ok(
 		'pg_createsubscriber',
 		'--verbose',
 		'--dry-run',
-		'--pgdata' => $node_s->data_dir,
+		'--pgdata' => $node_s1->data_dir,
 		'--publisher-server' => $node_p->connstr($db1),
-		'--socketdir' => $node_s->host,
-		'--subscriber-port' => $node_s->port,
+		'--socketdir' => $node_s1->host,
+		'--subscriber-port' => $node_s1->port,
 		'--replication-slot' => 'replslot1',
 	],
 	'run pg_createsubscriber without --databases');
 
-# Run pg_createsubscriber on node S.  --verbose is used twice
+# Run pg_createsubscriber on node S1.  --verbose is used twice
 # to show more information.
-# In passing, also test the --enable-two-phase option
+# In passing, also test the --enable-two-phase and
+# --drop-all-publications option
 command_ok(
 	[
 		'pg_createsubscriber',
 		'--verbose', '--verbose',
 		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
-		'--pgdata' => $node_s->data_dir,
+		'--pgdata' => $node_s1->data_dir,
 		'--publisher-server' => $node_p->connstr($db1),
-		'--socketdir' => $node_s->host,
-		'--subscriber-port' => $node_s->port,
+		'--socketdir' => $node_s1->host,
+		'--subscriber-port' => $node_s1->port,
 		'--publication' => 'pub1',
 		'--publication' => 'pub2',
 		'--replication-slot' => 'replslot1',
 		'--replication-slot' => 'replslot2',
 		'--database' => $db1,
 		'--database' => $db2,
-		'--enable-two-phase'
+		'--enable-two-phase',
+		'--drop-all-publications',
 	],
-	'run pg_createsubscriber on node S');
+	'run pg_createsubscriber on node S1');
 
 # Confirm the physical replication slot has been removed
 $result = $node_p->safe_psql($db1,
@@ -406,11 +424,15 @@ $node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('third row')");
 $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 
 # Start subscriber
-$node_s->start;
+$node_s1->start;
+
+# Confirm publications are removed from the subscriber node
+is($node_s1->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"),
+	'0', 'all publications dropped after --drop-all-publications is run');
 
 # Verify that all subtwophase states are pending or enabled,
 # e.g. there are no subscriptions where subtwophase is disabled ('d')
-is( $node_s->safe_psql(
+is( $node_s1->safe_psql(
 		'postgres',
 		"SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate = 'd'"
 	),
@@ -418,50 +440,103 @@ is( $node_s->safe_psql(
 	'subscriptions are created with the two-phase option enabled');
 
 # Confirm the pre-existing subscription has been removed
-$result = $node_s->safe_psql(
+$result = $node_s1->safe_psql(
 	'postgres', qq(
 	SELECT count(*) FROM pg_subscription WHERE subname = '$dummy_sub'
 ));
 is($result, qq(0), 'pre-existing subscription was dropped');
 
 # Get subscription names
-$result = $node_s->safe_psql(
+$result = $node_s1->safe_psql(
 	'postgres', qq(
 	SELECT subname FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
 ));
 my @subnames = split("\n", $result);
 
 # Wait subscriber to catch up
-$node_s->wait_for_subscription_sync($node_p, $subnames[0]);
-$node_s->wait_for_subscription_sync($node_p, $subnames[1]);
+$node_s1->wait_for_subscription_sync($node_p, $subnames[0]);
+$node_s1->wait_for_subscription_sync($node_p, $subnames[1]);
 
 # Confirm the failover slot has been removed
-$result = $node_s->safe_psql($db1,
+$result = $node_s1->safe_psql($db1,
 	"SELECT count(*) FROM pg_replication_slots WHERE slot_name = '$fslotname'"
 );
 is($result, qq(0), 'failover slot was removed');
 
 # Check result in database $db1
-$result = $node_s->safe_psql($db1, 'SELECT * FROM tbl1');
+$result = $node_s1->safe_psql($db1, 'SELECT * FROM tbl1');
 is( $result, qq(first row
 second row
 third row),
 	"logical replication works in database $db1");
 
 # Check result in database $db2
-$result = $node_s->safe_psql($db2, 'SELECT * FROM tbl2');
+$result = $node_s1->safe_psql($db2, 'SELECT * FROM tbl2');
 is($result, qq(row 1), "logical replication works in database $db2");
 
 # Different system identifier?
 my $sysid_p = $node_p->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
-my $sysid_s = $node_s->safe_psql('postgres',
+my $sysid_s = $node_s1->safe_psql('postgres',
 	'SELECT system_identifier FROM pg_control_system()');
 ok($sysid_p != $sysid_s, 'system identifier was changed');
 
+# Reuse node P as primary
+# Set up node S2 as standby linking to node P
+$node_p->backup('backup_3');
+my $node_s2 = PostgreSQL::Test::Cluster->new('node_s2');
+$node_s2->init_from_backup($node_p, 'backup_3', has_streaming => 1);
+$node_s2->append_conf(
+	'postgresql.conf', qq[
+       primary_conninfo = '$pconnstr'
+       max_logical_replication_workers = 5
+       ]);
+$node_s2->set_standby_mode();
+$node_s2->start;
+
+# Ensure there is a user database on the publisher
+my $db3 = generate_db($node_p, 'regression', 91, 127, '');
+
+# Create user-defined publications
+$node_p->safe_psql(
+	$db3, qq(
+	CREATE PUBLICATION test_pub3 FOR ALL TABLES;
+	CREATE PUBLICATION test_pub4 FOR ALL TABLES;
+));
+
+$node_p->wait_for_replay_catchup($node_s2);
+
+# Verify the existing publications
+ok( $node_s2->safe_psql($db3, "SELECT COUNT(*) = 2 FROM pg_publication"),
+	'two publications are created before running pg_createsubscriber');
+
+$node_s2->stop;
+
+# Run pg_createsubscriber on node S2 without '--drop-all-publications'.
+# --verbose is used twice to show more information.
+command_ok(
+	[
+		'pg_createsubscriber',
+		'--verbose', '--verbose',
+		'--recovery-timeout' => $PostgreSQL::Test::Utils::timeout_default,
+		'--pgdata' => $node_s2->data_dir,
+		'--publisher-server' => $node_p->connstr($db3),
+		'--socketdir' => $node_s2->host,
+		'--subscriber-port' => $node_s2->port,
+		'--database' => $db3,
+	],
+	'run pg_createsubscriber without --drop-all-publications on node S2');
+
+$node_s2->start;
+
+# Confirm publications still remain after running 'pg_createsubscriber'
+is($node_s2->safe_psql($db3, "SELECT COUNT(*) FROM pg_publication;"),
+	'2', 'all publications remain after running pg_createsubscriber');
+
 # clean up
 $node_p->teardown_node;
-$node_s->teardown_node;
+$node_s1->teardown_node;
+$node_s2->teardown_node;
 $node_t->teardown_node;
 $node_f->teardown_node;
 
-- 
2.34.1

