From 393f49f28ad8e1a6740e745339c790b2f2b14723 Mon Sep 17 00:00:00 2001
From: Daniel Gustafsson <dgustafsson@postgresql.org>
Date: Mon, 19 Dec 2022 23:22:07 +0100
Subject: [PATCH] pg_upgrade: run all data type checks per connection

The checks for data type usage were each connecting to all databases
in the cluster and running their query. On cluster which have a lot
of databases this can become unnecessarily expensive. This moves the
checks to run in a single connection instead to minimize connection
setup/teardown overhead.
---
 src/bin/pg_upgrade/check.c      | 371 +++++++++++++++---------------
 src/bin/pg_upgrade/pg_upgrade.h |  28 ++-
 src/bin/pg_upgrade/version.c    | 394 ++++++++++++++------------------
 3 files changed, 373 insertions(+), 420 deletions(-)

diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 7cf68dc9af..7f52d4727c 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -26,15 +26,177 @@ static void check_for_isn_and_int8_passing_mismatch(ClusterInfo *cluster);
 static void check_for_user_defined_postfix_ops(ClusterInfo *cluster);
 static void check_for_incompatible_polymorphics(ClusterInfo *cluster);
 static void check_for_tables_with_oids(ClusterInfo *cluster);
-static void check_for_composite_data_type_usage(ClusterInfo *cluster);
-static void check_for_reg_data_type_usage(ClusterInfo *cluster);
-static void check_for_aclitem_data_type_usage(ClusterInfo *cluster);
-static void check_for_jsonb_9_4_usage(ClusterInfo *cluster);
+static bool check_for_aclitem_data_type_usage(ClusterInfo *cluster);
+static bool check_for_jsonb_9_4_usage(ClusterInfo *cluster);
 static void check_for_pg_role_prefix(ClusterInfo *cluster);
 static void check_for_new_tablespace_dir(ClusterInfo *new_cluster);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static char *get_canonical_locale_name(int category, const char *locale);
 
+static DataTypesUsageChecks data_types_usage_checks[] = {
+	/*
+	 * Look for composite types that were made during initdb *or* belong to
+	 * information_schema; that's important in case information_schema was
+	 * dropped and reloaded.
+	 *
+	 * The cutoff OID here should match the source cluster's value of
+	 * FirstNormalObjectId.  We hardcode it rather than using that C #define
+	 * because, if that #define is ever changed, our own version's value is
+	 * NOT what to use.  Eventually we may need a test on the source cluster's
+	 * version to select the correct value.
+	 */
+	{"Checking for system-defined composite types in user tables",
+	 "tables_using_composite.txt",
+
+	 "SELECT t.oid FROM pg_catalog.pg_type t "
+	 "LEFT JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid "
+	 " WHERE typtype = 'c' AND (t.oid < 16384 OR nspname = 'information_schema')",
+
+	 "Your installation contains system-defined composite type(s) in user tables.\n"
+	 "These type OIDs are not stable across PostgreSQL versions,\n"
+	 "so this cluster cannot currently be upgraded.  You can\n"
+	 "drop the problem columns and restart the upgrade.\n"
+	 "A list of the problem columns is in the file:\n",
+	 NULL},
+
+	/*
+	 * 9.3 -> 9.4
+	 *	Fully implement the 'line' data type in 9.4, which previously returned
+	 *	"not enabled" by default and was only functionally enabled with a
+	 *	compile-time switch; as of 9.4 "line" has a different on-disk
+	 *	representation format.
+	 */
+	{"Checking for incompatible \"line\" data type",
+	 "tables_using_line.txt",
+
+	 "SELECT 'pg_catalog.line'::pg_catalog.regtype AS oid",
+
+	 "your installation contains the \"line\" data type in user tables.\n"
+	 "this data type changed its internal and input/output format\n"
+	 "between your old and new versions so this\n"
+	 "cluster cannot currently be upgraded.  you can\n"
+	 "drop the problem columns and restart the upgrade.\n"
+	 "a list of the problem columns is in the file:\n",
+	 old_9_3_check_for_line_data_type_usage},
+
+	/*
+	 *	pg_upgrade only preserves these system values:
+	 *		pg_class.oid
+	 *		pg_type.oid
+	 *		pg_enum.oid
+	 *
+	 *	Many of the reg* data types reference system catalog info that is
+	 *	not preserved, and hence these data types cannot be used in user
+	 *	tables upgraded by pg_upgrade.
+	 */
+	{"Checking for reg* data types in user tables",
+	 "tables_using_reg.txt",
+	 /*
+	  * Note: older servers will not have all of these reg* types, so we have
+	  * to write the query like this rather than depending on casts to regtype.
+	  */
+	 "SELECT oid FROM pg_catalog.pg_type t "
+	 "WHERE t.typnamespace = "
+	 "        (SELECT oid FROM pg_catalog.pg_namespace "
+	 "         WHERE nspname = 'pg_catalog') "
+	 "  AND t.typname IN ( "
+	 /* pg_class.oid is preserved, so 'regclass' is OK */
+	 "           'regcollation', "
+	 "           'regconfig', "
+	 "           'regdictionary', "
+	 "           'regnamespace', "
+	 "           'regoper', "
+	 "           'regoperator', "
+	 "           'regproc', "
+	 "           'regprocedure' "
+	 /* pg_authid.oid is preserved, so 'regrole' is OK */
+	 /* pg_type.oid is (mostly) preserved, so 'regtype' is OK */
+	 "         )",
+
+	 "Your installation contains one of the reg* data types in user tables.\n"
+	 "These data types reference system OIDs that are not preserved by\n"
+	 "pg_upgrade, so this cluster cannot currently be upgraded.  You can\n"
+	 "drop the problem columns and restart the upgrade.\n"
+	 "A list of the problem columns is in the file:\n",
+	 NULL},
+
+	/*
+	 * PG 16 increased the size of the 'aclitem' type, which breaks the on-disk
+	 * format for existing data.
+	 */
+	{"Checking for incompatible aclitem data type in user tables",
+	 "tables_using_aclitem.txt",
+
+	 "SELECT 'pg_catalog.aclitem'::pg_catalog.regtype AS oid",
+
+	 "Your installation contains the \"aclitem\" data type in user tables.\n"
+	 "The internal format of \"aclitem\" changed in PostgreSQL version 16\n"
+	 "so this cluster cannot currently be upgraded.  You can drop the\n"
+	 "problem columns and restart the upgrade.  A list of the problem\n"
+	 "columns is in the file:\n",
+	 check_for_aclitem_data_type_usage},
+
+	/*
+	 * It's no longer allowed to create tables or views with "unknown"-type
+	 * columns.  We do not complain about views with such columns, because
+	 * they should get silently converted to "text" columns during the DDL
+	 * dump and reload; it seems unlikely to be worth making users do that
+	 * by hand.  However, if there's a table with such a column, the DDL
+	 * reload will fail, so we should pre-detect that rather than failing
+	 * mid-upgrade.  Worse, if there's a matview with such a column, the
+	 * DDL reload will silently change it to "text" which won't match the
+	 * on-disk storage (which is like "cstring").  So we *must* reject that.
+	 */
+	{"Checking for invalid \"unknown\" user columns",
+	 "tables_using_unknown.txt",
+
+	 "SELECT 'pg_catalog.unknown'::pg_catalog.regtype AS oid",
+
+	 "Your installation contains the \"unknown\" data type in user tables.\n"
+	 "This data type is no longer allowed in tables, so this\n"
+	 "cluster cannot currently be upgraded.  You can\n"
+	 "drop the problem columns and restart the upgrade.\n"
+	 "A list of the problem columns is in the file:\n",
+	 old_9_6_check_for_unknown_data_type_usage},
+
+	/*
+	 * PG 12 changed the 'sql_identifier' type storage to be based on name,
+	 * not varchar, which breaks on-disk format for existing data. So we need
+	 * to prevent upgrade when used in user objects (tables, indexes, ...).
+	 * In 12, the sql_identifier data type was switched from name to varchar,
+	 * which does affect the storage (name is by-ref, but not varlena). This
+	 * means user tables using sql_identifier for columns are broken because
+	 * the on-disk format is different.
+	 */
+	{"Checking for invalid \"sql_identifier\" user columns",
+	 "tables_using_sql_identifier.txt",
+
+	 "SELECT 'information_schema.sql_identifier'::pg_catalog.regtype AS oid",
+
+	 "Your installation contains the \"sql_identifier\" data type in user tables.\n"
+	 "The on-disk format for this data type has changed, so this\n"
+	 "cluster cannot currently be upgraded.  You can\n"
+	 "drop the problem columns and restart the upgrade.\n"
+	 "A list of the problem columns is in the file:\n",
+	 old_11_check_for_sql_identifier_data_type_usage},
+
+	/*
+	 * JSONB changed its storage format during 9.4 beta, so check for it.
+	 */
+	{"Checking for incompatible \"jsonb\" data type",
+	 "tables_using_jsonb.txt",
+
+	 "SELECT 'pg_catalog.jsonb'::pg_catalog.regtype AS oid",
+
+	 "Your installation contains the \"jsonb\" data type in user tables.\n"
+	 "The internal format of \"jsonb\" changed during 9.4 beta so this\n"
+	 "cluster cannot currently be upgraded.  You can\n"
+	 "drop the problem columns and restart the upgrade.\n"
+	 "A list of the problem columns is in the file:\n",
+	 check_for_jsonb_9_4_usage},
+
+	 {NULL, NULL, NULL, NULL, NULL}
+};
 
 /*
  * fix_path_separator
@@ -104,16 +266,9 @@ check_and_dump_old_cluster(bool live_check)
 	check_is_install_user(&old_cluster);
 	check_proper_datallowconn(&old_cluster);
 	check_for_prepared_transactions(&old_cluster);
-	check_for_composite_data_type_usage(&old_cluster);
-	check_for_reg_data_type_usage(&old_cluster);
 	check_for_isn_and_int8_passing_mismatch(&old_cluster);
 
-	/*
-	 * PG 16 increased the size of the 'aclitem' type, which breaks the on-disk
-	 * format for existing data.
-	 */
-	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1500)
-		check_for_aclitem_data_type_usage(&old_cluster);
+	check_for_data_types_usage(&old_cluster, data_types_usage_checks);
 
 	/*
 	 * PG 14 changed the function signature of encoding conversion functions.
@@ -145,21 +300,12 @@ check_and_dump_old_cluster(bool live_check)
 	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1100)
 		check_for_tables_with_oids(&old_cluster);
 
-	/*
-	 * PG 12 changed the 'sql_identifier' type storage to be based on name,
-	 * not varchar, which breaks on-disk format for existing data. So we need
-	 * to prevent upgrade when used in user objects (tables, indexes, ...).
-	 */
-	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1100)
-		old_11_check_for_sql_identifier_data_type_usage(&old_cluster);
-
 	/*
 	 * Pre-PG 10 allowed tables with 'unknown' type columns and non WAL logged
 	 * hash indexes
 	 */
 	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 906)
 	{
-		old_9_6_check_for_unknown_data_type_usage(&old_cluster);
 		if (user_opts.check)
 			old_9_6_invalidate_hash_indexes(&old_cluster, true);
 	}
@@ -168,14 +314,6 @@ check_and_dump_old_cluster(bool live_check)
 	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 905)
 		check_for_pg_role_prefix(&old_cluster);
 
-	if (GET_MAJOR_VERSION(old_cluster.major_version) == 904 &&
-		old_cluster.controldata.cat_ver < JSONB_FORMAT_CHANGE_CAT_VER)
-		check_for_jsonb_9_4_usage(&old_cluster);
-
-	/* Pre-PG 9.4 had a different 'line' data type internal format */
-	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 903)
-		old_9_3_check_for_line_data_type_usage(&old_cluster);
-
 	/*
 	 * While not a check option, we do this now because this is the only time
 	 * the old server is running.
@@ -1207,182 +1345,37 @@ check_for_tables_with_oids(ClusterInfo *cluster)
 }
 
 
-/*
- * check_for_composite_data_type_usage()
- *	Check for system-defined composite types used in user tables.
- *
- *	The OIDs of rowtypes of system catalogs and information_schema views
- *	can change across major versions; unlike user-defined types, we have
- *	no mechanism for forcing them to be the same in the new cluster.
- *	Hence, if any user table uses one, that's problematic for pg_upgrade.
- */
-static void
-check_for_composite_data_type_usage(ClusterInfo *cluster)
-{
-	bool		found;
-	Oid			firstUserOid;
-	char		output_path[MAXPGPATH];
-	char	   *base_query;
-
-	prep_status("Checking for system-defined composite types in user tables");
-
-	snprintf(output_path, sizeof(output_path), "%s/%s",
-			 log_opts.basedir,
-			 "tables_using_composite.txt");
-
-	/*
-	 * Look for composite types that were made during initdb *or* belong to
-	 * information_schema; that's important in case information_schema was
-	 * dropped and reloaded.
-	 *
-	 * The cutoff OID here should match the source cluster's value of
-	 * FirstNormalObjectId.  We hardcode it rather than using that C #define
-	 * because, if that #define is ever changed, our own version's value is
-	 * NOT what to use.  Eventually we may need a test on the source cluster's
-	 * version to select the correct value.
-	 */
-	firstUserOid = 16384;
-
-	base_query = psprintf("SELECT t.oid FROM pg_catalog.pg_type t "
-						  "LEFT JOIN pg_catalog.pg_namespace n ON t.typnamespace = n.oid "
-						  " WHERE typtype = 'c' AND (t.oid < %u OR nspname = 'information_schema')",
-						  firstUserOid);
-
-	found = check_for_data_types_usage(cluster, base_query, output_path);
-
-	free(base_query);
-
-	if (found)
-	{
-		pg_log(PG_REPORT, "fatal");
-		pg_fatal("Your installation contains system-defined composite type(s) in user tables.\n"
-				 "These type OIDs are not stable across PostgreSQL versions,\n"
-				 "so this cluster cannot currently be upgraded.  You can\n"
-				 "drop the problem columns and restart the upgrade.\n"
-				 "A list of the problem columns is in the file:\n"
-				 "    %s", output_path);
-	}
-	else
-		check_ok();
-}
-
-/*
- * check_for_reg_data_type_usage()
- *	pg_upgrade only preserves these system values:
- *		pg_class.oid
- *		pg_type.oid
- *		pg_enum.oid
- *
- *	Many of the reg* data types reference system catalog info that is
- *	not preserved, and hence these data types cannot be used in user
- *	tables upgraded by pg_upgrade.
- */
-static void
-check_for_reg_data_type_usage(ClusterInfo *cluster)
-{
-	bool		found;
-	char		output_path[MAXPGPATH];
-
-	prep_status("Checking for reg* data types in user tables");
-
-	snprintf(output_path, sizeof(output_path), "%s/%s",
-			 log_opts.basedir,
-			 "tables_using_reg.txt");
-
-	/*
-	 * Note: older servers will not have all of these reg* types, so we have
-	 * to write the query like this rather than depending on casts to regtype.
-	 */
-	found = check_for_data_types_usage(cluster,
-									   "SELECT oid FROM pg_catalog.pg_type t "
-									   "WHERE t.typnamespace = "
-									   "        (SELECT oid FROM pg_catalog.pg_namespace "
-									   "         WHERE nspname = 'pg_catalog') "
-									   "  AND t.typname IN ( "
-	/* pg_class.oid is preserved, so 'regclass' is OK */
-									   "           'regcollation', "
-									   "           'regconfig', "
-									   "           'regdictionary', "
-									   "           'regnamespace', "
-									   "           'regoper', "
-									   "           'regoperator', "
-									   "           'regproc', "
-									   "           'regprocedure' "
-	/* pg_authid.oid is preserved, so 'regrole' is OK */
-	/* pg_type.oid is (mostly) preserved, so 'regtype' is OK */
-									   "         )",
-									   output_path);
-
-	if (found)
-	{
-		pg_log(PG_REPORT, "fatal");
-		pg_fatal("Your installation contains one of the reg* data types in user tables.\n"
-				 "These data types reference system OIDs that are not preserved by\n"
-				 "pg_upgrade, so this cluster cannot currently be upgraded.  You can\n"
-				 "drop the problem columns and restart the upgrade.\n"
-				 "A list of the problem columns is in the file:\n"
-				 "    %s", output_path);
-	}
-	else
-		check_ok();
-}
-
 /*
  * check_for_aclitem_data_type_usage
  *
- *	aclitem changed its storage format in 16, so check for it.
+ *     aclitem changed its storage format in 16, so check for it.
  */
-static void
+static bool
 check_for_aclitem_data_type_usage(ClusterInfo *cluster)
 {
-	char		output_path[MAXPGPATH];
-
-	prep_status("Checking for incompatible aclitem data type in user tables");
-
-	snprintf(output_path, sizeof(output_path), "tables_using_aclitem.txt");
+	/*
+	 * PG 16 increased the size of the 'aclitem' type, which breaks the on-disk
+	 * format for existing data.
+	 */
+	if (GET_MAJOR_VERSION(cluster->major_version) <= 1500)
+		return true;
 
-	if (check_for_data_type_usage(cluster, "pg_catalog.aclitem", output_path))
-	{
-		pg_log(PG_REPORT, "fatal");
-		pg_fatal("Your installation contains the \"aclitem\" data type in user tables.\n"
-				 "The internal format of \"aclitem\" changed in PostgreSQL version 16\n"
-				 "so this cluster cannot currently be upgraded.  You can drop the\n"
-				 "problem columns and restart the upgrade.  A list of the problem\n"
-				 "columns is in the file:\n"
-				 "    %s", output_path);
-	}
-	else
-		check_ok();
+	return false;
 }
 
 /*
  * check_for_jsonb_9_4_usage()
  *
- *	JSONB changed its storage format during 9.4 beta, so check for it.
+ *     JSONB changed its storage format during 9.4 beta, so check for it.
  */
-static void
+static bool
 check_for_jsonb_9_4_usage(ClusterInfo *cluster)
 {
-	char		output_path[MAXPGPATH];
-
-	prep_status("Checking for incompatible \"jsonb\" data type");
-
-	snprintf(output_path, sizeof(output_path), "%s/%s",
-			 log_opts.basedir,
-			 "tables_using_jsonb.txt");
+	if (GET_MAJOR_VERSION(cluster->major_version) == 904 &&
+		cluster->controldata.cat_ver < JSONB_FORMAT_CHANGE_CAT_VER)
+		return true;
 
-	if (check_for_data_type_usage(cluster, "pg_catalog.jsonb", output_path))
-	{
-		pg_log(PG_REPORT, "fatal");
-		pg_fatal("Your installation contains the \"jsonb\" data type in user tables.\n"
-				 "The internal format of \"jsonb\" changed during 9.4 beta so this\n"
-				 "cluster cannot currently be upgraded.  You can\n"
-				 "drop the problem columns and restart the upgrade.\n"
-				 "A list of the problem columns is in the file:\n"
-				 "    %s", output_path);
-	}
-	else
-		check_ok();
+	return false;
 }
 
 /*
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 5f2a116f23..18c3217d61 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -320,6 +320,21 @@ typedef struct
 } OSInfo;
 
 
+/* Function signature for data type check version hook */
+typedef bool (*DataTypesUsageVersionCheck)(ClusterInfo *cluster);
+
+/*
+ * DataTypesUsageChecks
+ */
+typedef struct
+{
+	const char *status;			/* status line to print to the user */
+	const char *script_filename;	/* filename to store report to */
+	const char *base_query;		/* Query to extract the oid of the datatype */
+	const char *fatal_check;	/* Text to store to report in case of error */
+	DataTypesUsageVersionCheck version_hook;
+} DataTypesUsageChecks;
+
 /*
  * Global variables
  */
@@ -442,18 +457,13 @@ unsigned int str2uint(const char *str);
 
 /* version.c */
 
-bool		check_for_data_types_usage(ClusterInfo *cluster,
-									   const char *base_query,
-									   const char *output_path);
-bool		check_for_data_type_usage(ClusterInfo *cluster,
-									  const char *type_name,
-									  const char *output_path);
-void		old_9_3_check_for_line_data_type_usage(ClusterInfo *cluster);
-void		old_9_6_check_for_unknown_data_type_usage(ClusterInfo *cluster);
+void		check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks);
+bool		old_9_3_check_for_line_data_type_usage(ClusterInfo *cluster);
+bool		old_9_6_check_for_unknown_data_type_usage(ClusterInfo *cluster);
+bool		old_11_check_for_sql_identifier_data_type_usage(ClusterInfo *cluster);
 void		old_9_6_invalidate_hash_indexes(ClusterInfo *cluster,
 											bool check_mode);
 
-void		old_11_check_for_sql_identifier_data_type_usage(ClusterInfo *cluster);
 void		report_extension_updates(ClusterInfo *cluster);
 
 /* parallel.c */
diff --git a/src/bin/pg_upgrade/version.c b/src/bin/pg_upgrade/version.c
index 403a6d7cfa..64dec909fa 100644
--- a/src/bin/pg_upgrade/version.c
+++ b/src/bin/pg_upgrade/version.c
@@ -13,12 +13,17 @@
 #include "fe_utils/string_utils.h"
 #include "pg_upgrade.h"
 
-
 /*
  * check_for_data_types_usage()
  *	Detect whether there are any stored columns depending on given type(s)
  *
- * If so, write a report to the given file name, and return true.
+ * If so, write a report to the given file name and signal a failure to the
+ * user.
+ *
+ * The checks to run are defined in a DataTypesUsageChecks structure where
+ * each check has a metadata for explaining errors to the user, a base_query,
+ * a report filename and a function pointer hook for validating if the check
+ * should be executed given the cluster at hand.
  *
  * base_query should be a SELECT yielding a single column named "oid",
  * containing the pg_type OIDs of one or more types that are known to have
@@ -27,218 +32,184 @@
  * We check for the type(s) in tables, matviews, and indexes, but not views;
  * there's no storage involved in a view.
  */
-bool
-check_for_data_types_usage(ClusterInfo *cluster,
-						   const char *base_query,
-						   const char *output_path)
+void
+check_for_data_types_usage(ClusterInfo *cluster, DataTypesUsageChecks *checks)
 {
-	bool		found = false;
-	FILE	   *script = NULL;
-	int			dbnum;
 
-	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+	prep_status("Checking for data type usage");
+
+	/*
+	 * Connect to each database in the cluster and run all defined checks
+	 * against that database before trying the next one.
+	 */
+	for (int dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
 	{
-		DbInfo	   *active_db = &cluster->dbarr.dbs[dbnum];
-		PGconn	   *conn = connectToServer(cluster, active_db->db_name);
-		PQExpBufferData querybuf;
-		PGresult   *res;
-		bool		db_used = false;
-		int			ntups;
-		int			rowno;
-		int			i_nspname,
-					i_relname,
-					i_attname;
-
-		/*
-		 * The type(s) of interest might be wrapped in a domain, array,
-		 * composite, or range, and these container types can be nested (to
-		 * varying extents depending on server version, but that's not of
-		 * concern here).  To handle all these cases we need a recursive CTE.
-		 */
-		initPQExpBuffer(&querybuf);
-		appendPQExpBuffer(&querybuf,
-						  "WITH RECURSIVE oids AS ( "
-		/* start with the type(s) returned by base_query */
-						  "	%s "
-						  "	UNION ALL "
-						  "	SELECT * FROM ( "
-		/* inner WITH because we can only reference the CTE once */
-						  "		WITH x AS (SELECT oid FROM oids) "
-		/* domains on any type selected so far */
-						  "			SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
-						  "			UNION ALL "
-		/* arrays over any type selected so far */
-						  "			SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
-						  "			UNION ALL "
-		/* composite types containing any type selected so far */
-						  "			SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
-						  "			WHERE t.typtype = 'c' AND "
-						  "				  t.oid = c.reltype AND "
-						  "				  c.oid = a.attrelid AND "
-						  "				  NOT a.attisdropped AND "
-						  "				  a.atttypid = x.oid "
-						  "			UNION ALL "
-		/* ranges containing any type selected so far */
-						  "			SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
-						  "			WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
-						  "	) foo "
-						  ") "
-		/* now look for stored columns of any such type */
-						  "SELECT n.nspname, c.relname, a.attname "
-						  "FROM	pg_catalog.pg_class c, "
-						  "		pg_catalog.pg_namespace n, "
-						  "		pg_catalog.pg_attribute a "
-						  "WHERE	c.oid = a.attrelid AND "
-						  "		NOT a.attisdropped AND "
-						  "		a.atttypid IN (SELECT oid FROM oids) AND "
-						  "		c.relkind IN ("
-						  CppAsString2(RELKIND_RELATION) ", "
-						  CppAsString2(RELKIND_MATVIEW) ", "
-						  CppAsString2(RELKIND_INDEX) ") AND "
-						  "		c.relnamespace = n.oid AND "
-		/* exclude possible orphaned temp tables */
-						  "		n.nspname !~ '^pg_temp_' AND "
-						  "		n.nspname !~ '^pg_toast_temp_' AND "
-		/* exclude system catalogs, too */
-						  "		n.nspname NOT IN ('pg_catalog', 'information_schema')",
-						  base_query);
-
-		res = executeQueryOrDie(conn, "%s", querybuf.data);
+		DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
+		PGconn *conn = connectToServer(cluster, active_db->db_name);
+		DataTypesUsageChecks *cur_check = checks;
 
-		ntups = PQntuples(res);
-		i_nspname = PQfnumber(res, "nspname");
-		i_relname = PQfnumber(res, "relname");
-		i_attname = PQfnumber(res, "attname");
-		for (rowno = 0; rowno < ntups; rowno++)
+		while (cur_check->status)
 		{
-			found = true;
-			if (script == NULL && (script = fopen_priv(output_path, "w")) == NULL)
-				pg_fatal("could not open file \"%s\": %s", output_path,
-						 strerror(errno));
-			if (!db_used)
+			PGresult *res;
+			int 	ntups;
+			int 	i_nspname;
+			int 	i_relname;
+			int 	i_attname;
+			FILE   *script = NULL;
+			bool 	db_used = false;
+			char	output_path[MAXPGPATH];
+			bool	found = false;
+
+			/*
+			 * Make sure that the check applies to the current cluster version
+			 * and skip if not. If no check hook has been defined we run the
+			 * check for all versions.
+			 */
+			if (cur_check->version_hook && !cur_check->version_hook(cluster))
 			{
-				fprintf(script, "In database: %s\n", active_db->db_name);
-				db_used = true;
-			}
-			fprintf(script, "  %s.%s.%s\n",
-					PQgetvalue(res, rowno, i_nspname),
-					PQgetvalue(res, rowno, i_relname),
-					PQgetvalue(res, rowno, i_attname));
-		}
-
-		PQclear(res);
-
-		termPQExpBuffer(&querybuf);
+				cur_check++;
+				continue;
+	 		}
+
+			snprintf(output_path, sizeof(output_path), "%s/%s",
+					 log_opts.basedir,
+					 cur_check->script_filename);
+
+			/*
+			 * The type(s) of interest might be wrapped in a domain, array,
+			 * composite, or range, and these container types can be nested (to
+			 * varying extents depending on server version, but that's not of
+			 * concern here).  To handle all these cases we need a recursive CTE.
+			 */
+			res = executeQueryOrDie(conn,
+							  "WITH RECURSIVE oids AS ( "
+			/* start with the type(s) returned by base_query */
+							  "	%s "
+					  "	UNION ALL "
+					  "	SELECT * FROM ( "
+	/* inner WITH because we can only reference the CTE once */
+					  "		WITH x AS (SELECT oid FROM oids) "
+	/* domains on any type selected so far */
+					  "			SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typbasetype = x.oid AND typtype = 'd' "
+					  "			UNION ALL "
+	/* arrays over any type selected so far */
+					  "			SELECT t.oid FROM pg_catalog.pg_type t, x WHERE typelem = x.oid AND typtype = 'b' "
+					  "			UNION ALL "
+	/* composite types containing any type selected so far */
+					  "			SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_class c, pg_catalog.pg_attribute a, x "
+					  "			WHERE t.typtype = 'c' AND "
+					  "				  t.oid = c.reltype AND "
+					  "				  c.oid = a.attrelid AND "
+					  "				  NOT a.attisdropped AND "
+					  "				  a.atttypid = x.oid "
+					  "			UNION ALL "
+	/* ranges containing any type selected so far */
+					  "			SELECT t.oid FROM pg_catalog.pg_type t, pg_catalog.pg_range r, x "
+					  "			WHERE t.typtype = 'r' AND r.rngtypid = t.oid AND r.rngsubtype = x.oid"
+					  "	) foo "
+					  ") "
+	/* now look for stored columns of any such type */
+					  "SELECT n.nspname, c.relname, a.attname "
+					  "FROM	pg_catalog.pg_class c, "
+					  "		pg_catalog.pg_namespace n, "
+					  "		pg_catalog.pg_attribute a "
+					  "WHERE	c.oid = a.attrelid AND "
+					  "		NOT a.attisdropped AND "
+					  "		a.atttypid IN (SELECT oid FROM oids) AND "
+					  "		c.relkind IN ("
+					  CppAsString2(RELKIND_RELATION) ", "
+					  CppAsString2(RELKIND_MATVIEW) ", "
+					  CppAsString2(RELKIND_INDEX) ") AND "
+					  "		c.relnamespace = n.oid AND "
+	/* exclude possible orphaned temp tables */
+					  "		n.nspname !~ '^pg_temp_' AND "
+					  "		n.nspname !~ '^pg_toast_temp_' AND "
+	/* exclude system catalogs, too */
+					  "		n.nspname NOT IN ('pg_catalog', 'information_schema')",
+					  cur_check->base_query);
+
+			ntups = PQntuples(res);
+
+			/*
+			 * The datatype was found, so extract the data and log to the
+			 * requested filename. We need to open the file for appending
+			 * since the check might have already found the type in another
+			 * database earlier in the loop.
+			 */
+			if (ntups)
+			{
+				i_nspname = PQfnumber(res, "nspname");
+				i_relname = PQfnumber(res, "relname");
+				i_attname = PQfnumber(res, "attname");
 
-		PQfinish(conn);
-	}
+				for (int rowno = 0; rowno < ntups; rowno++)
+				{
+					found = true;
+					if (script == NULL && (script = fopen_priv(output_path, "a")) == NULL)
+						pg_fatal("could not open file \"%s\": %s",
+								 output_path,
+								 strerror(errno));
+					if (!db_used)
+					{
+						fprintf(script, "In database: %s\n", active_db->db_name);
+						db_used = true;
+					}
+					fprintf(script, " %s.%s.%s.\n",
+							PQgetvalue(res, rowno, i_nspname),
+							PQgetvalue(res, rowno, i_relname),
+							PQgetvalue(res, rowno, i_attname));
+				}
 
-	if (script)
-		fclose(script);
+				if (script)
+				{
+					fclose(script);
+					script = NULL;
+				}
+			}
 
-	return found;
-}
+			PQclear(res);
 
-/*
- * check_for_data_type_usage()
- *	Detect whether there are any stored columns depending on the given type
- *
- * If so, write a report to the given file name, and return true.
- *
- * type_name should be a fully qualified type name.  This is just a
- * trivial wrapper around check_for_data_types_usage() to convert a
- * type name into a base query.
- */
-bool
-check_for_data_type_usage(ClusterInfo *cluster,
-						  const char *type_name,
-						  const char *output_path)
-{
-	bool		found;
-	char	   *base_query;
+			/*
+			 * If the check failed, terminate the umbrella status and print
+			 * the specific status line of the check to indicate which it was
+			 * before terminating with the detailed error message.
+			 */
+			if (found)
+			{
+				PQfinish(conn);
 
-	base_query = psprintf("SELECT '%s'::pg_catalog.regtype AS oid",
-						  type_name);
+				report_status(PG_REPORT, "failed");
+				prep_status("%s", cur_check->status);
+				pg_log(PG_REPORT, "fatal");
+				pg_fatal("%s    %s", cur_check->fatal_check, output_path);
+			}
 
-	found = check_for_data_types_usage(cluster, base_query, output_path);
+			cur_check++;
+		}
 
-	free(base_query);
+		PQfinish(conn);
+	}
 
-	return found;
+	check_ok();
 }
 
-
-/*
- * old_9_3_check_for_line_data_type_usage()
- *	9.3 -> 9.4
- *	Fully implement the 'line' data type in 9.4, which previously returned
- *	"not enabled" by default and was only functionally enabled with a
- *	compile-time switch; as of 9.4 "line" has a different on-disk
- *	representation format.
- */
-void
+bool
 old_9_3_check_for_line_data_type_usage(ClusterInfo *cluster)
 {
-	char		output_path[MAXPGPATH];
-
-	prep_status("Checking for incompatible \"line\" data type");
+	/* Pre-PG 9.4 had a different 'line' data type internal format */
+	if (GET_MAJOR_VERSION(cluster->major_version) <= 903)
+		return true;
 
-	snprintf(output_path, sizeof(output_path), "%s/%s",
-			 log_opts.basedir,
-			 "tables_using_line.txt");
-
-	if (check_for_data_type_usage(cluster, "pg_catalog.line", output_path))
-	{
-		pg_log(PG_REPORT, "fatal");
-		pg_fatal("Your installation contains the \"line\" data type in user tables.\n"
-				 "This data type changed its internal and input/output format\n"
-				 "between your old and new versions so this\n"
-				 "cluster cannot currently be upgraded.  You can\n"
-				 "drop the problem columns and restart the upgrade.\n"
-				 "A list of the problem columns is in the file:\n"
-				 "    %s", output_path);
-	}
-	else
-		check_ok();
+	return false;
 }
 
-
-/*
- * old_9_6_check_for_unknown_data_type_usage()
- *	9.6 -> 10
- *	It's no longer allowed to create tables or views with "unknown"-type
- *	columns.  We do not complain about views with such columns, because
- *	they should get silently converted to "text" columns during the DDL
- *	dump and reload; it seems unlikely to be worth making users do that
- *	by hand.  However, if there's a table with such a column, the DDL
- *	reload will fail, so we should pre-detect that rather than failing
- *	mid-upgrade.  Worse, if there's a matview with such a column, the
- *	DDL reload will silently change it to "text" which won't match the
- *	on-disk storage (which is like "cstring").  So we *must* reject that.
- */
-void
+bool
 old_9_6_check_for_unknown_data_type_usage(ClusterInfo *cluster)
 {
-	char		output_path[MAXPGPATH];
-
-	prep_status("Checking for invalid \"unknown\" user columns");
-
-	snprintf(output_path, sizeof(output_path), "%s/%s",
-			 log_opts.basedir,
-			 "tables_using_unknown.txt");
-
-	if (check_for_data_type_usage(cluster, "pg_catalog.unknown", output_path))
-	{
-		pg_log(PG_REPORT, "fatal");
-		pg_fatal("Your installation contains the \"unknown\" data type in user tables.\n"
-				 "This data type is no longer allowed in tables, so this\n"
-				 "cluster cannot currently be upgraded.  You can\n"
-				 "drop the problem columns and restart the upgrade.\n"
-				 "A list of the problem columns is in the file:\n"
-				 "    %s", output_path);
-	}
-	else
-		check_ok();
+	/* Pre-PG 10 allowed tables with 'unknown' type columns */
+	if (GET_MAJOR_VERSION(cluster->major_version) <= 906)
+		return true;
+	return false;
 }
 
 /*
@@ -353,41 +324,20 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool check_mode)
 		check_ok();
 }
 
-/*
- * old_11_check_for_sql_identifier_data_type_usage()
- *	11 -> 12
- *	In 12, the sql_identifier data type was switched from name to varchar,
- *	which does affect the storage (name is by-ref, but not varlena). This
- *	means user tables using sql_identifier for columns are broken because
- *	the on-disk format is different.
- */
-void
+bool
 old_11_check_for_sql_identifier_data_type_usage(ClusterInfo *cluster)
 {
-	char		output_path[MAXPGPATH];
-
-	prep_status("Checking for invalid \"sql_identifier\" user columns");
-
-	snprintf(output_path, sizeof(output_path), "%s/%s",
-			 log_opts.basedir,
-			 "tables_using_sql_identifier.txt");
-
-	if (check_for_data_type_usage(cluster, "information_schema.sql_identifier",
-								  output_path))
-	{
-		pg_log(PG_REPORT, "fatal");
-		pg_fatal("Your installation contains the \"sql_identifier\" data type in user tables.\n"
-				 "The on-disk format for this data type has changed, so this\n"
-				 "cluster cannot currently be upgraded.  You can\n"
-				 "drop the problem columns and restart the upgrade.\n"
-				 "A list of the problem columns is in the file:\n"
-				 "    %s", output_path);
-	}
-	else
-		check_ok();
+	/*
+	 * PG 12 changed the 'sql_identifier' type storage to be based on name,
+	 * not varchar, which breaks on-disk format for existing data. So we need
+	 * to prevent upgrade when used in user objects (tables, indexes, ...).
+	 */
+	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1100)
+		return true;
+
+	return false;
 }
 
-
 /*
  * report_extension_updates()
  *	Report extensions that should be updated.
-- 
2.32.1 (Apple Git-133)

