From fc3b0148848b1a6891b079fb449b3fb9223f5a07 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 21 Jun 2024 10:55:46 +0000
Subject: [PATCH v4 2/2] pg_createsubscriber: Drop pre-existing subscriptions
 from the converted instance

Previously, we did nothing for pre-existing subscriptions on the streaming
replication cluster. However, after the conversion, the downstream node will try
to connect to another publisher node specified by the pre-existing subscriptions,
which will cause an ERROR. To avoid failure, drop such subscriptions from the
converted node.
---
 src/bin/pg_basebackup/pg_createsubscriber.c   | 111 +++++++++++++++++-
 .../t/040_pg_createsubscriber.pl              |  14 +++
 2 files changed, 120 insertions(+), 5 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index fb57737f7c..21dd50f808 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -92,7 +92,8 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
 								  const char *slot_name);
 static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
 static void start_standby_server(const struct CreateSubscriberOptions *opt,
-								 bool restricted_access);
+								 bool restricted_access,
+								 bool restrict_logical_worker);
 static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
@@ -102,6 +103,10 @@ static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinf
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
 									 const char *lsn);
 static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
+static void check_and_drop_existing_subscriptions(PGconn *conn,
+												  const struct LogicalRepInfo *dbinfo);
+static void drop_existing_subscriptions(PGconn *conn, const char *subname,
+										const char *dbname);
 
 #define	USEC_PER_SEC	1000000
 #define	WAIT_INTERVAL	1		/* 1 second */
@@ -1025,6 +1030,87 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
 		exit(1);
 }
 
+/*
+ * Drop a specified subscription. This is to avoid duplicate subscriptions on
+ * the primary (publisher node) and the newly created subscriber. We
+ * shouldn't drop the associated slot as that would be used by the publisher
+ * node.
+ */
+static void
+drop_existing_subscriptions(PGconn *conn, const char *subname, const char *dbname)
+{
+	PQExpBuffer query = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	/*
+	 * Construct a query string. These commands are allowed to be executed
+	 * within a transaction.
+	 */
+	appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
+					  subname);
+	appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
+					  subname);
+	appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
+
+	pg_log_info("dropping subscription \"%s\" on database \"%s\"",
+				subname, dbname);
+
+	if (!dry_run)
+	{
+		res = PQexec(conn, query->data);
+
+		if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		{
+			pg_log_error("could not drop a subscription \"%s\" settings: %s",
+						 subname, PQresultErrorMessage(res));
+			disconnect_database(conn, true);
+		}
+
+		PQclear(res);
+	}
+
+	destroyPQExpBuffer(query);
+}
+
+/*
+ * Retrieve and drop the pre-existing subscriptions.
+ */
+static void
+check_and_drop_existing_subscriptions(PGconn *conn,
+									  const struct LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer query = createPQExpBuffer();
+	char	   *dbname;
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	dbname = PQescapeLiteral(conn, dbinfo->dbname, strlen(dbinfo->dbname));
+
+	appendPQExpBuffer(query,
+					  "SELECT s.subname FROM pg_catalog.pg_subscription s "
+					  "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
+					  "WHERE d.datname = %s",
+					  dbname);
+	res = PQexec(conn, query->data);
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain pre-existing subscriptions: %s",
+					 PQresultErrorMessage(res));
+		disconnect_database(conn, true);
+	}
+
+	for (int i = 0; i < PQntuples(res); i++)
+		drop_existing_subscriptions(conn, PQgetvalue(res, i, 0),
+									dbinfo->dbname);
+
+	PQclear(res);
+	destroyPQExpBuffer(query);
+}
+
 /*
  * Create the subscriptions, adjust the initial location for logical
  * replication and enable the subscriptions. That's the last step for logical
@@ -1040,6 +1126,14 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 		/* Connect to subscriber. */
 		conn = connect_database(dbinfo[i].subconninfo, true);
 
+		/*
+		 * We don't need the pre-existing subscriptions on the newly formed
+		 * subscriber. They can connect to other publisher nodes and either
+		 * get some unwarranted data or can lead to ERRORs in connecting to
+		 * such nodes.
+		 */
+		check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
+
 		/*
 		 * Since the publication was created before the consistent LSN, it is
 		 * available on the subscriber when the physical replica is promoted.
@@ -1314,7 +1408,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
 }
 
 static void
-start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access)
+start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
+					 bool restrict_logical_worker)
 {
 	PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
 	int			rc;
@@ -1343,6 +1438,11 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
 	if (opt->config_file != NULL)
 		appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
 						  opt->config_file);
+
+	/* Suppress to start logical replication if requested */
+	if (restrict_logical_worker)
+		appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
+
 	pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
 	rc = system(pg_ctl_cmd->data);
 	pg_ctl_status(pg_ctl_cmd->data, rc);
@@ -2067,7 +2167,7 @@ main(int argc, char **argv)
 	 * transformation steps.
 	 */
 	pg_log_info("starting the standby with command-line options");
-	start_standby_server(&opt, true);
+	start_standby_server(&opt, true, false);
 
 	/* Check if the standby server is ready for logical replication */
 	check_subscriber(dbinfo);
@@ -2098,10 +2198,11 @@ main(int argc, char **argv)
 
 	/*
 	 * Start subscriber so the recovery parameters will take effect. Wait
-	 * until accepting connections.
+	 * until accepting connections. We don't want to start logical replication
+	 * during setup.
 	 */
 	pg_log_info("starting the subscriber");
-	start_standby_server(&opt, true);
+	start_standby_server(&opt, true, true);
 
 	/* Waiting the subscriber to be promoted */
 	wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
index 1241bf6c6a..80002c5a17 100644
--- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
+++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
@@ -298,6 +298,13 @@ my $result = $node_s->safe_psql('postgres',
 	"SELECT slot_name FROM pg_replication_slots WHERE slot_name = '$fslotname' AND synced AND NOT temporary"
 );
 is($result, 'failover_slot', 'failover slot is synced');
+
+# Create subscription to test its removal
+my $dummy_sub = 'regress_sub_dummy';
+$node_p->safe_psql($db1,
+	"CREATE SUBSCRIPTION $dummy_sub CONNECTION 'dbname=dummy' PUBLICATION pub_dummy WITH (connect=false)"
+);
+$node_p->wait_for_replay_catchup($node_s);
 $node_s->stop;
 
 # dry run mode on node S
@@ -372,6 +379,13 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
 # Start subscriber
 $node_s->start;
 
+# Confirm the pre-existing subscription has been removed
+$result = $node_s->safe_psql(
+	'postgres', qq(
+	SELECT count(*) FROM pg_subscription WHERE subname = '$dummy_sub'
+));
+is($result, qq(0), 'pre-existing subscription was dropped');
+
 # Get subscription names
 $result = $node_s->safe_psql(
 	'postgres', qq(
-- 
2.43.0

