From 18cc4b037143d576c025d3a37a8eccbcea66ce63 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 14 Apr 2023 08:59:03 +0000
Subject: [PATCH v8 3/3] pg_upgrade: Add check function for
 --include-logical-replication-slots option

---
 src/bin/pg_upgrade/check.c                    | 81 ++++++++++++++++++-
 .../t/003_logical_replication_slots.pl        | 38 ++++++++-
 2 files changed, 115 insertions(+), 4 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 46a7d64448..b1e1f516c9 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -9,7 +9,10 @@
 
 #include "postgres_fe.h"
 
+#include "access/xlogrecord.h"
+#include "access/xlog_internal.h"
 #include "catalog/pg_authid_d.h"
+#include "catalog/pg_control.h"
 #include "catalog/pg_collation.h"
 #include "fe_utils/string_utils.h"
 #include "mb/pg_wchar.h"
@@ -31,7 +34,7 @@ static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_for_parameter_settings(ClusterInfo *new_cluster);
-
+static void check_for_confirmed_flush_lsn(ClusterInfo *cluster);
 
 /*
  * fix_path_separator
@@ -104,6 +107,8 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_composite_data_type_usage(&old_cluster);
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
+	if (user_opts.include_logical_slots)
+		check_for_confirmed_flush_lsn(&old_cluster);
 
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the on-disk
@@ -1418,6 +1423,10 @@ check_for_parameter_settings(ClusterInfo *new_cluster)
 	int			max_replication_slots;
 	char	   *wal_level;
 
+	/* --include-logical-replication-slots can be used since PG16. */
+	if (GET_MAJOR_VERSION(new_cluster->major_version < 1600))
+		return;
+
 	prep_status("Checking for logical replication slots");
 
 	res = executeQueryOrDie(conn, "SHOW max_replication_slots;");
@@ -1441,3 +1450,73 @@ check_for_parameter_settings(ClusterInfo *new_cluster)
 
 	check_ok();
 }
+
+/*
+ * Verify that all logical replication slots consumed all WALs, except a
+ * CHECKPOINT_SHUTDOWN record.
+ */
+static void
+check_for_confirmed_flush_lsn(ClusterInfo *cluster)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	bool		is_error = false;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn = connectToServer(cluster, active_db->db_name);
+
+	Assert(user_opts.include_logical_slots);
+
+	/* --include-logical-replication-slots can be used since PG16. */
+	if (GET_MAJOR_VERSION(cluster->major_version < 1600))
+		return;
+
+	prep_status("Checking for logical replication slots");
+
+
+	/*
+	 * Check that all logical replication slots have reached the current WAL
+	 * position, except for the CHECKPOINT_SHUTDOWN record. Even if all WALs
+	 * are consumed before shutting down the node, the checkpointer generates
+	 * a CHECKPOINT_SHUTDOWN record at shutdown, which cannot be consumed by
+	 * any slots. Therefore, we must allow for a difference between
+	 * pg_current_wal_insert_lsn() and confirmed_flush_lsn.
+	 */
+#define SHUTDOWN_RECORD_SIZE  (SizeOfXLogRecord + \
+							   SizeOfXLogRecordDataHeaderShort + \
+							   sizeof(CheckPoint))
+
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE (pg_catalog.pg_current_wal_insert_lsn() - confirmed_flush_lsn) > %d "
+							"AND temporary = false AND wal_status IN ('reserved', 'extended');",
+							(int) (SizeOfXLogShortPHD + SHUTDOWN_RECORD_SIZE));
+
+#undef SHUTDOWN_RECORD_SIZE
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+	{
+		char	   *slotname;
+
+		is_error = true;
+
+		slotname = PQgetvalue(res, i, i_slotname);
+
+		pg_log(PG_WARNING,
+			   "\nWARNING: logical replication slot \"%s\" has not consumed WALs yet.",
+			   slotname);
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (is_error)
+		pg_fatal("--include-logical-replication-slot requires that all "
+				 "logical replication slots consumed all the WALs");
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index 1159487f94..cb7acb3302 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -85,6 +85,38 @@ $new_publisher->append_conf('postgresql.conf', "max_replication_slots = 10");
 
 $old_publisher->start;
 
+# Create a dummy slot on old publisher to fail the test
+$old_publisher->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('dropme_slot', 'pgoutput')");
+$old_publisher->safe_psql('postgres',
+	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+$old_publisher->stop;
+
+# Cause a failure at the start of pg_upgrade because dropme_slot is not
+# advanced till the INSERT statement
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_publisher->data_dir,
+		'-D',         $new_publisher->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_publisher->host,
+		'-p',         $old_publisher->port,
+		'-P',         $new_publisher->port,
+		$mode,        '--include-logical-replication-slots',
+		'--check'
+	],
+	'run of pg_upgrade of old publisher with idle replication slots');
+ok( -d $new_publisher->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_publisher->data_dir . "/pg_upgrade_output.d");
+$old_publisher->start;
+$old_publisher->safe_psql('postgres',
+	"SELECT pg_drop_replication_slot('dropme_slot')");
+
 # Setup logical replication
 my $old_connstr = $old_publisher->connstr . ' dbname=postgres';
 $old_publisher->safe_psql('postgres',
@@ -96,7 +128,7 @@ $subscriber->safe_psql('postgres',
 $subscriber->wait_for_subscription_sync($old_publisher, 'sub');
 
 my $result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
-is($result, qq(10), 'check initial rows on subscriber');
+is($result, qq(20), 'check initial rows on subscriber');
 
 # Define another replication slot which allows to decode prepared transactions
 $old_publisher->safe_psql('postgres',
@@ -140,11 +172,11 @@ $subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE");
 
 # Check whether changes on the new publisher get replicated to the subscriber
 $new_publisher->safe_psql('postgres',
-	"INSERT INTO tbl VALUES (generate_series(11, 20))");
+	"INSERT INTO tbl VALUES (generate_series(21, 30))");
 
 $new_publisher->wait_for_catchup('sub');
 
 $result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl");
-is($result, qq(20), 'check changes are shipped to subscriber');
+is($result, qq(30), 'check changes are shipped to subscriber');
 
 done_testing();
-- 
2.27.0

