From c42c1cfcd6773084043a6250ce06a806c0b7b841 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Apr 2023 05:49:34 +0000
Subject: [PATCH v21 2/3] pg_upgrade: Allow to replicate logical replication
 slots to new node

This commit allows nodes with logical replication slots to be upgraded.

For pg_upgrade, it query the logical replication slots information from the old
cluter and restores the slots using the pg_create_logical_replication_slots()
statements. Note that we need to separate the timing of restoring replication
slots and other objects. Replication slots, in  particular, should not be
restored before executing the pg_resetwal command because it will remove WALs
that are required by the slots.

The significant advantage of this commit is that it makes it easy to continue
logical replication even after upgrading the publisher node. Previously,
pg_upgrade allowed copying publications to a new node. With this new commit,
adjusting the connection string to the new publisher will cause the apply
worker on the subscriber to connect to the new publisher automatically. This
enables seamless continuation of logical replication, even after an upgrade.

Author: Hayato Kuroda
Co-authored-by: Hou Zhijie
Reviewed-by: Peter Smith, Julien Rouhaud, Vignesh C, Wang Wei
---
 doc/src/sgml/ref/pgupgrade.sgml               |  30 ++++
 src/bin/pg_upgrade/Makefile                   |   3 +
 src/bin/pg_upgrade/check.c                    |  69 ++++++++
 src/bin/pg_upgrade/dump.c                     |   1 +
 src/bin/pg_upgrade/function.c                 |  14 +-
 src/bin/pg_upgrade/info.c                     | 117 +++++++++++++-
 src/bin/pg_upgrade/meson.build                |   1 +
 src/bin/pg_upgrade/pg_upgrade.c               | 104 ++++++++++++
 src/bin/pg_upgrade/pg_upgrade.h               |  21 +++
 .../t/003_logical_replication_slots.pl        | 152 ++++++++++++++++++
 src/tools/pgindent/typedefs.list              |   3 +
 11 files changed, 511 insertions(+), 4 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/003_logical_replication_slots.pl

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 7816b4c685..af776c4ceb 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -402,6 +402,36 @@ NET STOP postgresql-&majorversion;
     </para>
    </step>
 
+   <step>
+    <title>Prepare for publisher upgrades</title>
+
+    <para>
+     <application>pg_upgrade</application> try to dump and restore logical
+     replication slots. This helps avoid the need for manually defining the
+     same replication slot on the new publisher.
+    </para>
+
+    <para>
+     Before you start upgrading the publisher node, ensure that the
+     subscription is temporarily disabled. After the upgrade is complete,
+     execute the
+     <link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... DISABLE</command></link>
+     command to update the connection string, and then re-enable the
+     subscription.
+    </para>
+
+    <para>
+     Upgrading slots has some settings. At first, all the slots must not be in
+     <literal>lost</literal>, and they must have consumed all the WALs on old
+     node. Furthermore, new node must have larger
+     <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
+     than existing slots on old node, and
+     <link linkend="guc-wal-level"><varname>wal_level</varname></link> must be
+     <literal>logical</literal>. <application>pg_upgrade</application> will
+     run error if something wrong.
+    </para>
+   </step>
+
    <step>
     <title>Run <application>pg_upgrade</application></title>
 
diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index 5834513add..815d1a7ca1 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -3,6 +3,9 @@
 PGFILEDESC = "pg_upgrade - an in-place binary upgrade utility"
 PGAPPICON = win32
 
+# required for 003_logical_replication_slots.pl
+EXTRA_INSTALL=contrib/test_decoding
+
 subdir = src/bin/pg_upgrade
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 64024e3b9e..d5af0dcbc7 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -30,7 +30,9 @@ static void check_for_jsonb_9_4_usage(ClusterInfo *cluster);
 static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
+static void check_for_logical_replication_slots(ClusterInfo *new_cluster);
 
+int	num_slots_on_old_cluster;
 
 /*
  * fix_path_separator
@@ -89,6 +91,9 @@ check_and_dump_old_cluster(bool live_check)
 	/* Extract a list of databases and tables from the old cluster */
 	get_db_and_rel_infos(&old_cluster);
 
+	/* Extract a list of logical replication slots */
+	num_slots_on_old_cluster = get_logical_slot_infos(&old_cluster);
+
 	init_tablespaces();
 
 	get_loadable_libraries();
@@ -189,6 +194,17 @@ check_new_cluster(void)
 {
 	get_db_and_rel_infos(&new_cluster);
 
+	/*
+	 * Checking for logical slots must be done before
+	 * check_new_cluster_is_empty() because the slot_arr attribute of the
+	 * new_cluster will be checked in that function.
+	 */
+	if (num_slots_on_old_cluster)
+	{
+		(void) get_logical_slot_infos(&new_cluster);
+		check_for_logical_replication_slots(&new_cluster);
+	}
+
 	check_new_cluster_is_empty();
 
 	check_loadable_libraries();
@@ -353,6 +369,8 @@ check_new_cluster_is_empty(void)
 	{
 		int			relnum;
 		RelInfoArr *rel_arr = &new_cluster.dbarr.dbs[dbnum].rel_arr;
+		DbInfo     *pDbInfo = &new_cluster.dbarr.dbs[dbnum];
+		LogicalSlotInfoArr *slot_arr = &pDbInfo->slot_arr;
 
 		for (relnum = 0; relnum < rel_arr->nrels;
 			 relnum++)
@@ -364,6 +382,14 @@ check_new_cluster_is_empty(void)
 						 rel_arr->rels[relnum].nspname,
 						 rel_arr->rels[relnum].relname);
 		}
+
+		/*
+		 * Check the existence of logical replication slots.
+		 */
+		if (slot_arr->nslots)
+			pg_fatal("New cluster database \"%s\" is not empty: found logical replication slot \"%s\"",
+					 pDbInfo->db_name,
+					 slot_arr->slots[0].slotname);
 	}
 }
 
@@ -1402,3 +1428,46 @@ check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
 	else
 		check_ok();
 }
+
+/*
+ * Verify the parameter settings necessary for creating logical replication
+ * slots.
+ */
+static void
+check_for_logical_replication_slots(ClusterInfo *new_cluster)
+{
+	PGresult   *res;
+	PGconn	   *conn = connectToServer(new_cluster, "template1");
+	int			max_replication_slots;
+	char	   *wal_level;
+
+	/* logical replication slots can be dumped since PG17. */
+	if (GET_MAJOR_VERSION(new_cluster->major_version) <= 1600)
+		return;
+
+	prep_status("Checking parameter settings for logical replication slots");
+
+	res = executeQueryOrDie(conn, "SHOW max_replication_slots;");
+	max_replication_slots = atoi(PQgetvalue(res, 0, 0));
+
+	if (max_replication_slots == 0)
+		pg_fatal("max_replication_slots must be greater than 0");
+	else if (num_slots_on_old_cluster > max_replication_slots)
+		pg_fatal("max_replication_slots must be greater than existing logical "
+				 "replication slots on old node.");
+
+	PQclear(res);
+
+	res = executeQueryOrDie(conn, "SHOW wal_level;");
+	wal_level = PQgetvalue(res, 0, 0);
+
+	if (strcmp(wal_level, "logical") != 0)
+		pg_fatal("wal_level must be \"logical\", but is set to \"%s\"",
+				 wal_level);
+
+	PQclear(res);
+
+	PQfinish(conn);
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/dump.c b/src/bin/pg_upgrade/dump.c
index 6c8c82dca8..a46562639b 100644
--- a/src/bin/pg_upgrade/dump.c
+++ b/src/bin/pg_upgrade/dump.c
@@ -36,6 +36,7 @@ generate_old_dump(void)
 	{
 		char		sql_file_name[MAXPGPATH],
 					log_file_name[MAXPGPATH];
+
 		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
 		PQExpBufferData connstr,
 					escaped_connstr;
diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index dc8800c7cd..c929b92ff6 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -46,7 +46,8 @@ library_name_compare(const void *p1, const void *p2)
 /*
  * get_loadable_libraries()
  *
- *	Fetch the names of all old libraries containing C-language functions.
+ *	Fetch the names of all old libraries containing C-language functions, and
+ *	output plugins used by existing logical replication slots.
  *	We will later check that they all exist in the new installation.
  */
 void
@@ -66,14 +67,21 @@ get_loadable_libraries(void)
 		PGconn	   *conn = connectToServer(&old_cluster, active_db->db_name);
 
 		/*
-		 * Fetch all libraries containing non-built-in C functions in this DB.
+		 * Fetch all libraries containing non-built-in C functions and
+		 * output plugins in this DB.
 		 */
 		ress[dbnum] = executeQueryOrDie(conn,
 										"SELECT DISTINCT probin "
 										"FROM pg_catalog.pg_proc "
 										"WHERE prolang = %u AND "
 										"probin IS NOT NULL AND "
-										"oid >= %u;",
+										"oid >= %u "
+										"UNION "
+										"SELECT DISTINCT plugin "
+										"FROM pg_catalog.pg_replication_slots "
+										"WHERE wal_status <> 'lost' AND "
+										"database = current_database() AND "
+										"temporary IS FALSE;",
 										ClanguageId,
 										FirstNormalObjectId);
 		totaltups += PQntuples(ress[dbnum]);
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index a9988abfe1..aea3e5fbf3 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -26,6 +26,7 @@ static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
+static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
 
 
 /*
@@ -394,7 +395,7 @@ get_db_infos(ClusterInfo *cluster)
 	i_spclocation = PQfnumber(res, "spclocation");
 
 	ntups = PQntuples(res);
-	dbinfos = (DbInfo *) pg_malloc(sizeof(DbInfo) * ntups);
+	dbinfos = (DbInfo *) pg_malloc0(sizeof(DbInfo) * ntups);
 
 	for (tupnum = 0; tupnum < ntups; tupnum++)
 	{
@@ -600,6 +601,102 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
 	dbinfo->rel_arr.nrels = num_rels;
 }
 
+/*
+ * get_logical_slot_infos_per_db()
+ *
+ * gets the LogicalSlotInfos for all the logical replication slots of the database
+ * referred to by "dbinfo".
+ */
+static void
+get_logical_slot_infos_per_db(ClusterInfo *cluster, DbInfo *dbinfo)
+{
+	PGconn	   *conn = connectToServer(cluster,
+									   dbinfo->db_name);
+	PGresult   *res;
+	LogicalSlotInfo *slotinfos = NULL;
+
+	int			num_slots;
+
+	char		query[QUERY_ALLOC];
+	const char *std_strings;
+
+	dbinfo->slot_arr.encoding = PQclientEncoding(conn);
+
+	std_strings = PQparameterStatus(conn, "standard_conforming_strings");
+	dbinfo->slot_arr.std_strings = (std_strings && strcmp(std_strings, "on") == 0);
+
+	snprintf(query, sizeof(query),
+			 "SELECT slot_name, plugin, two_phase "
+			 "FROM pg_catalog.pg_replication_slots "
+			 "WHERE database = current_database() AND temporary = false "
+			 "AND wal_status <> 'lost';");
+
+	res = executeQueryOrDie(conn, "%s", query);
+
+	num_slots = PQntuples(res);
+
+	if (num_slots)
+	{
+		int			slotnum;
+		int			i_slotname;
+		int			i_plugin;
+		int			i_twophase;
+
+		slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots);
+
+		i_slotname = PQfnumber(res, "slot_name");
+		i_plugin = PQfnumber(res, "plugin");
+		i_twophase = PQfnumber(res, "two_phase");
+
+		for (slotnum = 0; slotnum < num_slots; slotnum++)
+		{
+			LogicalSlotInfo *curr = &slotinfos[slotnum];
+
+			curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
+			curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
+			curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
+		}
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	dbinfo->slot_arr.slots = slotinfos;
+	dbinfo->slot_arr.nslots = num_slots;
+}
+
+/*
+ * get_logical_slot_infos()
+ *
+ * Higher level routine to generate LogicalSlotInfoArr for all databases.
+ */
+int
+get_logical_slot_infos(ClusterInfo *cluster)
+{
+	int			dbnum;
+	int			slotnum = 0;
+
+	if (cluster == &old_cluster)
+		pg_log(PG_VERBOSE, "\nsource databases:");
+	else
+		pg_log(PG_VERBOSE, "\ntarget databases:");
+
+	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+	{
+		DbInfo	   *pDbInfo = &cluster->dbarr.dbs[dbnum];
+
+		get_logical_slot_infos_per_db(cluster, pDbInfo);
+		slotnum += pDbInfo->slot_arr.nslots;
+
+		if (log_opts.verbose)
+		{
+			pg_log(PG_VERBOSE, "Database: \"%s\"", pDbInfo->db_name);
+			print_slot_infos(&pDbInfo->slot_arr);
+		}
+	}
+
+	return slotnum;
+}
 
 static void
 free_db_and_rel_infos(DbInfoArr *db_arr)
@@ -610,6 +707,12 @@ free_db_and_rel_infos(DbInfoArr *db_arr)
 	{
 		free_rel_infos(&db_arr->dbs[dbnum].rel_arr);
 		pg_free(db_arr->dbs[dbnum].db_name);
+
+		/*
+		 * Logical replication slots must not exist on the new cluster before
+		 * doing create_logical_replication_slots().
+		 */
+		Assert(db_arr->dbs[dbnum].slot_arr.slots == NULL);
 	}
 	pg_free(db_arr->dbs);
 	db_arr->dbs = NULL;
@@ -660,3 +763,15 @@ print_rel_infos(RelInfoArr *rel_arr)
 			   rel_arr->rels[relnum].reloid,
 			   rel_arr->rels[relnum].tablespace);
 }
+
+static void
+print_slot_infos(LogicalSlotInfoArr *slot_arr)
+{
+	int			slotnum;
+
+	for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+		pg_log(PG_VERBOSE, "slotname: \"%s\", plugin: \"%s\", two_phase: %d",
+			   slot_arr->slots[slotnum].slotname,
+			   slot_arr->slots[slotnum].plugin,
+			   slot_arr->slots[slotnum].two_phase);
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..228f29b688 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_logical_replication_slots.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 4562dafcff..688b84d62e 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -59,6 +59,7 @@ static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
 static void setup(char *argv0, bool *live_check);
+static void create_logical_replication_slots(void);
 
 ClusterInfo old_cluster,
 			new_cluster;
@@ -188,6 +189,19 @@ main(int argc, char **argv)
 			  new_cluster.pgdata);
 	check_ok();
 
+	/*
+	 * Create logical replication slots.
+	 *
+	 * Note: This must be done after doing pg_resetwal command because
+	 * pg_resetwal would remove required WALs.
+	 */
+	if (num_slots_on_old_cluster)
+	{
+		start_postmaster(&new_cluster, true);
+		create_logical_replication_slots();
+		stop_postmaster(false);
+	}
+
 	if (user_opts.do_sync)
 	{
 		prep_status("Sync data directory to disk");
@@ -860,3 +874,93 @@ set_frozenxids(bool minmxid_only)
 
 	check_ok();
 }
+
+/*
+ * create_logical_replication_slots()
+ *
+ * Similar to create_new_objects() but only restores logical replication slots.
+ */
+static void
+create_logical_replication_slots(void)
+{
+	int			dbnum;
+	int			slotnum;
+
+	prep_status_progress("Restoring logical replication slots in the new cluster");
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
+		LogicalSlotInfoArr *slot_arr = &old_db->slot_arr;
+		PQExpBuffer query,
+					escaped;
+		PGconn	   *conn;
+		char		log_file_name[MAXPGPATH];
+
+		/* Quick exit if there are no slots */
+		if (!slot_arr->nslots)
+			continue;
+
+		query = createPQExpBuffer();
+		escaped = createPQExpBuffer();
+		conn = connectToServer(&new_cluster, old_db->db_name);
+
+		snprintf(log_file_name, sizeof(log_file_name),
+				 DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
+
+		pg_log(PG_STATUS, "%s", old_db->db_name);
+
+		for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+		{
+			/*
+			 * Constructs query for creating logical replication slots.
+			 *
+			 * XXX: For simplification, pg_create_logical_replication_slot() is
+			 * used. Is it sufficient?
+			 */
+			appendPQExpBuffer(query, "SELECT pg_catalog.pg_create_logical_replication_slot(");
+			appendStringLiteral(query, slot_arr->slots[slotnum].slotname,
+								slot_arr->encoding, slot_arr->std_strings);
+			appendPQExpBuffer(query, ", ");
+			appendStringLiteral(query, slot_arr->slots[slotnum].plugin,
+								slot_arr->encoding, slot_arr->std_strings);
+			appendPQExpBuffer(query, ", false, %s);",
+							  slot_arr->slots[slotnum].two_phase ? "true" : "false");
+
+			/*
+			 * The string must be escaped to shell-style, because there is a
+			 * possibility that output plugin name contains quotes. The output
+			 * string would be sandwiched by the single quotes, so it does not have
+			 * to be wrapped by any quotes when it is passed to
+			 * parallel_exec_prog().
+			 */
+			appendShellString(escaped, query->data);
+
+			parallel_exec_prog(log_file_name,
+							   NULL,
+							   "\"%s/psql\" %s --echo-queries --set ON_ERROR_STOP=on "
+							   "--no-psqlrc --dbname %s -c %s",
+							   new_cluster.bindir,
+							   cluster_conn_opts(&new_cluster),
+							   old_db->db_name,
+							   escaped->data);
+			resetPQExpBuffer(escaped);
+			resetPQExpBuffer(query);
+		}
+
+		PQfinish(conn);
+
+		destroyPQExpBuffer(escaped);
+		destroyPQExpBuffer(query);
+
+		/* reap all children */
+		while (reap_child(true) == true)
+			;
+	}
+
+	end_progress_output();
+	check_ok();
+
+	/* update new_cluster info again */
+	get_logical_slot_infos(&new_cluster);
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 3eea0139c7..2a3a178cde 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -46,6 +46,7 @@
 #define INTERNAL_LOG_FILE	"pg_upgrade_internal.log"
 
 extern char *output_files[];
+extern int	num_slots_on_old_cluster;
 
 /*
  * WIN32 files do not accept writes from multiple processes
@@ -150,6 +151,24 @@ typedef struct
 	int			nrels;
 } RelInfoArr;
 
+/*
+ * Structure to store logical replication slot information
+ */
+typedef struct
+{
+	char	   *slotname;		/* slot name */
+	char	   *plugin;			/* plugin */
+	bool		two_phase;		/* Can the slot decode 2PC? */
+} LogicalSlotInfo;
+
+typedef struct
+{
+	int			nslots;
+	LogicalSlotInfo *slots;
+	int			encoding;
+	bool		std_strings;
+} LogicalSlotInfoArr;
+
 /*
  * The following structure represents a relation mapping.
  */
@@ -176,6 +195,7 @@ typedef struct
 	char		db_tablespace[MAXPGPATH];	/* database default tablespace
 											 * path */
 	RelInfoArr	rel_arr;		/* array of all user relinfos */
+	LogicalSlotInfoArr slot_arr;	/* array of all LogicalSlotInfo */
 } DbInfo;
 
 /*
@@ -400,6 +420,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db,
 							  DbInfo *new_db, int *nmaps, const char *old_pgdata,
 							  const char *new_pgdata);
 void		get_db_and_rel_infos(ClusterInfo *cluster);
+int			get_logical_slot_infos(ClusterInfo *cluster);
 
 /* option.c */
 
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
new file mode 100644
index 0000000000..f015b5d363
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -0,0 +1,152 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Tests for upgrading replication slots
+
+use strict;
+use warnings;
+
+use File::Path qw(rmtree);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize old node
+my $old_node = PostgreSQL::Test::Cluster->new('old_node');
+$old_node->init(allows_streaming => 'logical');
+$old_node->start;
+
+# Initialize new node
+my $new_node = PostgreSQL::Test::Cluster->new('new_node');
+$new_node->init(allows_streaming => 1);
+
+my $bindir = $new_node->config_data('--bindir');
+
+$old_node->stop;
+
+# Create a slot on old node
+$old_node->start;
+$old_node->safe_psql(
+	'postgres', "SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding', false, true);"
+);
+$old_node->stop;
+
+# Cause a failure at the start of pg_upgrade because wal_level is replica
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_node->data_dir,
+		'-D',         $new_node->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_node->host,
+		'-p',         $old_node->port,
+		'-P',         $new_node->port,
+		$mode,
+	],
+	'run of pg_upgrade of old node with wrong wal_level');
+ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+
+# Preparations for the subsequent test. The case max_replication_slots is set
+# to 0 is prohibited.
+$new_node->append_conf('postgresql.conf', "wal_level = 'logical'");
+$new_node->append_conf('postgresql.conf', "max_replication_slots = 0");
+
+# Cause a failure at the start of pg_upgrade because max_replication_slots is 0
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_node->data_dir,
+		'-D',         $new_node->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_node->host,
+		'-p',         $old_node->port,
+		'-P',         $new_node->port,
+		$mode,
+	],
+	'run of pg_upgrade of old node with wrong max_replication_slots');
+ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+
+# Preparations for the subsequent test. max_replication_slots is set to
+# non-zero value
+$new_node->append_conf('postgresql.conf', "max_replication_slots = 1");
+
+# Create a slot on old node, and generate WALs
+$old_node->start;
+$old_node->safe_psql(
+	'postgres', qq[
+	SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);
+	CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;
+]);
+
+$old_node->stop;
+
+# Cause a failure at the start of pg_upgrade because max_replication_slots is
+# smaller than existing slots on old node
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_node->data_dir,
+		'-D',         $new_node->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_node->host,
+		'-p',         $old_node->port,
+		'-P',         $new_node->port,
+		$mode,
+	],
+	'run of pg_upgrade of old node with small max_replication_slots');
+ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+
+# Preparations for the subsequent test. max_replication_slots is set to
+# appropriate value
+$new_node->append_conf('postgresql.conf', "max_replication_slots = 10");
+
+# Remove an unnecessary slot and consume WALs
+$old_node->start;
+$old_node->safe_psql(
+	'postgres', qq[
+	SELECT pg_drop_replication_slot('test_slot1');
+	SELECT count(*) FROM pg_logical_slot_get_changes('test_slot2', NULL, NULL)
+]);
+$old_node->stop;
+
+# Actual run, pg_upgrade_output.d is removed at the end
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_node->data_dir,
+		'-D',         $new_node->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_node->host,
+		'-p',         $old_node->port,
+		'-P',         $new_node->port,
+		$mode,
+	],
+	'run of pg_upgrade of old node');
+ok( !-d $new_node->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+$new_node->start;
+my $result = $new_node->safe_psql('postgres',
+	"SELECT slot_name, two_phase FROM pg_replication_slots");
+is($result, qq(test_slot2|t), 'check the slot exists on new node');
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 51b7951ad8..0071efef1c 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1501,7 +1501,10 @@ LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
 LogicalRepWorkerType
+LogicalReplicationSlotInfo
 LogicalRewriteMappingData
+LogicalSlotInfo
+LogicalSlotInfoArr
 LogicalTape
 LogicalTapeSet
 LsnReadQueue
-- 
2.27.0

