Hi,
On 07/04/2026 10:00, Heikki Linnakangas wrote:
This all makes more sense in the partitioned case. Perhaps we should
parallelize only when partitioned are used, and use only one thread
per partition.
Thanks for having a look. I attached v3 that parallelizes only the
partitioned case, one thread per partition. Results:
patch:
pgbench -i -s 100 --partitions 10
done in 12.63 s (drop tables 0.05 s, create tables 0.01 s, client-side
generate 5.98 s, vacuum 1.63 s, primary keys 4.96 s).
master:
pgbench -i -s 100 --partitions 10
done in 29.29 s (drop tables 0.00 s, create tables 0.02 s, client-side
generate 16.31 s, vacuum 7.78 s, primary keys 5.18 s).
--
Thanks,
Mircea Cadariu
From dd4f3e2d7dbae6b008157f4928287056fd0a82b9 Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Wed, 8 Apr 2026 15:35:31 +0100
Subject: [PATCH] pgbench: parallelize account loading for range-partitioned
tables
When initializing with range partitioning, spawn one worker thread per
partition to load pgbench_accounts in parallel. Each worker opens its
own connection, truncates its partition within a transaction, and loads
its rows using COPY FREEZE, which avoids a separate freeze pass during
the subsequent vacuum step.
Non-partitioned and hash-partitioned tables are unaffected and continue
to use serial loading.
---
src/bin/pgbench/pgbench.c | 120 ++++++++++++++++++-
src/bin/pgbench/t/001_pgbench_with_server.pl | 18 +++
2 files changed, 134 insertions(+), 4 deletions(-)
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1dae918cc0..f537d46393 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -5143,6 +5143,106 @@ initPopulateTable(PGconn *con, const char *table, int64
base,
termPQExpBuffer(&sql);
}
+static void
+initPopulatePartition(PGconn *con, int partno)
+{
+ int64 total_rows = (int64) naccounts * scale;
+ int64 part_size = (total_rows + partitions - 1) / partitions;
+ int64 start_row = (int64) (partno - 1) * part_size;
+ int64 end_row = (partno == partitions) ? total_rows : (int64)
partno * part_size;
+ char table_name[NAMEDATALEN];
+ char truncate_stmt[256];
+ char copy_stmt[256];
+ int n;
+ PGresult *res;
+ PQExpBufferData sql;
+ int64 row;
+
+ snprintf(table_name, sizeof(table_name), "pgbench_accounts_%d", partno);
+ snprintf(truncate_stmt, sizeof(truncate_stmt), "truncate %s",
table_name);
+
+ if (PQserverVersion(con) >= 140000)
+ n = pg_snprintf(copy_stmt, sizeof(copy_stmt),
+ "copy %s from stdin with
(freeze on)", table_name);
+ else
+ n = pg_snprintf(copy_stmt, sizeof(copy_stmt),
+ "copy %s from stdin",
table_name);
+
+ if (n >= sizeof(copy_stmt))
+ pg_fatal("invalid buffer size: must be at least %d characters
long", n);
+
+ executeStatement(con, truncate_stmt);
+
+ res = PQexec(con, copy_stmt);
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pg_fatal("could not start COPY for partition %d: %s",
+ partno, PQerrorMessage(con));
+ PQclear(res);
+
+ initPQExpBuffer(&sql);
+
+ for (row = start_row; row < end_row; row++)
+ {
+ initAccount(&sql, row);
+ if (PQputCopyData(con, sql.data, sql.len) <= 0)
+ pg_fatal("PQputCopyData failed for partition %d",
partno);
+ }
+
+ if (PQputCopyEnd(con, NULL) <= 0)
+ pg_fatal("PQputCopyEnd failed for partition %d", partno);
+
+ res = PQgetResult(con);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_fatal("COPY failed for partition %d: %s", partno,
PQerrorMessage(con));
+ PQclear(res);
+
+ termPQExpBuffer(&sql);
+}
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initPartitionWorkerThread(void *arg)
+{
+ int partno = *(int *) arg;
+ PGconn *con = doConnect();
+
+ if (con == NULL)
+ pg_fatal("could not create connection for partition worker %d",
partno);
+
+ executeStatement(con, "begin");
+ initPopulatePartition(con, partno);
+ executeStatement(con, "commit");
+
+ PQfinish(con);
+ THREAD_FUNC_RETURN;
+}
+
+static void
+initLoadAccountsParallel(void)
+{
+ THREAD_T *threads;
+ int *partno;
+ int i;
+
+ fprintf(stderr, "loading pgbench_accounts with %d threads...\n",
partitions);
+
+ threads = pg_malloc_array(THREAD_T, partitions);
+ partno = pg_malloc_array(int, partitions);
+
+ for (i = 0; i < partitions; i++)
+ {
+ partno[i] = i + 1;
+ errno = THREAD_CREATE(&threads[i], initPartitionWorkerThread,
&partno[i]);
+ if (errno != 0)
+ pg_fatal("could not create thread for partition %d:
%m", i + 1);
+ }
+
+ for (i = 0; i < partitions; i++)
+ THREAD_JOIN(threads[i]);
+
+ free(threads);
+ free(partno);
+}
+
/*
* Fill the standard tables with some data generated and sent from the client.
*
@@ -5155,8 +5255,11 @@ initGenerateDataClientSide(PGconn *con)
fprintf(stderr, "generating data (client-side)...\n");
/*
- * we do all of this in one transaction to enable the backend's
- * data-loading optimizations
+ * For the non-partitioned and hash-partitioned cases, do everything in
+ * one transaction to enable the backend's data-loading optimizations.
For
+ * range-partitioned tables, branches and tellers are loaded in one
+ * transaction, then accounts are loaded in parallel with one thread per
+ * partition, each in its own transaction.
*/
executeStatement(con, "begin");
@@ -5169,9 +5272,18 @@ initGenerateDataClientSide(PGconn *con)
*/
initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
- initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
- executeStatement(con, "commit");
+ if (partition_method == PART_RANGE)
+ {
+ executeStatement(con, "commit");
+ initLoadAccountsParallel();
+ }
+ else
+ {
+ /* hash partitioning and non-partitioned tables use serial
loading */
+ initPopulateTable(con, "pgbench_accounts", naccounts,
initAccount);
+ executeStatement(con, "commit");
+ }
}
/*
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl
b/src/bin/pgbench/t/001_pgbench_with_server.pl
index b7685ea5d2..b59c181c2a 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -164,6 +164,24 @@ $node->pgbench(
# Check data state, after server-side data generation.
check_data_state($node, 'server-side');
+# Test parallel initialization with range partitions (client-side generation).
+# One thread per partition is spawned automatically.
+$node->pgbench(
+ '-i -s 1 --partitions=4 --partition-method=range',
+ 0,
+ [qr{^$}],
+ [
+ qr{creating tables},
+ qr{creating 4 partitions},
+ qr{loading pgbench_accounts with 4 threads},
+ qr{vacuuming},
+ qr{creating primary keys},
+ qr{done in \d+\.\d\d s }
+ ],
+ 'pgbench parallel initialization with range partitions');
+
+check_data_state($node, 'parallel-range-partitions');
+
# Run all builtin scripts, for a few transactions each
$node->pgbench(
'--transactions=5 -Dfoo=bla --client=2 --protocol=simple --builtin=t'
--
2.39.5 (Apple Git-154)