From 54eb780374816cf97a82a7fbbb7e2a6c165c99e5 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 14 Apr 2023 08:59:03 +0000
Subject: [PATCH v19 3/3] pg_upgrade: Add check function for logical
 replication slots

pg_upgrade fails if the old node has slots which status is 'lost' or they do not
consume all WAL records. These are needed for prevent the data loss.

For implementing that some functions are ported from pg_walinspect to core.

Author: Hayato Kuroda
Reviewed-by: Wang Wei, Vignesh C
---
 contrib/pg_walinspect/pg_walinspect.c         |  94 -----------
 src/backend/access/transam/xlogutils.c        |  92 +++++++++++
 src/backend/catalog/system_functions.sql      |   4 +
 src/backend/utils/adt/pg_upgrade_support.c    |  62 ++++++-
 src/bin/pg_upgrade/check.c                    |  83 ++++++++++
 .../t/003_logical_replication_slots.pl        | 155 ++++++++++++------
 src/include/access/xlogutils.h                |   3 +
 src/include/catalog/pg_proc.dat               |   8 +
 src/test/regress/sql/misc_functions.sql       |   2 +-
 9 files changed, 354 insertions(+), 149 deletions(-)

diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c
index 796a74f322..49f4f92e98 100644
--- a/contrib/pg_walinspect/pg_walinspect.c
+++ b/contrib/pg_walinspect/pg_walinspect.c
@@ -40,8 +40,6 @@ PG_FUNCTION_INFO_V1(pg_get_wal_stats_till_end_of_wal);
 
 static void ValidateInputLSNs(XLogRecPtr start_lsn, XLogRecPtr *end_lsn);
 static XLogRecPtr GetCurrentLSN(void);
-static XLogReaderState *InitXLogReaderState(XLogRecPtr lsn);
-static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
 static void GetWALRecordInfo(XLogReaderState *record, Datum *values,
 							 bool *nulls, uint32 ncols);
 static void GetWALRecordsInfo(FunctionCallInfo fcinfo,
@@ -84,98 +82,6 @@ GetCurrentLSN(void)
 	return curr_lsn;
 }
 
-/*
- * Initialize WAL reader and identify first valid LSN.
- */
-static XLogReaderState *
-InitXLogReaderState(XLogRecPtr lsn)
-{
-	XLogReaderState *xlogreader;
-	ReadLocalXLogPageNoWaitPrivate *private_data;
-	XLogRecPtr	first_valid_record;
-
-	/*
-	 * Reading WAL below the first page of the first segments isn't allowed.
-	 * This is a bootstrap WAL page and the page_read callback fails to read
-	 * it.
-	 */
-	if (lsn < XLOG_BLCKSZ)
-		ereport(ERROR,
-				(errmsg("could not read WAL at LSN %X/%X",
-						LSN_FORMAT_ARGS(lsn))));
-
-	private_data = (ReadLocalXLogPageNoWaitPrivate *)
-		palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate));
-
-	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
-									XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait,
-											   .segment_open = &wal_segment_open,
-											   .segment_close = &wal_segment_close),
-									private_data);
-
-	if (xlogreader == NULL)
-		ereport(ERROR,
-				(errcode(ERRCODE_OUT_OF_MEMORY),
-				 errmsg("out of memory"),
-				 errdetail("Failed while allocating a WAL reading processor.")));
-
-	/* first find a valid recptr to start from */
-	first_valid_record = XLogFindNextRecord(xlogreader, lsn);
-
-	if (XLogRecPtrIsInvalid(first_valid_record))
-		ereport(ERROR,
-				(errmsg("could not find a valid record after %X/%X",
-						LSN_FORMAT_ARGS(lsn))));
-
-	return xlogreader;
-}
-
-/*
- * Read next WAL record.
- *
- * By design, to be less intrusive in a running system, no slot is allocated
- * to reserve the WAL we're about to read. Therefore this function can
- * encounter read errors for historical WAL.
- *
- * We guard against ordinary errors trying to read WAL that hasn't been
- * written yet by limiting end_lsn to the flushed WAL, but that can also
- * encounter errors if the flush pointer falls in the middle of a record. In
- * that case we'll return NULL.
- */
-static XLogRecord *
-ReadNextXLogRecord(XLogReaderState *xlogreader)
-{
-	XLogRecord *record;
-	char	   *errormsg;
-
-	record = XLogReadRecord(xlogreader, &errormsg);
-
-	if (record == NULL)
-	{
-		ReadLocalXLogPageNoWaitPrivate *private_data;
-
-		/* return NULL, if end of WAL is reached */
-		private_data = (ReadLocalXLogPageNoWaitPrivate *)
-			xlogreader->private_data;
-
-		if (private_data->end_of_wal)
-			return NULL;
-
-		if (errormsg)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read WAL at %X/%X: %s",
-							LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
-		else
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read WAL at %X/%X",
-							LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
-	}
-
-	return record;
-}
-
 /*
  * Output values that make up a row describing caller's WAL record.
  *
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index e174a2a891..574797fbbd 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -1048,3 +1048,95 @@ WALReadRaiseError(WALReadError *errinfo)
 						errinfo->wre_req)));
 	}
 }
+
+/*
+ * Initialize WAL reader and identify first valid LSN.
+ */
+XLogReaderState *
+InitXLogReaderState(XLogRecPtr lsn)
+{
+	XLogReaderState *xlogreader;
+	ReadLocalXLogPageNoWaitPrivate *private_data;
+	XLogRecPtr	first_valid_record;
+
+	/*
+	 * Reading WAL below the first page of the first segments isn't allowed.
+	 * This is a bootstrap WAL page and the page_read callback fails to read
+	 * it.
+	 */
+	if (lsn < XLOG_BLCKSZ)
+		ereport(ERROR,
+				(errmsg("could not read WAL at LSN %X/%X",
+						LSN_FORMAT_ARGS(lsn))));
+
+	private_data = (ReadLocalXLogPageNoWaitPrivate *)
+		palloc0(sizeof(ReadLocalXLogPageNoWaitPrivate));
+
+	xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
+									XL_ROUTINE(.page_read = &read_local_xlog_page_no_wait,
+											   .segment_open = &wal_segment_open,
+											   .segment_close = &wal_segment_close),
+									private_data);
+
+	if (xlogreader == NULL)
+		ereport(ERROR,
+				(errcode(ERRCODE_OUT_OF_MEMORY),
+				 errmsg("out of memory"),
+				 errdetail("Failed while allocating a WAL reading processor.")));
+
+	/* first find a valid recptr to start from */
+	first_valid_record = XLogFindNextRecord(xlogreader, lsn);
+
+	if (XLogRecPtrIsInvalid(first_valid_record))
+		ereport(ERROR,
+				(errmsg("could not find a valid record after %X/%X",
+						LSN_FORMAT_ARGS(lsn))));
+
+	return xlogreader;
+}
+
+/*
+ * Read next WAL record.
+ *
+ * By design, to be less intrusive in a running system, no slot is allocated
+ * to reserve the WAL we're about to read. Therefore this function can
+ * encounter read errors for historical WAL.
+ *
+ * We guard against ordinary errors trying to read WAL that hasn't been
+ * written yet by limiting end_lsn to the flushed WAL, but that can also
+ * encounter errors if the flush pointer falls in the middle of a record. In
+ * that case we'll return NULL.
+ */
+XLogRecord *
+ReadNextXLogRecord(XLogReaderState *xlogreader)
+{
+	XLogRecord *record;
+	char	   *errormsg;
+
+	record = XLogReadRecord(xlogreader, &errormsg);
+
+	if (record == NULL)
+	{
+		ReadLocalXLogPageNoWaitPrivate *private_data;
+
+		/* return NULL, if end of WAL is reached */
+		private_data = (ReadLocalXLogPageNoWaitPrivate *)
+			xlogreader->private_data;
+
+		if (private_data->end_of_wal)
+			return NULL;
+
+		if (errormsg)
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not read WAL at %X/%X: %s",
+							LSN_FORMAT_ARGS(xlogreader->EndRecPtr), errormsg)));
+		else
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not read WAL at %X/%X",
+							LSN_FORMAT_ARGS(xlogreader->EndRecPtr))));
+	}
+
+	return record;
+}
diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql
index 07c0d89c4f..7d52750ae9 100644
--- a/src/backend/catalog/system_functions.sql
+++ b/src/backend/catalog/system_functions.sql
@@ -616,6 +616,8 @@ REVOKE EXECUTE ON FUNCTION pg_backup_stop(boolean) FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public;
 
+REVOKE EXECUTE ON FUNCTION validate_wal_record_types_after(pg_lsn) FROM public;
+
 REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;
 
 REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public;
@@ -726,6 +728,8 @@ REVOKE EXECUTE ON FUNCTION pg_ls_replslotdir(text) FROM PUBLIC;
 -- We also set up some things as accessible to standard roles.
 --
 
+GRANT EXECUTE ON FUNCTION validate_wal_record_types_after(pg_lsn) TO pg_read_server_files;
+
 GRANT EXECUTE ON FUNCTION pg_ls_logdir() TO pg_monitor;
 
 GRANT EXECUTE ON FUNCTION pg_ls_waldir() TO pg_monitor;
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 0186636d9f..c3a9c4a83b 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -11,15 +11,20 @@
 
 #include "postgres.h"
 
+#include "access/xlog.h"
+#include "access/xlog_internal.h"
+#include "access/xlogutils.h"
 #include "catalog/binary_upgrade.h"
 #include "catalog/heap.h"
+#include "catalog/pg_control.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_type.h"
 #include "commands/extension.h"
+#include "storage/standbydefs.h"
 #include "miscadmin.h"
 #include "utils/array.h"
 #include "utils/builtins.h"
-
+#include "utils/pg_lsn.h"
 
 #define CHECK_IS_BINARY_UPGRADE									\
 do {															\
@@ -261,3 +266,58 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)
 
 	PG_RETURN_VOID();
 }
+
+Datum
+validate_wal_record_types_after(PG_FUNCTION_ARGS)
+{
+	XLogRecPtr	start_lsn = PG_GETARG_LSN(0);
+	XLogRecPtr	curr_lsn = GetFlushRecPtr(NULL);
+	XLogReaderState *xlogreader;
+	bool		initial_record = true;
+
+	/* Quick exit if the given lsn is larger than current one */
+	if (start_lsn > curr_lsn)
+		PG_RETURN_BOOL(true);
+
+	xlogreader = InitXLogReaderState(start_lsn);
+
+	/* Read records till end of WAL */
+	while (ReadNextXLogRecord(xlogreader))
+	{
+		uint8		info;
+
+		/*
+		 * XXX: check the type of WAL. Currently XLOG info is directly
+		 * extracted, but it may be better to use the descriptor instead.
+		 */
+		info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
+
+		if (initial_record)
+		{
+			/* Initial record must be XLOG_CHECKPOINT_SHUTDOWN */
+			if (info != XLOG_CHECKPOINT_SHUTDOWN)
+				PG_RETURN_BOOL(false);
+
+			initial_record = false;
+		}
+
+		else
+		{
+			/*
+			 * XXX: There is a possibility that following records may be
+			 * generated during the upgrade.
+			 */
+			if (info != XLOG_RUNNING_XACTS &&
+				info != XLOG_CHECKPOINT_ONLINE &&
+				info != XLOG_FPI_FOR_HINT)
+				PG_RETURN_BOOL(false);
+		}
+
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	pfree(xlogreader->private_data);
+	XLogReaderFree(xlogreader);
+
+	PG_RETURN_BOOL(true);
+}
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 93da9e15f3..e772629c42 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -31,6 +31,7 @@ static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_for_logical_replication_slots(ClusterInfo *new_cluster);
+static void check_for_confirmed_flush_lsn(ClusterInfo *cluster);
 
 int	num_slots_on_old_cluster;
 
@@ -111,6 +112,8 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_composite_data_type_usage(&old_cluster);
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
+	if (!user_opts.exclude_logical_slots)
+		check_for_confirmed_flush_lsn(&old_cluster);
 
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the
@@ -1479,3 +1482,83 @@ check_for_logical_replication_slots(ClusterInfo *new_cluster)
 
 	check_ok();
 }
+
+/*
+ * Verify that all logical replication slots consumed all WALs, except a
+ * CHECKPOINT_SHUTDOWN record.
+ */
+static void
+check_for_confirmed_flush_lsn(ClusterInfo *cluster)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	bool		is_error = false;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn = connectToServer(cluster, active_db->db_name);
+
+	Assert(!user_opts.exclude_logical_slots);
+
+	/* logical slots can be dumped since PG17. */
+	if (GET_MAJOR_VERSION(cluster->major_version) <= 1600)
+		return;
+
+	prep_status("Checking confirmed_flush_lsn for logical replication slots");
+
+	/* Check that all logical slots are not in 'lost' state. */
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE temporary = false AND wal_status = 'lost';");
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+	{
+		is_error = true;
+
+		pg_log(PG_WARNING,
+			   "\nWARNING: logical replication slot \"%s\" is obsolete.",
+			   PQgetvalue(res, i, i_slotname));
+	}
+
+	PQclear(res);
+
+	if (is_error)
+		pg_fatal("logical replication slots not to be in 'lost' state."
+				 "Please use --exclude-logical-replication-slots if it is "
+				 "unexpected.");
+
+	/*
+	 * Check that all logical replication slots have reached the current WAL
+	 * position.
+	 */
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE (SELECT pg_catalog.validate_wal_record_types_after(confirmed_flush_lsn)) IS FALSE "
+							"AND temporary = false;");
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+	{
+		is_error = true;
+
+		pg_log(PG_WARNING,
+			   "\nWARNING: logical replication slot \"%s\" has not consumed WALs yet",
+			   PQgetvalue(res, i, i_slotname));
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (is_error)
+		pg_fatal("All logical replication slots consumed all the WALs."
+				 "Please use --exclude-logical-replication-slots if it is "
+				 "unexpected.");
+
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index f015b5d363..59e2c2f209 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -15,138 +15,187 @@ use Test::More;
 my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
 
 # Initialize old node
-my $old_node = PostgreSQL::Test::Cluster->new('old_node');
-$old_node->init(allows_streaming => 'logical');
-$old_node->start;
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
 
 # Initialize new node
-my $new_node = PostgreSQL::Test::Cluster->new('new_node');
-$new_node->init(allows_streaming => 1);
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 1);
 
-my $bindir = $new_node->config_data('--bindir');
+# Initialize subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
 
-$old_node->stop;
+my $bindir = $new_publisher->config_data('--bindir');
 
 # Create a slot on old node
-$old_node->start;
-$old_node->safe_psql(
+$old_publisher->start;
+$old_publisher->safe_psql(
 	'postgres', "SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding', false, true);"
 );
-$old_node->stop;
+$old_publisher->stop;
 
 # Cause a failure at the start of pg_upgrade because wal_level is replica
 command_fails(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,
 	],
 	'run of pg_upgrade of old node with wrong wal_level');
-ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
 
 # Clean up
-rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # Preparations for the subsequent test. The case max_replication_slots is set
 # to 0 is prohibited.
-$new_node->append_conf('postgresql.conf', "wal_level = 'logical'");
-$new_node->append_conf('postgresql.conf', "max_replication_slots = 0");
+$new_publisher->append_conf('postgresql.conf', "wal_level = 'logical'");
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 0");
 
 # Cause a failure at the start of pg_upgrade because max_replication_slots is 0
 command_fails(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,
 	],
 	'run of pg_upgrade of old node with wrong max_replication_slots');
-ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
 
 # Clean up
-rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # Preparations for the subsequent test. max_replication_slots is set to
 # non-zero value
-$new_node->append_conf('postgresql.conf', "max_replication_slots = 1");
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
 
-# Create a slot on old node, and generate WALs
-$old_node->start;
-$old_node->safe_psql(
+# Create a slot on old node
+$old_publisher->start;
+$old_publisher->safe_psql(
 	'postgres', qq[
 	SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);
 	CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;
 ]);
 
-$old_node->stop;
+$old_publisher->stop;
 
 # Cause a failure at the start of pg_upgrade because max_replication_slots is
 # smaller than existing slots on old node
 command_fails(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,
 	],
 	'run of pg_upgrade of old node with small max_replication_slots');
-ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
 
 # Clean up
-rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # Preparations for the subsequent test. max_replication_slots is set to
 # appropriate value
-$new_node->append_conf('postgresql.conf', "max_replication_slots = 10");
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 10");
 
-# Remove an unnecessary slot and consume WALs
-$old_node->start;
-$old_node->safe_psql(
+# Cause a failure at the start of pg_upgrade because slot do not finish
+# consuming all the WALs
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade of old node with idle replication slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+$old_publisher->start;
+$old_publisher->safe_psql(
 	'postgres', qq[
 	SELECT pg_drop_replication_slot('test_slot1');
-	SELECT count(*) FROM pg_logical_slot_get_changes('test_slot2', NULL, NULL)
+	SELECT pg_drop_replication_slot('test_slot2');
+]);
+
+# Setup logical replication
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+$old_publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+$subscriber->start;
+$subscriber->safe_psql(
+	'postgres', qq[
+	CREATE TABLE tbl (a int);
+	CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub WITH (two_phase = 'true')
 ]);
-$old_node->stop;
+
+# Wait for initial table sync to finish
+$subscriber->wait_for_subscription_sync($old_publisher, 'sub');
+
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+$old_publisher->stop;
 
 # Actual run, pg_upgrade_output.d is removed at the end
 command_ok(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,
 	],
 	'run of pg_upgrade of old node');
-ok( !-d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ removed after pg_upgrade success");
 
-$new_node->start;
-my $result = $new_node->safe_psql('postgres',
+$new_publisher->start;
+my $result = $new_publisher->safe_psql('postgres',
 	"SELECT slot_name, two_phase FROM pg_replication_slots");
-is($result, qq(test_slot2|t), 'check the slot exists on new node');
+is($result, qq(sub|t), 'check the slot exists on new node');
+
+# Change the connection
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+$subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub CONNECTION '$new_connstr'");
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+$new_publisher->wait_for_catchup('sub');
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are shipped to subscriber');
 
 done_testing();
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 5b77b11f50..1cf31aa24f 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -115,4 +115,7 @@ extern void XLogReadDetermineTimeline(XLogReaderState *state,
 
 extern void WALReadRaiseError(WALReadError *errinfo);
 
+extern XLogReaderState *InitXLogReaderState(XLogRecPtr lsn);
+extern XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader);
+
 #endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 6996073989..aaa474476c 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6489,6 +6489,14 @@
   proargnames => '{rm_id, rm_name, rm_builtin}',
   prosrc => 'pg_get_wal_resource_managers' },
 
+{ oid => '8046', descr => 'Info of the WAL conent',
+  proname => 'validate_wal_record_types_after', prorows => '10', proretset => 't',
+  provolatile => 's', prorettype => 'bool', proargtypes => 'pg_lsn',
+  proallargtypes => '{pg_lsn,bool}',
+  proargmodes => '{i,o}',
+  proargnames => '{start_lsn,is_ok}',
+  prosrc => 'validate_wal_record_types_after' },
+
 { oid => '2621', descr => 'reload configuration files',
   proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool',
   proargtypes => '', prosrc => 'pg_reload_conf' },
diff --git a/src/test/regress/sql/misc_functions.sql b/src/test/regress/sql/misc_functions.sql
index b57f01f3e9..ffe7d5b4ce 100644
--- a/src/test/regress/sql/misc_functions.sql
+++ b/src/test/regress/sql/misc_functions.sql
@@ -236,4 +236,4 @@ SELECT * FROM pg_split_walfile_name('invalid');
 SELECT segment_number > 0 AS ok_segment_number, timeline_id
   FROM pg_split_walfile_name('000000010000000100000000');
 SELECT segment_number > 0 AS ok_segment_number, timeline_id
-  FROM pg_split_walfile_name('ffffffFF00000001000000af');
+  FROM pg_split_walfile_name('ffffffFF00000001000000af');
\ No newline at end of file
-- 
2.27.0

