From ad7de281d514343df7a04d01314266837d544b33 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Thu, 9 May 2024 12:53:36 +0000
Subject: [PATCH v2 4/5] Drop replication slots which had been synchronized

After the transformation, the slotsync worker won't be available on the standby.
If synchronized slots are created, they do not consume changes, it may lead
PANIC. To avoid the issue, drop such slots at the end of the command.
---
 src/bin/pg_basebackup/pg_createsubscriber.c | 64 +++++++++++++++++++++
 1 file changed, 64 insertions(+)

diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index c8849fbc82..ee96a6fd0a 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -87,6 +87,7 @@ static void setup_recovery(const struct LogicalRepInfo *dbinfo, const char *data
 						   const char *lsn);
 static void drop_primary_replication_slot(struct LogicalRepInfo *dbinfo,
 										  const char *slotname);
+static void drop_replication_slot_on_subscriber(struct LogicalRepInfo *dbinfo);
 static char *create_logical_replication_slot(PGconn *conn,
 											 struct LogicalRepInfo *dbinfo);
 static void drop_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo,
@@ -1195,6 +1196,61 @@ drop_primary_replication_slot(struct LogicalRepInfo *dbinfo, const char *slotnam
 	}
 }
 
+/*
+ * Drop logical replication slot on subscriber if they had been synchronized.
+ * After the transformation, they won't be updated.
+ */
+static void
+drop_replication_slot_on_subscriber(struct LogicalRepInfo *dbinfo)
+{
+	for (int i = 0; i < num_dbs; i++)
+	{
+		PGconn	   *conn;
+		PGresult   *res;
+
+		/* Connect to the subscriber */
+		conn = connect_database(dbinfo[i].subconninfo, false);
+
+		if (conn == NULL)
+		{
+			pg_log_warning("could not connect to the database \"%s\" on standby",
+						   dbinfo[i].dbname);
+			pg_log_warning_hint("You may have to drop replication slots which had been synchronized.");
+			continue;
+		}
+
+		/* Extract slots which had been synchronized. */
+		res = PQexec(conn,
+					 "SELECT slot_name FROM pg_catalog.pg_replication_slots "
+					 "WHERE failover;");
+
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+		{
+			pg_log_warning("could not obtain synchronized slots information: %s",
+						   PQresultErrorMessage(res));
+			pg_log_warning_hint("You may have to drop replication slots which had been synchronized.");
+			disconnect_database(conn, false);
+			continue;
+		}
+
+		/* Skip if not found */
+		if (PQntuples(res) == 0)
+		{
+			PQclear(res);
+			disconnect_database(conn, false);
+			continue;
+		}
+
+		/* Drop all replication slots extracted by the query */
+		for (int j = 0; j < PQntuples(res); j++)
+			drop_replication_slot(conn, &dbinfo[i], PQgetvalue(res, j, 0));
+
+		/* Cleanup */
+		PQclear(res);
+		disconnect_database(conn, false);
+	}
+}
+
 /*
  * Create a logical replication slot and returns a LSN.
  *
@@ -2170,6 +2226,14 @@ main(int argc, char **argv)
 	/* Remove primary_slot_name if it exists on primary */
 	drop_primary_replication_slot(dbinfo, primary_slot_name);
 
+	/*
+	 * After the transformation, the slotsync worker won't be available on the
+	 * standby. If synchronized slots are created, they do not consume changes,
+	 * it may lead PANIC. To avoid the issue, drop such slots at the end of the
+	 * command.
+	 */
+	drop_replication_slot_on_subscriber(dbinfo);
+
 	/* Stop the subscriber */
 	pg_log_info("stopping the subscriber");
 	stop_standby_server(subscriber_dir);
-- 
2.34.1

