Hi Lakshmi,

On 19/01/2026 09:25, lakshmi wrote:

Hi Mircea,

I tested the patch on 19devel and it worked well for me.
Before applying it, |-j| is rejected in pgbench initialization mode as expected. After applying the patch, |pgbench -i -s 100 -j 10| runs successfully and shows a clear speedup. On my system the total runtime dropped to about 9.6s, with client-side data generation around 3.3s. I also checked correctness after the run — row counts for pgbench_accounts, pgbench_branches, and pgbench_tellers all match the expected values.

Thanks for working on this, the improvement is very noticeable.

Best regards,
lakshmi

Thanks for having a look and trying it out!

FYI this is one of Tomas Vondra's patch ideas from his blog [1].

I have attached a new version which now includes docs, tests, a proposed commit message, and an attempt to fix the current CI failures (Windows).

[1] - https://vondra.me/posts/patch-idea-parallel-pgbench-i

--
Thanks,
Mircea Cadariu
From 01caabbe159af6b6efd607e08ad5d7fbc77e513e Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Sun, 9 Nov 2025 10:41:51 +0000
Subject: [PATCH v1] Add parallel data loading support to pgbench

pgbench initialization (pgbench -i) now uses the -j option to
load the pgbench_accounts table in parallel using multiple threads, each
with its own connection.

For range-partitioned tables, each worker creates and loads its assigned
partitions as standalone tables, then the main thread attaches them to
the parent after all data is loaded. This avoids AccessExclusiveLock
contention on the parent table during parallel loading, and allows each
worker to use COPY FREEZE.

For non-partitioned tables, worker 0 reuses the main connection (which
did the TRUNCATE) and can use COPY FREEZE, while additional workers use
separate connections with regular COPY.

Implementation details:
- COPY data is batched into 1MB buffers before sending to reduce overhead
- Hash partitioning does not support parallel loading
- Number of threads must not exceed the number of partitions

Author: Mircea Cadariu <[email protected]>
Reviewed-by: Lakshmi G <[email protected]>
---
 doc/src/sgml/ref/pgbench.sgml                |  11 +
 src/bin/pgbench/pgbench.c                    | 475 +++++++++++++++++--
 src/bin/pgbench/t/001_pgbench_with_server.pl |  45 ++
 src/bin/pgbench/t/002_pgbench_no_server.pl   |   5 +
 4 files changed, 501 insertions(+), 35 deletions(-)

diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 2e401d1ceb..58b3ccd379 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -502,6 +502,17 @@ pgbench <optional> <replaceable>options</replaceable> 
</optional> <replaceable>d
         Clients are distributed as evenly as possible among available threads.
         Default is 1.
        </para>
+       <para>
+        In initialization mode (<option>-i</option>), this option controls
+        the number of threads used to load data into
+        <literal>pgbench_accounts</literal> in parallel.
+        With <option>--partitions</option> using
+        <literal>range</literal> partitioning, each thread loads one or more
+        complete partitions independently.
+        The number of threads must not exceed the number of partitions.
+        Parallel data loading is not currently supported with
+        <literal>hash</literal> partitioning.
+       </para>
       </listitem>
      </varlistentry>
 
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 58735871c1..b94da2567d 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -78,6 +78,8 @@
 #define ERRCODE_T_R_DEADLOCK_DETECTED  "40P01"
 #define ERRCODE_UNDEFINED_TABLE  "42P01"
 
+#define COPY_BATCH_SIZE        (1024 * 1024)
+
 /*
  * Hashing constants
  */
@@ -825,7 +827,6 @@ static const BuiltinScript builtin_script[] =
        }
 };
 
-
 /* Function prototypes */
 static void setNullValue(PgBenchValue *pv);
 static void setBoolValue(PgBenchValue *pv, bool bval);
@@ -848,6 +849,8 @@ static void clear_socket_set(socket_set *sa);
 static void add_socket_to_set(socket_set *sa, int fd, int idx);
 static int     wait_on_socket_set(socket_set *sa, int64 usecs);
 static bool socket_has_input(socket_set *sa, int fd, int idx);
+static void createPartitions(PGconn *con, int part_start, int part_end);
+static void attachPartitions(PGconn *con);
 
 /* callback used to build rows for COPY during data loading */
 typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
@@ -857,6 +860,20 @@ static const PsqlScanCallbacks pgbench_callbacks = {
        NULL,                                           /* don't need 
get_variable functionality */
 };
 
+/* Worker thread data for parallel table loading */
+typedef struct WorkerTask
+{
+       PGconn     *con;
+       const char *table;
+       int64           start_row;
+       int64           end_row;
+       initRowMethod append_row;
+       int                     worker_id;
+       int                     part_start;
+       int                     part_end;
+       int64           part_size;
+}                      WorkerTask;
+
 static char
 get_table_relkind(PGconn *con, const char *table)
 {
@@ -1586,6 +1603,301 @@ doConnect(void)
        return conn;
 }
 
+/*
+ * Truncate specified table(s)
+ * tableName can be a single table or comma-separated list of tables
+ */
+static void
+truncateTable(PGconn *con, const char *tableName)
+{
+       PQExpBufferData query;
+
+       initPQExpBuffer(&query);
+       printfPQExpBuffer(&query, "TRUNCATE TABLE %s", tableName);
+       executeStatement(con, query.data);
+       termPQExpBuffer(&query);
+}
+
+/*
+ * Parameters needed for COPY operations.
+ */
+typedef struct CopyTarget
+{
+       const char *table_name;
+       int64           start_row;
+       int64           end_row;
+       bool            use_freeze;
+}                      CopyTarget;
+
+/*
+ * Perform COPY operation for a single table or partition.
+ * Batches rows into larger buffers before sending to reduce overhead.
+ */
+static void
+performCopy(PGconn *conn, WorkerTask * wtd, CopyTarget * target)
+{
+       PGresult   *res;
+       char            copy_statement[NAMEDATALEN + 32];
+       int64           row;
+       PQExpBufferData batch_buffer;
+
+       /* Build the COPY command */
+       if (target->use_freeze)
+               snprintf(copy_statement, sizeof(copy_statement),
+                                "COPY %s FROM STDIN (FREEZE ON)", 
target->table_name);
+       else
+               snprintf(copy_statement, sizeof(copy_statement),
+                                "COPY %s FROM STDIN", target->table_name);
+
+       /* Initiate COPY mode */
+       res = PQexec(conn, copy_statement);
+       if (PQresultStatus(res) != PGRES_COPY_IN)
+               pg_fatal("COPY command failed for table \"%s\": %s",
+                                target->table_name, PQerrorMessage(conn));
+       PQclear(res);
+
+       /* Pre-allocate buffer to avoid repeated reallocs */
+       initPQExpBuffer(&batch_buffer);
+       enlargePQExpBuffer(&batch_buffer, COPY_BATCH_SIZE);
+
+       /* Generate and send rows in batches using append_row */
+       for (row = target->start_row; row < target->end_row; row++)
+       {
+               /* Use append_row to accumulate multiple rows in the buffer */
+               wtd->append_row(&batch_buffer, row);
+
+               /* Send batch when buffer reaches size threshold */
+               if (batch_buffer.len >= COPY_BATCH_SIZE)
+               {
+                       if (PQputCopyData(conn, batch_buffer.data, 
batch_buffer.len) <= 0)
+                               pg_fatal("error in PQputCopyData: %s", 
PQerrorMessage(conn));
+
+                       resetPQExpBuffer(&batch_buffer);
+               }
+       }
+
+       /* Send any remaining buffered data */
+       if (batch_buffer.len > 0)
+       {
+               if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <= 
0)
+                       pg_fatal("error in PQputCopyData: %s", 
PQerrorMessage(conn));
+       }
+
+       /* Finalize the COPY operation */
+       if (PQputCopyEnd(conn, NULL) <= 0)
+               pg_fatal("error in PQputCopyEnd: %s", PQerrorMessage(conn));
+
+       res = PQgetResult(conn);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("COPY failed for table \"%s\": %s",
+                                target->table_name, PQerrorMessage(conn));
+       PQclear(res);
+
+       termPQExpBuffer(&batch_buffer);
+}
+
+
+static void
+assignWorkerRows(WorkerTask * wtd, int num_workers, int64 total_rows)
+{
+       int64           rows_per_worker = total_rows / num_workers;
+
+       wtd->start_row = wtd->worker_id * rows_per_worker;
+       wtd->end_row = (wtd->worker_id == num_workers - 1) ?
+               total_rows :
+               (wtd->worker_id + 1) * rows_per_worker;
+}
+
+/*
+ * Covers only multiple partitions per worker (workers <= partitions) for now.
+ * Each worker loads complete partitions independently and can use COPY FREEZE.
+ */
+static void
+assignWorkerPartitions(WorkerTask * wtd, int num_workers, int64 total_rows,
+                                          int num_parts)
+{
+       int                     parts_per_worker = num_parts / num_workers;
+       int                     extra_parts = num_parts % num_workers;
+
+       wtd->part_start = wtd->worker_id * parts_per_worker + 1 +
+               (wtd->worker_id < extra_parts ? wtd->worker_id : extra_parts);
+       wtd->part_end = wtd->part_start + parts_per_worker - 1 +
+               (wtd->worker_id < extra_parts ? 1 : 0);
+
+       wtd->start_row = (wtd->part_start - 1) * wtd->part_size;
+       wtd->end_row = (wtd->part_end == num_parts) ?
+               total_rows :
+               wtd->part_end * wtd->part_size;
+}
+
+
+/* Load data into partitioned table */
+static void
+loadPartitionedTable(PGconn *conn, WorkerTask * wtd)
+{
+       int                     p;
+
+       for (p = wtd->part_start; p <= wtd->part_end; p++)
+       {
+               CopyTarget      target;
+               int64           part_start_row = (p - 1) * wtd->part_size;
+               int64           part_end_row = (p == partitions) ? (naccounts * 
(int64) scale) : (p * wtd->part_size);
+               char            partition_table[NAMEDATALEN];
+
+               snprintf(partition_table, sizeof(partition_table), 
"pgbench_accounts_%d", p);
+
+               target.table_name = partition_table;
+               target.start_row = part_start_row;
+               target.end_row = part_end_row;
+               target.use_freeze = true;
+
+               performCopy(conn, wtd, &target);
+       }
+}
+
+/*
+ * Load data into non-partitioned table.
+ *
+ * Only worker 0 can use COPY FREEZE, because it inherits the transaction
+ * that truncated the table.  Other workers use plain COPY in their own
+ * transactions.
+ */
+static void
+loadRegularTable(PGconn *conn, WorkerTask * wtd)
+{
+       CopyTarget      target;
+
+       target.table_name = wtd->table;
+       target.start_row = wtd->start_row;
+       target.end_row = wtd->end_row;
+       target.use_freeze = (wtd->worker_id == 0);
+
+       performCopy(conn, wtd, &target);
+}
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initWorkerThread(void *arg)
+{
+       WorkerTask *wtd = (WorkerTask *) arg;
+       PGconn     *conn;
+
+       /* Connection is pre-created, just use it */
+       conn = wtd->con;
+
+       /*
+        * Start a new transaction for this worker, except for worker 0 on
+        * non-partitioned tables. Worker 0 continues the transaction from the
+        * main thread that already did the truncate (to enable COPY FREEZE).
+        */
+       if (wtd->part_start > 0 || wtd->worker_id > 0)
+               executeStatement(conn, "begin");
+
+       if (wtd->part_start > 0)
+       {
+               createPartitions(conn, wtd->part_start, wtd->part_end);
+               loadPartitionedTable(conn, wtd);
+       }
+       else
+               loadRegularTable(conn, wtd);
+
+       executeStatement(conn, "commit");
+
+       THREAD_FUNC_RETURN;
+}
+
+static void
+initPopulateTableParallel(PGconn *connection, int num_workers,
+                                                 const char *table, int64 
total_rows,
+                                                 initRowMethod append_row)
+{
+       THREAD_T   *worker_threads;
+       WorkerTask *worker_data;
+       PGconn    **connections;
+       bool            is_partitioned;
+       int                     i;
+
+       /* Allocate worker data and threads */
+       worker_threads = pg_malloc(num_workers * sizeof(THREAD_T));
+       worker_data = pg_malloc0(num_workers * sizeof(WorkerTask));
+       connections = pg_malloc(num_workers * sizeof(PGconn *));
+
+       /* Reuse main connection for worker 0, create new ones for others */
+       connections[0] = connection;
+       for (i = 1; i < num_workers; i++)
+       {
+               connections[i] = doConnect();
+               if (connections[i] == NULL)
+                       pg_fatal("could not create connection for worker %d", 
i);
+       }
+
+       /* Works only for pgbench_accounts and the range partitioning option */
+       is_partitioned = strcmp(table, "pgbench_accounts") == 0 && 
partition_method == PART_RANGE;
+
+       /*
+        * For partitioned tables, we handle only num_workers <= partitions for
+        * now
+        */
+       if (is_partitioned && num_workers > partitions)
+               pg_fatal("number of threads (%d) must not exceed the number of 
partitions (%d)",
+                                num_workers, partitions);
+
+       executeStatement(connections[0], "begin");
+       truncateTable(connections[0], table);
+
+       if (is_partitioned)
+       {
+               executeStatement(connections[0], "commit");
+       }
+
+       fprintf(stderr, "loading %s with %d threads...\n", table, num_workers);
+
+       /* Create and start worker threads */
+       for (i = 0; i < num_workers; i++)
+       {
+               worker_data[i].con = connections[i];
+               worker_data[i].table = table;
+               worker_data[i].append_row = append_row;
+               worker_data[i].worker_id = i;
+
+               if (!is_partitioned)
+                       assignWorkerRows(&worker_data[i], num_workers, 
total_rows);
+               else
+               {
+                       worker_data[i].part_size = (naccounts * (int64) scale + 
partitions - 1) / partitions;
+                       assignWorkerPartitions(&worker_data[i], num_workers, 
total_rows,
+                                                                  partitions);
+               }
+
+               errno = THREAD_CREATE(&worker_threads[i], initWorkerThread, 
&worker_data[i]);
+               if (errno != 0)
+                       pg_fatal("could not create thread for worker %d: %m", 
i);
+       }
+
+       /*
+        * Wait for all workers to finish.  Any worker failure calls pg_fatal(),
+        * which terminates the process, so if we get here all workers 
succeeded.
+        */
+       for (i = 0; i < num_workers; i++)
+               THREAD_JOIN(worker_threads[i]);
+
+       /*
+        * For partitioned tables, attach all partitions now that data is 
loaded.
+        */
+       if (is_partitioned)
+               attachPartitions(connection);
+
+       /*
+        * Clean up worker connections (skip index 0, which is the main
+        * connection)
+        */
+       for (i = 1; i < num_workers; i++)
+               PQfinish(connections[i]);
+
+       free(connections);
+       free(worker_threads);
+       free(worker_data);
+}
+
 /* qsort comparator for Variable array */
 static int
 compareVariableNames(const void *v1, const void *v2)
@@ -4824,14 +5136,50 @@ initDropTables(PGconn *con)
  * with a known size, so we choose to partition it.
  */
 static void
-createPartitions(PGconn *con)
+createPartitions(PGconn *con, int part_start, int part_end)
 {
        PQExpBufferData query;
 
        /* we must have to create some partitions */
        Assert(partitions > 0);
 
-       fprintf(stderr, "creating %d partitions...\n", partitions);
+       initPQExpBuffer(&query);
+
+       for (int p = part_start; p <= part_end; p++)
+       {
+               /*
+                * Create standalone tables (not attached to parent yet). This 
avoids
+                * AccessExclusiveLock on the parent table, allowing parallel
+                * creation. Tables will be attached after data loading.
+                */
+               printfPQExpBuffer(&query,
+                                                 "create%s table 
pgbench_accounts_%d\n"
+                                                 "  (aid int not null,\n"
+                                                 "   bid int,\n"
+                                                 "   abalance int,\n"
+                                                 "   filler character(84))\n"
+                                                 "  with (fillfactor=%d)",
+                                                 unlogged_tables ? " unlogged" 
: "", p,
+                                                 fillfactor);
+
+               executeStatement(con, query.data);
+       }
+
+       termPQExpBuffer(&query);
+}
+
+/*
+ * Attach standalone partition tables to the parent table.
+ * Called after all data has been loaded in parallel.
+ */
+static void
+attachPartitions(PGconn *con)
+{
+       PQExpBufferData query;
+
+       Assert(partitions > 0);
+
+       fprintf(stderr, "attaching %d partitions...\n", partitions);
 
        initPQExpBuffer(&query);
 
@@ -4842,10 +5190,9 @@ createPartitions(PGconn *con)
                        int64           part_size = (naccounts * (int64) scale 
+ partitions - 1) / partitions;
 
                        printfPQExpBuffer(&query,
-                                                         "create%s table 
pgbench_accounts_%d\n"
-                                                         "  partition of 
pgbench_accounts\n"
-                                                         "  for values from (",
-                                                         unlogged_tables ? " 
unlogged" : "", p);
+                                                         "alter table 
pgbench_accounts\n"
+                                                         "  attach partition 
pgbench_accounts_%d\n"
+                                                         "  for values from 
(", p);
 
                        /*
                         * For RANGE, we use open-ended partitions at the 
beginning and
@@ -4868,21 +5215,16 @@ createPartitions(PGconn *con)
                        appendPQExpBufferChar(&query, ')');
                }
                else if (partition_method == PART_HASH)
+               {
                        printfPQExpBuffer(&query,
-                                                         "create%s table 
pgbench_accounts_%d\n"
-                                                         "  partition of 
pgbench_accounts\n"
+                                                         "alter table 
pgbench_accounts\n"
+                                                         "  attach partition 
pgbench_accounts_%d\n"
                                                          "  for values with 
(modulus %d, remainder %d)",
-                                                         unlogged_tables ? " 
unlogged" : "", p,
-                                                         partitions, p - 1);
+                                                         p, partitions, p - 1);
+               }
                else                                    /* cannot get there */
                        Assert(0);
 
-               /*
-                * Per ddlinfo in initCreateTables, fillfactor is needed on 
table
-                * pgbench_accounts.
-                */
-               appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor);
-
                executeStatement(con, query.data);
        }
 
@@ -4980,8 +5322,11 @@ initCreateTables(PGconn *con)
 
        termPQExpBuffer(&query);
 
-       if (partition_method != PART_NONE)
-               createPartitions(con);
+       if (partition_method != PART_NONE && (nthreads == 1 || partition_method 
== PART_HASH))
+       {
+               fprintf(stderr, "creating %d partitions...\n", partitions);
+               createPartitions(con, 1, partitions);
+       }
 }
 
 /*
@@ -4990,11 +5335,7 @@ initCreateTables(PGconn *con)
 static void
 initTruncateTables(PGconn *con)
 {
-       executeStatement(con, "truncate table "
-                                        "pgbench_accounts, "
-                                        "pgbench_branches, "
-                                        "pgbench_history, "
-                                        "pgbench_tellers");
+       truncateTable(con, "pgbench_accounts, pgbench_branches, 
pgbench_history, pgbench_tellers");
 }
 
 static void
@@ -5024,9 +5365,41 @@ initAccount(PQExpBufferData *sql, int64 curr)
                                          curr + 1, curr / naccounts + 1);
 }
 
+/*
+ * Append-based versions to enable batching.
+ * These use appendPQExpBuffer instead of printfPQExpBuffer to allow
+ * multiple rows to be accumulated in a single buffer.
+ */
+static void
+appendBranch(PQExpBufferData *sql, int64 curr)
+{
+       /* "filler" column uses NULL */
+       appendPQExpBuffer(sql,
+                                         INT64_FORMAT "\t0\t\\N\n",
+                                         curr + 1);
+}
+
 static void
-initPopulateTable(PGconn *con, const char *table, int64 base,
-                                 initRowMethod init_row)
+appendTeller(PQExpBufferData *sql, int64 curr)
+{
+       /* "filler" column uses NULL */
+       appendPQExpBuffer(sql,
+                                         INT64_FORMAT "\t" INT64_FORMAT 
"\t0\t\\N\n",
+                                         curr + 1, curr / ntellers + 1);
+}
+
+static void
+appendAccount(PQExpBufferData *sql, int64 curr)
+{
+       /* "filler" column defaults to blank padded empty string */
+       appendPQExpBuffer(sql,
+                                         INT64_FORMAT "\t" INT64_FORMAT 
"\t0\t\n",
+                                         curr + 1, curr / naccounts + 1);
+}
+
+static void
+initPopulateTableSerial(PGconn *con, const char *table, int64 base,
+                                               initRowMethod init_row)
 {
        int                     n;
        int64           k;
@@ -5034,7 +5407,7 @@ initPopulateTable(PGconn *con, const char *table, int64 
base,
        int                     prev_chars = 0;
        PGresult   *res;
        PQExpBufferData sql;
-       char            copy_statement[256];
+       char            copy_statement[NAMEDATALEN + 32];
        const char *copy_statement_fmt = "copy %s from stdin";
        int64           total = base * scale;
 
@@ -5143,6 +5516,27 @@ initPopulateTable(PGconn *con, const char *table, int64 
base,
        termPQExpBuffer(&sql);
 }
 
+static void
+initPopulateTable(PGconn *con, const char *table, int64 total_rows,
+                                 initRowMethod init_row, initRowMethod 
append_row, bool use_parallel)
+{
+       bool            is_accounts = (strcmp(table, "pgbench_accounts") == 0);
+
+       if (use_parallel && nthreads > 1)
+               initPopulateTableParallel(con, nthreads, table, total_rows * 
scale, append_row);
+       else
+       {
+               /*
+                * For single-threaded mode with partitioned tables, attach 
partitions
+                * before loading data so COPY to the parent table can route 
rows.
+                */
+               if (is_accounts && partitions > 0 && partition_method != 
PART_NONE)
+                       attachPartitions(con);
+
+               initPopulateTableSerial(con, table, total_rows, init_row);
+       }
+}
+
 /*
  * Fill the standard tables with some data generated and sent from the client.
  *
@@ -5155,8 +5549,9 @@ initGenerateDataClientSide(PGconn *con)
        fprintf(stderr, "generating data (client-side)...\n");
 
        /*
-        * we do all of this in one transaction to enable the backend's
-        * data-loading optimizations
+        * For single-threaded mode, do everything in one transaction. For
+        * multi-threaded mode, do branches/tellers/history in one transaction,
+        * then accounts in parallel (each thread handles its own transaction).
         */
        executeStatement(con, "begin");
 
@@ -5167,11 +5562,16 @@ initGenerateDataClientSide(PGconn *con)
         * fill branches, tellers, accounts in that order in case foreign keys
         * already exist
         */
-       initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
-       initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
-       initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
+       initPopulateTable(con, "pgbench_branches", nbranches, initBranch, 
appendBranch, false);
+       initPopulateTable(con, "pgbench_tellers", ntellers, initTeller, 
appendTeller, false);
 
-       executeStatement(con, "commit");
+       if (nthreads > 1)
+               executeStatement(con, "commit");
+
+       initPopulateTable(con, "pgbench_accounts", naccounts, initAccount, 
appendAccount, nthreads > 1);
+
+       if (nthreads == 1)
+               executeStatement(con, "commit");
 }
 
 /*
@@ -5197,6 +5597,9 @@ initGenerateDataServerSide(PGconn *con)
        /* truncate away any old data */
        initTruncateTables(con);
 
+       if (partitions > 0 && partition_method != PART_NONE)
+               attachPartitions(con);
+
        initPQExpBuffer(&sql);
 
        printfPQExpBuffer(&sql,
@@ -6944,7 +7347,6 @@ main(int argc, char **argv)
                                initialization_option_set = true;
                                break;
                        case 'j':                       /* jobs */
-                               benchmarking_option_set = true;
                                if (!option_parse_int(optarg, "-j/--jobs", 1, 
INT_MAX,
                                                                          
&nthreads))
                                {
@@ -7176,7 +7578,7 @@ main(int argc, char **argv)
         * optimization; throttle_delay is calculated incorrectly below if some
         * threads have no clients assigned to them.)
         */
-       if (nthreads > nclients)
+       if (nthreads > nclients && !is_init_mode)
                nthreads = nclients;
 
        /*
@@ -7221,6 +7623,9 @@ main(int argc, char **argv)
                if (partitions > 0 && partition_method == PART_NONE)
                        partition_method = PART_RANGE;
 
+               if (partition_method == PART_HASH && nthreads > 1)
+                       pg_fatal("parallel data loading (-j) is not supported 
with hash partitioning");
+
                if (initialize_steps == NULL)
                        initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
 
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl 
b/src/bin/pgbench/t/001_pgbench_with_server.pl
index b7685ea5d2..7b214c9030 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -217,6 +217,51 @@ my $nthreads = 2;
        $nthreads = 1 if $stderr =~ m/threads are not supported on this 
platform/;
 }
 
+# Test parallel initialization (requires thread support)
+if ($nthreads > 1)
+{
+       # Parallel init without partitions
+       $node->pgbench(
+               '-i -j 2 --scale=1',
+               0,
+               [qr{^$}],
+               [
+                       qr{creating tables},
+                       qr{loading pgbench_accounts with 2 threads},
+                       qr{vacuuming},
+                       qr{creating primary keys},
+                       qr{done in \d+\.\d\d s }
+               ],
+               'pgbench parallel initialization without partitions');
+
+       check_data_state($node, 'parallel-no-partitions');
+
+       # Parallel init with range partitions
+       $node->pgbench(
+               '-i -j 2 --scale=1 --partitions=4 --partition-method=range',
+               0,
+               [qr{^$}],
+               [
+                       qr{creating tables},
+                       qr{loading pgbench_accounts with 2 threads},
+                       qr{attaching 4 partitions},
+                       qr{vacuuming},
+                       qr{creating primary keys},
+                       qr{done in \d+\.\d\d s }
+               ],
+               'pgbench parallel initialization with range partitions');
+
+       check_data_state($node, 'parallel-range-partitions');
+
+       # Error: more threads than partitions
+       $node->pgbench(
+               '-i -j 3 --scale=1 --partitions=2 --partition-method=range',
+               1,
+               [qr{^$}],
+               [qr{number of threads \(3\) must not exceed the number of 
partitions \(2\)}],
+               'pgbench parallel init fails when threads exceed partitions');
+}
+
 # run custom scripts
 $node->pgbench(
        "-t 100 -c 1 -j $nthreads -M prepared -n",
diff --git a/src/bin/pgbench/t/002_pgbench_no_server.pl 
b/src/bin/pgbench/t/002_pgbench_no_server.pl
index e694e9ef0f..d67f26e422 100644
--- a/src/bin/pgbench/t/002_pgbench_no_server.pl
+++ b/src/bin/pgbench/t/002_pgbench_no_server.pl
@@ -187,6 +187,11 @@ my @options = (
                '-i --partition-method=hash',
                [qr{partition-method requires greater than zero --partitions}]
        ],
+       [
+               'parallel data loading with hash partitioning',
+               '-i -j 2 --partitions=4 --partition-method=hash',
+               [qr{parallel data loading \(-j\) is not supported with hash 
partitioning}]
+       ],
        [
                'bad maximum number of tries',
                '--max-tries -10',
-- 
2.39.5 (Apple Git-154)

Reply via email to