From 3809abdaa4ecc351b7e9c52d6a8e751732a4f4f0 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Tue, 4 Apr 2023 05:49:34 +0000
Subject: [PATCH] pg_upgrade: Add --include-replication-slot option

This commit introduces a new option called "--include-replication-slot".
This allows nodes with logical replication slots to be upgraded. The commit can
be divided into two parts: one for pg_dump and another for pg_upgrade.

For pg_dump this commit includes a new option called "--slot-only". This option
can be used to dump replication slots. When this option is specified, the slot_name,
plugin, and two_phase parameters are extracted from pg_replication_slots. An SQL
file is then generated which executes pg_create_logical_replication_slot() with
the extracted parameters.

For pg_upgrade, when '--include-replication-slot' is specified, it executes pg_dump
with added option and restore from the dump. Apart from restoring schema, pg_resetwal
must not be called after restoring replicaiton slots. This is because the command
discards WAL files and starts from a new segment, even if they are required by
replication slots. This leads an ERROR: "requested WAL segment XXX has already
been removed". To avoid this, replication slots are restored at a different time
than other objects, after running pg_resetwal.

The significant advantage of this commit is that it makes it easy to continue
logical replication even after upgrading the publisher node. Previously, pg_upgrade
allowed copying publications to a new node. With this new commit, adjusting the
connection string to the new publisher will cause the apply worker on the subscriber
to connect to the new publisher automatically. This enables seamless continuation
of logical replication, even after an upgrade.
---
 doc/src/sgml/ref/pg_dump.sgml                 |  10 ++
 doc/src/sgml/ref/pgupgrade.sgml               |  11 ++
 src/bin/pg_dump/pg_backup.h                   |   1 +
 src/bin/pg_dump/pg_dump.c                     | 148 +++++++++++++++++-
 src/bin/pg_dump/pg_dump.h                     |  15 +-
 src/bin/pg_dump/pg_dump_sort.c                |   4 +
 src/bin/pg_upgrade/dump.c                     |  22 +++
 src/bin/pg_upgrade/meson.build                |   1 +
 src/bin/pg_upgrade/option.c                   |   5 +
 src/bin/pg_upgrade/pg_upgrade.c               |  64 ++++++++
 src/bin/pg_upgrade/pg_upgrade.h               |   3 +
 .../pg_upgrade/t/003_logical_replication.pl   |  88 +++++++++++
 12 files changed, 370 insertions(+), 2 deletions(-)
 create mode 100644 src/bin/pg_upgrade/t/003_logical_replication.pl

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index 77299878e0..7525cb521a 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -1201,6 +1201,16 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--slot-only</option></term>
+      <listitem>
+       <para>
+        Dump only replication slots, neither the schema (data definitions) nor
+        data. Mainly this is used for upgrading nodes.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
        <term><option>-?</option></term>
        <term><option>--help</option></term>
diff --git a/doc/src/sgml/ref/pgupgrade.sgml b/doc/src/sgml/ref/pgupgrade.sgml
index 7816b4c685..39c9e607d4 100644
--- a/doc/src/sgml/ref/pgupgrade.sgml
+++ b/doc/src/sgml/ref/pgupgrade.sgml
@@ -240,6 +240,17 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--include-replication-slot</option></term>
+      <listitem>
+       <para>
+        Transport replication slots. Currently this can work only for logical
+        slots, and temporary slots are ignored. Note that pg_upgrade does not
+        check the installation of plugins.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-?</option></term>
       <term><option>--help</option></term>
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index aba780ef4b..8a6f25cf2c 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -187,6 +187,7 @@ typedef struct _dumpOptions
 	int			use_setsessauth;
 	int			enable_row_security;
 	int			load_via_partition_root;
+	int			slot_only;
 
 	/* default, if no "inclusion" switches appear, is to dump everything */
 	bool		include_everything;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 6abbcff683..484c7e961a 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -327,6 +327,9 @@ static void setupDumpWorker(Archive *AH);
 static TableInfo *getRootTableInfo(const TableInfo *tbinfo);
 static bool forcePartitionRootLoad(const TableInfo *tbinfo);
 
+static void getRepliactionSlots(Archive *fout);
+static void dumpReplicationSlot(Archive *fout,
+								const ReplicationSlotInfo *slotinfo);
 
 int
 main(int argc, char **argv)
@@ -430,7 +433,7 @@ main(int argc, char **argv)
 		{"table-and-children", required_argument, NULL, 12},
 		{"exclude-table-and-children", required_argument, NULL, 13},
 		{"exclude-table-data-and-children", required_argument, NULL, 14},
-
+		{"slot-only", no_argument, NULL, 15},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -656,6 +659,11 @@ main(int argc, char **argv)
 										  optarg);
 				break;
 
+			case 15:			/* dump onlu replication slot(s) */
+				dopt.slot_only = true;
+				dopt.include_everything = false;
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -713,6 +721,11 @@ main(int argc, char **argv)
 	if (dopt.do_nothing && dopt.dump_inserts == 0)
 		pg_fatal("option --on-conflict-do-nothing requires option --inserts, --rows-per-insert, or --column-inserts");
 
+	if (dopt.slot_only && dopt.dataOnly)
+		pg_fatal("options --replicatin-slots and -a/--data-only cannot be used together");
+	if (dopt.slot_only && dopt.schemaOnly)
+		pg_fatal("options --replicatin-slots and -s/--schema-only cannot be used together");
+
 	/* Identify archive format to emit */
 	archiveFormat = parseArchiveFormat(format, &archiveMode);
 
@@ -892,6 +905,15 @@ main(int argc, char **argv)
 	 */
 	collectRoleNames(fout);
 
+	/*
+	 * If dumping replication slots are request, dumping them and skip others.
+	 */
+	if (dopt.slot_only)
+	{
+		getRepliactionSlots(fout);
+		goto dump;
+	}
+
 	/*
 	 * Now scan the database and create DumpableObject structs for all the
 	 * objects we intend to dump.
@@ -935,6 +957,8 @@ main(int argc, char **argv)
 	if (!dopt.no_security_labels)
 		collectSecLabels(fout);
 
+dump:
+
 	/* Lastly, create dummy objects to represent the section boundaries */
 	boundaryObjs = createBoundaryObjects();
 
@@ -1129,6 +1153,7 @@ help(const char *progname)
 	printf(_("  --use-set-session-authorization\n"
 			 "                               use SET SESSION AUTHORIZATION commands instead of\n"
 			 "                               ALTER OWNER commands to set ownership\n"));
+	printf(_("  --slot-only                  dump only replication slots, no schema and data\n"));
 
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=DBNAME      database to dump\n"));
@@ -10251,6 +10276,9 @@ dumpDumpableObject(Archive *fout, DumpableObject *dobj)
 		case DO_SUBSCRIPTION:
 			dumpSubscription(fout, (const SubscriptionInfo *) dobj);
 			break;
+		case DO_REPICATION_SLOT:
+			dumpReplicationSlot(fout, (const ReplicationSlotInfo *) dobj);
+			break;
 		case DO_PRE_DATA_BOUNDARY:
 		case DO_POST_DATA_BOUNDARY:
 			/* never dumped, nothing to do */
@@ -18226,6 +18254,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs,
 			case DO_PUBLICATION_REL:
 			case DO_PUBLICATION_TABLE_IN_SCHEMA:
 			case DO_SUBSCRIPTION:
+			case DO_REPICATION_SLOT:
 				/* Post-data objects: must come after the post-data boundary */
 				addObjectDependency(dobj, postDataBound->dumpId);
 				break;
@@ -18487,3 +18516,120 @@ appendReloptionsArrayAH(PQExpBuffer buffer, const char *reloptions,
 	if (!res)
 		pg_log_warning("could not parse %s array", "reloptions");
 }
+
+/*
+ * getRepliactionSlots
+ *	  get information about replication slots
+ */
+static void
+getRepliactionSlots(Archive *fout)
+{
+	PGresult   *res;
+	ReplicationSlotInfo *slotinfo;
+	PQExpBuffer query;
+	DumpOptions *dopt = fout->dopt;
+
+	int			i_slotname;
+	int			i_plugin;
+	int			i_twophase;
+	int			i,
+				ntups;
+
+	/* Check whether we should dump or not */
+	if (fout->remoteVersion < 160000 && !dopt->slot_only)
+		return;
+
+	query = createPQExpBuffer();
+
+	resetPQExpBuffer(query);
+
+	/*
+	 * Get replication slots.
+	 *
+	 * XXX: Which information must be extracted from old node? Currently three
+	 * attributes are extracted because they are used by
+	 * pg_create_logical_replication_slot().
+	 * XXX: Do we have to support physical slots?
+	 */
+	appendPQExpBufferStr(query,
+						 "SELECT r.slot_name, r.plugin, r.two_phase "
+						 "FROM pg_replication_slots r "
+						 "WHERE r.database = current_database() AND temporary = false "
+						 "AND wal_status IN ('reserved', 'extended');");
+
+	res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK);
+
+	ntups = PQntuples(res);
+
+	i_slotname = PQfnumber(res, "slot_name");
+	i_plugin = PQfnumber(res, "plugin");
+	i_twophase = PQfnumber(res, "two_phase");
+
+	slotinfo = pg_malloc(ntups * sizeof(ReplicationSlotInfo));
+
+	for (i = 0; i < ntups; i++)
+	{
+		slotinfo[i].dobj.objType = DO_REPICATION_SLOT;
+
+		slotinfo[i].dobj.catId.tableoid = InvalidOid;
+		slotinfo[i].dobj.catId.oid = InvalidOid;
+		AssignDumpId(&slotinfo[i].dobj);
+
+		slotinfo[i].dobj.name = pg_strdup(PQgetvalue(res, i, i_slotname));
+
+		slotinfo[i].plugin = pg_strdup(PQgetvalue(res, i, i_plugin));
+		slotinfo[i].twophase = pg_strdup(PQgetvalue(res, i, i_twophase));
+
+		/* FIXME: force dumping */
+		slotinfo[i].dobj.dump = DUMP_COMPONENT_ALL;
+	}
+	PQclear(res);
+
+	destroyPQExpBuffer(query);
+}
+
+/*
+ * dumpReplicationSlot
+ *	  write down a script for pg_restore command
+ */
+static void
+dumpReplicationSlot(Archive *fout, const ReplicationSlotInfo *slotinfo)
+{
+	DumpOptions *dopt = fout->dopt;
+	PQExpBuffer query;
+	char *slotname;
+
+	if (!dopt->slot_only)
+		return;
+
+	slotname = pg_strdup(slotinfo->dobj.name);
+	query = createPQExpBuffer();
+
+	/*
+	 * XXX: For simplification, pg_create_logical_replication_slot() is used.
+	 * Is it sufficient?
+	 */
+	appendPQExpBuffer(query, "SELECT pg_create_logical_replication_slot('%s', ",
+					  slotname);
+	appendStringLiteralAH(query, slotinfo->plugin, fout);
+	appendPQExpBuffer(query, ", ");
+	appendStringLiteralAH(query, slotinfo->twophase, fout);
+	appendPQExpBuffer(query, ");");
+
+	if (slotinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)
+		ArchiveEntry(fout, slotinfo->dobj.catId, slotinfo->dobj.dumpId,
+					 ARCHIVE_OPTS(.tag = slotname,
+								  .description = "REPICATION SLOT",
+								  .section = SECTION_POST_DATA,
+								  .createStmt = query->data));
+
+	/* XXX: do we have to dump security label? */
+
+	if (slotinfo->dobj.dump & DUMP_COMPONENT_COMMENT)
+		dumpComment(fout, "REPICATION SLOT", slotname,
+					NULL, NULL,
+					slotinfo->dobj.catId, 0, slotinfo->dobj.dumpId);
+
+	pfree(slotname);
+	destroyPQExpBuffer(query);
+}
diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h
index ed6ce41ad7..a27bff661b 100644
--- a/src/bin/pg_dump/pg_dump.h
+++ b/src/bin/pg_dump/pg_dump.h
@@ -82,7 +82,8 @@ typedef enum
 	DO_PUBLICATION,
 	DO_PUBLICATION_REL,
 	DO_PUBLICATION_TABLE_IN_SCHEMA,
-	DO_SUBSCRIPTION
+	DO_SUBSCRIPTION,
+	DO_REPICATION_SLOT
 } DumpableObjectType;
 
 /*
@@ -666,6 +667,18 @@ typedef struct _SubscriptionInfo
 	char	   *subpasswordrequired;
 } SubscriptionInfo;
 
+/*
+ * The ReplicationSlotInfo struct is used to represent replication slots.
+ * XXX: add more attrbutes if needed
+ */
+typedef struct _ReplicationSlotInfo
+{
+	DumpableObject dobj;
+	char *plugin;
+	char *slottype;
+	char *twophase;
+} ReplicationSlotInfo;
+
 /*
  *	common utility functions
  */
diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c
index 8266c117a3..8e1fc1fda5 100644
--- a/src/bin/pg_dump/pg_dump_sort.c
+++ b/src/bin/pg_dump/pg_dump_sort.c
@@ -1497,6 +1497,10 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
 			snprintf(buf, bufsize,
 					 "SUBSCRIPTION (ID %d OID %u)",
 					 obj->dumpId, obj->catId.oid);
+		case DO_REPICATION_SLOT:
+			snprintf(buf, bufsize,
+					 "REPLICATION SLOT (ID %d NAME %s)",
+					 obj->dumpId, obj->name);
 			return;
 		case DO_PRE_DATA_BOUNDARY:
 			snprintf(buf, bufsize,
diff --git a/src/bin/pg_upgrade/dump.c b/src/bin/pg_upgrade/dump.c
index 6c8c82dca8..aecd284b48 100644
--- a/src/bin/pg_upgrade/dump.c
+++ b/src/bin/pg_upgrade/dump.c
@@ -59,6 +59,28 @@ generate_old_dump(void)
 						   log_opts.dumpdir,
 						   sql_file_name, escaped_connstr.data);
 
+		/*
+		 * Dump replicaiton slots if needed.
+		 *
+		 * XXX We cannot dump replication slots at the same time as the schema
+		 * dump because we need to separate the timing of restoring replication
+		 * slots and other objects. Replication slots, in particular, should
+		 * not be restored before executing the pg_resetwal command because it
+		 * will remove WALs that are required by the slots.
+		 */
+		if (user_opts.include_slots)
+		{
+			char		slots_file_name[MAXPGPATH];
+
+			snprintf(slots_file_name, sizeof(slots_file_name), DB_DUMP_FILE_MASK_FOR_SLOTS, old_db->db_oid);
+			parallel_exec_prog(log_file_name, NULL,
+							   "\"%s/pg_dump\" %s --slot-only --quote-all-identifiers "
+							   "--binary-upgrade %s --file=\"%s/%s\" %s",
+							   new_cluster.bindir, cluster_conn_opts(&old_cluster),
+							   log_opts.verbose ? "--verbose" : "",
+							   log_opts.dumpdir,
+							   slots_file_name, escaped_connstr.data);
+		}
 		termPQExpBuffer(&escaped_connstr);
 	}
 
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 12a97f84e2..7f5d48b7e1 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -42,6 +42,7 @@ tests += {
     'tests': [
       't/001_basic.pl',
       't/002_pg_upgrade.pl',
+      't/003_logical_replication.pl',
     ],
     'test_kwargs': {'priority': 40}, # pg_upgrade tests are slow
   },
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 8869b6b60d..9897e706d7 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -57,6 +57,7 @@ parseCommandLine(int argc, char *argv[])
 		{"verbose", no_argument, NULL, 'v'},
 		{"clone", no_argument, NULL, 1},
 		{"copy", no_argument, NULL, 2},
+		{"include-replication-slot", no_argument, NULL, 3},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -199,6 +200,10 @@ parseCommandLine(int argc, char *argv[])
 				user_opts.transfer_mode = TRANSFER_MODE_COPY;
 				break;
 
+			case 3:
+				user_opts.include_slots = true;
+				break;
+
 			default:
 				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
 						os_info.progname);
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index 75bab0a04c..0236cd18c0 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -59,6 +59,7 @@ static void copy_xact_xlog_xid(void);
 static void set_frozenxids(bool minmxid_only);
 static void make_outputdirs(char *pgdata);
 static void setup(char *argv0, bool *live_check);
+static void create_replicaiton_slots(void);
 
 ClusterInfo old_cluster,
 			new_cluster;
@@ -188,6 +189,19 @@ main(int argc, char **argv)
 			  new_cluster.pgdata);
 	check_ok();
 
+	/*
+	 * Create replication slots if requested.
+	 *
+	 * XXX This must be done after doing pg_resetwal command because the
+	 * command will remove required WALs.
+	 */
+	if (user_opts.include_slots)
+	{
+		start_postmaster(&new_cluster, true);
+		create_replicaiton_slots();
+		stop_postmaster(false);
+	}
+
 	if (user_opts.do_sync)
 	{
 		prep_status("Sync data directory to disk");
@@ -860,3 +874,53 @@ set_frozenxids(bool minmxid_only)
 
 	check_ok();
 }
+
+/*
+ * create_replicaiton_slots()
+ *
+ * Similar to create_new_objects() but only restores replication slots.
+ */
+static void
+create_replicaiton_slots(void)
+{
+	int			dbnum;
+
+	prep_status_progress("Restoring replication slots in the new cluster");
+
+	for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
+	{
+		char		slots_file_name[MAXPGPATH],
+					log_file_name[MAXPGPATH];
+		DbInfo	   *old_db = &old_cluster.dbarr.dbs[dbnum];
+		char	   *opts;
+
+		pg_log(PG_STATUS, "%s", old_db->db_name);
+
+		snprintf(slots_file_name, sizeof(slots_file_name),
+				 DB_DUMP_FILE_MASK_FOR_SLOTS, old_db->db_oid);
+		snprintf(log_file_name, sizeof(log_file_name),
+				 DB_DUMP_LOG_FILE_MASK, old_db->db_oid);
+
+		opts = "--echo-queries --set ON_ERROR_STOP=on --no-psqlrc";
+
+		parallel_exec_prog(log_file_name,
+						   NULL,
+						   "\"%s/psql\" %s %s --dbname %s -f \"%s/%s\"",
+						   new_cluster.bindir,
+						   cluster_conn_opts(&new_cluster),
+						   opts,
+						   old_db->db_name,
+						   log_opts.dumpdir,
+						   slots_file_name);
+	}
+
+	/* reap all children */
+	while (reap_child(true) == true)
+		;
+
+	end_progress_output();
+	check_ok();
+
+	/* update new_cluster info now that we have objects in the databases */
+	get_db_and_rel_infos(&new_cluster);
+}
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 3eea0139c7..82d7a89e24 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -29,6 +29,7 @@
 /* contains both global db information and CREATE DATABASE commands */
 #define GLOBALS_DUMP_FILE	"pg_upgrade_dump_globals.sql"
 #define DB_DUMP_FILE_MASK	"pg_upgrade_dump_%u.custom"
+#define DB_DUMP_FILE_MASK_FOR_SLOTS	"pg_upgrade_dump_%u_slots.custom"
 
 /*
  * Base directories that include all the files generated internally, from the
@@ -304,6 +305,8 @@ typedef struct
 	transferMode transfer_mode; /* copy files or link them? */
 	int			jobs;			/* number of processes/threads to use */
 	char	   *socketdir;		/* directory to use for Unix sockets */
+	bool		include_slots;	/* true -> dump and restore replication
+								 * slots */
 } UserOpts;
 
 typedef struct
diff --git a/src/bin/pg_upgrade/t/003_logical_replication.pl b/src/bin/pg_upgrade/t/003_logical_replication.pl
new file mode 100644
index 0000000000..27b36bea5b
--- /dev/null
+++ b/src/bin/pg_upgrade/t/003_logical_replication.pl
@@ -0,0 +1,88 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+# Tests for logical replication, especially for upgrading publisher
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Can be changed to test the other modes.
+my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
+
+# Initialize publisher node
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
+$old_publisher->start;
+
+# Create subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
+$subscriber->start;
+
+$old_publisher->safe_psql('postgres',
+	"CREATE TABLE tbl AS SELECT generate_series(1,10) AS a");
+$subscriber->safe_psql('postgres',
+	"CREATE TABLE tbl (a int)");
+
+# Setup logical replication
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+$old_publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+$subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub"
+);
+
+# Wait for initial table sync to finish
+$subscriber->wait_for_subscription_sync($old_publisher, 'sub');
+
+# Preparations for upgrading publisher
+$old_publisher->stop;
+$subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub DISABLE");
+
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 'logical');
+
+my $bindir = $new_publisher->config_data('--bindir');
+
+# Run pg_upgrade. pg_upgrade_output.d is removed at the end
+command_ok(
+	[
+		'pg_upgrade', '--no-sync',        '-d', $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir, '-b', $bindir,
+		'-B',         $bindir,         '-s', $new_publisher->host,
+		'-p',         $old_publisher->port,     '-P', $new_publisher->port,
+		$mode, '--include-replication-slot'
+	],
+	'run of pg_upgrade for new publisher');
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ removed after pg_upgrade success");
+
+# Check whether the replication slot is copied
+$new_publisher->start;
+my $result =
+  $new_publisher->safe_psql('postgres', "SELECT count(*) FROM pg_replication_slots");
+is($result, qq(1),
+	'check the replication slot is copied to new publisher');
+
+# Change connection string and enable logical replication
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+
+$subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub CONNECTION '$new_connstr'");
+$subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub ENABLE");
+
+# Check whether changes on new publisher are shipped to subscriber
+$new_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))"
+);
+
+$new_publisher->wait_for_catchup('sub');
+
+$result =
+  $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20),
+	'check changes are shipped to subscriber');
+
+done_testing();
-- 
2.27.0

