I figured I'd post what I have so far since this thread hasn't been updated
in a while. The attached patches are still "proof-of-concept grade," but
they are at least moving in the right direction (IMHO). The variable
naming is still not great, and they are woefully undercommented, among
other things.
0001 introduces a new API for registering callbacks and running them in
parallel on all databases in the cluster. This new system manages a set of
"slots" that follow a simple state machine to asynchronously establish a
connection and run the queries. It uses system() to wait for these
asynchronous tasks to complete. Users of this API only need to provide two
callbacks: one to return the query that should be run on each database and
another to process the results of that query. If multiple queries are
required for each database, users can provide multiple sets of callbacks.
The other patches change several of the existing tasks to use this new API.
With these patches applied, I see the following differences in the output
of 'pg_upgrade | ts -i' for a cluster with 1k empty databases:
WITHOUT PATCH
00:00:19 Checking database user is the install user
ok
00:00:02 Checking for subscription state
ok
00:00:06 Adding ".old" suffix to old global/pg_control
ok
00:00:04 Checking for extension updates
ok
WITH PATCHES (--jobs 1)
00:00:10 Checking database user is the install user
ok
00:00:02 Checking for subscription state
ok
00:00:07 Adding ".old" suffix to old global/pg_control
ok
00:00:05 Checking for extension updates
ok
WITH PATCHES (--jobs 4)
00:00:06 Checking database user is the install user
ok
00:00:00 Checking for subscription state
ok
00:00:02 Adding ".old" suffix to old global/pg_control
ok
00:00:01 Checking for extension updates
ok
Note that the "Checking database user is the install user" time also
includes the call to get_db_rel_and_slot_infos() on the old cluster as well
as the call to get_loadable_libraries() on the old cluster. I believe the
improvement with the patches with just one job is due to the consolidation
of the queries into one database connection (presently,
get_db_rel_and_slot_infos() creates 3 connections per database for some
upgrades). Similarly, the "Adding \".old\" suffix to old
global/pg_control" time includes the call to get_db_rel_and_slot_infos() on
the new cluster.
There are several remaining places where we could use this new API to speed
up upgrades. For example, I haven't attempted to use it for the data type
checks yet, and that tends to eat up a sizable chunk of time when there are
many databases.
On Thu, May 16, 2024 at 08:24:08PM -0500, Nathan Bossart wrote:
> On Thu, May 16, 2024 at 05:09:55PM -0700, Jeff Davis wrote:
>> Also, did you consider connecting once to each database and running
>> many queries? Most of those seem like just checks.
>
> This was the idea behind 347758b. It may be possible to do more along
> these lines. IMO parallelizing will still be useful even if we do combine
> more of the steps.
My current thinking is that any possible further consolidation should
happen as part of a follow-up effort to parallelization. I'm cautiously
optimistic that the parallelization work will make the consolidation easier
since it moves things to rigidly-defined callback functions.
A separate piece of off-list feedback from Michael Paquier is that this new
parallel system might be something we can teach the ParallelSlot code used
by bin/scripts/ to do. I've yet to look too deeply into this, but I
suspect that it will be difficult to combine the two. For example, the
ParallelSlot system doesn't seem well-suited for the kind of
run-once-in-each-database tasks required by pg_upgrade, and the error
handling is probably little different, too. However, it's still worth a
closer look, and I'm interested in folks' opinions on the subject.
--
nathan
>From d7683a095d4d2c1574005eb41504a5be256d6480 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Fri, 28 Jun 2024 11:02:44 -0500
Subject: [PATCH v2 1/6] introduce framework for parallelizing pg_upgrade tasks
---
src/bin/pg_upgrade/Makefile | 1 +
src/bin/pg_upgrade/async.c | 323 +++++++++++++++++++++++++++++++
src/bin/pg_upgrade/meson.build | 1 +
src/bin/pg_upgrade/pg_upgrade.h | 16 ++
src/tools/pgindent/typedefs.list | 4 +
5 files changed, 345 insertions(+)
create mode 100644 src/bin/pg_upgrade/async.c
diff --git a/src/bin/pg_upgrade/Makefile b/src/bin/pg_upgrade/Makefile
index bde91e2beb..3bc4f5d740 100644
--- a/src/bin/pg_upgrade/Makefile
+++ b/src/bin/pg_upgrade/Makefile
@@ -12,6 +12,7 @@ include $(top_builddir)/src/Makefile.global
OBJS = \
$(WIN32RES) \
+ async.o \
check.o \
controldata.o \
dump.o \
diff --git a/src/bin/pg_upgrade/async.c b/src/bin/pg_upgrade/async.c
new file mode 100644
index 0000000000..7df1e7712d
--- /dev/null
+++ b/src/bin/pg_upgrade/async.c
@@ -0,0 +1,323 @@
+/*
+ * async.c
+ *
+ * parallelization via libpq's async APIs
+ *
+ * Copyright (c) 2024, PostgreSQL Global Development Group
+ * src/bin/pg_upgrade/async.c
+ */
+
+#include "postgres_fe.h"
+
+#include "common/connect.h"
+#include "fe_utils/string_utils.h"
+#include "pg_upgrade.h"
+
+static int dbs_complete;
+static int dbs_processing;
+
+typedef struct AsyncTaskCallbacks
+{
+ AsyncTaskGetQueryCB query_cb;
+ AsyncTaskProcessCB process_cb;
+ bool free_result;
+ void *arg;
+} AsyncTaskCallbacks;
+
+typedef struct AsyncTask
+{
+ AsyncTaskCallbacks *cbs;
+ int num_cb_sets;
+} AsyncTask;
+
+typedef enum
+{
+ FREE,
+ CONNECTING,
+ SETTING_SEARCH_PATH,
+ RUNNING_QUERY,
+} AsyncSlotState;
+
+typedef struct
+{
+ AsyncSlotState state;
+ int db;
+ int query;
+ PGconn *conn;
+} AsyncSlot;
+
+AsyncTask *
+async_task_create(void)
+{
+ return pg_malloc0(sizeof(AsyncTask));
+}
+
+void
+async_task_free(AsyncTask *task)
+{
+ if (task->cbs)
+ pg_free(task->cbs);
+
+ pg_free(task);
+}
+
+void
+async_task_add_step(AsyncTask *task,
+ AsyncTaskGetQueryCB query_cb,
+ AsyncTaskProcessCB process_cb, bool
free_result,
+ void *arg)
+{
+ AsyncTaskCallbacks *new_cbs;
+
+ task->cbs = pg_realloc(task->cbs,
+ ++task->num_cb_sets *
sizeof(AsyncTaskCallbacks));
+
+ new_cbs = &task->cbs[task->num_cb_sets - 1];
+ new_cbs->query_cb = query_cb;
+ new_cbs->process_cb = process_cb;
+ new_cbs->free_result = free_result;
+ new_cbs->arg = arg;
+}
+
+static void
+conn_failure(PGconn *conn)
+{
+ pg_log(PG_REPORT, "%s", PQerrorMessage(conn));
+ printf(_("Failure, exiting\n"));
+ exit(1);
+}
+
+static void
+start_conn(const ClusterInfo *cluster, AsyncSlot *slot)
+{
+ PQExpBufferData conn_opts;
+
+ /* Build connection string with proper quoting */
+ initPQExpBuffer(&conn_opts);
+ appendPQExpBufferStr(&conn_opts, "dbname=");
+ appendConnStrVal(&conn_opts, cluster->dbarr.dbs[slot->db].db_name);
+ appendPQExpBufferStr(&conn_opts, " user=");
+ appendConnStrVal(&conn_opts, os_info.user);
+ appendPQExpBuffer(&conn_opts, " port=%d", cluster->port);
+ if (cluster->sockdir)
+ {
+ appendPQExpBufferStr(&conn_opts, " host=");
+ appendConnStrVal(&conn_opts, cluster->sockdir);
+ }
+
+ slot->conn = PQconnectStart(conn_opts.data);
+ termPQExpBuffer(&conn_opts);
+
+ if (!slot->conn)
+ conn_failure(slot->conn);
+}
+
+static void
+dispatch_query(const ClusterInfo *cluster, AsyncSlot *slot,
+ const AsyncTask *task)
+{
+ AsyncTaskCallbacks *cbs = &task->cbs[slot->query];
+ AsyncTaskGetQueryCB get_query = cbs->query_cb;
+ DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db];
+ char *query = (*get_query) (dbinfo, cbs->arg);
+
+ if (!PQsendQuery(slot->conn, query))
+ conn_failure(slot->conn);
+
+ pg_free(query);
+}
+
+static PGresult *
+get_last_result(PGconn *conn)
+{
+ PGresult *tmp;
+ PGresult *res = NULL;
+
+ while ((tmp = PQgetResult(conn)) != NULL)
+ {
+ PQclear(res);
+ res = tmp;
+ if (PQstatus(conn) == CONNECTION_BAD)
+ conn_failure(conn);
+ }
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK &&
+ PQresultStatus(res) != PGRES_TUPLES_OK)
+ conn_failure(conn);
+
+ return res;
+}
+
+static void
+process_query_result(const ClusterInfo *cluster, AsyncSlot *slot,
+ const AsyncTask *task)
+{
+ AsyncTaskCallbacks *cbs = &task->cbs[slot->query];
+ AsyncTaskProcessCB process_cb = cbs->process_cb;
+ DbInfo *dbinfo = &cluster->dbarr.dbs[slot->db];
+ PGresult *res = get_last_result(slot->conn);
+
+ (*process_cb) (dbinfo, res, cbs->arg);
+
+ if (cbs->free_result)
+ PQclear(res);
+}
+
+static void
+process_slot(const ClusterInfo *cluster, AsyncSlot *slot, const AsyncTask
*task)
+{
+ switch (slot->state)
+ {
+ case FREE:
+ if (dbs_processing >= cluster->dbarr.ndbs)
+ return;
+ slot->db = dbs_processing++;
+ slot->state = CONNECTING;
+ start_conn(cluster, slot);
+ return;
+
+ case CONNECTING:
+ if (PQconnectPoll(slot->conn) == PGRES_POLLING_FAILED)
+ conn_failure(slot->conn);
+ if (PQconnectPoll(slot->conn) != PGRES_POLLING_OK)
+ return;
+ slot->state = SETTING_SEARCH_PATH;
+ if (!PQsendQuery(slot->conn,
ALWAYS_SECURE_SEARCH_PATH_SQL))
+ conn_failure(slot->conn);
+ return;
+
+ case SETTING_SEARCH_PATH:
+ if (!PQconsumeInput(slot->conn))
+ conn_failure(slot->conn);
+ if (PQisBusy(slot->conn))
+ return;
+ PQclear(get_last_result(slot->conn));
+ slot->state = RUNNING_QUERY;
+ dispatch_query(cluster, slot, task);
+ return;
+
+ case RUNNING_QUERY:
+ if (!PQconsumeInput(slot->conn))
+ conn_failure(slot->conn);
+ if (PQisBusy(slot->conn))
+ return;
+ process_query_result(cluster, slot, task);
+ if (++slot->query >= task->num_cb_sets)
+ {
+ dbs_complete++;
+ PQfinish(slot->conn);
+ memset(slot, 0, sizeof(AsyncSlot));
+ return;
+ }
+ dispatch_query(cluster, slot, task);
+ return;
+ }
+}
+
+/*
+ * Wait on the slots to either finish connecting or to receive query results if
+ * possible. This avoids a tight loop in async_task_run().
+ */
+static void
+wait_on_slots(AsyncSlot *slots, int numslots)
+{
+ fd_set input_mask;
+ fd_set output_mask;
+ fd_set except_mask;
+ int maxFd = 0;
+
+ FD_ZERO(&input_mask);
+ FD_ZERO(&output_mask);
+ FD_ZERO(&except_mask);
+
+ for (int i = 0; i < numslots; i++)
+ {
+ int sock;
+ bool read = false;
+
+ switch (slots[i].state)
+ {
+ case FREE:
+
+ /*
+ * If we see a free slot, return right away so
that it can be
+ * reused immediately for the next database.
This might cause
+ * us to spin more than necessary as we finish
processing the
+ * last few databases, but that shouldn't cause
too much harm.
+ */
+ return;
+
+ case CONNECTING:
+
+ /*
+ * If we are waiting for the connection to
establish, choose
+ * whether to wait for reading or for writing
on the socket as
+ * appropriate. If neither apply, just return
immediately so
+ * that we can handle the slot.
+ */
+ {
+ PostgresPollingStatusType status;
+
+ status = PQconnectPoll(slots[i].conn);
+ if (status == PGRES_POLLING_READING)
+ read = true;
+ else if (status !=
PGRES_POLLING_WRITING)
+ return;
+ }
+ break;
+
+ case SETTING_SEARCH_PATH:
+ case RUNNING_QUERY:
+
+ /*
+ * If we've sent a query, we must wait for the
socket to be
+ * read-ready. Note that process_slot()
handles calling
+ * PQconsumeInput() as required.
+ */
+ read = true;
+ break;
+ }
+
+ /*
+ * If there's some problem retrieving the socket, just pretend
this
+ * slot doesn't exist. We don't expect this to happen
regularly in
+ * practice, so it seems unlikely to cause too much harm.
+ */
+ sock = PQsocket(slots[i].conn);
+ if (sock < 0)
+ continue;
+
+ /*
+ * Add the socket to the set.
+ */
+ FD_SET(sock, read ? &input_mask : &output_mask);
+ FD_SET(sock, &except_mask);
+ maxFd = Max(maxFd, sock);
+ }
+
+ /*
+ * If we found socket(s) to wait on, wait.
+ */
+ if (maxFd != 0)
+ (void) select(maxFd + 1, &input_mask, &output_mask,
&except_mask, NULL);
+}
+
+void
+async_task_run(const AsyncTask *task, const ClusterInfo *cluster)
+{
+ int jobs = Max(1, user_opts.jobs);
+ AsyncSlot *slots = pg_malloc0(sizeof(AsyncSlot) * jobs);
+
+ dbs_complete = 0;
+ dbs_processing = 0;
+
+ while (dbs_complete < cluster->dbarr.ndbs)
+ {
+ for (int i = 0; i < jobs; i++)
+ process_slot(cluster, &slots[i], task);
+
+ wait_on_slots(slots, jobs);
+ }
+
+ pg_free(slots);
+}
diff --git a/src/bin/pg_upgrade/meson.build b/src/bin/pg_upgrade/meson.build
index 9825fa3305..9eb48e176c 100644
--- a/src/bin/pg_upgrade/meson.build
+++ b/src/bin/pg_upgrade/meson.build
@@ -1,6 +1,7 @@
# Copyright (c) 2022-2024, PostgreSQL Global Development Group
pg_upgrade_sources = files(
+ 'async.c',
'check.c',
'controldata.c',
'dump.c',
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 8afe240bdf..1ebad3bd74 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -494,3 +494,19 @@ void parallel_transfer_all_new_dbs(DbInfoArr
*old_db_arr, DbInfoArr *new_db_arr
char *old_pgdata, char *new_pgdata,
char *old_tablespace);
bool reap_child(bool wait_for_child);
+
+/* async.c */
+
+typedef char *(*AsyncTaskGetQueryCB) (DbInfo *dbinfo, void *arg);
+typedef void (*AsyncTaskProcessCB) (DbInfo *dbinfo, PGresult *res, void *arg);
+
+/* struct definition is private to async.c */
+typedef struct AsyncTask AsyncTask;
+
+AsyncTask *async_task_create(void);
+void async_task_add_step(AsyncTask *task,
+
AsyncTaskGetQueryCB query_cb,
+
AsyncTaskProcessCB process_cb, bool free_result,
+ void *arg);
+void async_task_run(const AsyncTask *task, const ClusterInfo
*cluster);
+void async_task_free(AsyncTask *task);
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index e6c1caf649..3d219cbfe2 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -153,6 +153,10 @@ ArrayMetaState
ArraySubWorkspace
ArrayToken
ArrayType
+AsyncSlot
+AsyncSlotState
+AsyncTask
+AsyncTaskCallbacks
AsyncQueueControl
AsyncQueueEntry
AsyncRequest
--
2.39.3 (Apple Git-146)
>From c84b3c97cb0befff8027702f1674e809f174b3aa Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Fri, 28 Jun 2024 17:21:19 -0500
Subject: [PATCH v2 2/6] use new pg_upgrade async API for subscription state
checks
---
src/bin/pg_upgrade/check.c | 200 ++++++++++++++++++++-----------------
1 file changed, 106 insertions(+), 94 deletions(-)
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index 27924159d6..f653fa25a5 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -1906,6 +1906,75 @@ check_old_cluster_for_valid_slots(bool live_check)
check_ok();
}
+/* private state for subscription state checks */
+struct substate_info
+{
+ FILE *script;
+ char output_path[MAXPGPATH];
+};
+
+/*
+ * We don't allow upgrade if there is a risk of dangling slot or origin
+ * corresponding to initial sync after upgrade.
+ *
+ * A slot/origin not created yet refers to the 'i' (initialize) state, while
+ * 'r' (ready) state refers to a slot/origin created previously but already
+ * dropped. These states are supported for pg_upgrade. The other states listed
+ * below are not supported:
+ *
+ * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this state would
+ * retain a replication slot, which could not be dropped by the sync worker
+ * spawned after the upgrade because the subscription ID used for the slot name
+ * won't match anymore.
+ *
+ * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this state would
+ * retain the replication origin when there is a failure in tablesync worker
+ * immediately after dropping the replication slot in the publisher.
+ *
+ * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to work on a
+ * relation upgraded while in this state would expect an origin ID with the OID
+ * of the subscription used before the upgrade, causing it to fail.
+ *
+ * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and SUBREL_STATE_UNKNOWN:
+ * These states are not stored in the catalog, so we need not allow these
+ * states.
+ */
+static char *
+sub_query(DbInfo *dbinfo, void *arg)
+{
+ return pg_strdup("SELECT r.srsubstate, s.subname, n.nspname, c.relname "
+ "FROM pg_catalog.pg_subscription_rel r
"
+ "LEFT JOIN pg_catalog.pg_subscription
s"
+ " ON r.srsubid = s.oid "
+ "LEFT JOIN pg_catalog.pg_class c"
+ " ON r.srrelid = c.oid "
+ "LEFT JOIN pg_catalog.pg_namespace n"
+ " ON c.relnamespace = n.oid "
+ "WHERE r.srsubstate NOT IN ('i', 'r') "
+ "ORDER BY s.subname");
+}
+
+static void
+sub_process(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+ struct substate_info *state = (struct substate_info *) arg;
+ int ntup = PQntuples(res);
+
+ for (int i = 0; i < ntup; i++)
+ {
+ if (state->script == NULL &&
+ (state->script = fopen_priv(state->output_path, "w"))
== NULL)
+ pg_fatal("could not open file \"%s\": %m",
state->output_path);
+
+ fprintf(state->script, "The table sync state \"%s\" is not
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\"
relation:\"%s\"\n",
+ PQgetvalue(res, i, 0),
+ dbinfo->db_name,
+ PQgetvalue(res, i, 1),
+ PQgetvalue(res, i, 2),
+ PQgetvalue(res, i, 3));
+ }
+}
+
/*
* check_old_cluster_subscription_state()
*
@@ -1916,115 +1985,58 @@ check_old_cluster_for_valid_slots(bool live_check)
static void
check_old_cluster_subscription_state(void)
{
- FILE *script = NULL;
- char output_path[MAXPGPATH];
+ AsyncTask *task = async_task_create();
+ struct substate_info state;
+ PGresult *res;
+ PGconn *conn;
int ntup;
prep_status("Checking for subscription state");
- snprintf(output_path, sizeof(output_path), "%s/%s",
+ state.script = NULL;
+ snprintf(state.output_path, sizeof(state.output_path), "%s/%s",
log_opts.basedir,
"subs_invalid.txt");
- for (int dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
- {
- PGresult *res;
- DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum];
- PGconn *conn = connectToServer(&old_cluster,
active_db->db_name);
- /* We need to check for pg_replication_origin only once. */
- if (dbnum == 0)
- {
- /*
- * Check that all the subscriptions have their
respective
- * replication origin.
- */
- res = executeQueryOrDie(conn,
- "SELECT
d.datname, s.subname "
- "FROM
pg_catalog.pg_subscription s "
- "LEFT
OUTER JOIN pg_catalog.pg_replication_origin o "
- "
ON o.roname = 'pg_' || s.oid "
- "INNER
JOIN pg_catalog.pg_database d "
- "
ON d.oid = s.subdbid "
- "WHERE
o.roname IS NULL;");
-
- ntup = PQntuples(res);
- for (int i = 0; i < ntup; i++)
- {
- if (script == NULL && (script =
fopen_priv(output_path, "w")) == NULL)
- pg_fatal("could not open file \"%s\":
%m", output_path);
- fprintf(script, "The replication origin is
missing for database:\"%s\" subscription:\"%s\"\n",
- PQgetvalue(res, i, 0),
- PQgetvalue(res, i, 1));
- }
- PQclear(res);
- }
-
- /*
- * We don't allow upgrade if there is a risk of dangling slot or
- * origin corresponding to initial sync after upgrade.
- *
- * A slot/origin not created yet refers to the 'i' (initialize)
state,
- * while 'r' (ready) state refers to a slot/origin created
previously
- * but already dropped. These states are supported for
pg_upgrade. The
- * other states listed below are not supported:
- *
- * a) SUBREL_STATE_DATASYNC: A relation upgraded while in this
state
- * would retain a replication slot, which could not be dropped
by the
- * sync worker spawned after the upgrade because the
subscription ID
- * used for the slot name won't match anymore.
- *
- * b) SUBREL_STATE_SYNCDONE: A relation upgraded while in this
state
- * would retain the replication origin when there is a failure
in
- * tablesync worker immediately after dropping the replication
slot in
- * the publisher.
- *
- * c) SUBREL_STATE_FINISHEDCOPY: A tablesync worker spawned to
work on
- * a relation upgraded while in this state would expect an
origin ID
- * with the OID of the subscription used before the upgrade,
causing
- * it to fail.
- *
- * d) SUBREL_STATE_SYNCWAIT, SUBREL_STATE_CATCHUP and
- * SUBREL_STATE_UNKNOWN: These states are not stored in the
catalog,
- * so we need not allow these states.
- */
- res = executeQueryOrDie(conn,
- "SELECT
r.srsubstate, s.subname, n.nspname, c.relname "
- "FROM
pg_catalog.pg_subscription_rel r "
- "LEFT JOIN
pg_catalog.pg_subscription s"
- " ON
r.srsubid = s.oid "
- "LEFT JOIN
pg_catalog.pg_class c"
- " ON
r.srrelid = c.oid "
- "LEFT JOIN
pg_catalog.pg_namespace n"
- " ON
c.relnamespace = n.oid "
- "WHERE
r.srsubstate NOT IN ('i', 'r') "
- "ORDER BY
s.subname");
-
- ntup = PQntuples(res);
- for (int i = 0; i < ntup; i++)
- {
- if (script == NULL && (script = fopen_priv(output_path,
"w")) == NULL)
- pg_fatal("could not open file \"%s\": %m",
output_path);
+ /*
+ * Check that all the subscriptions have their respective replication
+ * origin. This check only needs to run once.
+ */
+ conn = connectToServer(&old_cluster, old_cluster.dbarr.dbs[0].db_name);
+ res = executeQueryOrDie(conn,
+ "SELECT d.datname,
s.subname "
+ "FROM
pg_catalog.pg_subscription s "
+ "LEFT OUTER JOIN
pg_catalog.pg_replication_origin o "
+ " ON o.roname =
'pg_' || s.oid "
+ "INNER JOIN
pg_catalog.pg_database d "
+ " ON d.oid =
s.subdbid "
+ "WHERE o.roname IS
NULL;");
+ ntup = PQntuples(res);
+ for (int i = 0; i < ntup; i++)
+ {
+ if (state.script == NULL &&
+ (state.script = fopen_priv(state.output_path, "w")) ==
NULL)
+ pg_fatal("could not open file \"%s\": %m",
state.output_path);
+ fprintf(state.script, "The replication origin is missing for
database:\"%s\" subscription:\"%s\"\n",
+ PQgetvalue(res, i, 0),
+ PQgetvalue(res, i, 1));
+ }
+ PQclear(res);
+ PQfinish(conn);
- fprintf(script, "The table sync state \"%s\" is not
allowed for database:\"%s\" subscription:\"%s\" schema:\"%s\"
relation:\"%s\"\n",
- PQgetvalue(res, i, 0),
- active_db->db_name,
- PQgetvalue(res, i, 1),
- PQgetvalue(res, i, 2),
- PQgetvalue(res, i, 3));
- }
+ async_task_add_step(task, sub_query, sub_process, true, &state);
- PQclear(res);
- PQfinish(conn);
- }
+ async_task_run(task, &old_cluster);
+ async_task_free(task);
- if (script)
+ if (state.script)
{
- fclose(script);
+ fclose(state.script);
pg_log(PG_REPORT, "fatal");
pg_fatal("Your installation contains subscriptions without
origin or having relations not in i (initialize) or r (ready) state.\n"
"You can allow the initial sync to finish for
all relations and then restart the upgrade.\n"
"A list of the problematic subscriptions is in
the file:\n"
- " %s", output_path);
+ " %s", state.output_path);
}
else
check_ok();
--
2.39.3 (Apple Git-146)
>From c9d2c483ac1fabc0897c19a60a1cf6054e1293da Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Tue, 21 May 2024 16:35:19 -0500
Subject: [PATCH v2 3/6] move live_check variable to user_opts
---
src/bin/pg_upgrade/check.c | 32 ++++++++++++++++----------------
src/bin/pg_upgrade/controldata.c | 5 +++--
src/bin/pg_upgrade/info.c | 12 +++++-------
src/bin/pg_upgrade/option.c | 4 ++--
src/bin/pg_upgrade/pg_upgrade.c | 21 ++++++++++-----------
src/bin/pg_upgrade/pg_upgrade.h | 13 +++++++------
6 files changed, 43 insertions(+), 44 deletions(-)
diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c
index f653fa25a5..251f3d9017 100644
--- a/src/bin/pg_upgrade/check.c
+++ b/src/bin/pg_upgrade/check.c
@@ -29,7 +29,7 @@ static void check_for_new_tablespace_dir(void);
static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
static void check_new_cluster_logical_replication_slots(void);
static void check_new_cluster_subscription_configuration(void);
-static void check_old_cluster_for_valid_slots(bool live_check);
+static void check_old_cluster_for_valid_slots(void);
static void check_old_cluster_subscription_state(void);
/*
@@ -555,9 +555,9 @@ fix_path_separator(char *path)
}
void
-output_check_banner(bool live_check)
+output_check_banner(void)
{
- if (user_opts.check && live_check)
+ if (user_opts.live_check)
{
pg_log(PG_REPORT,
"Performing Consistency Checks on Old Live Server\n"
@@ -573,18 +573,18 @@ output_check_banner(bool live_check)
void
-check_and_dump_old_cluster(bool live_check)
+check_and_dump_old_cluster(void)
{
/* -- OLD -- */
- if (!live_check)
+ if (!user_opts.live_check)
start_postmaster(&old_cluster, true);
/*
* Extract a list of databases, tables, and logical replication slots
from
* the old cluster.
*/
- get_db_rel_and_slot_infos(&old_cluster, live_check);
+ get_db_rel_and_slot_infos(&old_cluster);
init_tablespaces();
@@ -605,7 +605,7 @@ check_and_dump_old_cluster(bool live_check)
* Logical replication slots can be migrated since PG17. See
comments
* atop get_old_cluster_logical_slot_infos().
*/
- check_old_cluster_for_valid_slots(live_check);
+ check_old_cluster_for_valid_slots();
/*
* Subscriptions and their dependencies can be migrated since
PG17.
@@ -652,7 +652,7 @@ check_and_dump_old_cluster(bool live_check)
*/
if (GET_MAJOR_VERSION(old_cluster.major_version) <= 906)
{
- if (user_opts.check)
+ if (user_opts.live_check)
old_9_6_invalidate_hash_indexes(&old_cluster, true);
}
@@ -667,7 +667,7 @@ check_and_dump_old_cluster(bool live_check)
if (!user_opts.check)
generate_old_dump();
- if (!live_check)
+ if (!user_opts.live_check)
stop_postmaster(false);
}
@@ -675,7 +675,7 @@ check_and_dump_old_cluster(bool live_check)
void
check_new_cluster(void)
{
- get_db_rel_and_slot_infos(&new_cluster, false);
+ get_db_rel_and_slot_infos(&new_cluster);
check_new_cluster_is_empty();
@@ -826,14 +826,14 @@ check_cluster_versions(void)
void
-check_cluster_compatibility(bool live_check)
+check_cluster_compatibility(void)
{
/* get/check pg_control data of servers */
- get_control_data(&old_cluster, live_check);
- get_control_data(&new_cluster, false);
+ get_control_data(&old_cluster);
+ get_control_data(&new_cluster);
check_control_data(&old_cluster.controldata, &new_cluster.controldata);
- if (live_check && old_cluster.port == new_cluster.port)
+ if (user_opts.live_check && old_cluster.port == new_cluster.port)
pg_fatal("When checking a live server, "
"the old and new port numbers must be
different.");
}
@@ -1839,7 +1839,7 @@ check_new_cluster_subscription_configuration(void)
* before shutdown.
*/
static void
-check_old_cluster_for_valid_slots(bool live_check)
+check_old_cluster_for_valid_slots(void)
{
char output_path[MAXPGPATH];
FILE *script = NULL;
@@ -1878,7 +1878,7 @@ check_old_cluster_for_valid_slots(bool live_check)
* Note: This can be satisfied only when the old
cluster has been
* shut down, so we skip this for live checks.
*/
- if (!live_check && !slot->caught_up)
+ if (!user_opts.live_check && !slot->caught_up)
{
if (script == NULL &&
(script = fopen_priv(output_path, "w"))
== NULL)
diff --git a/src/bin/pg_upgrade/controldata.c b/src/bin/pg_upgrade/controldata.c
index 1f0ccea3ed..cf665b9dee 100644
--- a/src/bin/pg_upgrade/controldata.c
+++ b/src/bin/pg_upgrade/controldata.c
@@ -33,7 +33,7 @@
* return valid xid data for a running server.
*/
void
-get_control_data(ClusterInfo *cluster, bool live_check)
+get_control_data(ClusterInfo *cluster)
{
char cmd[MAXPGPATH];
char bufin[MAX_STRING];
@@ -76,6 +76,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
uint32 segno = 0;
char *resetwal_bin;
int rc;
+ bool live_check = (cluster == &old_cluster &&
user_opts.live_check);
/*
* Because we test the pg_resetwal output as strings, it has to be in
@@ -118,7 +119,7 @@ get_control_data(ClusterInfo *cluster, bool live_check)
/*
* Check for clean shutdown
*/
- if (!live_check || cluster == &new_cluster)
+ if (!live_check)
{
/* only pg_controldata outputs the cluster state */
snprintf(cmd, sizeof(cmd), "\"%s/pg_controldata\" \"%s\"",
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 95c22a7200..8f1777de59 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -27,7 +27,7 @@ static void free_rel_infos(RelInfoArr *rel_arr);
static void print_db_infos(DbInfoArr *db_arr);
static void print_rel_infos(RelInfoArr *rel_arr);
static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool
live_check);
+static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
static void get_db_subscription_count(DbInfo *dbinfo);
@@ -273,11 +273,9 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo
*db, bool is_new_db)
*
* higher level routine to generate dbinfos for the database running
* on the given "port". Assumes that server is already running.
- *
- * live_check would be used only when the target is the old cluster.
*/
void
-get_db_rel_and_slot_infos(ClusterInfo *cluster, bool live_check)
+get_db_rel_and_slot_infos(ClusterInfo *cluster)
{
int dbnum;
@@ -299,7 +297,7 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster, bool
live_check)
*/
if (cluster == &old_cluster)
{
- get_old_cluster_logical_slot_infos(pDbInfo, live_check);
+ get_old_cluster_logical_slot_infos(pDbInfo);
get_db_subscription_count(pDbInfo);
}
}
@@ -645,7 +643,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
* are included.
*/
static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check)
+get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
{
PGconn *conn;
PGresult *res;
@@ -681,7 +679,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool
live_check)
"WHERE slot_type =
'logical' AND "
"database =
current_database() AND "
"temporary IS FALSE;",
- live_check ? "FALSE" :
+ user_opts.live_check ?
"FALSE" :
"(CASE WHEN
invalidation_reason IS NOT NULL THEN FALSE "
"ELSE (SELECT
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
"END)");
diff --git a/src/bin/pg_upgrade/option.c b/src/bin/pg_upgrade/option.c
index 548ea4e623..6f41d63eed 100644
--- a/src/bin/pg_upgrade/option.c
+++ b/src/bin/pg_upgrade/option.c
@@ -470,10 +470,10 @@ adjust_data_dir(ClusterInfo *cluster)
* directory.
*/
void
-get_sock_dir(ClusterInfo *cluster, bool live_check)
+get_sock_dir(ClusterInfo *cluster)
{
#if !defined(WIN32)
- if (!live_check)
+ if (!user_opts.live_check || cluster == &new_cluster)
cluster->sockdir = user_opts.socketdir;
else
{
diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c
index af370768b6..3f4ad7d5cc 100644
--- a/src/bin/pg_upgrade/pg_upgrade.c
+++ b/src/bin/pg_upgrade/pg_upgrade.c
@@ -65,7 +65,7 @@ static void create_new_objects(void);
static void copy_xact_xlog_xid(void);
static void set_frozenxids(bool minmxid_only);
static void make_outputdirs(char *pgdata);
-static void setup(char *argv0, bool *live_check);
+static void setup(char *argv0);
static void create_logical_replication_slots(void);
ClusterInfo old_cluster,
@@ -88,7 +88,6 @@ int
main(int argc, char **argv)
{
char *deletion_script_file_name = NULL;
- bool live_check = false;
/*
* pg_upgrade doesn't currently use common/logging.c, but initialize it
@@ -123,18 +122,18 @@ main(int argc, char **argv)
*/
make_outputdirs(new_cluster.pgdata);
- setup(argv[0], &live_check);
+ setup(argv[0]);
- output_check_banner(live_check);
+ output_check_banner();
check_cluster_versions();
- get_sock_dir(&old_cluster, live_check);
- get_sock_dir(&new_cluster, false);
+ get_sock_dir(&old_cluster);
+ get_sock_dir(&new_cluster);
- check_cluster_compatibility(live_check);
+ check_cluster_compatibility();
- check_and_dump_old_cluster(live_check);
+ check_and_dump_old_cluster();
/* -- NEW -- */
@@ -331,7 +330,7 @@ make_outputdirs(char *pgdata)
static void
-setup(char *argv0, bool *live_check)
+setup(char *argv0)
{
/*
* make sure the user has a clean environment, otherwise, we may confuse
@@ -378,7 +377,7 @@ setup(char *argv0, bool *live_check)
pg_fatal("There seems to be a postmaster
servicing the old cluster.\n"
"Please shutdown that
postmaster and try again.");
else
- *live_check = true;
+ user_opts.live_check = true;
}
}
@@ -648,7 +647,7 @@ create_new_objects(void)
set_frozenxids(true);
/* update new_cluster info now that we have objects in the databases */
- get_db_rel_and_slot_infos(&new_cluster, false);
+ get_db_rel_and_slot_infos(&new_cluster);
}
/*
diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h
index 1ebad3bd74..56d05d7eb9 100644
--- a/src/bin/pg_upgrade/pg_upgrade.h
+++ b/src/bin/pg_upgrade/pg_upgrade.h
@@ -322,6 +322,7 @@ typedef struct
typedef struct
{
bool check; /* check clusters only, don't
change any data */
+ bool live_check; /* check clusters only, old
server is running */
bool do_sync; /* flush changes to disk */
transferMode transfer_mode; /* copy files or link them? */
int jobs; /* number of
processes/threads to use */
@@ -366,20 +367,20 @@ extern OSInfo os_info;
/* check.c */
-void output_check_banner(bool live_check);
-void check_and_dump_old_cluster(bool live_check);
+void output_check_banner(void);
+void check_and_dump_old_cluster(void);
void check_new_cluster(void);
void report_clusters_compatible(void);
void issue_warnings_and_set_wal_level(void);
void output_completion_banner(char *deletion_script_file_name);
void check_cluster_versions(void);
-void check_cluster_compatibility(bool live_check);
+void check_cluster_compatibility(void);
void create_script_for_old_cluster_deletion(char
**deletion_script_file_name);
/* controldata.c */
-void get_control_data(ClusterInfo *cluster, bool live_check);
+void get_control_data(ClusterInfo *cluster);
void check_control_data(ControlData *oldctrl, ControlData *newctrl);
void disable_old_cluster(void);
@@ -428,7 +429,7 @@ void check_loadable_libraries(void);
FileNameMap *gen_db_file_maps(DbInfo *old_db,
DbInfo *new_db, int
*nmaps, const char *old_pgdata,
const char
*new_pgdata);
-void get_db_rel_and_slot_infos(ClusterInfo *cluster, bool
live_check);
+void get_db_rel_and_slot_infos(ClusterInfo *cluster);
int count_old_cluster_logical_slots(void);
int count_old_cluster_subscriptions(void);
@@ -436,7 +437,7 @@ int count_old_cluster_subscriptions(void);
void parseCommandLine(int argc, char *argv[]);
void adjust_data_dir(ClusterInfo *cluster);
-void get_sock_dir(ClusterInfo *cluster, bool live_check);
+void get_sock_dir(ClusterInfo *cluster);
/* relfilenumber.c */
--
2.39.3 (Apple Git-146)
>From 48943ad85f83ba44ea01e4b1fdd5c4afc53552e3 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Fri, 28 Jun 2024 21:09:33 -0500
Subject: [PATCH v2 4/6] use new pg_upgrade async API for retrieving relinfos
---
src/bin/pg_upgrade/info.c | 187 +++++++++++++++++---------------------
1 file changed, 81 insertions(+), 106 deletions(-)
diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c
index 8f1777de59..d07255bd0a 100644
--- a/src/bin/pg_upgrade/info.c
+++ b/src/bin/pg_upgrade/info.c
@@ -22,13 +22,16 @@ static void report_unmatched_relation(const RelInfo *rel,
const DbInfo *db,
static void free_db_and_rel_infos(DbInfoArr *db_arr);
static void get_template0_info(ClusterInfo *cluster);
static void get_db_infos(ClusterInfo *cluster);
-static void get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo);
+static char *get_rel_infos_query(DbInfo *dbinfo, void *arg);
+static void get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg);
static void free_rel_infos(RelInfoArr *rel_arr);
static void print_db_infos(DbInfoArr *db_arr);
static void print_rel_infos(RelInfoArr *rel_arr);
static void print_slot_infos(LogicalSlotInfoArr *slot_arr);
-static void get_old_cluster_logical_slot_infos(DbInfo *dbinfo);
-static void get_db_subscription_count(DbInfo *dbinfo);
+static char *get_old_cluster_logical_slot_infos_query(DbInfo *dbinfo, void
*arg);
+static void get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult
*res, void *arg);
+static char *get_db_subscription_count_query(DbInfo *dbinfo, void *arg);
+static void get_db_subscription_count_result(DbInfo *dbinfo, PGresult *res,
void *arg);
/*
@@ -277,7 +280,7 @@ report_unmatched_relation(const RelInfo *rel, const DbInfo
*db, bool is_new_db)
void
get_db_rel_and_slot_infos(ClusterInfo *cluster)
{
- int dbnum;
+ AsyncTask *task = async_task_create();
if (cluster->dbarr.dbs != NULL)
free_db_and_rel_infos(&cluster->dbarr);
@@ -285,23 +288,26 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster)
get_template0_info(cluster);
get_db_infos(cluster);
- for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
+ async_task_add_step(task,
+ get_rel_infos_query,
+ get_rel_infos_result,
+ true, NULL);
+ if (cluster == &old_cluster &&
+ GET_MAJOR_VERSION(cluster->major_version) > 1600)
{
- DbInfo *pDbInfo = &cluster->dbarr.dbs[dbnum];
-
- get_rel_infos(cluster, pDbInfo);
-
- /*
- * Retrieve the logical replication slots infos and the
subscriptions
- * count for the old cluster.
- */
- if (cluster == &old_cluster)
- {
- get_old_cluster_logical_slot_infos(pDbInfo);
- get_db_subscription_count(pDbInfo);
- }
+ async_task_add_step(task,
+
get_old_cluster_logical_slot_infos_query,
+
get_old_cluster_logical_slot_infos_result,
+ true, cluster);
+ async_task_add_step(task,
+
get_db_subscription_count_query,
+
get_db_subscription_count_result,
+ true, cluster);
}
+ async_task_run(task, cluster);
+ async_task_free(task);
+
if (cluster == &old_cluster)
pg_log(PG_VERBOSE, "\nsource databases:");
else
@@ -447,30 +453,10 @@ get_db_infos(ClusterInfo *cluster)
* Note: the resulting RelInfo array is assumed to be sorted by OID.
* This allows later processing to match up old and new databases efficiently.
*/
-static void
-get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
+static char *
+get_rel_infos_query(DbInfo *dbinfo, void *arg)
{
- PGconn *conn = connectToServer(cluster,
-
dbinfo->db_name);
- PGresult *res;
- RelInfo *relinfos;
- int ntups;
- int relnum;
- int num_rels = 0;
- char *nspname = NULL;
- char *relname = NULL;
- char *tablespace = NULL;
- int i_spclocation,
- i_nspname,
- i_relname,
- i_reloid,
- i_indtable,
- i_toastheap,
- i_relfilenumber,
- i_reltablespace;
- char query[QUERY_ALLOC];
- char *last_namespace = NULL,
- *last_tablespace = NULL;
+ char *query = pg_malloc(QUERY_ALLOC);
query[0] = '\0'; /* initialize query string to
empty */
@@ -484,7 +470,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
* output, so we have to copy that system table. It's easiest to do
that
* by treating it as a user table.
*/
- snprintf(query + strlen(query), sizeof(query) - strlen(query),
+ snprintf(query + strlen(query), QUERY_ALLOC - strlen(query),
"WITH regular_heap (reloid, indtable, toastheap) AS ( "
" SELECT c.oid, 0::oid, 0::oid "
" FROM pg_catalog.pg_class c JOIN
pg_catalog.pg_namespace n "
@@ -506,7 +492,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
* selected by the regular_heap CTE. (We have to do this separately
* because the namespace-name rules above don't work for toast tables.)
*/
- snprintf(query + strlen(query), sizeof(query) - strlen(query),
+ snprintf(query + strlen(query), QUERY_ALLOC - strlen(query),
" toast_heap (reloid, indtable, toastheap) AS ( "
" SELECT c.reltoastrelid, 0::oid, c.oid "
" FROM regular_heap JOIN pg_catalog.pg_class c "
@@ -519,7 +505,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
* Testing indisready is necessary in 9.2, and harmless in earlier/later
* versions.
*/
- snprintf(query + strlen(query), sizeof(query) - strlen(query),
+ snprintf(query + strlen(query), QUERY_ALLOC - strlen(query),
" all_index (reloid, indtable, toastheap) AS ( "
" SELECT indexrelid, indrelid, 0::oid "
" FROM pg_catalog.pg_index "
@@ -533,7 +519,7 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
* And now we can write the query that retrieves the data we want for
each
* heap and index relation. Make sure result is sorted by OID.
*/
- snprintf(query + strlen(query), sizeof(query) - strlen(query),
+ snprintf(query + strlen(query), QUERY_ALLOC - strlen(query),
"SELECT all_rels.*, n.nspname, c.relname, "
" c.relfilenode, c.reltablespace, "
" pg_catalog.pg_tablespace_location(t.oid) AS
spclocation "
@@ -550,22 +536,30 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
" ON c.reltablespace = t.oid "
"ORDER BY 1;");
- res = executeQueryOrDie(conn, "%s", query);
-
- ntups = PQntuples(res);
-
- relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+ return query;
+}
- i_reloid = PQfnumber(res, "reloid");
- i_indtable = PQfnumber(res, "indtable");
- i_toastheap = PQfnumber(res, "toastheap");
- i_nspname = PQfnumber(res, "nspname");
- i_relname = PQfnumber(res, "relname");
- i_relfilenumber = PQfnumber(res, "relfilenode");
- i_reltablespace = PQfnumber(res, "reltablespace");
- i_spclocation = PQfnumber(res, "spclocation");
+static void
+get_rel_infos_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+ int ntups = PQntuples(res);
+ RelInfo *relinfos = (RelInfo *) pg_malloc(sizeof(RelInfo) * ntups);
+ int i_reloid = PQfnumber(res, "reloid");
+ int i_indtable = PQfnumber(res, "indtable");
+ int i_toastheap = PQfnumber(res, "toastheap");
+ int i_nspname = PQfnumber(res, "nspname");
+ int i_relname = PQfnumber(res, "relname");
+ int i_relfilenumber = PQfnumber(res, "relfilenode");
+ int i_reltablespace = PQfnumber(res,
"reltablespace");
+ int i_spclocation = PQfnumber(res, "spclocation");
+ int num_rels = 0;
+ char *nspname = NULL;
+ char *relname = NULL;
+ char *tablespace = NULL;
+ char *last_namespace = NULL;
+ char *last_tablespace = NULL;
- for (relnum = 0; relnum < ntups; relnum++)
+ for (int relnum = 0; relnum < ntups; relnum++)
{
RelInfo *curr = &relinfos[num_rels++];
@@ -618,9 +612,6 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
/* A zero reltablespace oid indicates the database
tablespace. */
curr->tablespace = dbinfo->db_tablespace;
}
- PQclear(res);
-
- PQfinish(conn);
dbinfo->rel_arr.rels = relinfos;
dbinfo->rel_arr.nrels = num_rels;
@@ -642,20 +633,9 @@ get_rel_infos(ClusterInfo *cluster, DbInfo *dbinfo)
* check_old_cluster_for_valid_slots() would raise a FATAL error if such slots
* are included.
*/
-static void
-get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
+static char *
+get_old_cluster_logical_slot_infos_query(DbInfo *dbinfo, void *arg)
{
- PGconn *conn;
- PGresult *res;
- LogicalSlotInfo *slotinfos = NULL;
- int num_slots;
-
- /* Logical slots can be migrated since PG17. */
- if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
- return;
-
- conn = connectToServer(&old_cluster, dbinfo->db_name);
-
/*
* Fetch the logical replication slot information. The check whether the
* slot is considered caught up is done by an upgrade function. This
@@ -673,18 +653,23 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
* started and stopped several times causing any temporary slots to be
* removed.
*/
- res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase,
failover, "
- "%s as caught_up,
invalidation_reason IS NOT NULL as invalid "
- "FROM
pg_catalog.pg_replication_slots "
- "WHERE slot_type =
'logical' AND "
- "database =
current_database() AND "
- "temporary IS FALSE;",
- user_opts.live_check ?
"FALSE" :
- "(CASE WHEN
invalidation_reason IS NOT NULL THEN FALSE "
- "ELSE (SELECT
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
- "END)");
-
- num_slots = PQntuples(res);
+ return psprintf("SELECT slot_name, plugin, two_phase, failover, "
+ "%s as caught_up, invalidation_reason
IS NOT NULL as invalid "
+ "FROM pg_catalog.pg_replication_slots "
+ "WHERE slot_type = 'logical' AND "
+ "database = current_database() AND "
+ "temporary IS FALSE;",
+ user_opts.live_check ? "FALSE" :
+ "(CASE WHEN invalidation_reason IS NOT
NULL THEN FALSE "
+ "ELSE (SELECT
pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) "
+ "END)");
+}
+
+static void
+get_old_cluster_logical_slot_infos_result(DbInfo *dbinfo, PGresult *res, void
*arg)
+{
+ LogicalSlotInfo *slotinfos = NULL;
+ int num_slots = PQntuples(res);
if (num_slots)
{
@@ -717,14 +702,10 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo)
}
}
- PQclear(res);
- PQfinish(conn);
-
dbinfo->slot_arr.slots = slotinfos;
dbinfo->slot_arr.nslots = num_slots;
}
-
/*
* count_old_cluster_logical_slots()
*
@@ -754,24 +735,18 @@ count_old_cluster_logical_slots(void)
* This is because before that the logical slots are not upgraded, so we will
* not be able to upgrade the logical replication clusters completely.
*/
-static void
-get_db_subscription_count(DbInfo *dbinfo)
+static char *
+get_db_subscription_count_query(DbInfo *dbinfo, void *arg)
{
- PGconn *conn;
- PGresult *res;
-
- /* Subscriptions can be migrated since PG17. */
- if (GET_MAJOR_VERSION(old_cluster.major_version) < 1700)
- return;
+ return psprintf("SELECT count(*) "
+ "FROM pg_catalog.pg_subscription WHERE
subdbid = %u",
+ dbinfo->db_oid);
+}
- conn = connectToServer(&old_cluster, dbinfo->db_name);
- res = executeQueryOrDie(conn, "SELECT count(*) "
- "FROM
pg_catalog.pg_subscription WHERE subdbid = %u",
- dbinfo->db_oid);
+static void
+get_db_subscription_count_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
dbinfo->nsubs = atoi(PQgetvalue(res, 0, 0));
-
- PQclear(res);
- PQfinish(conn);
}
/*
--
2.39.3 (Apple Git-146)
>From 09e7e7baa8c277a3afbed1e2f8d05bfa7fcc586c Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Fri, 28 Jun 2024 21:24:35 -0500
Subject: [PATCH v2 5/6] use new pg_upgrade async API to parallelize getting
loadable libraries
---
src/bin/pg_upgrade/function.c | 63 ++++++++++++++++++++---------------
1 file changed, 37 insertions(+), 26 deletions(-)
diff --git a/src/bin/pg_upgrade/function.c b/src/bin/pg_upgrade/function.c
index 7e3abed098..c11fce0696 100644
--- a/src/bin/pg_upgrade/function.c
+++ b/src/bin/pg_upgrade/function.c
@@ -42,6 +42,32 @@ library_name_compare(const void *p1, const void *p2)
((const LibraryInfo *) p2)->dbnum);
}
+struct loadable_libraries_state
+{
+ PGresult **ress;
+ int totaltups;
+};
+
+static char *
+get_loadable_libraries_query(DbInfo *dbinfo, void *arg)
+{
+ return psprintf("SELECT DISTINCT probin "
+ "FROM pg_catalog.pg_proc "
+ "WHERE prolang = %u AND "
+ "probin IS NOT NULL AND "
+ "oid >= %u;",
+ ClanguageId,
+ FirstNormalObjectId);
+}
+
+static void
+get_loadable_libraries_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+ struct loadable_libraries_state *state = (struct
loadable_libraries_state *) arg;
+
+ state->ress[dbinfo - old_cluster.dbarr.dbs] = res;
+ state->totaltups += PQntuples(res);
+}
/*
* get_loadable_libraries()
@@ -54,47 +80,32 @@ library_name_compare(const void *p1, const void *p2)
void
get_loadable_libraries(void)
{
- PGresult **ress;
int totaltups;
int dbnum;
int n_libinfos;
+ AsyncTask *task = async_task_create();
+ struct loadable_libraries_state state;
- ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs * sizeof(PGresult
*));
- totaltups = 0;
+ state.ress = (PGresult **) pg_malloc(old_cluster.dbarr.ndbs *
sizeof(PGresult *));
+ state.totaltups = 0;
- /* Fetch all library names, removing duplicates within each DB */
- for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
- {
- DbInfo *active_db = &old_cluster.dbarr.dbs[dbnum];
- PGconn *conn = connectToServer(&old_cluster,
active_db->db_name);
+ async_task_add_step(task, get_loadable_libraries_query,
+ get_loadable_libraries_result,
false, &state);
- /*
- * Fetch all libraries containing non-built-in C functions in
this DB.
- */
- ress[dbnum] = executeQueryOrDie(conn,
-
"SELECT DISTINCT probin "
-
"FROM pg_catalog.pg_proc "
-
"WHERE prolang = %u AND "
-
"probin IS NOT NULL AND "
-
"oid >= %u;",
-
ClanguageId,
-
FirstNormalObjectId);
- totaltups += PQntuples(ress[dbnum]);
-
- PQfinish(conn);
- }
+ async_task_run(task, &old_cluster);
+ async_task_free(task);
/*
* Allocate memory for required libraries and logical replication output
* plugins.
*/
- n_libinfos = totaltups + count_old_cluster_logical_slots();
+ n_libinfos = state.totaltups + count_old_cluster_logical_slots();
os_info.libraries = (LibraryInfo *) pg_malloc(sizeof(LibraryInfo) *
n_libinfos);
totaltups = 0;
for (dbnum = 0; dbnum < old_cluster.dbarr.ndbs; dbnum++)
{
- PGresult *res = ress[dbnum];
+ PGresult *res = state.ress[dbnum];
int ntups;
int rowno;
LogicalSlotInfoArr *slot_arr =
&old_cluster.dbarr.dbs[dbnum].slot_arr;
@@ -129,7 +140,7 @@ get_loadable_libraries(void)
}
}
- pg_free(ress);
+ pg_free(state.ress);
os_info.num_libraries = totaltups;
}
--
2.39.3 (Apple Git-146)
>From 7a420ff039d48c54cbb4d06647f039257a807bb9 Mon Sep 17 00:00:00 2001
From: Nathan Bossart <[email protected]>
Date: Fri, 28 Jun 2024 21:31:57 -0500
Subject: [PATCH v2 6/6] use new pg_upgrade async API to parallelize reporting
extension updates
---
src/bin/pg_upgrade/version.c | 82 ++++++++++++++++++------------------
1 file changed, 41 insertions(+), 41 deletions(-)
diff --git a/src/bin/pg_upgrade/version.c b/src/bin/pg_upgrade/version.c
index 2de6dffccd..12783bb2ba 100644
--- a/src/bin/pg_upgrade/version.c
+++ b/src/bin/pg_upgrade/version.c
@@ -139,6 +139,42 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster, bool
check_mode)
check_ok();
}
+static char *
+report_extension_updates_query(DbInfo *dbinfo, void *arg)
+{
+ return pg_strdup("SELECT name "
+ "FROM pg_available_extensions "
+ "WHERE installed_version !=
default_version");
+}
+
+static void
+report_extension_updates_result(DbInfo *dbinfo, PGresult *res, void *arg)
+{
+ bool db_used = false;
+ int ntups = PQntuples(res);
+ int i_name = PQfnumber(res, "name");
+ char *output_path = "update_extensions.sql";
+ FILE **script = (FILE **) arg;
+
+ for (int rowno = 0; rowno < ntups; rowno++)
+ {
+ if (*script == NULL && (*script = fopen_priv(output_path, "w"))
== NULL)
+ pg_fatal("could not open file \"%s\": %m", output_path);
+ if (!db_used)
+ {
+ PQExpBufferData connectbuf;
+
+ initPQExpBuffer(&connectbuf);
+ appendPsqlMetaConnect(&connectbuf, dbinfo->db_name);
+ fputs(connectbuf.data, *script);
+ termPQExpBuffer(&connectbuf);
+ db_used = true;
+ }
+ fprintf(*script, "ALTER EXTENSION %s UPDATE;\n",
+ quote_identifier(PQgetvalue(res, rowno,
i_name)));
+ }
+}
+
/*
* report_extension_updates()
* Report extensions that should be updated.
@@ -146,53 +182,17 @@ old_9_6_invalidate_hash_indexes(ClusterInfo *cluster,
bool check_mode)
void
report_extension_updates(ClusterInfo *cluster)
{
- int dbnum;
FILE *script = NULL;
char *output_path = "update_extensions.sql";
+ AsyncTask *task = async_task_create();
prep_status("Checking for extension updates");
- for (dbnum = 0; dbnum < cluster->dbarr.ndbs; dbnum++)
- {
- PGresult *res;
- bool db_used = false;
- int ntups;
- int rowno;
- int i_name;
- DbInfo *active_db = &cluster->dbarr.dbs[dbnum];
- PGconn *conn = connectToServer(cluster, active_db->db_name);
-
- /* find extensions needing updates */
- res = executeQueryOrDie(conn,
- "SELECT name "
- "FROM
pg_available_extensions "
- "WHERE
installed_version != default_version"
- );
-
- ntups = PQntuples(res);
- i_name = PQfnumber(res, "name");
- for (rowno = 0; rowno < ntups; rowno++)
- {
- if (script == NULL && (script = fopen_priv(output_path,
"w")) == NULL)
- pg_fatal("could not open file \"%s\": %m",
output_path);
- if (!db_used)
- {
- PQExpBufferData connectbuf;
-
- initPQExpBuffer(&connectbuf);
- appendPsqlMetaConnect(&connectbuf,
active_db->db_name);
- fputs(connectbuf.data, script);
- termPQExpBuffer(&connectbuf);
- db_used = true;
- }
- fprintf(script, "ALTER EXTENSION %s UPDATE;\n",
- quote_identifier(PQgetvalue(res, rowno,
i_name)));
- }
+ async_task_add_step(task, report_extension_updates_query,
+
report_extension_updates_result, true, &script);
- PQclear(res);
-
- PQfinish(conn);
- }
+ async_task_run(task, cluster);
+ async_task_free(task);
if (script)
{
--
2.39.3 (Apple Git-146)