From 285953f835928a8bb74e4260527541aa5dd8324a Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 14 Apr 2023 08:59:03 +0000
Subject: [PATCH v20 3/3] pg_upgrade: Add check function for logical
 replication slots

pg_upgrade fails if the old node has slots which status is 'lost' or they do not
consume all WAL records. These are needed for prevent the data loss.

Author: Hayato Kuroda
Reviewed-by: Wang Wei, Vignesh C
---
 src/bin/pg_upgrade/check.c                    |  82 +++++++++
 src/bin/pg_upgrade/controldata.c              |  31 ++++
 src/bin/pg_upgrade/pg_upgrade.h               |   3 +
 .../t/003_logical_replication_slots.pl        | 155 ++++++++++++------
 src/test/regress/sql/misc_functions.sql       |   2 +-
 5 files changed, 219 insertions(+), 54 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index d5af0dcbc7..2ad1249cf8 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -31,6 +31,7 @@ static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_for_logical_replication_slots(ClusterInfo *new_cluster);
+static void check_for_confirmed_flush_lsn(ClusterInfo *cluster, bool live_check);
 
 int	num_slots_on_old_cluster;
 
@@ -108,6 +109,8 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_composite_data_type_usage(&old_cluster);
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
+	if (num_slots_on_old_cluster)
+		check_for_confirmed_flush_lsn(&old_cluster, live_check);
 
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the
@@ -1471,3 +1474,82 @@ check_for_logical_replication_slots(ClusterInfo *new_cluster)
 
 	check_ok();
 }
+
+/*
+ * Verify that all logical replication slots consumed all WALs, except a
+ * CHECKPOINT_SHUTDOWN record.
+ */
+static void
+check_for_confirmed_flush_lsn(ClusterInfo *cluster, bool live_check)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	bool		is_error = false;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn = connectToServer(cluster, active_db->db_name);
+
+	/* logical slots can be dumped since PG17. */
+	if (GET_MAJOR_VERSION(cluster->major_version) <= 1600)
+		return;
+
+	prep_status("Checking confirmed_flush_lsn for logical replication slots");
+
+	/* Check that all logical slots are not in 'lost' state. */
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE temporary = false AND wal_status = 'lost';");
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+	{
+		is_error = true;
+
+		pg_log(PG_WARNING,
+			   "\nWARNING: logical replication slot \"%s\" is obsolete.",
+			   PQgetvalue(res, i, i_slotname));
+	}
+
+	PQclear(res);
+
+	if (is_error)
+		pg_fatal("logical replication slots not to be in 'lost' state.");
+
+	/*
+	 * Check that all logical replication slots have reached the latest
+	 * checkpoint position (SHUTDOWN_CHECKPOINT record). This checks cannot be
+	 * done in case of live_check because the server has not been written the
+	 * SHUTDOWN_CHECKPOINT record yet.
+	 */
+	if (!live_check)
+	{
+		res = executeQueryOrDie(conn,
+								"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+								"WHERE confirmed_flush_lsn != '%X/%X' AND temporary = false;",
+								old_cluster.controldata.chkpnt_latest_upper,
+								old_cluster.controldata.chkpnt_latest_lower);
+
+		ntups = PQntuples(res);
+		i_slotname = PQfnumber(res, "slot_name");
+
+		for (i = 0; i < ntups; i++)
+		{
+			is_error = true;
+
+			pg_log(PG_WARNING,
+				   "\nWARNING: logical replication slot \"%s\" has not consumed WALs yet",
+				   PQgetvalue(res, i, i_slotname));
+		}
+
+		PQclear(res);
+		PQfinish(conn);
+
+		if (is_error)
+			pg_fatal("All logical replication slots consumed all the WALs.");
+	}
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index 4beb65ab22..e00543e74a 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -169,6 +169,37 @@ get_control_data(ClusterInfo *cluster, bool live_check)
 				}
 				got_cluster_state = true;
 			}
+
+			else if ((p = strstr(bufin, "Latest checkpoint location:")) != NULL)
+			{
+				/*
+				 * Gather latest checkpoint location if the cluster is newer or
+				 * equal to 17. This is used for upgrading logical replication
+				 * slots.
+				 */
+				if (GET_MAJOR_VERSION(cluster->major_version) >= 17)
+				{
+					char *slash = NULL;
+
+					p = strchr(p, ':');
+
+					if (p == NULL || strlen(p) <= 1)
+						pg_fatal("%d: controldata retrieval problem", __LINE__);
+
+					p++;			/* remove ':' char */
+
+					p = strpbrk(p, "01234567890ABCDEF");
+
+					/*
+					 * Upper and lower part of LSN must be read and stored
+					 * separately because it is reported as %X/%X format.
+					 */
+					cluster->controldata.chkpnt_latest_upper =
+						strtoul(p, &slash, 16);
+					cluster->controldata.chkpnt_latest_lower =
+						strtoul(++slash, NULL, 16);
+				}
+			}
 		}
 
 		rc = pclose(output);
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 2a3a178cde..475bbeb30b 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -245,6 +245,9 @@ typedef struct
 	bool		date_is_int;
 	bool		float8_pass_by_value;
 	uint32		data_checksum_version;
+
+	uint32		chkpnt_latest_upper;
+	uint32		chkpnt_latest_lower;
 } ControlData;
 
 /*
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index f015b5d363..59e2c2f209 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -15,138 +15,187 @@ use Test::More;
 my $mode = $ENV{PG_TEST_PG_UPGRADE_MODE} || '--copy';
 
 # Initialize old node
-my $old_node = PostgreSQL::Test::Cluster->new('old_node');
-$old_node->init(allows_streaming => 'logical');
-$old_node->start;
+my $old_publisher = PostgreSQL::Test::Cluster->new('old_publisher');
+$old_publisher->init(allows_streaming => 'logical');
 
 # Initialize new node
-my $new_node = PostgreSQL::Test::Cluster->new('new_node');
-$new_node->init(allows_streaming => 1);
+my $new_publisher = PostgreSQL::Test::Cluster->new('new_publisher');
+$new_publisher->init(allows_streaming => 1);
 
-my $bindir = $new_node->config_data('--bindir');
+# Initialize subscriber node
+my $subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$subscriber->init(allows_streaming => 'logical');
 
-$old_node->stop;
+my $bindir = $new_publisher->config_data('--bindir');
 
 # Create a slot on old node
-$old_node->start;
-$old_node->safe_psql(
+$old_publisher->start;
+$old_publisher->safe_psql(
 	'postgres', "SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding', false, true);"
 );
-$old_node->stop;
+$old_publisher->stop;
 
 # Cause a failure at the start of pg_upgrade because wal_level is replica
 command_fails(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,
 	],
 	'run of pg_upgrade of old node with wrong wal_level');
-ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
 
 # Clean up
-rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # Preparations for the subsequent test. The case max_replication_slots is set
 # to 0 is prohibited.
-$new_node->append_conf('postgresql.conf', "wal_level = 'logical'");
-$new_node->append_conf('postgresql.conf', "max_replication_slots = 0");
+$new_publisher->append_conf('postgresql.conf', "wal_level = 'logical'");
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 0");
 
 # Cause a failure at the start of pg_upgrade because max_replication_slots is 0
 command_fails(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,
 	],
 	'run of pg_upgrade of old node with wrong max_replication_slots');
-ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
 
 # Clean up
-rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # Preparations for the subsequent test. max_replication_slots is set to
 # non-zero value
-$new_node->append_conf('postgresql.conf', "max_replication_slots = 1");
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 1");
 
-# Create a slot on old node, and generate WALs
-$old_node->start;
-$old_node->safe_psql(
+# Create a slot on old node
+$old_publisher->start;
+$old_publisher->safe_psql(
 	'postgres', qq[
 	SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding', false, true);
 	CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a;
 ]);
 
-$old_node->stop;
+$old_publisher->stop;
 
 # Cause a failure at the start of pg_upgrade because max_replication_slots is
 # smaller than existing slots on old node
 command_fails(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,
 	],
 	'run of pg_upgrade of old node with small max_replication_slots');
-ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
 
 # Clean up
-rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
 
 # Preparations for the subsequent test. max_replication_slots is set to
 # appropriate value
-$new_node->append_conf('postgresql.conf', "max_replication_slots = 10");
+$new_publisher->append_conf('postgresql.conf', "max_replication_slots = 10");
 
-# Remove an unnecessary slot and consume WALs
-$old_node->start;
-$old_node->safe_psql(
+# Cause a failure at the start of pg_upgrade because slot do not finish
+# consuming all the WALs
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,
+	],
+	'run of pg_upgrade of old node with idle replication slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+
+$old_publisher->start;
+$old_publisher->safe_psql(
 	'postgres', qq[
 	SELECT pg_drop_replication_slot('test_slot1');
-	SELECT count(*) FROM pg_logical_slot_get_changes('test_slot2', NULL, NULL)
+	SELECT pg_drop_replication_slot('test_slot2');
+]);
+
+# Setup logical replication
+my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
+$old_publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES");
+$subscriber->start;
+$subscriber->safe_psql(
+	'postgres', qq[
+	CREATE TABLE tbl (a int);
+	CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub WITH (two_phase = 'true')
 ]);
-$old_node->stop;
+
+# Wait for initial table sync to finish
+$subscriber->wait_for_subscription_sync($old_publisher, 'sub');
+
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE");
+$old_publisher->stop;
 
 # Actual run, pg_upgrade_output.d is removed at the end
 command_ok(
 	[
 		'pg_upgrade', '--no-sync',
-		'-d',         $old_node->data_dir,
-		'-D',         $new_node->data_dir,
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
 		'-b',         $bindir,
 		'-B',         $bindir,
-		'-s',         $new_node->host,
-		'-p',         $old_node->port,
-		'-P',         $new_node->port,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
 		$mode,
 	],
 	'run of pg_upgrade of old node');
-ok( !-d $new_node->data_dir . "/pg_upgrade_output.d",
+ok( !-d $new_publisher->data_dir . "/pg_upgrade_output.d",
 	"pg_upgrade_output.d/ removed after pg_upgrade success");
 
-$new_node->start;
-my $result = $new_node->safe_psql('postgres',
+$new_publisher->start;
+my $result = $new_publisher->safe_psql('postgres',
 	"SELECT slot_name, two_phase FROM pg_replication_slots");
-is($result, qq(test_slot2|t), 'check the slot exists on new node');
+is($result, qq(sub|t), 'check the slot exists on new node');
+
+# Change the connection
+my $new_connstr = $new_publisher->connstr . ' dbname=postgres';
+$subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION sub CONNECTION '$new_connstr'");
+$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
+
+# Check whether changes on the new publisher get replicated to the subscriber
+$new_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+$new_publisher->wait_for_catchup('sub');
+$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
+is($result, qq(20), 'check changes are shipped to subscriber');
 
 done_testing();
diff --git a/src/test/regress/sql/misc_functions.sql b/src/test/regress/sql/misc_functions.sql
index b57f01f3e9..ffe7d5b4ce 100644
--- a/src/test/regress/sql/misc_functions.sql
+++ b/src/test/regress/sql/misc_functions.sql
@@ -236,4 +236,4 @@ SELECT * FROM pg_split_walfile_name('invalid');
 SELECT segment_number > 0 AS ok_segment_number, timeline_id
   FROM pg_split_walfile_name('000000010000000100000000');
 SELECT segment_number > 0 AS ok_segment_number, timeline_id
-  FROM pg_split_walfile_name('ffffffFF00000001000000af');
+  FROM pg_split_walfile_name('ffffffFF00000001000000af');
\ No newline at end of file
-- 
2.30.0.windows.2

