From 1398560d8c9cb316576375b7ceeec7a66b706f28 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Fri, 14 Apr 2023 08:59:03 +0000
Subject: [PATCH v12 3/4] pg_upgrade: Add check function for
 --include-logical-replication-slots option

---
 src/bin/pg_upgrade/check.c                    | 80 ++++++++++++++++++-
 .../t/003_logical_replication_slots.pl        | 30 ++++++-
 2 files changed, 108 insertions(+), 2 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index c7bed668ac..2e62eabe0a 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -9,7 +9,10 @@
 
 #include "postgres_fe.h"
 
+#include "access/xlogrecord.h"
+#include "access/xlog_internal.h"
 #include "catalog/pg_authid_d.h"
+#include "catalog/pg_control.h"
 #include "catalog/pg_collation.h"
 #include "fe_utils/string_utils.h"
 #include "mb/pg_wchar.h"
@@ -31,7 +34,7 @@ static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_for_parameter_settings(ClusterInfo *new_cluster);
-
+static void check_for_confirmed_flush_lsn(ClusterInfo *cluster);
 
 /*
  * fix_path_separator
@@ -108,6 +111,8 @@ check_and_dump_old_cluster(bool live_check)
 	check_for_composite_data_type_usage(&old_cluster);
 	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
+	if (user_opts.include_logical_slots)
+		check_for_confirmed_flush_lsn(&old_cluster);
 
 	/*
 	 * PG 16 increased the size of the 'aclitem' type, which breaks the on-disk
@@ -1441,6 +1446,10 @@ check_for_parameter_settings(ClusterInfo *new_cluster)
 	int			max_replication_slots;
 	char	   *wal_level;
 
+	/* --include-logical-replication-slots can be used since PG16. */
+	if (GET_MAJOR_VERSION(new_cluster->major_version < 1600))
+		return;
+
 	prep_status("Checking for logical replication slots");
 
 	res = executeQueryOrDie(conn, "SHOW max_replication_slots;");
@@ -1464,3 +1473,72 @@ check_for_parameter_settings(ClusterInfo *new_cluster)
 
 	check_ok();
 }
+
+/*
+ * Verify that all logical replication slots consumed all WALs, except a
+ * CHECKPOINT_SHUTDOWN record.
+ */
+static void
+check_for_confirmed_flush_lsn(ClusterInfo *cluster)
+{
+	int			i,
+				ntups,
+				i_slotname;
+	bool		is_error = false;
+	PGresult   *res;
+	DbInfo	   *active_db = &cluster->dbarr.dbs[0];
+	PGconn	   *conn = connectToServer(cluster, active_db->db_name);
+
+	Assert(user_opts.include_logical_slots);
+
+	/* --include-logical-replication-slots can be used since PG16. */
+	if (GET_MAJOR_VERSION(cluster->major_version < 1600))
+		return;
+
+	prep_status("Checking for logical replication slots");
+
+	/*
+	 * Check that all logical replication slots have reached the current WAL
+	 * position, except for the CHECKPOINT_SHUTDOWN record. Even if all WALs
+	 * are consumed before shutting down the node, the checkpointer generates
+	 * a CHECKPOINT_SHUTDOWN record at shutdown, which cannot be consumed by
+	 * any slots. Therefore, we must allow for a difference between
+	 * pg_current_wal_insert_lsn() and confirmed_flush_lsn.
+	 */
+#define SHUTDOWN_RECORD_SIZE  (SizeOfXLogRecord + \
+							   SizeOfXLogRecordDataHeaderShort + \
+							   sizeof(CheckPoint))
+
+	res = executeQueryOrDie(conn,
+							"SELECT slot_name FROM pg_catalog.pg_replication_slots "
+							"WHERE (pg_catalog.pg_current_wal_insert_lsn() - confirmed_flush_lsn) > %d "
+							"AND temporary = false AND wal_status IN ('reserved', 'extended');",
+							(int) (SizeOfXLogLongPHD + SHUTDOWN_RECORD_SIZE));
+
+#undef SHUTDOWN_RECORD_SIZE
+
+	ntups = PQntuples(res);
+	i_slotname = PQfnumber(res, "slot_name");
+
+	for (i = 0; i < ntups; i++)
+	{
+		char	   *slotname;
+
+		is_error = true;
+
+		slotname = PQgetvalue(res, i, i_slotname);
+
+		pg_log(PG_WARNING,
+			   "\nWARNING: logical replication slot \"%s\" has not consumed WALs yet",
+			   slotname);
+	}
+
+	PQclear(res);
+	PQfinish(conn);
+
+	if (is_error)
+		pg_fatal("--include-logical-replication-slots requires that all "
+				 "logical replication slots consumed all the WALs");
+
+	check_ok();
+}
diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
index 525a7704cf..21fefca084 100644
--- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
+++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl
@@ -85,11 +85,39 @@ $old_node->safe_psql(
 ]);
 
 my $result = $old_node->safe_psql('postgres',
-	"SELECT count (*) FROM pg_logical_slot_get_changes('test_slot', NULL, NULL)"
+	"SELECT count(*) FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL)"
 );
+
 is($result, qq(12), 'ensure WALs are not consumed yet');
 $old_node->stop;
 
+# Cause a failure at the start of pg_upgrade because test_slot does not
+# finish consuming all the WALs
+command_fails(
+	[
+		'pg_upgrade', '--no-sync',
+		'-d',         $old_node->data_dir,
+		'-D',         $new_node->data_dir,
+		'-b',         $bindir,
+		'-B',         $bindir,
+		'-s',         $new_node->host,
+		'-p',         $old_node->port,
+		'-P',         $new_node->port,
+		$mode,        '--include-logical-replication-slots',
+	],
+	'run of pg_upgrade of old node with idle replication slots');
+ok( -d $new_node->data_dir . "/pg_upgrade_output.d",
+	"pg_upgrade_output.d/ not removed after pg_upgrade failure");
+
+# Clean up
+rmtree($new_node->data_dir . "/pg_upgrade_output.d");
+
+$old_node->start;
+$old_node->safe_psql('postgres',
+	"SELECT count (*) FROM pg_logical_slot_get_changes('test_slot', NULL, NULL)"
+);
+$old_node->stop;
+
 # Actual run, pg_upgrade_output.d is removed at the end
 command_ok(
 	[
-- 
2.27.0

