Hi Lakshmi and Hayato,

Thanks a lot for your feedback.

Attached for your consideration is v4, in which I address your remarks.

--
Thanks,
Mircea Cadariu
From 6099f30cf78b0ed8608670ff07b8a71b8cf0d47c Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Sun, 3 May 2026 16:42:20 +0100
Subject: [PATCH v4] pgbench: parallelize account loading for range-partitioned
 tables

In init mode with range partitioning, -j > 1 loads pgbench_accounts
in parallel.  Each worker creates its assigned partitions as
standalone tables, populates them with COPY FREEZE, and the main
connection attaches them afterwards.
---
 doc/src/sgml/ref/pgbench.sgml                |   9 +
 src/bin/pgbench/pgbench.c                    | 258 +++++++++++++++++--
 src/bin/pgbench/t/001_pgbench_with_server.pl |  29 +++
 3 files changed, 269 insertions(+), 27 deletions(-)

diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 2e401d1ceb..3594b731cc 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -382,6 +382,11 @@ pgbench <optional> <replaceable>options</replaceable> 
</optional> <replaceable>d
         the scaled number of accounts.
         Default is <literal>0</literal>, meaning no partitioning.
        </para>
+       <para>
+        With <option>-j</option> greater than 1 and
+        <option>--partition-method=range</option>, partitions are
+        loaded in parallel.
+       </para>
       </listitem>
      </varlistentry>
 
@@ -502,6 +507,10 @@ pgbench <optional> <replaceable>options</replaceable> 
</optional> <replaceable>d
         Clients are distributed as evenly as possible among available threads.
         Default is 1.
        </para>
+       <para>
+        In initialization mode (<option>-i</option>), <option>-j</option>
+        sets the number of threads used to load partitions in parallel.
+       </para>
       </listitem>
      </varlistentry>
 
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 1dae918cc0..aa21b653ce 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -4817,6 +4817,34 @@ initDropTables(PGconn *con)
                                         "pgbench_tellers");
 }
 
+static void
+appendAccountsRangeForValues(PQExpBufferData *query, int p)
+{
+       int64           part_size = (naccounts * (int64) scale + partitions - 
1) / partitions;
+
+       appendPQExpBufferStr(query, " for values from (");
+       if (p == 1)
+               appendPQExpBufferStr(query, "minvalue");
+       else
+               appendPQExpBuffer(query, INT64_FORMAT, (p - 1) * part_size + 1);
+       appendPQExpBufferStr(query, ") to (");
+       if (p < partitions)
+               appendPQExpBuffer(query, INT64_FORMAT, p * part_size + 1);
+       else
+               appendPQExpBufferStr(query, "maxvalue");
+       appendPQExpBufferChar(query, ')');
+}
+
+static void
+getAccountsPartitionRows(int p, int64 *start_row, int64 *end_row)
+{
+       int64           total_rows = (int64) naccounts * scale;
+       int64           part_size = (total_rows + partitions - 1) / partitions;
+
+       *start_row = (int64) (p - 1) * part_size;
+       *end_row = (p == partitions) ? total_rows : (int64) p * part_size;
+}
+
 /*
  * Create "pgbench_accounts" partitions if needed.
  *
@@ -4839,33 +4867,17 @@ createPartitions(PGconn *con)
        {
                if (partition_method == PART_RANGE)
                {
-                       int64           part_size = (naccounts * (int64) scale 
+ partitions - 1) / partitions;
-
-                       printfPQExpBuffer(&query,
-                                                         "create%s table 
pgbench_accounts_%d\n"
-                                                         "  partition of 
pgbench_accounts\n"
-                                                         "  for values from (",
-                                                         unlogged_tables ? " 
unlogged" : "", p);
-
                        /*
                         * For RANGE, we use open-ended partitions at the 
beginning and
                         * end to allow any valid value for the primary key.  
Although the
                         * actual minimum and maximum values can be derived 
from the
                         * scale, it is more generic and the performance is 
better.
                         */
-                       if (p == 1)
-                               appendPQExpBufferStr(&query, "minvalue");
-                       else
-                               appendPQExpBuffer(&query, INT64_FORMAT, (p - 1) 
* part_size + 1);
-
-                       appendPQExpBufferStr(&query, ") to (");
-
-                       if (p < partitions)
-                               appendPQExpBuffer(&query, INT64_FORMAT, p * 
part_size + 1);
-                       else
-                               appendPQExpBufferStr(&query, "maxvalue");
-
-                       appendPQExpBufferChar(&query, ')');
+                       printfPQExpBuffer(&query,
+                                                         "create%s table 
pgbench_accounts_%d\n"
+                                                         "  partition of 
pgbench_accounts",
+                                                         unlogged_tables ? " 
unlogged" : "", p);
+                       appendAccountsRangeForValues(&query, p);
                }
                else if (partition_method == PART_HASH)
                        printfPQExpBuffer(&query,
@@ -4889,6 +4901,62 @@ createPartitions(PGconn *con)
        termPQExpBuffer(&query);
 }
 
+static void
+createStandalonePartitions(PGconn *con, int part_start, int part_end)
+{
+       PQExpBufferData query;
+       const char *aid_type = (scale >= SCALE_32BIT_THRESHOLD) ? "bigint" : 
"int";
+
+       Assert(partitions > 0);
+       Assert(partition_method == PART_RANGE);
+
+       initPQExpBuffer(&query);
+
+       for (int p = part_start; p <= part_end; p++)
+       {
+               printfPQExpBuffer(&query,
+                                                 "create%s table 
pgbench_accounts_%d\n"
+                                                 "  (aid %s not null,\n"
+                                                 "   bid int,\n"
+                                                 "   abalance int,\n"
+                                                 "   filler character(84))\n"
+                                                 "  with (fillfactor=%d)",
+                                                 unlogged_tables ? " unlogged" 
: "", p,
+                                                 aid_type, fillfactor);
+
+               executeStatement(con, query.data);
+       }
+
+       termPQExpBuffer(&query);
+}
+
+static void
+attachStandalonePartitions(PGconn *con)
+{
+       PQExpBufferData query;
+
+       Assert(partitions > 0);
+       Assert(partition_method == PART_RANGE);
+
+       initPQExpBuffer(&query);
+
+       executeStatement(con, "begin");
+
+       for (int p = 1; p <= partitions; p++)
+       {
+               printfPQExpBuffer(&query,
+                                                 "alter table 
pgbench_accounts\n"
+                                                 "  attach partition 
pgbench_accounts_%d",
+                                                 p);
+               appendAccountsRangeForValues(&query, p);
+               executeStatement(con, query.data);
+       }
+
+       executeStatement(con, "commit");
+
+       termPQExpBuffer(&query);
+}
+
 /*
  * Create pgbench's standard tables
  */
@@ -4981,7 +5049,17 @@ initCreateTables(PGconn *con)
        termPQExpBuffer(&query);
 
        if (partition_method != PART_NONE)
+       {
+               /*
+                * In the parallel range-partitioned case, partitions are 
created by
+                * the worker threads (so each one can use COPY FREEZE in its 
own
+                * transaction) and attached afterwards.
+                */
+               if (partition_method == PART_RANGE && nthreads > 1)
+                       return;
+
                createPartitions(con);
+       }
 }
 
 /*
@@ -5143,6 +5221,121 @@ initPopulateTable(PGconn *con, const char *table, int64 
base,
        termPQExpBuffer(&sql);
 }
 
+static void
+initPopulatePartition(PGconn *con, int partno)
+{
+       int64           start_row;
+       int64           end_row;
+       char            copy_stmt[256];
+       PGresult   *res;
+       PQExpBufferData sql;
+       int64           row;
+
+       getAccountsPartitionRows(partno, &start_row, &end_row);
+
+       snprintf(copy_stmt, sizeof(copy_stmt),
+                        PQserverVersion(con) >= 140000 ?
+                        "copy pgbench_accounts_%d from stdin with (freeze on)" 
:
+                        "copy pgbench_accounts_%d from stdin",
+                        partno);
+
+       res = PQexec(con, copy_stmt);
+       if (PQresultStatus(res) != PGRES_COPY_IN)
+               pg_fatal("could not start COPY for partition %d: %s",
+                                partno, PQerrorMessage(con));
+       PQclear(res);
+
+       initPQExpBuffer(&sql);
+
+       for (row = start_row; row < end_row; row++)
+       {
+               initAccount(&sql, row);
+               if (PQputCopyData(con, sql.data, sql.len) <= 0)
+                       pg_fatal("PQputCopyData failed for partition %d", 
partno);
+       }
+
+       if (PQputCopyEnd(con, NULL) <= 0)
+               pg_fatal("PQputCopyEnd failed for partition %d", partno);
+
+       res = PQgetResult(con);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("COPY failed for partition %d: %s", partno, 
PQerrorMessage(con));
+       PQclear(res);
+
+       termPQExpBuffer(&sql);
+}
+
+typedef struct PartitionWorkerData
+{
+       int                     thread_id;
+       int                     part_start;
+       int                     part_end;
+}                      PartitionWorkerData;
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initPartitionWorkerThread(void *arg)
+{
+       PartitionWorkerData *data = (PartitionWorkerData *) arg;
+       PGconn     *con = doConnect();
+       int                     p;
+
+       if (con == NULL)
+               pg_fatal("could not create connection for partition worker 
(parts %d-%d)",
+                                data->part_start, data->part_end);
+
+       executeStatement(con, "begin");
+       createStandalonePartitions(con, data->part_start, data->part_end);
+       for (p = data->part_start; p <= data->part_end; p++)
+       {
+               pg_time_usec_t start = pg_time_now();
+
+               initPopulatePartition(con, p);
+               fprintf(stderr, "partition %d loaded by thread %d (in %.2f 
s)\n",
+                               p, data->thread_id,
+                               PG_TIME_GET_DOUBLE(pg_time_now() - start));
+       }
+       executeStatement(con, "commit");
+
+       PQfinish(con);
+       THREAD_FUNC_RETURN;
+}
+
+static void
+initLoadAccountsParallel(void)
+{
+       THREAD_T   *threads;
+       PartitionWorkerData *data;
+       int                     parts_per_worker = partitions / nthreads;
+       int                     extra_parts = partitions % nthreads;
+       int                     next_part = 1;
+       int                     i;
+
+       fprintf(stderr, "creating %d partitions...\n", partitions);
+       fprintf(stderr, "loading pgbench_accounts with %d threads...\n", 
nthreads);
+
+       threads = pg_malloc_array(THREAD_T, nthreads);
+       data = pg_malloc_array(PartitionWorkerData, nthreads);
+
+       for (i = 0; i < nthreads; i++)
+       {
+               data[i].thread_id = i;
+               data[i].part_start = next_part;
+               data[i].part_end = next_part + parts_per_worker - 1 +
+                       (i < extra_parts ? 1 : 0);
+               next_part = data[i].part_end + 1;
+
+               errno = THREAD_CREATE(&threads[i], initPartitionWorkerThread, 
&data[i]);
+               if (errno != 0)
+                       pg_fatal("could not create thread for worker %d: %m", 
i);
+       }
+
+       for (i = 0; i < nthreads; i++)
+               THREAD_JOIN(threads[i]);
+
+       free(threads);
+       free(data);
+}
+
 /*
  * Fill the standard tables with some data generated and sent from the client.
  *
@@ -5155,8 +5348,11 @@ initGenerateDataClientSide(PGconn *con)
        fprintf(stderr, "generating data (client-side)...\n");
 
        /*
-        * we do all of this in one transaction to enable the backend's
-        * data-loading optimizations
+        * For serial loading, do everything in one transaction to enable the
+        * backend's data-loading optimizations.  For parallel loading
+        * (range-partitioned, -j > 1), load branches and tellers in one
+        * transaction, then load accounts in parallel with each worker in its 
own
+        * transaction.
         */
        executeStatement(con, "begin");
 
@@ -5169,9 +5365,18 @@ initGenerateDataClientSide(PGconn *con)
         */
        initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
        initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
-       initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
 
-       executeStatement(con, "commit");
+       if (partition_method == PART_RANGE && nthreads > 1)
+       {
+               executeStatement(con, "commit");
+               initLoadAccountsParallel();
+               attachStandalonePartitions(con);
+       }
+       else
+       {
+               initPopulateTable(con, "pgbench_accounts", naccounts, 
initAccount);
+               executeStatement(con, "commit");
+       }
 }
 
 /*
@@ -6944,7 +7149,6 @@ main(int argc, char **argv)
                                initialization_option_set = true;
                                break;
                        case 'j':                       /* jobs */
-                               benchmarking_option_set = true;
                                if (!option_parse_int(optarg, "-j/--jobs", 1, 
INT_MAX,
                                                                          
&nthreads))
                                {
@@ -7176,7 +7380,7 @@ main(int argc, char **argv)
         * optimization; throttle_delay is calculated incorrectly below if some
         * threads have no clients assigned to them.)
         */
-       if (nthreads > nclients)
+       if (nthreads > nclients && !is_init_mode)
                nthreads = nclients;
 
        /*
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl 
b/src/bin/pgbench/t/001_pgbench_with_server.pl
index b7685ea5d2..29ee28d616 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -164,6 +164,35 @@ $node->pgbench(
 # Check data state, after server-side data generation.
 check_data_state($node, 'server-side');
 
+# Test parallel initialization with range partitions (client-side generation).
+# Use -j to control the number of worker threads; partitions must be >= -j.
+$node->pgbench(
+       '-i -j 2 -s 1 --partitions=4 --partition-method=range',
+       0,
+       [qr{^$}],
+       [
+               qr{creating tables},
+               qr{creating 4 partitions},
+               qr{loading pgbench_accounts with 2 threads},
+               qr{partition \d loaded by thread \d \(in \d+\.\d\d s\)},
+               qr{vacuuming},
+               qr{creating primary keys},
+               qr{done in \d+\.\d\d s }
+       ],
+       'pgbench parallel initialization with range partitions');
+
+check_data_state($node, 'parallel-range-partitions');
+
+# Uneven distribution: 5 partitions across 2 threads (3 + 2).
+$node->pgbench(
+       '-i -j 2 -s 1 --partitions=5 --partition-method=range',
+       0,
+       [qr{^$}],
+       [ qr{loading pgbench_accounts with 2 threads}, qr{done in \d+\.\d\d s } 
],
+       'pgbench parallel init with uneven partition distribution');
+
+check_data_state($node, 'parallel-range-uneven');
+
 # Run all builtin scripts, for a few transactions each
 $node->pgbench(
        '--transactions=5 -Dfoo=bla --client=2 --protocol=simple --builtin=t'
-- 
2.39.5 (Apple Git-154)

Reply via email to