From 8a1e7c8a8eff38068eeda86ce4ccce9f5e209ba8 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 21 Jun 2024 10:55:46 +0000
Subject: [PATCH v2 2/2] pg_createsubscriber: Drop pre-existing subscriptions
 from the converted instance

Previously, we did nothing for pre-existing subscriptions on the streaming
replication cluster. However, after the conversion, the downstream node will try
to connect to another publisher node specified by the pre-existing subscriptions,
which will cause an ERROR. To avoid failure, drop such subscriptions from the
converted node.
---
 src/bin/pg_basebackup/pg_createsubscriber.c   |  96 +++++++++++++-
 .../t/041_pg_createsubscriber_added.pl        | 118 ++++++++++++++++++
 2 files changed, 210 insertions(+), 4 deletions(-)
 create mode 100644 src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl

diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 63317791f8..5f34f2d501 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -91,7 +91,8 @@ static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
 								  const char *slot_name);
 static void pg_ctl_status(const char *pg_ctl_cmd, int rc);
 static void start_standby_server(const struct CreateSubscriberOptions *opt,
-								 bool restricted_access);
+								 bool restricted_access,
+								 bool restricted_worker);
 static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
 								  const struct CreateSubscriberOptions *opt);
@@ -101,6 +102,9 @@ static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinf
 static void set_replication_progress(PGconn *conn, const struct LogicalRepInfo *dbinfo,
 									 const char *lsn);
 static void enable_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
+static void check_and_drop_existing_subscriptions(PGconn *conn,
+												  const struct LogicalRepInfo *dbinfo);
+static void drop_existing_subscriptions(PGconn *conn, const char *subname);
 
 #define	USEC_PER_SEC	1000000
 #define	WAIT_INTERVAL	1		/* 1 second */
@@ -1017,6 +1021,75 @@ check_subscriber(const struct LogicalRepInfo *dbinfo)
 		exit(1);
 }
 
+/*
+ * Drop a specified subscription. This is used for resolving the duplication of
+ * subscriptions on the primary and standby, so no need tp drop the replication
+ * slot. It will be used by the publisher node.
+ */
+static void
+drop_existing_subscriptions(PGconn *conn, const char *subname)
+{
+	PQExpBuffer query = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	/*
+	 * Construct a query string. These commands are allowed to be executed
+	 * within a transaction.
+	 */
+	appendPQExpBuffer(query, "ALTER SUBSCRIPTION %s DISABLE;",
+					  subname);
+	appendPQExpBuffer(query, " ALTER SUBSCRIPTION %s SET (slot_name = NONE);",
+					  subname);
+	appendPQExpBuffer(query, " DROP SUBSCRIPTION %s;", subname);
+
+	res = PQexec(conn, query->data);
+
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		pg_log_error("could not drop a subscription \"%s\" settings: %s",
+					 subname, PQresultErrorMessage(res));
+		disconnect_database(conn, true);
+	}
+
+	PQclear(res);
+	destroyPQExpBuffer(query);
+}
+
+/*
+ * Find out subscriptions not created by pg_createsubscriber, and drop them
+ */
+static void
+check_and_drop_existing_subscriptions(PGconn *conn,
+									  const struct LogicalRepInfo *dbinfo)
+{
+	PQExpBuffer query = createPQExpBuffer();
+	PGresult   *res;
+
+	Assert(conn != NULL);
+
+	appendPQExpBuffer(query,
+					  "SELECT s.subname FROM pg_catalog.pg_subscription s "
+					  "INNER JOIN pg_catalog.pg_database d ON (s.subdbid = d.oid) "
+					  "WHERE d.datname = '%s'",
+					  dbinfo->dbname);
+	res = PQexec(conn, query->data);
+
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("could not obtain pre-existing subscriptions: %s",
+					 PQresultErrorMessage(res));
+		disconnect_database(conn, true);
+	}
+
+	for (int i = 0; i < PQntuples(res); i++)
+		drop_existing_subscriptions(conn, PQgetvalue(res, i, 0));
+
+	PQclear(res);
+	destroyPQExpBuffer(query);
+}
+
 /*
  * Create the subscriptions, adjust the initial location for logical
  * replication and enable the subscriptions. That's the last step for logical
@@ -1032,6 +1105,15 @@ setup_subscriber(struct LogicalRepInfo *dbinfo, const char *consistent_lsn)
 		/* Connect to subscriber. */
 		conn = connect_database(dbinfo[i].subconninfo, true);
 
+		/*
+		 * If the streaming replication cluster has subscriptions, a converted
+		 * node may connect to another publisher node. This could cause an
+		 * ERROR due to a slot acquirement miss or data inconsistency because
+		 * only the converted node receives changes. To avoid it, drop
+		 * pre-existing subscriptions from the converted node.
+		 */
+		check_and_drop_existing_subscriptions(conn, &dbinfo[i]);
+
 		/*
 		 * Since the publication was created before the consistent LSN, it is
 		 * available on the subscriber when the physical replica is promoted.
@@ -1306,7 +1388,8 @@ pg_ctl_status(const char *pg_ctl_cmd, int rc)
 }
 
 static void
-start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access)
+start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_access,
+					 bool restricted_worker)
 {
 	PQExpBuffer pg_ctl_cmd = createPQExpBuffer();
 	int			rc;
@@ -1334,6 +1417,11 @@ start_standby_server(const struct CreateSubscriberOptions *opt, bool restricted_
 	if (opt->config_file != NULL)
 		appendPQExpBuffer(pg_ctl_cmd, " -o \"-c config_file=%s\"",
 						  opt->config_file);
+
+	/* Suppress to start logical replication if requested */
+	if (restricted_worker)
+		appendPQExpBuffer(pg_ctl_cmd, " -o \"-c max_logical_replication_workers=0\"");
+
 	pg_log_debug("pg_ctl command is: %s", pg_ctl_cmd->data);
 	rc = system(pg_ctl_cmd->data);
 	pg_ctl_status(pg_ctl_cmd->data, rc);
@@ -2058,7 +2146,7 @@ main(int argc, char **argv)
 	 * transformation steps.
 	 */
 	pg_log_info("starting the standby with command-line options");
-	start_standby_server(&opt, true);
+	start_standby_server(&opt, true, false);
 
 	/* Check if the standby server is ready for logical replication */
 	check_subscriber(dbinfo);
@@ -2092,7 +2180,7 @@ main(int argc, char **argv)
 	 * until accepting connections.
 	 */
 	pg_log_info("starting the subscriber");
-	start_standby_server(&opt, true);
+	start_standby_server(&opt, true, true);
 
 	/* Waiting the subscriber to be promoted */
 	wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
diff --git a/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl b/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl
new file mode 100644
index 0000000000..59cddb2fc1
--- /dev/null
+++ b/src/bin/pg_basebackup/t/041_pg_createsubscriber_added.pl
@@ -0,0 +1,118 @@
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Construct a cascading replication system like:
+#
+# node_a --(logical replication)--> node_b --(streaming replication)--> node_c
+#
+
+# Set up node A as publisher
+my $node_a = PostgreSQL::Test::Cluster->new('node_a');
+my $aconnstr = $node_a->connstr;
+$node_a->init(allows_streaming => 'logical');
+$node_a->start;
+
+# On node A
+# - create databases
+# - create test tables
+# - insert a row
+# - create publications
+$node_a->safe_psql(
+	'postgres', q(
+	CREATE DATABASE pg1;
+	CREATE DATABASE pg2;
+));
+$node_a->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_a->safe_psql('pg1', "INSERT INTO tbl1 VALUES('first row')");
+$node_a->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+$node_a->safe_psql('pg1', 'CREATE PUBLICATION pub1_pg1 FOR ALL TABLES');
+$node_a->safe_psql('pg1', 'CREATE PUBLICATION pub2_pg1');
+$node_a->safe_psql('pg2', 'CREATE PUBLICATION pub1_pg2 FOR ALL TABLES');
+$node_a->safe_psql('pg2', 'CREATE PUBLICATION pub2_pg2');
+
+# Set up node B as subscriber/primary
+my $node_b = PostgreSQL::Test::Cluster->new('node_b');
+my $bconnstr = $node_b->connstr;
+$node_b->init(allows_streaming => 'logical');
+$node_b->start;
+
+# On node B
+# - create databases
+# - create subscriptions
+$node_b->safe_psql(
+	'postgres', q(
+	CREATE DATABASE pg1;
+	CREATE DATABASE pg2;
+));
+$node_b->safe_psql('pg1', 'CREATE TABLE tbl1 (a text)');
+$node_b->safe_psql('pg2', 'CREATE TABLE tbl2 (a text)');
+$node_b->safe_psql('pg1',
+    "CREATE SUBSCRIPTION sub1_pg1 CONNECTION '$aconnstr dbname=pg1' PUBLICATION pub1_pg1");
+$node_b->safe_psql('pg1',
+    "CREATE SUBSCRIPTION sub2_pg1 CONNECTION '$aconnstr dbname=pg1' PUBLICATION pub2_pg1");
+$node_b->safe_psql('pg2',
+    "CREATE SUBSCRIPTION sub1_pg2 CONNECTION '$aconnstr dbname=pg2' PUBLICATION pub1_pg2");
+$node_b->safe_psql('pg2',
+    "CREATE SUBSCRIPTION sub2_pg2 CONNECTION '$aconnstr dbname=pg2' PUBLICATION pub2_pg2");
+
+$node_b->wait_for_subscription_sync($node_a, 'sub1_pg1');
+$node_b->wait_for_subscription_sync($node_a, 'sub1_pg2');
+
+# Confirms logical replication works well
+my $result = $node_b->safe_psql('pg1', 'SELECT * FROM tbl1;');
+is($result, 'first row', 'check logical replication works well');
+
+# Set up node C as standby
+$node_b->backup('backup_1');
+my $node_c = PostgreSQL::Test::Cluster->new('node_c');
+$node_c->init_from_backup($node_b, 'backup_1', has_streaming => 1);
+$node_c->append_conf(
+	'postgresql.conf', qq[
+primary_conninfo = '$bconnstr'
+]);
+$node_c->set_standby_mode();
+$node_c->start;
+
+$node_b->wait_for_replay_catchup($node_c);
+
+# Confirms streaming replication works well
+$result = $node_c->safe_psql('pg1', 'SELECT * FROM tbl1;');
+is($result, 'first row', 'check streaming replication works well');
+
+$node_c->stop;
+
+# Run pg_createsubscriber
+command_ok(
+	[
+		'pg_createsubscriber', '--verbose',
+		'--recovery-timeout', "$PostgreSQL::Test::Utils::timeout_default",
+		'--verbose',
+        '--pgdata', $node_c->data_dir,
+        '--publisher-server', $bconnstr,
+        '--socket-directory', $node_c->host,
+        '--subscriber-port', $node_c->port,
+        '--database', 'pg1',
+        '--database', 'pg2'
+	],
+	'run pg_createsubscriber on node S');
+
+# Confirms pre-existing subscriptions are removed from the converted node
+$node_c->start;
+$result = $node_c->safe_psql('pg1',
+    "SELECT subname FROM pg_subscription WHERE subname NOT LIKE 'pg_createsubscriber%';");
+is($result, '', 'check subscriptions are removed');
+
+# Confirms pre-existing subscriptions still exist on the primary node
+$result = $node_b->safe_psql('pg1',
+    "SELECT subname, subenabled, subslotname FROM pg_subscription WHERE subname NOT LIKE 'pg_createsubscriber%' ORDER BY subname;");
+is($result, 'sub1_pg1|t|sub1_pg1
+sub1_pg2|t|sub1_pg2
+sub2_pg1|t|sub2_pg1
+sub2_pg2|t|sub2_pg2', 'check subscriptions still exist');
+
+done_testing();
-- 
2.43.0

