A number of pg_upgrade steps require connecting to each database and
running a query.  When there are many databases, these steps are
particularly time-consuming, especially since this is done sequentially in
a single process.  At a quick glance, I see the following such steps:

        * create_logical_replication_slots
        * check_for_data_types_usage
        * check_for_isn_and_int8_passing_mismatch
        * check_for_user_defined_postfix_ops
        * check_for_incompatible_polymorphics
        * check_for_tables_with_oids
        * check_for_user_defined_encoding_conversions
        * check_old_cluster_subscription_state
        * get_loadable_libraries
        * get_db_rel_and_slot_infos
        * old_9_6_invalidate_hash_indexes
        * report_extension_updates

I set out to parallelize these kinds of steps via multiple threads or
processes, but I ended up realizing that we could likely achieve much of
the same gain with libpq's asynchronous APIs.  Specifically, both
establishing the connections and running the queries can be done without
blocking, so we can just loop over a handful of slots and advance a simple
state machine for each.  The attached is a proof-of-concept grade patch for
doing this for get_db_rel_and_slot_infos(), which yielded the following
results on my laptop for "pg_upgrade --link --sync-method=syncfs --jobs 8"
for a cluster with 10K empty databases.

        total pg_upgrade_time:
        * HEAD:  14m 8s
        * patch: 10m 58s

        get_db_rel_and_slot_infos() on old cluster:
        * HEAD:  2m 45s
        * patch: 36s

        get_db_rel_and_slot_infos() on new cluster:
        * HEAD:  1m 46s
        * patch: 29s

I am posting this early to get thoughts on the general approach.  If we
proceeded with this strategy, I'd probably create some generic tooling that
each relevant step would provide a set of callback functions.

-- 
Nathan Bossart
Amazon Web Services: https://aws.amazon.com
>From 05a9903295cb3b57ca9144217e89f0aac27277b5 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <nat...@postgresql.org>
Date: Wed, 15 May 2024 12:07:10 -0500
Subject: [PATCH v1 1/1] parallel get relinfos

---
 src/bin/pg_upgrade/info.c        | 266 +++++++++++++++++++++++--------
 src/tools/pgindent/typedefs.list |   1 +
 2 files changed, 202 insertions(+), 65 deletions(-)

diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 95c22a7200..bb28e262c7 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -11,6 +11,7 @@
 
 #include "access/transam.h"
 #include "catalog/pg_class_d.h"
+#include "fe_utils/string_utils.h"
 #include "pg_upgrade.h"
 
 static void create_rel_filename_map(const char *old_data, const char *new_data,
@@ -22,13 +23,16 @@ static void report_unmatched_relation(const RelInfo *rel, const DbInfo *db,
 static void free_db_and_rel_infos(DbInfoArr *db_arr);
 static void get_template0_info(ClusterInfo *cluster);
 static void get_db_infos(ClusterInfo *cluster);
-static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
+static void start_rel_infos_query(PGconn *conn);
+static void get_rel_infos_result(PGconn *conn, DbInfo *dbinfo);
 static void free_rel_infos(RelInfoArr *rel_arr);
 static void print_db_infos(DbInfoArr *db_arr);
 static void print_rel_infos(RelInfoArr *rel_arr);
 static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check);
-static void get_db_subscription_count(DbInfo *dbinfo);
+static void start_old_cluster_logical_slot_infos_query(PGconn *conn, bool live_check);
+static void get_old_cluster_logical_slot_infos_result(PGconn *conn, DbInfo *dbinfo);
+static void start_db_sub_count_query(PGconn *conn, DbInfo *dbinfo);
+static void get_db_sub_count_result(PGconn *conn, DbInfo *dbinfo);
 
 
 /*
@@ -268,6 +272,16 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db)
 			   reloid, db->db_name, reldesc);
 }
 
+typedef enum
+{
+	UNUSED,
+	CONN_STARTED,
+	CONNECTING,
+	STARTED_RELINFO_QUERY,
+	STARTED_LOGICAL_QUERY,
+	STARTED_SUBSCRIPTION_QUERY,
+} InfoState;
+
 /*
  * get_db_rel_and_slot_infos()
  *
@@ -279,7 +293,12 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo *db, bool is_new_db)
 void
 get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
 {
-	int			dbnum;
+	int			dbnum = 0;
+	int			dbnum_proc = 0;
+	InfoState  *states;
+	int		   *dbs;
+	PGconn	  **conns;
+	int			jobs = (user_opts.jobs < 1) ? 1 : user_opts.jobs;
 
 	if (cluster->dbarr.dbs != NULL)
 		free_db_and_rel_infos(&cluster->dbarr);
@@ -287,20 +306,103 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
 	get_template0_info(cluster);
 	get_db_infos(cluster);
 
-	for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
-	{
-		DbInfo	   *pDbInfo = &cluster->dbarr.dbs[dbnum];
+	states = (InfoState *) pg_malloc(sizeof(InfoState *) * jobs);
+	dbs = (int *) pg_malloc(sizeof(int) * jobs);
+	conns = (PGconn **) pg_malloc(sizeof(PGconn *) * jobs);
 
-		get_rel_infos(cluster, pDbInfo);
+	for (int i = 0; i < jobs; i++)
+		states[i] = UNUSED;
 
-		/*
-		 * Retrieve the logical replication slots infos and the subscriptions
-		 * count for the old cluster.
-		 */
-		if (cluster == &old_cluster)
+	while (dbnum < cluster->dbarr.ndbs)
+	{
+		for (int i = 0; i < jobs; i++)
 		{
-			get_old_cluster_logical_slot_infos(pDbInfo, live_check);
-			get_db_subscription_count(pDbInfo);
+			switch (states[i])
+			{
+				case UNUSED:
+					if (dbnum_proc < cluster->dbarr.ndbs)
+					{
+						PQExpBufferData conn_opts;
+
+						dbs[i] = dbnum_proc++;
+
+						/* Build connection string with proper quoting */
+						initPQExpBuffer(&conn_opts);
+						appendPQExpBufferStr(&conn_opts, "dbname=");
+						appendConnStrVal(&conn_opts, cluster->dbarr.dbs[dbs[i]].db_name);
+						appendPQExpBufferStr(&conn_opts, " user=");
+						appendConnStrVal(&conn_opts, os_info.user);
+						appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
+						if (cluster->sockdir)
+						{
+							appendPQExpBufferStr(&conn_opts, " host=");
+							appendConnStrVal(&conn_opts, cluster->sockdir);
+						}
+
+						conns[i] = PQconnectStart(conn_opts.data);
+						termPQExpBuffer(&conn_opts);
+						states[i] = CONNECTING;
+					}
+					break;
+				case CONNECTING:
+					if (PQconnectPoll(conns[i]) == PGRES_POLLING_FAILED)
+					{
+						pg_log(PG_REPORT, "%s", PQerrorMessage(conns[i]));
+						exit(1);
+					}
+					if (PQconnectPoll(conns[i]) == PGRES_POLLING_OK)
+						states[i] = CONN_STARTED;
+					break;
+				case CONN_STARTED:
+					if (PQstatus(conns[i]) == CONNECTION_OK)
+					{
+						start_rel_infos_query(conns[i]);
+						states[i] = STARTED_RELINFO_QUERY;
+					}
+					break;
+				case STARTED_RELINFO_QUERY:
+					if (PQisBusy(conns[i]))
+						PQconsumeInput(conns[i]);
+					else
+					{
+						get_rel_infos_result(conns[i], &cluster->dbarr.dbs[dbs[i]]);
+
+						if (cluster == &old_cluster &&
+							GET_MAJOR_VERSION(old_cluster.major_version) >= 1700)
+						{
+							start_old_cluster_logical_slot_infos_query(conns[i], live_check);
+							states[i] = STARTED_LOGICAL_QUERY;
+						}
+						else
+						{
+							dbnum++;
+							PQfinish(conns[i]);
+							states[i] = UNUSED;
+						}
+					}
+					break;
+				case STARTED_LOGICAL_QUERY:
+					if (PQisBusy(conns[i]))
+						PQconsumeInput(conns[i]);
+					else
+					{
+						get_old_cluster_logical_slot_infos_result(conns[i], &cluster->dbarr.dbs[dbs[i]]);
+						start_db_sub_count_query(conns[i], &cluster->dbarr.dbs[dbs[i]]);
+						states[i] = STARTED_SUBSCRIPTION_QUERY;
+					}
+					break;
+				case STARTED_SUBSCRIPTION_QUERY:
+					if (PQisBusy(conns[i]))
+						PQconsumeInput(conns[i]);
+					else
+					{
+						get_db_sub_count_result(conns[i], &cluster->dbarr.dbs[dbs[i]]);
+						dbnum++;
+						PQfinish(conns[i]);
+						states[i] = UNUSED;
+					}
+					break;
+			}
 		}
 	}
 
@@ -450,29 +552,9 @@ get_db_infos(ClusterInfo *cluster)
  * This allows later processing to match up old and new databases efficiently.
  */
 static void
-get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
+start_rel_infos_query(PGconn *conn)
 {
-	PGconn	   *conn = connectToServer(cluster,
-									   dbinfo->db_name);
-	PGresult   *res;
-	RelInfo    *relinfos;
-	int			ntups;
-	int			relnum;
-	int			num_rels = 0;
-	char	   *nspname = NULL;
-	char	   *relname = NULL;
-	char	   *tablespace = NULL;
-	int			i_spclocation,
-				i_nspname,
-				i_relname,
-				i_reloid,
-				i_indtable,
-				i_toastheap,
-				i_relfilenumber,
-				i_reltablespace;
 	char		query[QUERY_ALLOC];
-	char	   *last_namespace = NULL,
-			   *last_tablespace = NULL;
 
 	query[0] = '\0';			/* initialize query string to empty */
 
@@ -552,7 +634,38 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
 			 "     ON c.reltablespace = t.oid "
 			 "ORDER BY 1;");
 
-	res = executeQueryOrDie(conn, "%s", query);
+	if (PQsendQuery(conn, query) == 0)
+	{
+		/* TODO: fail */
+	}
+}
+
+static void
+get_rel_infos_result(PGconn *conn, DbInfo *dbinfo)
+{
+	PGresult   *res = PQgetResult(conn);
+	RelInfo    *relinfos;
+	int			ntups;
+	int			relnum;
+	int			num_rels = 0;
+	char	   *nspname = NULL;
+	char	   *relname = NULL;
+	char	   *tablespace = NULL;
+	int			i_spclocation,
+				i_nspname,
+				i_relname,
+				i_reloid,
+				i_indtable,
+				i_toastheap,
+				i_relfilenumber,
+				i_reltablespace;
+	char	   *last_namespace = NULL,
+			   *last_tablespace = NULL;
+
+	if (PQgetResult(conn) != NULL)
+	{
+		/* TODO: fail */
+	}
 
 	ntups = PQntuples(res);
 
@@ -622,8 +735,6 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
 	}
 	PQclear(res);
 
-	PQfinish(conn);
-
 	dbinfo->rel_arr.rels = relinfos;
 	dbinfo->rel_arr.nrels = num_rels;
 }
@@ -645,19 +756,14 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
  * are included.
  */
 static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
+start_old_cluster_logical_slot_infos_query(PGconn *conn, bool live_check)
 {
-	PGconn	   *conn;
-	PGresult   *res;
-	LogicalSlotInfo *slotinfos = NULL;
-	int			num_slots;
+	char		query[QUERY_ALLOC];
 
 	/* Logical slots can be migrated since PG17. */
 	if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
 		return;
 
-	conn = connectToServer(&old_cluster, dbinfo->db_name);
-
 	/*
 	 * Fetch the logical replication slot information. The check whether the
 	 * slot is considered caught up is done by an upgrade function. This
@@ -675,16 +781,34 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 	 * started and stopped several times causing any temporary slots to be
 	 * removed.
 	 */
-	res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, "
-							"%s as caught_up, invalidation_reason IS NOT NULL as invalid "
-							"FROM pg_catalog.pg_replication_slots "
-							"WHERE slot_type = 'logical' AND "
-							"database = current_database() AND "
-							"temporary IS FALSE;",
-							live_check ? "FALSE" :
-							"(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE "
-							"ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
-							"END)");
+	snprintf(query, sizeof(query), "SELECT slot_name, plugin, two_phase, failover, "
+			 "%s as caught_up, invalidation_reason IS NOT NULL as invalid "
+			 "FROM pg_catalog.pg_replication_slots "
+			 "WHERE slot_type = 'logical' AND "
+			 "database = current_database() AND "
+			 "temporary IS FALSE;",
+			 live_check ? "FALSE" :
+			 "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE "
+			 "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+			 "END)");
+
+	if (PQsendQuery(conn, query) == 0)
+	{
+		/* TODO: fail */
+	}
+}
+
+static void
+get_old_cluster_logical_slot_infos_result(PGconn *conn, DbInfo *dbinfo)
+{
+	PGresult   *res = PQgetResult(conn);
+	LogicalSlotInfo *slotinfos = NULL;
+	int			num_slots;
+
+	if (PQgetResult(conn) != NULL)
+	{
+		/* TODO: fail */
+	}
 
 	num_slots = PQntuples(res);
 
@@ -720,7 +844,6 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
 	}
 
 	PQclear(res);
-	PQfinish(conn);
 
 	dbinfo->slot_arr.slots = slotinfos;
 	dbinfo->slot_arr.nslots = num_slots;
@@ -757,23 +880,36 @@ count_old_cluster_logical_slots(void)
  * not be able to upgrade the logical replication clusters completely.
  */
 static void
-get_db_subscription_count(DbInfo *dbinfo)
+start_db_sub_count_query(PGconn *conn, DbInfo *dbinfo)
 {
-	PGconn	   *conn;
-	PGresult   *res;
+	char		query[QUERY_ALLOC];
 
 	/* Subscriptions can be migrated since PG17. */
 	if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700)
 		return;
 
-	conn = connectToServer(&old_cluster, dbinfo->db_name);
-	res = executeQueryOrDie(conn, "SELECT count(*) "
-							"FROM pg_catalog.pg_subscription WHERE subdbid = %u",
-							dbinfo->db_oid);
+	snprintf(query, sizeof(query), "SELECT count(*) "
+			 "FROM pg_catalog.pg_subscription WHERE subdbid = %u",
+			 dbinfo->db_oid);
+	if (PQsendQuery(conn, query) == 0)
+	{
+		/* TODO: fail */
+	}
+}
+
+static void
+get_db_sub_count_result(PGconn *conn, DbInfo *dbinfo)
+{
+	PGresult   *res = PQgetResult(conn);
+
+	if (PQgetResult(conn) != NULL)
+	{
+		/* TODO: fail */
+	}
+
 	dbinfo->nsubs = atoi(PQgetvalue(res, 0, 0));
 
 	PQclear(res);
-	PQfinish(conn);
 }
 
 /*
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 2b83c340fb..015019b18d 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1225,6 +1225,7 @@ IndxInfo
 InferClause
 InferenceElem
 InfoItem
+InfoState
 InhInfo
 InheritableSocket
 InitSampleScan_function
-- 
2.25.1

Reply via email to