From c27ef8c9dc6288cb5833e4d4d4f36d0f9af9464d Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Apr 2023 05:49:34 +0000
Subject: [PATCH v11 1/4] pg_upgrade: Add --include-logical-replication-slots
 option

This commit introduces a new pg_upgrade option called "--include-logical-replication-slots".
This allows nodes with logical replication slots to be upgraded. The commit can
be divided into two parts: one for pg_dump and another for pg_upgrade.

For pg_dump this commit includes a new option called "--logical-replication-slots-only".
This option can be used to dump logical replication slots. When this option is
specified, the slot_name, plugin, and two_phase parameters are extracted from
pg_replication_slots. An SQL file is then generated which executes
pg_create_logical_replication_slot() with the extracted parameters.

For pg_upgrade, when '--include-logical-replication-slots' is specified, it executes
pg_dump with the new "--logical-replication-slots-only" option and restores from the
dump. Note that we cannot dump replication slots at the same time as the schema
dump because we need to separate the timing of restoring replication slots and
other objects. Replication slots, in  particular, should not be restored before
executing the pg_resetwal command because it will remove WALs that are required
by the slots.

The significant advantage of this commit is that it makes it easy to continue
logical replication even after upgrading the publisher node. Previously, pg_upgrade
allowed copying publications to a new node. With this new commit, adjusting the
connection string to the new publisher will cause the apply worker on the subscriber
to connect to the new publisher automatically. This enables seamless continuation
of logical replication, even after an upgrade.

Author: Hayato Kuroda
Reviewed-by: Peter Smith, Julien Rouhaud, Vignesh C
---
 doc/src/sgml/ref/pgupgrade.sgml               |  11 ++
 src/bin/pg_dump/pg_backup.h                   |   1 +
 src/bin/pg_dump/pg_dump.c                     | 150 ++++++++++++++++++
 src/bin/pg_dump/pg_dump.h                     |  17 +-
 src/bin/pg_dump/pg_dump_sort.c                |  11 +-
 src/bin/pg_upgrade/Makefile                   |   3 +
 src/bin/pg_upgrade/check.c                    |  60 +++++++
 src/bin/pg_upgrade/dump.c                     |  24 +++
 src/bin/pg_upgrade/info.c                     | 119 +++++++++++++-
 src/bin/pg_upgrade/meson.build                |   1 +
 src/bin/pg_upgrade/option.c                   |   7 +
 src/bin/pg_upgrade/pg_upgrade.c               |  61 +++++++
 src/bin/pg_upgrade/pg_upgrade.h               |  21 +++
 .../t/003_logical_replication_slots.pl        | 115 ++++++++++++++
 src/tools/pgindent/typedefs.list              |   3 +
 15 files changed, 599 insertions(+), 5 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/003_logical_replication_slots.pl

diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 7816b4c685..94e90ff506 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -240,6 +240,17 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--include-logical-replication-slots</option></term>
+      <listitem>
+       <para>
+        Upgrade logical replication slots. Only permanent replication slots
+        are included. Note that pg_upgrade does not check the installation of
+        plugins.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-?</option></term>
       <term><option>--help</option></term>
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index aba780ef4b..0a4e931f9b 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -187,6 +187,7 @@ typedef struct _dumpOptions
 	int			use_setsessauth;
 	int			enable_row_security;
 	int			load_via_partition_root;
+	int			logical_slots_only;
 
 	/* default, if no "inclusion" switches appear, is to dump everything */
 	bool		include_everything;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 41a51ec5cd..906f9a9541 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -328,6 +328,9 @@ static void setupDumpWorker(Archive *AH);
 static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
 static bool forcePartitionRootLoad(const TableInfo *tbinfo);
 
+static void getLogicalReplicationSlots(Archive *fout);
+static void dumpLogicalReplicationSlot(Archive *fout,
+									   const LogicalReplicationSlotInfo *slotinfo);
 
 int
 main(int argc, char **argv)
@@ -431,6 +434,7 @@ main(int argc, char **argv)
 		{"table-and-children", required_argument, NULL, 12},
 		{"exclude-table-and-children", required_argument, NULL, 13},
 		{"exclude-table-data-and-children", required_argument, NULL, 14},
+		{"logical-replication-slots-only", no_argument, NULL, 15},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -657,6 +661,10 @@ main(int argc, char **argv)
 										  optarg);
 				break;
 
+			case 15:			/* dump only replication slot(s) */
+				dopt.logical_slots_only = true;
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -714,6 +722,18 @@ main(int argc, char **argv)
 	if (dopt.do_nothing && dopt.dump_inserts == 0)
 		pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
 
+	if (dopt.logical_slots_only)
+	{
+		if (!dopt.binary_upgrade)
+			pg_fatal("options --logical-replication-slots-only requires option --binary-upgrade");
+
+		if (dopt.dataOnly)
+			pg_fatal("options --logical-replication-slots-only and -a/--data-only cannot be used together");
+
+		if (dopt.schemaOnly)
+			pg_fatal("options --logical-replication-slots-only and -s/--schema-only cannot be used together");
+	}
+
 	/* Identify archive format to emit */
 	archiveFormat = parseArchiveFormat(format, &archiveMode);
 
@@ -876,6 +896,16 @@ main(int argc, char **argv)
 			pg_fatal("no matching extensions were found");
 	}
 
+	/*
+	 * If dump logical-replication-slots-only was requested, dump only them
+	 * and skip everything else.
+	 */
+	if (dopt.logical_slots_only)
+	{
+		getLogicalReplicationSlots(fout);
+		goto dump;
+	}
+
 	/*
 	 * Dumping LOs is the default for dumps where an inclusion switch is not
 	 * used (an "include everything" dump).  -B can be used to exclude LOs
@@ -936,6 +966,8 @@ main(int argc, char **argv)
 	if (!dopt.no_security_labels)
 		collectSecLabels(fout);
 
+dump:
+
 	/* Lastly, create dummy objects to represent the section boundaries */
 	boundaryObjs = createBoundaryObjects();
 
@@ -1109,6 +1141,8 @@ help(const char *progname)
 			 "                               servers matching PATTERN\n"));
 	printf(_("  --inserts                    dump data as INSERT commands, rather than COPY\n"));
 	printf(_("  --load-via-partition-root    load partitions via the root table\n"));
+	printf(_("  --logical-replication-slots-only\n"
+			 "                               dump only logical replication slots, no schema or data\n"));
 	printf(_("  --no-comments                do not dump comments\n"));
 	printf(_("  --no-publications            do not dump publications\n"));
 	printf(_("  --no-security-labels         do not dump security label assignments\n"));
@@ -10252,6 +10286,10 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
 		case DO_SUBSCRIPTION:
 			dumpSubscription(fout, (const SubscriptionInfo *) dobj);
 			break;
+		case DO_LOGICAL_REPLICATION_SLOT:
+			dumpLogicalReplicationSlot(fout,
+									   (const LogicalReplicationSlotInfo *) dobj);
+			break;
 		case DO_PRE_DATA_BOUNDARY:
 		case DO_POST_DATA_BOUNDARY:
 			/* never dumped, nothing to do */
@@ -18227,6 +18265,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
 			case DO_PUBLICATION_REL:
 			case DO_PUBLICATION_TABLE_IN_SCHEMA:
 			case DO_SUBSCRIPTION:
+			case DO_LOGICAL_REPLICATION_SLOT:
 				/* Post-data objects: must come after the post-data boundary */
 				addObjectDependency(dobj, postDataBound->dumpId);
 				break;
@@ -18488,3 +18527,114 @@ appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
 	if (!res)
 		pg_log_warning("could not parse %s array", "reloptions");
 }
+
+/*
+ * getLogicalReplicationSlots
+ *	  get information about replication slots
+ */
+static void
+getLogicalReplicationSlots(Archive *fout)
+{
+	PGresult   *res;
+	LogicalReplicationSlotInfo *slotinfo;
+	PQExpBuffer query;
+
+	int			i_slotname;
+	int			i_plugin;
+	int			i_twophase;
+	int			i,
+				ntups;
+
+	/* Check whether we should dump or not */
+	if (fout->remoteVersion < 160000)
+		return;
+
+	Assert(fout->dopt->logical_slots_only);
+
+	query = createPQExpBuffer();
+
+	resetPQExpBuffer(query);
+
+	/*
+	 * Get replication slots.
+	 *
+	 * XXX: Which information must be extracted from old node? Currently three
+	 * attributes are extracted because they are used by
+	 * pg_create_logical_replication_slot().
+	 *
+	 * XXX: Do we have to support physical slots?
+	 */
+	appendPQExpBufferStr(query,
+						 "SELECT slot_name, plugin, two_phase "
+						 "FROM pg_catalog.pg_replication_slots "
+						 "WHERE database = current_database() AND temporary = false "
+						 "AND wal_status IN ('reserved', 'extended');");
+
+	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+	ntups = PQntuples(res);
+
+	i_slotname = PQfnumber(res, "slot_name");
+	i_plugin = PQfnumber(res, "plugin");
+	i_twophase = PQfnumber(res, "two_phase");
+
+	slotinfo = pg_malloc(ntups * sizeof(LogicalReplicationSlotInfo));
+
+	for (i = 0; i < ntups; i++)
+	{
+		slotinfo[i].dobj.objType = DO_LOGICAL_REPLICATION_SLOT;
+
+		slotinfo[i].dobj.catId.tableoid = InvalidOid;
+		slotinfo[i].dobj.catId.oid = InvalidOid;
+		AssignDumpId(&slotinfo[i].dobj);
+
+		slotinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_slotname));
+
+		slotinfo[i].plugin = pg_strdup(PQgetvalue(res, i, i_plugin));
+		slotinfo[i].twophase = (strcmp(PQgetvalue(res, i, i_twophase), "t") == 0);
+
+		/*
+		 * Note: Currently we do not have any options to include/exclude slots
+		 * in dumping, so all the slots must be selected.
+		 */
+		slotinfo[i].dobj.dump = DUMP_COMPONENT_ALL;
+	}
+	PQclear(res);
+
+	destroyPQExpBuffer(query);
+}
+
+/*
+ * dumpLogicalReplicationSlot
+ *	  dump creation functions for the given logical replication slots
+ */
+static void
+dumpLogicalReplicationSlot(Archive *fout,
+						   const LogicalReplicationSlotInfo *slotinfo)
+{
+	Assert(fout->dopt->logical_slots_only);
+
+	if (slotinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+	{
+		PQExpBuffer query = createPQExpBuffer();
+
+		/*
+		 * XXX: For simplification, pg_create_logical_replication_slot() is
+		 * used. Is it sufficient?
+		 */
+		appendPQExpBuffer(query, "SELECT pg_catalog.pg_create_logical_replication_slot(");
+		appendStringLiteralAH(query, slotinfo->dobj.name, fout);
+		appendPQExpBuffer(query, ", ");
+		appendStringLiteralAH(query, slotinfo->plugin, fout);
+		appendPQExpBuffer(query, ", false, %s);",
+						  slotinfo->twophase ? "true" : "false");
+
+		ArchiveEntry(fout, slotinfo->dobj.catId, slotinfo->dobj.dumpId,
+					 ARCHIVE_OPTS(.tag = slotinfo->dobj.name,
+								  .description = "REPLICATION SLOT",
+								  .section = SECTION_POST_DATA,
+								  .createStmt = query->data));
+
+		destroyPQExpBuffer(query);
+	}
+}
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index ed6ce41ad7..de081c35ae 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -82,7 +82,8 @@ typedef enum
 	DO_PUBLICATION,
 	DO_PUBLICATION_REL,
 	DO_PUBLICATION_TABLE_IN_SCHEMA,
-	DO_SUBSCRIPTION
+	DO_SUBSCRIPTION,
+	DO_LOGICAL_REPLICATION_SLOT
 } DumpableObjectType;
 
 /*
@@ -666,6 +667,20 @@ typedef struct _SubscriptionInfo
 	char	   *subpasswordrequired;
 } SubscriptionInfo;
 
+/*
+ * The LogicalReplicationSlotInfo struct is used to represent replication
+ * slots.
+ *
+ * XXX: add more attributes if needed
+ */
+typedef struct _LogicalReplicationSlotInfo
+{
+	DumpableObject dobj;
+	char	   *plugin;
+	char	   *slottype;
+	bool		twophase;
+} LogicalReplicationSlotInfo;
+
 /*
  *	common utility functions
  */
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 745578d855..4e12e46dc5 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -93,6 +93,7 @@ enum dbObjectTypePriorities
 	PRIO_PUBLICATION_REL,
 	PRIO_PUBLICATION_TABLE_IN_SCHEMA,
 	PRIO_SUBSCRIPTION,
+	PRIO_LOGICAL_REPLICATION_SLOT,
 	PRIO_DEFAULT_ACL,			/* done in ACL pass */
 	PRIO_EVENT_TRIGGER,			/* must be next to last! */
 	PRIO_REFRESH_MATVIEW		/* must be last! */
@@ -146,10 +147,11 @@ static const int dbObjectTypePriority[] =
 	PRIO_PUBLICATION,			/* DO_PUBLICATION */
 	PRIO_PUBLICATION_REL,		/* DO_PUBLICATION_REL */
 	PRIO_PUBLICATION_TABLE_IN_SCHEMA,	/* DO_PUBLICATION_TABLE_IN_SCHEMA */
-	PRIO_SUBSCRIPTION			/* DO_SUBSCRIPTION */
+	PRIO_SUBSCRIPTION,			/* DO_SUBSCRIPTION */
+	PRIO_LOGICAL_REPLICATION_SLOT	/* DO_LOGICAL_REPLICATION_SLOT */
 };
 
-StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_SUBSCRIPTION + 1),
+StaticAssertDecl(lengthof(dbObjectTypePriority) == (DO_LOGICAL_REPLICATION_SLOT + 1),
 				 "array length mismatch");
 
 static DumpId preDataBoundId;
@@ -1498,6 +1500,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
 					 "SUBSCRIPTION (ID %d OID %u)",
 					 obj->dumpId, obj->catId.oid);
 			return;
+		case DO_LOGICAL_REPLICATION_SLOT:
+			snprintf(buf, bufsize,
+					 "LOGICAL REPLICATION SLOT (ID %d NAME %s)",
+					 obj->dumpId, obj->name);
+			return;
 		case DO_PRE_DATA_BOUNDARY:
 			snprintf(buf, bufsize,
 					 "PRE-DATA BOUNDARY  (ID %d)",
diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index 5834513add..815d1a7ca1 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -3,6 +3,9 @@
 PGFILEDESC = "pg_upgrade - an in-place binary upgrade utility"
 PGAPPICON = win32
 
+# required for 003_logical_replication_slots.pl
+EXTRA_INSTALL=contrib/test_decoding
+
 subdir = src/bin/pg_upgrade
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index fea159689e..1802a30fe6 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -30,6 +30,7 @@ static void check_for_jsonb_9_4_usage(ClusterInfo *cluster);
 static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
+static void check_for_parameter_settings(ClusterInfo *new_cluster);
 
 
 /*
@@ -89,6 +90,10 @@ check_and_dump_old_cluster(bool live_check)
 	/* Extract a list of databases and tables from the old cluster */
 	get_db_and_rel_infos(&old_cluster);
 
+	/* Additionally, extract a list of logical replication slots if required */
+	if (user_opts.include_logical_slots)
+		get_logical_slot_infos(&old_cluster);
+
 	init_tablespaces();
 
 	get_loadable_libraries();
@@ -188,6 +193,7 @@ void
 check_new_cluster(void)
 {
 	get_db_and_rel_infos(&new_cluster);
+	get_logical_slot_infos(&new_cluster);
 
 	check_new_cluster_is_empty();
 
@@ -210,6 +216,9 @@ check_new_cluster(void)
 	check_for_prepared_transactions(&new_cluster);
 
 	check_for_new_tablespace_dir(&new_cluster);
+
+	if (user_opts.include_logical_slots)
+		check_for_parameter_settings(&new_cluster);
 }
 
 
@@ -364,6 +373,22 @@ check_new_cluster_is_empty(void)
 						 rel_arr->rels[relnum].nspname,
 						 rel_arr->rels[relnum].relname);
 		}
+
+		/*
+		 * If --include-logical-replication-slots is required, check the
+		 * existence of slots
+		 */
+		if (user_opts.include_logical_slots)
+		{
+			LogicalSlotInfoArr *slot_arr = &new_cluster.dbarr.dbs[dbnum].slot_arr;
+
+			/* if nslots > 0, report just first entry and exit */
+			if (slot_arr->nslots)
+				pg_fatal("New cluster database \"%s\" is not empty: found logical replication slot \"%s\"",
+						 new_cluster.dbarr.dbs[dbnum].db_name,
+						 slot_arr->slots[0].slotname);
+		}
+
 	}
 }
 
@@ -1402,3 +1427,38 @@ check_for_user_defined_encoding_conversions(ClusterInfo *cluster)
 	else
 		check_ok();
 }
+
+/*
+ * Verify parameter settings for creating logical replication slots
+ */
+static void
+check_for_parameter_settings(ClusterInfo *new_cluster)
+{
+	PGresult   *res;
+	PGconn	   *conn = connectToServer(new_cluster, "template1");
+	int			max_replication_slots;
+	char	   *wal_level;
+
+	prep_status("Checking for logical replication slots");
+
+	res = executeQueryOrDie(conn, "SHOW max_replication_slots;");
+	max_replication_slots = atoi(PQgetvalue(res, 0, 0));
+
+	if (max_replication_slots == 0)
+		pg_fatal("max_replication_slots must be greater than 0");
+
+	PQclear(res);
+
+	res = executeQueryOrDie(conn, "SHOW wal_level;");
+	wal_level = PQgetvalue(res, 0, 0);
+
+	if (strcmp(wal_level, "logical") != 0)
+		pg_fatal("wal_level must be \"logical\", but is set to \"%s\"",
+				 wal_level);
+
+	PQclear(res);
+
+	PQfinish(conn);
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/dump.c b/src/bin/pg_upgrade/dump.c
index 6c8c82dca8..e6b90864f5 100644
--- a/src/bin/pg_upgrade/dump.c
+++ b/src/bin/pg_upgrade/dump.c
@@ -59,6 +59,30 @@ generate_old_dump(void)
 						   log_opts.dumpdir,
 						   sql_file_name, escaped_connstr.data);
 
+		/*
+		 * Dump logical replication slots if needed.
+		 *
+		 * XXX We cannot dump replication slots at the same time as the schema
+		 * dump because we need to separate the timing of restoring
+		 * replication slots and other objects. Replication slots, in
+		 * particular, should not be restored before executing the pg_resetwal
+		 * command because it will remove WALs that are required by the slots.
+		 */
+		if (user_opts.include_logical_slots)
+		{
+			char		slots_file_name[MAXPGPATH];
+
+			snprintf(slots_file_name, sizeof(slots_file_name),
+					 DB_DUMP_LOGICAL_SLOTS_FILE_MASK, old_db->db_oid);
+			parallel_exec_prog(log_file_name, NULL,
+							   "\"%s/pg_dump\" %s --logical-replication-slots-only "
+							   "--quote-all-identifiers --binary-upgrade %s "
+							   "--file=\"%s/%s\" %s",
+							   new_cluster.bindir, cluster_conn_opts(&old_cluster),
+							   log_opts.verbose ? "--verbose" : "",
+							   log_opts.dumpdir,
+							   slots_file_name, escaped_connstr.data);
+		}
 		termPQExpBuffer(&escaped_connstr);
 	}
 
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 85ed15ae4a..9679941217 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -23,10 +23,12 @@ static void free_db_and_rel_infos(DbInfoArr *db_arr);
 static void get_template0_info(ClusterInfo *cluster);
 static void get_db_infos(ClusterInfo *cluster);
 static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
+static void get_logical_slot_infos_per_db(ClusterInfo *cluster, DbInfo *dbinfo);
 static void free_rel_infos(RelInfoArr *rel_arr);
+static void free_logical_slot_infos(LogicalSlotInfoArr *slot_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
-
+static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
 
 /*
  * gen_db_file_maps()
@@ -394,7 +396,7 @@ get_db_infos(ClusterInfo *cluster)
 	i_spclocation = PQfnumber(res, "spclocation");
 
 	ntups = PQntuples(res);
-	dbinfos = (DbInfo *) pg_malloc(sizeof(DbInfo) * ntups);
+	dbinfos = (DbInfo *) pg_malloc0(sizeof(DbInfo) * ntups);
 
 	for (tupnum = 0; tupnum < ntups; tupnum++)
 	{
@@ -600,6 +602,94 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
 	dbinfo->rel_arr.nrels = num_rels;
 }
 
+/*
+ * get_logical_slot_infos()
+ *
+ * Higher level routine to generate LogicalSlotInfoArr for all databases.
+ */
+void
+get_logical_slot_infos(ClusterInfo *cluster)
+{
+	int			dbnum;
+
+	if (cluster == &old_cluster)
+		pg_log(PG_VERBOSE, "\nsource databases:");
+	else
+		pg_log(PG_VERBOSE, "\ntarget databases:");
+
+	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+	{
+		DbInfo *pDbInfo = &cluster->dbarr.dbs[dbnum];
+
+		if (pDbInfo->slot_arr.slots)
+			free_logical_slot_infos(&pDbInfo->slot_arr);
+
+		get_logical_slot_infos_per_db(cluster, pDbInfo);
+
+		if (log_opts.verbose)
+		{
+			pg_log(PG_VERBOSE, "Database: %s", pDbInfo->db_name);
+			print_slot_infos(&pDbInfo->slot_arr);
+		}
+	}
+}
+
+/*
+ * get_logical_slot_infos_per_db()
+ *
+ * gets the LogicalSlotInfos for all the logical replication slots of the database
+ * referred to by "dbinfo".
+ */
+static void
+get_logical_slot_infos_per_db(ClusterInfo *cluster, DbInfo *dbinfo)
+{
+	PGconn	   *conn = connectToServer(cluster,
+									   dbinfo->db_name);
+	PGresult   *res;
+	LogicalSlotInfo *slotinfos;
+
+	int			ntups;
+	int			slotnum;
+	int			num_slots = 0;
+
+	int			i_slotname;
+	int			i_plugin;
+	int			i_twophase;
+
+	char		query[QUERY_ALLOC];
+
+	query[0] = '\0';			/* initialize query string to empty */
+
+	snprintf(query, sizeof(query),
+			 "SELECT slot_name, plugin, two_phase "
+			 "FROM pg_catalog.pg_replication_slots "
+			 "WHERE database = current_database() AND temporary = false "
+			 "AND wal_status IN ('reserved', 'extended');");
+
+	res = executeQueryOrDie(conn, "%s", query);
+
+	ntups = PQntuples(res);
+
+	slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * ntups);
+
+	i_slotname = PQfnumber(res, "slot_name");
+	i_plugin = PQfnumber(res, "plugin");
+	i_twophase = PQfnumber(res, "two_phase");
+
+	for (slotnum = 0; slotnum < ntups; slotnum++)
+	{
+		LogicalSlotInfo *curr = &slotinfos[num_slots++];
+
+		curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname));
+		curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin));
+		curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0);
+	}
+
+	PQfinish(conn);
+
+	dbinfo->slot_arr.slots = slotinfos;
+	dbinfo->slot_arr.nslots = num_slots;
+}
 
 static void
 free_db_and_rel_infos(DbInfoArr *db_arr)
@@ -634,6 +724,19 @@ free_rel_infos(RelInfoArr *rel_arr)
 	rel_arr->nrels = 0;
 }
 
+static void
+free_logical_slot_infos(LogicalSlotInfoArr *slot_arr)
+{
+	int			slotnum;
+
+	for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+	{
+		pg_free(slot_arr->slots[slotnum].slotname);
+		pg_free(slot_arr->slots[slotnum].plugin);
+	}
+	pg_free(slot_arr->slots);
+	slot_arr->nslots = 0;
+}
 
 static void
 print_db_infos(DbInfoArr *db_arr)
@@ -660,3 +763,15 @@ print_rel_infos(RelInfoArr *rel_arr)
 			   rel_arr->rels[relnum].reloid,
 			   rel_arr->rels[relnum].tablespace);
 }
+
+static void
+print_slot_infos(LogicalSlotInfoArr *slot_arr)
+{
+	int			slotnum;
+
+	for (slotnum = 0; slotnum < slot_arr->nslots; slotnum++)
+		pg_log(PG_VERBOSE, "slotname: %s, plugin: %s, two_phase: %d",
+			   slot_arr->slots[slotnum].slotname,
+			   slot_arr->slots[slotnum].plugin,
+			   slot_arr->slots[slotnum].two_phase);
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..228f29b688 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_logical_replication_slots.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 640361009e..df66a5ffe6 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -57,6 +57,7 @@ parseCommandLine(int argc, char *argv[])
 		{"verbose", no_argument, NULL, 'v'},
 		{"clone", no_argument, NULL, 1},
 		{"copy", no_argument, NULL, 2},
+		{"include-logical-replication-slots", no_argument, NULL, 3},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -199,6 +200,10 @@ parseCommandLine(int argc, char *argv[])
 				user_opts.transfer_mode = TRANSFER_MODE_COPY;
 				break;
 
+			case 3:
+				user_opts.include_logical_slots = true;
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
 						os_info.progname);
@@ -289,6 +294,8 @@ usage(void)
 	printf(_("  -V, --version                 display version information, then exit\n"));
 	printf(_("  --clone                       clone instead of copying files to new cluster\n"));
 	printf(_("  --copy                        copy files to new cluster (default)\n"));
+	printf(_("  --include-logical-replication-slots\n"
+			 "                                upgrade logical replication slots\n"));
 	printf(_("  -?, --help                    show this help, then exit\n"));
 	printf(_("\n"
 			 "Before running pg_upgrade you must:\n"
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 75bab0a04c..373a9ef490 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -59,6 +59,7 @@ static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
 static void setup(char *argv0, bool *live_check);
+static void create_logical_replication_slots(void);
 
 ClusterInfo old_cluster,
 			new_cluster;
@@ -188,6 +189,19 @@ main(int argc, char **argv)
 			  new_cluster.pgdata);
 	check_ok();
 
+	/*
+	 * Create logical replication slots if requested.
+	 *
+	 * Note: This must be done after doing pg_resetwal command because the
+	 * command will remove required WALs.
+	 */
+	if (user_opts.include_logical_slots)
+	{
+		start_postmaster(&new_cluster, true);
+		create_logical_replication_slots();
+		stop_postmaster(false);
+	}
+
 	if (user_opts.do_sync)
 	{
 		prep_status("Sync data directory to disk");
@@ -860,3 +874,50 @@ set_frozenxids(bool minmxid_only)
 
 	check_ok();
 }
+
+/*
+ * create_logical_replication_slots()
+ *
+ * Similar to create_new_objects() but only restores logical replication slots.
+ */
+static void
+create_logical_replication_slots(void)
+{
+	int			dbnum;
+
+	prep_status_progress("Restoring logical replication slots in the new cluster");
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		char		slots_file_name[MAXPGPATH],
+					log_file_name[MAXPGPATH];
+		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
+
+		pg_log(PG_STATUS, "%s", old_db->db_name);
+
+		snprintf(slots_file_name, sizeof(slots_file_name),
+				 DB_DUMP_LOGICAL_SLOTS_FILE_MASK, old_db->db_oid);
+		snprintf(log_file_name, sizeof(log_file_name),
+				 DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
+
+		parallel_exec_prog(log_file_name,
+						   NULL,
+						   "\"%s/psql\" %s --echo-queries --set ON_ERROR_STOP=on "
+						   "--no-psqlrc --dbname %s -f \"%s/%s\"",
+						   new_cluster.bindir,
+						   cluster_conn_opts(&new_cluster),
+						   old_db->db_name,
+						   log_opts.dumpdir,
+						   slots_file_name);
+	}
+
+	/* reap all children */
+	while (reap_child(true) == true)
+		;
+
+	end_progress_output();
+	check_ok();
+
+	/* update new_cluster info again */
+	get_logical_slot_infos(&new_cluster);
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 3eea0139c7..7adbb50807 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -29,6 +29,7 @@
 /* contains both global db information and CREATE DATABASE commands */
 #define GLOBALS_DUMP_FILE	"pg_upgrade_dump_globals.sql"
 #define DB_DUMP_FILE_MASK	"pg_upgrade_dump_%u.custom"
+#define DB_DUMP_LOGICAL_SLOTS_FILE_MASK	"pg_upgrade_dump_%u_logical_slots.sql"
 
 /*
  * Base directories that include all the files generated internally, from the
@@ -150,6 +151,22 @@ typedef struct
 	int			nrels;
 } RelInfoArr;
 
+/*
+ * Structure to store logical replication slot information
+ */
+typedef struct
+{
+	char	   *slotname;		/* slot name */
+	char	   *plugin;			/* plugin */
+	bool		two_phase;		/* Can the slot decode 2PC? */
+} LogicalSlotInfo;
+
+typedef struct
+{
+	LogicalSlotInfo *slots;
+	int			nslots;
+} LogicalSlotInfoArr;
+
 /*
  * The following structure represents a relation mapping.
  */
@@ -176,6 +193,7 @@ typedef struct
 	char		db_tablespace[MAXPGPATH];	/* database default tablespace
 											 * path */
 	RelInfoArr	rel_arr;		/* array of all user relinfos */
+	LogicalSlotInfoArr slot_arr;	/* array of all LogicalSlotInfo */
 } DbInfo;
 
 /*
@@ -304,6 +322,8 @@ typedef struct
 	transferMode transfer_mode; /* copy files or link them? */
 	int			jobs;			/* number of processes/threads to use */
 	char	   *socketdir;		/* directory to use for Unix sockets */
+	bool		include_logical_slots;	/* true -> dump and restore logical
+										 * replication slots */
 } UserOpts;
 
 typedef struct
@@ -400,6 +420,7 @@ FileNameMap *gen_db_file_maps(DbInfo *old_db,
 							  DbInfo *new_db, int *nmaps, const char *old_pgdata,
 							  const char *new_pgdata);
 void		get_db_and_rel_infos(ClusterInfo *cluster);
+void		get_logical_slot_infos(ClusterInfo *cluster);
 
 /* option.c */
 
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
new file mode 100644
index 0000000000..3430c641aa
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -0,0 +1,115 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# Tests for upgrading replication slots
+
+use strict;
+use warnings;
+
+use File::Path qw(rmtree);
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize old node
+my $old_node = PostgreSQL::Test::Cluster->new('old_node');
+$old_node->init(allows_streaming => 'logical');
+$old_node->start;
+
+# Initialize new node
+my $new_node = PostgreSQL::Test::Cluster->new('new_node');
+$new_node->init(allows_streaming => 1);
+
+my $bindir = $new_node->config_data('--bindir');
+
+$old_node->stop;
+
+# Cause a failure at the start of pg_upgrade because wal_level is replica
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_node->data_dir,
+		'-D',         $new_node->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_node->host,
+		'-p',         $old_node->port,
+		'-P',         $new_node->port,
+		$mode,        '--include-logical-replication-slots',
+	],
+	'run of pg_upgrade of old node with wrong wal_level');
+ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+
+# Preparations for the subsequent test. The case max_replication_slots is set
+# to 0 is prohibit.
+$new_node->append_conf('postgresql.conf', "wal_level = 'logical'");
+$new_node->append_conf('postgresql.conf', "max_replication_slots = 0");
+
+# Cause a failure at the start of pg_upgrade because max_replication_slots is 0
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_node->data_dir,
+		'-D',         $new_node->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_node->host,
+		'-p',         $old_node->port,
+		'-P',         $new_node->port,
+		$mode,        '--include-logical-replication-slots',
+	],
+	'run of pg_upgrade of old node with wrong max_replication_slots');
+ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+
+# Preparations for the subsequent test. max_replication_slots is set to
+# non-zero value to succeed the pg_upgrade
+$new_node->append_conf('postgresql.conf', "max_replication_slots = 10");
+
+# Create a slot on old node, and generate WALs
+$old_node->start;
+$old_node->safe_psql(
+	'postgres', qq[
+	SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, true);
+	CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;
+]);
+
+my $result = $old_node->safe_psql('postgres',
+	"SELECT count (*) FROM pg_logical_slot_get_changes('test_slot', NULL, NULL)"
+);
+is($result, qq(12), 'ensure WALs are not consumed yet');
+$old_node->stop;
+
+# Actual run, pg_upgrade_output.d is removed at the end
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_node->data_dir,
+		'-D',         $new_node->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_node->host,
+		'-p',         $old_node->port,
+		'-P',         $new_node->port,
+		$mode,        '--include-logical-replication-slots'
+	],
+	'run of pg_upgrade of old node');
+ok( !-d $new_node->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+$new_node->start;
+$result = $new_node->safe_psql('postgres',
+	"SELECT slot_name, two_phase FROM pg_replication_slots");
+is($result, qq(test_slot|t), 'check the slot exists on new node');
+
+done_testing();
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index b4058b88c3..5944cb34ea 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1479,6 +1479,7 @@ LogicalRepBeginData
 LogicalRepCommitData
 LogicalRepCommitPreparedTxnData
 LogicalRepCtxStruct
+LogicalReplicationSlotInfo
 LogicalRepMode
 LogicalRepMsgType
 LogicalRepPartMapEntry
@@ -1492,6 +1493,8 @@ LogicalRepTupleData
 LogicalRepTyp
 LogicalRepWorker
 LogicalRewriteMappingData
+LogicalSlotInfo
+LogicalSlotInfoArr
 LogicalTape
 LogicalTapeSet
 LsnReadQueue
-- 
2.27.0

