Hi,

I propose a patch for speeding up pgbench -i through multithreading.

To enable this, pass -j and then the number of workers you want to use.

Here are some results I got on my laptop:


master

---

-i -s 100
done in 20.95 s (drop tables 0.00 s, create tables 0.01 s, client-side generate 14.51 s, vacuum 0.27 s, primary keys 6.16 s).

-i -s 100 --partitions=10
done in 29.73 s (drop tables 0.00 s, create tables 0.02 s, client-side generate 16.33 s, vacuum 8.72 s, primary keys 4.67 s).


patch (-j 10)

---

-i -s 100 -j 10
done in 18.64 s (drop tables 0.00 s, create tables 0.01 s, client-side generate 5.82 s, vacuum 6.89 s, primary keys 5.93 s).

-i -s 100 -j 10 --partitions=10
done in 14.66 s (drop tables 0.00 s, create tables 0.01 s, client-side generate 8.42 s, vacuum 1.55 s, primary keys 4.68 s).

The speedup is more significant for the partitioned use-case. This is because all workers can use COPY FREEZE (thus incurring a lower vacuum penalty) because they create their separate partitions.

For the non-partitioned case the speedup is lower, but I observe it improves somewhat with larger scale factors. When parallel vacuum support is merged, this should further reduce the time.

I'd still need to update docs, tests, better integrate the code with its surroundings, and other aspects. Would appreciate any feedback on what I have so far though. Thanks!

Kind regards,

Mircea Cadariu

From 18d91ec9c22d43522dc1cd83c16359c36b3dc58d Mon Sep 17 00:00:00 2001
From: Mircea Cadariu <[email protected]>
Date: Sun, 9 Nov 2025 10:41:51 +0000
Subject: [PATCH v1] wip

---
 src/bin/pgbench/pgbench.c | 455 +++++++++++++++++++++++++++++++++++---
 1 file changed, 420 insertions(+), 35 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index a425176ecd..ef4e05678a 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -78,6 +78,8 @@
 #define ERRCODE_T_R_DEADLOCK_DETECTED  "40P01"
 #define ERRCODE_UNDEFINED_TABLE  "42P01"
 
+#define COPY_BATCH_SIZE        (1024 * 1024)
+
 /*
  * Hashing constants
  */
@@ -825,7 +827,6 @@ static const BuiltinScript builtin_script[] =
        }
 };
 
-
 /* Function prototypes */
 static void setNullValue(PgBenchValue *pv);
 static void setBoolValue(PgBenchValue *pv, bool bval);
@@ -848,6 +849,8 @@ static void clear_socket_set(socket_set *sa);
 static void add_socket_to_set(socket_set *sa, int fd, int idx);
 static int     wait_on_socket_set(socket_set *sa, int64 usecs);
 static bool socket_has_input(socket_set *sa, int fd, int idx);
+static void createPartitions(PGconn *con, int part_start, int part_end);
+static void attachPartitions(PGconn *con);
 
 /* callback used to build rows for COPY during data loading */
 typedef void (*initRowMethod) (PQExpBufferData *sql, int64 curr);
@@ -856,6 +859,19 @@ typedef void (*initRowMethod) (PQExpBufferData *sql, int64 
curr);
 static const PsqlScanCallbacks pgbench_callbacks = {
        NULL,                                           /* don't need 
get_variable functionality */
 };
+/* Worker thread data for parallel table loading */
+typedef struct WorkerTaskDescription
+{
+       PGconn     *con;
+       const char *table;
+       int64           start_row;
+       int64           end_row;
+       initRowMethod append_row;
+       int                     worker_id;
+       int                     part_start;
+       int                     part_end;
+       int64           part_size;
+} WorkerTaskDescription;
 
 static char
 get_table_relkind(PGconn *con, const char *table)
@@ -1631,6 +1647,277 @@ doConnect(void)
        return conn;
 }
 
+/*
+ * Truncate specified table(s)
+ * tableName can be a single table or comma-separated list of tables
+ */
+static void
+truncateTable(PGconn *con, const char *tableName)
+{
+       PQExpBufferData query;
+
+       initPQExpBuffer(&query);
+       printfPQExpBuffer(&query, "TRUNCATE TABLE %s", tableName);
+       executeStatement(con, query.data);
+       termPQExpBuffer(&query);
+}
+
+/*
+ * Parameters needed for COPY operations.
+ */
+typedef struct CopyTarget
+{
+       const char *table_name;
+       int64           start_row;
+       int64           end_row;
+       bool            use_freeze;
+} CopyTarget;
+
+/*
+ * Perform COPY operation for a single table or partition.
+ * Batches rows into larger buffers before sending to reduce overhead.
+ */
+static void
+performCopy(PGconn *conn, WorkerTaskDescription *wtd, CopyTarget *target)
+{
+       PGresult   *res;
+       char            copy_statement[NAMEDATALEN + 32];
+       int64           row;
+       PQExpBufferData batch_buffer;
+
+       /* Build the COPY command */
+       if (target->use_freeze)
+               snprintf(copy_statement, sizeof(copy_statement),
+                                "COPY %s FROM STDIN (FREEZE ON)", 
target->table_name);
+       else
+               snprintf(copy_statement, sizeof(copy_statement),
+                                "COPY %s FROM STDIN", target->table_name);
+
+       /* Initiate COPY mode */
+       res = PQexec(conn, copy_statement);
+       if (PQresultStatus(res) != PGRES_COPY_IN)
+               pg_fatal("COPY command failed for table \"%s\": %s",
+                                target->table_name, PQerrorMessage(conn));
+       PQclear(res);
+
+       /* Pre-allocate buffer to avoid repeated reallocs */
+       initPQExpBuffer(&batch_buffer);
+       enlargePQExpBuffer(&batch_buffer, COPY_BATCH_SIZE);
+
+       /* Generate and send rows in batches using append_row */
+       for (row = target->start_row; row < target->end_row; row++)
+       {
+               /* Use append_row to accumulate multiple rows in the buffer */
+               wtd->append_row(&batch_buffer, row);
+
+               /* Send batch when buffer reaches size threshold */
+               if (batch_buffer.len >= COPY_BATCH_SIZE)
+               {
+                       if (PQputCopyData(conn, batch_buffer.data, 
batch_buffer.len) <= 0)
+                               pg_fatal("error in PQputCopyData: %s", 
PQerrorMessage(conn));
+
+                       resetPQExpBuffer(&batch_buffer);
+               }
+       }
+
+       /* Send any remaining buffered data */
+       if (batch_buffer.len > 0)
+       {
+               if (PQputCopyData(conn, batch_buffer.data, batch_buffer.len) <= 
0)
+                       pg_fatal("error in PQputCopyData: %s", 
PQerrorMessage(conn));
+       }
+
+       /* Finalize the COPY operation */
+       if (PQputCopyEnd(conn, NULL) <= 0)
+               pg_fatal("error in PQputCopyEnd: %s", PQerrorMessage(conn));
+
+       res = PQgetResult(conn);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pg_fatal("COPY failed for table \"%s\": %s",
+                                target->table_name, PQerrorMessage(conn));
+       PQclear(res);
+
+       termPQExpBuffer(&batch_buffer);
+}
+
+
+static void
+assignWorkerRows(WorkerTaskDescription *wtd, int num_workers, int64 total_rows)
+{
+       int64           rows_per_worker = total_rows / num_workers;
+
+       wtd->start_row = wtd->worker_id * rows_per_worker;
+       wtd->end_row = (wtd->worker_id == num_workers - 1) ?
+               total_rows :
+               (wtd->worker_id + 1) * rows_per_worker;
+}
+
+/*
+ * Covers only multiple partitions per worker (workers <= partitions) for now.
+ * Each worker loads complete partitions independently and can use COPY FREEZE.
+ */
+static void
+assignWorkerPartitions(WorkerTaskDescription *wtd, int num_workers, int64 
total_rows,
+                                          int num_parts)
+{
+       int                     parts_per_worker = num_parts / num_workers;
+       int                     extra_parts = num_parts % num_workers;
+
+       wtd->part_start = wtd->worker_id * parts_per_worker + 1 +
+               (wtd->worker_id < extra_parts ? wtd->worker_id : extra_parts);
+       wtd->part_end = wtd->part_start + parts_per_worker - 1 +
+               (wtd->worker_id < extra_parts ? 1 : 0);
+
+       wtd->start_row = (wtd->part_start - 1) * wtd->part_size;
+       wtd->end_row = (wtd->part_end == num_parts) ?
+               total_rows :
+               wtd->part_end * wtd->part_size;
+}
+
+
+/* Load data into partitioned table */
+static void
+loadPartitionedTable(PGconn *conn, WorkerTaskDescription *wtd)
+{
+       int                     p;
+
+       for (p = wtd->part_start; p <= wtd->part_end; p++)
+       {
+               CopyTarget      target;
+               int64           part_start_row = (p - 1) * wtd->part_size;
+               int64           part_end_row = (p == partitions) ? (naccounts * 
(int64) scale) : (p * wtd->part_size);
+               char            partition_table[NAMEDATALEN];
+
+               snprintf(partition_table, sizeof(partition_table), 
"pgbench_accounts_%d", p);
+
+               target.table_name = partition_table;
+               target.start_row = part_start_row;
+               target.end_row = part_end_row;
+               target.use_freeze = true;
+
+               performCopy(conn, wtd, &target);
+       }
+}
+
+/* Load data into non-partitioned table */
+static void
+loadRegularTable(PGconn *conn, WorkerTaskDescription *wtd)
+{
+       CopyTarget      target;
+
+       target.table_name = wtd->table;
+       target.start_row = wtd->start_row;
+       target.end_row = wtd->end_row;
+       target.use_freeze = (wtd->worker_id == 0);
+
+       performCopy(conn, wtd, &target);
+}
+
+static THREAD_FUNC_RETURN_TYPE THREAD_FUNC_CC
+initWorkerThread(void *arg)
+{
+       WorkerTaskDescription           *wtd = (WorkerTaskDescription *) arg;
+       PGconn                  *conn;
+
+       /* Connection is pre-created, just use it */
+       conn = wtd->con;
+
+       /*
+        * Start a new transaction for this worker, except for worker 0 on
+        * non-partitioned tables. Worker 0 continues the transaction from the
+        * main thread that already did the truncate (to enable COPY FREEZE).
+        */
+       if (wtd->part_start > 0 || wtd->worker_id > 0)
+               executeStatement(conn, "begin");
+
+       if (wtd->part_start > 0)
+       {
+               createPartitions(conn, wtd->part_start, wtd->part_end);
+               loadPartitionedTable(conn, wtd);
+       }
+       else
+               loadRegularTable(conn, wtd);
+
+       executeStatement(conn, "commit");
+
+       return NULL;
+}
+
+static void
+initPopulateTableParallel(PGconn *connection, int num_workers,
+                                                 const char *table, int64 
total_rows,
+                                                 initRowMethod append_row)
+{
+       THREAD_T   *worker_threads;
+       WorkerTaskDescription *worker_data;
+       PGconn    **connections;
+       bool            is_partitioned;
+       int                     i;
+
+       /* Allocate worker data and threads */
+       worker_threads = pg_malloc(num_workers * sizeof(pthread_t));
+       worker_data = pg_malloc0(num_workers * sizeof(WorkerTaskDescription));
+       connections = pg_malloc(num_workers * sizeof(PGconn *));
+
+       /* Reuse main connection for worker 0, create new ones for others */
+       connections[0] = connection;
+       for (i = 1; i < num_workers; i++)
+               connections[i] = doConnect();
+
+       /* Works only for pgbench_accounts and the range partitioning option */
+       is_partitioned = strcmp(table, "pgbench_accounts") == 0 && 
partition_method == PART_RANGE;
+
+       /* For partitioned tables, we handle only num_workers <= partitions for 
now */
+       if (is_partitioned && num_workers > partitions)
+               pg_fatal("number of threads (%d) must not exceed the number of 
partitions (%d)",
+                                num_workers, partitions);
+
+       executeStatement(connections[0], "begin");
+       truncateTable(connections[0], table);
+
+       if (is_partitioned)
+       {
+               executeStatement(connections[0], "commit");
+       }
+
+       /* Create and start worker threads */
+       for (i = 0; i < num_workers; i++)
+       {
+               worker_data[i].con = connections[i];
+               worker_data[i].table = table;
+               worker_data[i].append_row = append_row;
+               worker_data[i].worker_id = i;
+
+               if (!is_partitioned)
+                       assignWorkerRows(&worker_data[i], num_workers, 
total_rows);
+               else
+               {
+                       worker_data[i].part_size = (naccounts * (int64) scale + 
partitions - 1) / partitions;
+                       assignWorkerPartitions(&worker_data[i], num_workers, 
total_rows,
+                                                                  partitions);
+               }
+
+               THREAD_CREATE(&worker_threads[i], initWorkerThread, 
&worker_data[i]);
+       }
+
+       for (i = 0; i < num_workers; i++)
+               THREAD_JOIN(worker_threads[i]);
+
+       /*
+        * For partitioned tables, attach all partitions now that data is 
loaded.
+        */
+       if (is_partitioned)
+               attachPartitions(connection);
+
+       /* Clean up worker connections (skip index 0, which is the main 
connection) */
+       for (i = 1; i < num_workers; i++)
+               PQfinish(connections[i]);
+
+       free(connections);
+       free(worker_threads);
+       free(worker_data);
+}
+
 /* qsort comparator for Variable array */
 static int
 compareVariableNames(const void *v1, const void *v2)
@@ -4869,14 +5156,58 @@ initDropTables(PGconn *con)
  * with a known size, so we choose to partition it.
  */
 static void
-createPartitions(PGconn *con)
+createPartitions(PGconn *con, int part_start, int part_end)
 {
        PQExpBufferData query;
 
        /* we must have to create some partitions */
        Assert(partitions > 0);
 
-       fprintf(stderr, "creating %d partitions...\n", partitions);
+       /* If called with -1, create all partitions */
+       if (part_start == -1)
+       {
+               part_start = 1;
+               part_end = partitions;
+               fprintf(stderr, "creating %d partitions...\n", partitions);
+       }
+
+       initPQExpBuffer(&query);
+
+       for (int p = part_start; p <= part_end; p++)
+       {
+               /*
+                * Create standalone tables (not attached to parent yet).
+                * This avoids AccessExclusiveLock on the parent table, allowing
+                * parallel creation. Tables will be attached after data 
loading.
+                */
+               printfPQExpBuffer(&query,
+                                                 "create%s table 
pgbench_accounts_%d\n"
+                                                 "  (aid int not null,\n"
+                                                 "   bid int,\n"
+                                                 "   abalance int,\n"
+                                                 "   filler character(84))\n"
+                                                 "  with (fillfactor=%d)",
+                                                 unlogged_tables ? " unlogged" 
: "", p,
+                                                 fillfactor);
+
+               executeStatement(con, query.data);
+       }
+
+       termPQExpBuffer(&query);
+}
+
+/*
+ * Attach standalone partition tables to the parent table.
+ * Called after all data has been loaded in parallel.
+ */
+static void
+attachPartitions(PGconn *con)
+{
+       PQExpBufferData query;
+
+       Assert(partitions > 0);
+
+       fprintf(stderr, "attaching %d partitions...\n", partitions);
 
        initPQExpBuffer(&query);
 
@@ -4884,13 +5215,12 @@ createPartitions(PGconn *con)
        {
                if (partition_method == PART_RANGE)
                {
-                       int64           part_size = (naccounts * (int64) scale 
+ partitions - 1) / partitions;
+                       int64 part_size = (naccounts * (int64) scale + 
partitions - 1) / partitions;
 
                        printfPQExpBuffer(&query,
-                                                         "create%s table 
pgbench_accounts_%d\n"
-                                                         "  partition of 
pgbench_accounts\n"
-                                                         "  for values from (",
-                                                         unlogged_tables ? " 
unlogged" : "", p);
+                                                         "alter table 
pgbench_accounts\n"
+                                                         "  attach partition 
pgbench_accounts_%d\n"
+                                                         "  for values from 
(", p);
 
                        /*
                         * For RANGE, we use open-ended partitions at the 
beginning and
@@ -4913,21 +5243,16 @@ createPartitions(PGconn *con)
                        appendPQExpBufferChar(&query, ')');
                }
                else if (partition_method == PART_HASH)
+               {
                        printfPQExpBuffer(&query,
-                                                         "create%s table 
pgbench_accounts_%d\n"
-                                                         "  partition of 
pgbench_accounts\n"
+                                                         "alter table 
pgbench_accounts\n"
+                                                         "  attach partition 
pgbench_accounts_%d\n"
                                                          "  for values with 
(modulus %d, remainder %d)",
-                                                         unlogged_tables ? " 
unlogged" : "", p,
-                                                         partitions, p - 1);
+                                                         p, partitions, p - 1);
+               }
                else                                    /* cannot get there */
                        Assert(0);
 
-               /*
-                * Per ddlinfo in initCreateTables, fillfactor is needed on 
table
-                * pgbench_accounts.
-                */
-               appendPQExpBuffer(&query, " with (fillfactor=%d)", fillfactor);
-
                executeStatement(con, query.data);
        }
 
@@ -5025,8 +5350,8 @@ initCreateTables(PGconn *con)
 
        termPQExpBuffer(&query);
 
-       if (partition_method != PART_NONE)
-               createPartitions(con);
+       if (partition_method != PART_NONE && (nthreads == 1 || partition_method 
== PART_HASH))
+               createPartitions(con, -1, -1);
 }
 
 /*
@@ -5035,11 +5360,7 @@ initCreateTables(PGconn *con)
 static void
 initTruncateTables(PGconn *con)
 {
-       executeStatement(con, "truncate table "
-                                        "pgbench_accounts, "
-                                        "pgbench_branches, "
-                                        "pgbench_history, "
-                                        "pgbench_tellers");
+       truncateTable(con, "pgbench_accounts, pgbench_branches, 
pgbench_history, pgbench_tellers");
 }
 
 static void
@@ -5069,8 +5390,40 @@ initAccount(PQExpBufferData *sql, int64 curr)
                                          curr + 1, curr / naccounts + 1);
 }
 
+/*
+ * Append-based versions to enable batching.
+ * These use appendPQExpBuffer instead of printfPQExpBuffer to allow
+ * multiple rows to be accumulated in a single buffer.
+ */
+static void
+appendBranch(PQExpBufferData *sql, int64 curr)
+{
+       /* "filler" column uses NULL */
+       appendPQExpBuffer(sql,
+                                         INT64_FORMAT "\t0\t\\N\n",
+                                         curr + 1);
+}
+
+static void
+appendTeller(PQExpBufferData *sql, int64 curr)
+{
+       /* "filler" column uses NULL */
+       appendPQExpBuffer(sql,
+                                         INT64_FORMAT "\t" INT64_FORMAT 
"\t0\t\\N\n",
+                                         curr + 1, curr / ntellers + 1);
+}
+
 static void
-initPopulateTable(PGconn *con, const char *table, int64 base,
+appendAccount(PQExpBufferData *sql, int64 curr)
+{
+       /* "filler" column defaults to blank padded empty string */
+       appendPQExpBuffer(sql,
+                                         INT64_FORMAT "\t" INT64_FORMAT 
"\t0\t\n",
+                                         curr + 1, curr / naccounts + 1);
+}
+
+static void
+initPopulateTableSerial(PGconn *con, const char *table, int64 base,
                                  initRowMethod init_row)
 {
        int                     n;
@@ -5079,7 +5432,7 @@ initPopulateTable(PGconn *con, const char *table, int64 
base,
        int                     prev_chars = 0;
        PGresult   *res;
        PQExpBufferData sql;
-       char            copy_statement[256];
+       char            copy_statement[NAMEDATALEN + 32];
        const char *copy_statement_fmt = "copy %s from stdin";
        int64           total = base * scale;
 
@@ -5188,6 +5541,27 @@ initPopulateTable(PGconn *con, const char *table, int64 
base,
        termPQExpBuffer(&sql);
 }
 
+static void
+initPopulateTable(PGconn *con, const char *table, int64 total_rows,
+                                 initRowMethod init_row, initRowMethod 
append_row, bool use_parallel)
+{
+       bool            is_accounts = (strcmp(table, "pgbench_accounts") == 0);
+
+       if (use_parallel && nthreads > 1)
+               initPopulateTableParallel(con, nthreads, table, total_rows * 
scale, append_row);
+       else
+       {
+               /*
+                * For single-threaded mode with partitioned tables, attach 
partitions
+                * before loading data so COPY to the parent table can route 
rows.
+                */
+               if (is_accounts && partitions > 0 && partition_method != 
PART_NONE)
+                       attachPartitions(con);
+
+               initPopulateTableSerial(con, table, total_rows, init_row);
+       }
+}
+
 /*
  * Fill the standard tables with some data generated and sent from the client.
  *
@@ -5200,8 +5574,9 @@ initGenerateDataClientSide(PGconn *con)
        fprintf(stderr, "generating data (client-side)...\n");
 
        /*
-        * we do all of this in one transaction to enable the backend's
-        * data-loading optimizations
+        * For single-threaded mode, do everything in one transaction.
+        * For multi-threaded mode, do branches/tellers/history in one 
transaction,
+        * then accounts in parallel (each thread handles its own transaction).
         */
        executeStatement(con, "begin");
 
@@ -5212,11 +5587,16 @@ initGenerateDataClientSide(PGconn *con)
         * fill branches, tellers, accounts in that order in case foreign keys
         * already exist
         */
-       initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
-       initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
-       initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
+       initPopulateTable(con, "pgbench_branches", nbranches, initBranch, 
appendBranch, false);
+       initPopulateTable(con, "pgbench_tellers", ntellers, initTeller, 
appendTeller, false);
 
-       executeStatement(con, "commit");
+       if (nthreads > 1)
+               executeStatement(con, "commit");
+
+       initPopulateTable(con, "pgbench_accounts", naccounts, initAccount, 
appendAccount, nthreads > 1);
+
+       if (nthreads == 1)
+               executeStatement(con, "commit");
 }
 
 /*
@@ -5242,6 +5622,9 @@ initGenerateDataServerSide(PGconn *con)
        /* truncate away any old data */
        initTruncateTables(con);
 
+       if (partitions > 0 && partition_method != PART_NONE)
+               attachPartitions(con);
+
        initPQExpBuffer(&sql);
 
        printfPQExpBuffer(&sql,
@@ -6989,7 +7372,6 @@ main(int argc, char **argv)
                                initialization_option_set = true;
                                break;
                        case 'j':                       /* jobs */
-                               benchmarking_option_set = true;
                                if (!option_parse_int(optarg, "-j/--jobs", 1, 
INT_MAX,
                                                                          
&nthreads))
                                {
@@ -7221,7 +7603,7 @@ main(int argc, char **argv)
         * optimization; throttle_delay is calculated incorrectly below if some
         * threads have no clients assigned to them.)
         */
-       if (nthreads > nclients)
+       if (nthreads > nclients && !is_init_mode)
                nthreads = nclients;
 
        /*
@@ -7266,6 +7648,9 @@ main(int argc, char **argv)
                if (partitions > 0 && partition_method == PART_NONE)
                        partition_method = PART_RANGE;
 
+               if (partition_method == PART_HASH && nthreads > 1)
+                       pg_fatal("parallel data loading (-j) is not supported 
with hash partitioning");
+
                if (initialize_steps == NULL)
                        initialize_steps = pg_strdup(DEFAULT_INIT_STEPS);
 
-- 
2.39.5 (Apple Git-154)

Reply via email to