Hi,

On 07/04/2026 10:00, Heikki Linnakangas wrote:

This all makes more sense in the partitioned case. Perhaps we should parallelize only when partitioned are used, and use only one thread per partition.

Thanks for having a look. I attached v3 that parallelizes only the partitioned case, one thread per partition. Results:


patch:

pgbench -i -s 100 --partitions 10

done in 12.63 s (drop tables 0.05 s, create tables 0.01 s, client-side generate 5.98 s, vacuum 1.63 s, primary keys 4.96 s).


master:

pgbench -i -s 100 --partitions 10

done in 29.29 s (drop tables 0.00 s, create tables 0.02 s, client-side generate 16.31 s, vacuum 7.78 s, primary keys 5.18 s).

--
Thanks,
Mircea Cadariu
From dd4f3e2d7dbae6b008157f4928287056fd0a82b9 Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Wed, 8 Apr 2026 15:35:31 +0100
Subject: [PATCH] pgbench: parallelize account loading for range-partitioned
 tables

When initializing with range partitioning, spawn one worker thread per
partition to load pgbench_accounts in parallel.  Each worker opens its
own connection, truncates its partition within a transaction, and loads
its rows using COPY FREEZE, which avoids a separate freeze pass during
the subsequent vacuum step.

Non-partitioned and hash-partitioned tables are unaffected and continue
to use serial loading.
---
 src/bin/pgbench/pgbench.c                    | 120 ++++++++++++++++++-
 src/bin/pgbench/t/001_pgbench_with_server.pl |  18 +++
 2 files changed, 134 insertions(+), 4 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1dae918cc0..f537d46393 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -5143,6 +5143,106 @@ initPopulateTable(PGconn *con, const char *table, int64 
base,
        termPQExpBuffer(&sql);
 }
 
+static void
+initPopulatePartition(PGconn *con, int partno)
+{
+       int64           total_rows = (int64) naccounts * scale;
+       int64           part_size = (total_rows + partitions - 1) / partitions;
+       int64           start_row = (int64) (partno - 1) * part_size;
+       int64           end_row = (partno == partitions) ? total_rows : (int64) 
partno * part_size;
+       char            table_name[NAMEDATALEN];
+       char            truncate_stmt[256];
+       char            copy_stmt[256];
+       int                     n;
+       PGresult   *res;
+       PQExpBufferData sql;
+       int64           row;
+
+       snprintf(table_name, sizeof(table_name), "pgbench_accounts_%d", partno);
+       snprintf(truncate_stmt, sizeof(truncate_stmt), "truncate %s", 
table_name);
+
+       if (PQserverVersion(con) >= 140000)
+               n = pg_snprintf(copy_stmt, sizeof(copy_stmt),
+                                               "copy %s from stdin with 
(freeze on)", table_name);
+       else
+               n = pg_snprintf(copy_stmt, sizeof(copy_stmt),
+                                               "copy %s from stdin", 
table_name);
+
+       if (n >= sizeof(copy_stmt))
+               pg_fatal("invalid buffer size: must be at least %d characters 
long", n);
+
+       executeStatement(con, truncate_stmt);
+
+       res = PQexec(con, copy_stmt);
+       if (PQresultStatus(res) != PGRES_COPY_IN)
+               pg_fatal("could not start COPY for partition %d: %s",
+                                partno, PQerrorMessage(con));
+       PQclear(res);
+
+       initPQExpBuffer(&sql);
+
+       for (row = start_row; row < end_row; row++)
+       {
+               initAccount(&sql, row);
+               if (PQputCopyData(con, sql.data, sql.len) <= 0)
+                       pg_fatal("PQputCopyData failed for partition %d", 
partno);
+       }
+
+       if (PQputCopyEnd(con, NULL) <= 0)
+               pg_fatal("PQputCopyEnd failed for partition %d", partno);
+
+       res = PQgetResult(con);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("COPY failed for partition %d: %s", partno, 
PQerrorMessage(con));
+       PQclear(res);
+
+       termPQExpBuffer(&sql);
+}
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initPartitionWorkerThread(void *arg)
+{
+       int                     partno = *(int *) arg;
+       PGconn     *con = doConnect();
+
+       if (con == NULL)
+               pg_fatal("could not create connection for partition worker %d", 
partno);
+
+       executeStatement(con, "begin");
+       initPopulatePartition(con, partno);
+       executeStatement(con, "commit");
+
+       PQfinish(con);
+       THREAD_FUNC_RETURN;
+}
+
+static void
+initLoadAccountsParallel(void)
+{
+       THREAD_T   *threads;
+       int                *partno;
+       int                     i;
+
+       fprintf(stderr, "loading pgbench_accounts with %d threads...\n", 
partitions);
+
+       threads = pg_malloc_array(THREAD_T, partitions);
+       partno = pg_malloc_array(int, partitions);
+
+       for (i = 0; i < partitions; i++)
+       {
+               partno[i] = i + 1;
+               errno = THREAD_CREATE(&threads[i], initPartitionWorkerThread, 
&partno[i]);
+               if (errno != 0)
+                       pg_fatal("could not create thread for partition %d: 
%m", i + 1);
+       }
+
+       for (i = 0; i < partitions; i++)
+               THREAD_JOIN(threads[i]);
+
+       free(threads);
+       free(partno);
+}
+
 /*
  * Fill the standard tables with some data generated and sent from the client.
  *
@@ -5155,8 +5255,11 @@ initGenerateDataClientSide(PGconn *con)
        fprintf(stderr, "generating data (client-side)...\n");
 
        /*
-        * we do all of this in one transaction to enable the backend's
-        * data-loading optimizations
+        * For the non-partitioned and hash-partitioned cases, do everything in
+        * one transaction to enable the backend's data-loading optimizations. 
For
+        * range-partitioned tables, branches and tellers are loaded in one
+        * transaction, then accounts are loaded in parallel with one thread per
+        * partition, each in its own transaction.
         */
        executeStatement(con, "begin");
 
@@ -5169,9 +5272,18 @@ initGenerateDataClientSide(PGconn *con)
         */
        initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
        initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
-       initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
 
-       executeStatement(con, "commit");
+       if (partition_method == PART_RANGE)
+       {
+               executeStatement(con, "commit");
+               initLoadAccountsParallel();
+       }
+       else
+       {
+               /* hash partitioning and non-partitioned tables use serial 
loading */
+               initPopulateTable(con, "pgbench_accounts", naccounts, 
initAccount);
+               executeStatement(con, "commit");
+       }
 }
 
 /*
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl 
b/src/bin/pgbench/t/001_pgbench_with_server.pl
index b7685ea5d2..b59c181c2a 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -164,6 +164,24 @@ $node->pgbench(
 # Check data state, after server-side data generation.
 check_data_state($node, 'server-side');
 
+# Test parallel initialization with range partitions (client-side generation).
+# One thread per partition is spawned automatically.
+$node->pgbench(
+       '-i -s 1 --partitions=4 --partition-method=range',
+       0,
+       [qr{^$}],
+       [
+               qr{creating tables},
+               qr{creating 4 partitions},
+               qr{loading pgbench_accounts with 4 threads},
+               qr{vacuuming},
+               qr{creating primary keys},
+               qr{done in \d+\.\d\d s }
+       ],
+       'pgbench parallel initialization with range partitions');
+
+check_data_state($node, 'parallel-range-partitions');
+
 # Run all builtin scripts, for a few transactions each
 $node->pgbench(
        '--transactions=5 -Dfoo=bla --client=2 --protocol=simple --builtin=t'
-- 
2.39.5 (Apple Git-154)

Reply via email to