From d4694a87b66e5b36d73cb18ba8eb7ad8cd223626 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 14 Apr 2023 08:59:03 +0000
Subject: [PATCH 3/3] pg_upgrade: Add check function for
 --include-logical-replication-slots option

XXX: Actually, this commit disallows to support slots which are created by user
backends. In the checking function we ensure that all the avtive slots have
confirmed_flush_lsn which is same as current WAL position, and they would not be
the same. For slots which are used by logical replication, logical walsenders
guarantee that at the shutdown. For individual slots, however, cannot be handled
by walsenders, so confirmed_flush_lsn is behind shutdown checkpoint record.

Author: Hayato Kuroda
Reviewed-by: Wang Wei, Vignesh C
---
 src/bin/pg_upgrade/check.c                    |  57 ++++++++
 .../t/003_logical_replication_slots.pl        | 134 +++++++++++-------
 2 files changed, 138 insertions(+), 53 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 4b54f5567c..e9911cb302 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -31,6 +31,7 @@ static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_for_logical_replication_slots(ClusterInfo *new_cluster);
+static void check_for_confirmed_flush_lsn(ClusterInfo *cluster);
 
 static int num_slots_on_old_cluster;
 
@@ -109,6 +110,8 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_composite_data_type_usage(&old_cluster);
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
+	if (user_opts.include_logical_slots)
+		check_for_confirmed_flush_lsn(&old_cluster);
 
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the
@@ -1478,3 +1481,57 @@ check_for_logical_replication_slots(ClusterInfo *new_cluster)
 
 	check_ok();
 }
+
+/*
+ * Verify that all logical replication slots consumed all WALs, except a
+ * CHECKPOINT_SHUTDOWN record.
+ */
+static void
+check_for_confirmed_flush_lsn(ClusterInfo *cluster)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	bool		is_error = false;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn = connectToServer(cluster, active_db->db_name);
+
+	Assert(user_opts.include_logical_slots);
+
+	/* --include-logical-replication-slots can be used since PG16. */
+	if (GET_MAJOR_VERSION(cluster->major_version) <= 1500)
+		return;
+
+	prep_status("Checking confirmed_flush_lsn for logical replication slots");
+
+	/*
+	 * Check that all logical replication slots have reached the current WAL
+	 * position.
+	 */
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE (pg_catalog.pg_current_wal_insert_lsn() - confirmed_flush_lsn) <> 0 "
+							"AND temporary = false AND wal_status IN ('reserved', 'extended');");
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+	{
+		is_error = true;
+
+		pg_log(PG_WARNING,
+			   "\nWARNING: logical replication slot \"%s\" has not consumed WALs yet",
+			   PQgetvalue(res, i, i_slotname));
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (is_error)
+		pg_fatal("--include-logical-replication-slots requires that all "
+				 "logical replication slots consumed all the WALs");
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index 9ca266f6b2..87049d7116 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -15,132 +15,160 @@ use Test::More;
 my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
 
 # Initialize old node
-my $old_node = PostgreSQL::Test::Cluster->new('old_node');
-$old_node->init(allows_streaming => 'logical');
-$old_node->start;
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
 
 # Initialize new node
-my $new_node = PostgreSQL::Test::Cluster->new('new_node');
-$new_node->init(allows_streaming => 1);
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 1);
 
-my $bindir = $new_node->config_data('--bindir');
+# Initialize subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
 
-$old_node->stop;
+my $bindir = $new_publisher->config_data('--bindir');
 
 # Cause a failure at the start of pg_upgrade because wal_level is replica
 command_fails(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,        '--include-logical-replication-slots',
 	],
 	'run of pg_upgrade of old node with wrong wal_level');
-ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
 
 # Clean up
-rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # Preparations for the subsequent test. The case max_replication_slots is set
 # to 0 is prohibited.
-$new_node->append_conf('postgresql.conf', "wal_level = 'logical'");
-$new_node->append_conf('postgresql.conf', "max_replication_slots = 0");
+$new_publisher->append_conf('postgresql.conf', "wal_level = 'logical'");
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 0");
 
 # Cause a failure at the start of pg_upgrade because max_replication_slots is 0
 command_fails(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,        '--include-logical-replication-slots',
 	],
 	'run of pg_upgrade of old node with wrong max_replication_slots');
-ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
 
 # Clean up
-rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # Preparations for the subsequent test. max_replication_slots is set to
 # non-zero value
-$new_node->append_conf('postgresql.conf', "max_replication_slots = 1");
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
 
-# Create a slot on old node, and generate WALs
-$old_node->start;
-$old_node->safe_psql(
+# Create a slot on old node
+$old_publisher->start;
+$old_publisher->safe_psql(
 	'postgres', qq[
-	SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, true);
-	SELECT pg_create_logical_replication_slot('to_be_dropped', 'test_decoding', false, true);
+	SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding', false, true);
+	SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);
 	CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;
 ]);
 
-$old_node->stop;
+$old_publisher->stop;
 
 # Cause a failure at the start of pg_upgrade because max_replication_slots is
 # smaller than existing slots on old node
 command_fails(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,        '--include-logical-replication-slots',
 	],
 	'run of pg_upgrade of old node with small max_replication_slots');
-ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
 
 # Clean up
-rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # Preparations for the subsequent test. max_replication_slots is set to
 # appropriate value
-$new_node->append_conf('postgresql.conf', "max_replication_slots = 10");
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 10");
 
-# Remove an unnecessary slot and consume WALs
-$old_node->start;
-$old_node->safe_psql(
+$old_publisher->start;
+$old_publisher->safe_psql(
 	'postgres', qq[
-	SELECT pg_drop_replication_slot('to_be_dropped');
-	SELECT count(*) FROM pg_logical_slot_get_changes('test_slot', NULL, NULL)
+	SELECT pg_drop_replication_slot('test_slot1');
+	SELECT pg_drop_replication_slot('test_slot2');
 ]);
-$old_node->stop;
+
+# Setup logical replication
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+$old_publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+$subscriber->start;
+$subscriber->safe_psql(
+	'postgres', qq[
+	CREATE TABLE tbl (a int);
+	CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub WITH (two_phase = 'true')
+]);
+
+# Wait for initial table sync to finish
+$subscriber->wait_for_subscription_sync($old_publisher, 'sub');
+
+$old_publisher->stop;
 
 # Actual run, pg_upgrade_output.d is removed at the end
 command_ok(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,        '--include-logical-replication-slots'
 	],
 	'run of pg_upgrade of old node');
-ok( !-d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ removed after pg_upgrade success");
 
-$new_node->start;
-my $result = $new_node->safe_psql('postgres',
+$new_publisher->start;
+my $result = $new_publisher->safe_psql('postgres',
 	"SELECT slot_name, two_phase FROM pg_replication_slots");
-is($result, qq(test_slot|t), 'check the slot exists on new node');
+is($result, qq(sub|t), 'check the slot exists on new node');
+
+# Change the connection
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+$subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub CONNECTION '$new_connstr'");
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+$new_publisher->wait_for_catchup('sub');
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are shipped to subscriber');
 
 done_testing();
-- 
2.27.0

