On Tue Jul 11, 2023 at 12:03 AM CDT, Michael Paquier wrote:
> On Wed, Jun 14, 2023 at 10:58:06AM -0500, Tristan Partin wrote:
>  static void
> -initGenerateDataClientSide(PGconn *con)
> +initBranch(PGconn *con, PQExpBufferData *sql, int64 curr)
> +{
> +       /* "filler" column defaults to NULL */
> +       printfPQExpBuffer(sql,
> +                                         INT64_FORMAT "\t0\t\n",
> +                                         curr + 1);
> +}
> +
> +static void
> +initTeller(PGconn *con, PQExpBufferData *sql, int64 curr)
> +{
> +       /* "filler" column defaults to NULL */
> +       printfPQExpBuffer(sql,
> +                                         INT64_FORMAT "\t" INT64_FORMAT 
> "\t0\t\n",
> +                                         curr + 1, curr / ntellers + 1);
> +}
> +
> +static void
> +initAccount(PGconn *con, PQExpBufferData *sql, int64 curr)
> +{
> +       /* "filler" column defaults to blank padded empty string */
> +       printfPQExpBuffer(sql,
> +                                         INT64_FORMAT "\t" INT64_FORMAT 
> "\t0\t\n",
> +                                         curr + 1, curr / naccounts + 1);
> +}
>
> These routines don't need a PGconn argument, by the way.

Nice catch!

>     /* use COPY with FREEZE on v14 and later without partitioning */
>     if (partitions == 0 && PQserverVersion(con) >= 140000)
> -       copy_statement = "copy pgbench_accounts from stdin with (freeze on)";
> +       copy_statement_fmt = "copy %s from stdin with (freeze on)";
>     else
> -       copy_statement = "copy pgbench_accounts from stdin";
> +       copy_statement_fmt = "copy %s from stdin";
>
> This seems a bit incorrect because partitioning only applies to
> pgbench_accounts, no?  This change means that the teller and branch
> tables would not benefit from FREEZE under a pgbench compiled with
> this patch and a backend version of 14 or newer if --partitions is
> used.  So, perhaps "partitions" should be an argument of
> initPopulateTable() specified for each table? 

Whoops, looks like I didn't read the comment for what the partitions
variable means. It only applies to pgbench_accounts as you said. I don't
think passing it in as an argument would make much sense. Let me know
what you think about the change in this new version, which only hits the
first portion of the `if` statement if the table is pgbench_accounts.

-- 
Tristan Partin
Neon (https://neon.tech)
From af242f39aefd4d9c77271ac897b53aaf17601f85 Mon Sep 17 00:00:00 2001
From: Tristan Partin <tris...@neon.tech>
Date: Tue, 23 May 2023 11:48:16 -0500
Subject: [PATCH v3] Use COPY instead of INSERT for populating all tables

COPY is a better interface for bulk loading and/or high latency
connections. Previously COPY was only used for the pgbench_accounts
table because the amount of data was so much larger than the other
tables. However, as already said there are also gains to be had in the
high latency case, such as loading data across continents.
---
 src/bin/pgbench/pgbench.c | 155 ++++++++++++++++++++++----------------
 1 file changed, 90 insertions(+), 65 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 320d348a0f..b77b6c3704 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -4869,17 +4869,46 @@ initTruncateTables(PGconn *con)
 					 "pgbench_tellers");
 }
 
-/*
- * Fill the standard tables with some data generated and sent from the client
- */
+typedef void initRow(PQExpBufferData *sql, int64 curr);
+
 static void
-initGenerateDataClientSide(PGconn *con)
+initBranch(PQExpBufferData *sql, int64 curr)
+{
+	/* "filler" column defaults to NULL */
+	printfPQExpBuffer(sql,
+					  INT64_FORMAT "\t0\t\n",
+					  curr + 1);
+}
+
+static void
+initTeller(PQExpBufferData *sql, int64 curr)
+{
+	/* "filler" column defaults to NULL */
+	printfPQExpBuffer(sql,
+					  INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n",
+					  curr + 1, curr / ntellers + 1);
+}
+
+static void
+initAccount(PQExpBufferData *sql, int64 curr)
+{
+	/* "filler" column defaults to blank padded empty string */
+	printfPQExpBuffer(sql,
+					  INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n",
+					  curr + 1, curr / naccounts + 1);
+}
+
+static void
+initPopulateTable(PGconn *con, const char *table, int64 base, initRow *init_row)
 {
+	int			n;
+	int			k;
+	int			chars = 0;
+	PGresult	*res;
 	PQExpBufferData sql;
-	PGresult   *res;
-	int			i;
-	int64		k;
-	char	   *copy_statement;
+	char 		copy_statement[256];
+	const char *copy_statement_fmt;
+	const int64 total = base * scale;
 
 	/* used to track elapsed time and estimate of the remaining time */
 	pg_time_usec_t start;
@@ -4888,50 +4917,22 @@ initGenerateDataClientSide(PGconn *con)
 	/* Stay on the same line if reporting to a terminal */
 	char		eol = isatty(fileno(stderr)) ? '\r' : '\n';
 
-	fprintf(stderr, "generating data (client-side)...\n");
-
-	/*
-	 * we do all of this in one transaction to enable the backend's
-	 * data-loading optimizations
-	 */
-	executeStatement(con, "begin");
-
-	/* truncate away any old data */
-	initTruncateTables(con);
-
 	initPQExpBuffer(&sql);
 
 	/*
-	 * fill branches, tellers, accounts in that order in case foreign keys
-	 * already exist
+	 * Use COPY with FREEZE on v14 and later without partitioning.  Remember
+	 * that partitions only applies to pgbench_accounts.
 	 */
-	for (i = 0; i < nbranches * scale; i++)
-	{
-		/* "filler" column defaults to NULL */
-		printfPQExpBuffer(&sql,
-						  "insert into pgbench_branches(bid,bbalance) values(%d,0)",
-						  i + 1);
-		executeStatement(con, sql.data);
-	}
-
-	for (i = 0; i < ntellers * scale; i++)
-	{
-		/* "filler" column defaults to NULL */
-		printfPQExpBuffer(&sql,
-						  "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)",
-						  i + 1, i / ntellers + 1);
-		executeStatement(con, sql.data);
-	}
-
-	/*
-	 * accounts is big enough to be worth using COPY and tracking runtime
-	 */
-
-	/* use COPY with FREEZE on v14 and later without partitioning */
-	if (partitions == 0 && PQserverVersion(con) >= 140000)
-		copy_statement = "copy pgbench_accounts from stdin with (freeze on)";
+	if (partitions == 0 && strcmp(table, "pgbench_accounts") == 0 && PQserverVersion(con) >= 140000)
+		copy_statement_fmt = "copy %s from stdin with (freeze on)";
 	else
-		copy_statement = "copy pgbench_accounts from stdin";
+		copy_statement_fmt = "copy %s from stdin";
+
+	n = pg_snprintf(copy_statement, sizeof(copy_statement), copy_statement_fmt, table);
+	if (n >= sizeof(copy_statement))
+		pg_fatal("invalid buffer size: must be at least %d characters long", n);
+	else if (n == -1)
+		pg_fatal("invalid format string");
 
 	res = PQexec(con, copy_statement);
 
@@ -4941,14 +4942,11 @@ initGenerateDataClientSide(PGconn *con)
 
 	start = pg_time_now();
 
-	for (k = 0; k < (int64) naccounts * scale; k++)
+	for (k = 0; k < total; k++)
 	{
-		int64		j = k + 1;
+		const int64		j = k + 1;
 
-		/* "filler" column defaults to blank padded empty string */
-		printfPQExpBuffer(&sql,
-						  INT64_FORMAT "\t" INT64_FORMAT "\t0\t\n",
-						  j, k / naccounts + 1);
+		init_row(&sql, k);
 		if (PQputline(con, sql.data))
 			pg_fatal("PQputline failed");
 
@@ -4962,25 +4960,26 @@ initGenerateDataClientSide(PGconn *con)
 		if ((!use_quiet) && (j % 100000 == 0))
 		{
 			double		elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
-			double		remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
+			double		remaining_sec = ((double) total - j) * elapsed_sec / j;
 
-			fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)%c",
-					j, (int64) naccounts * scale,
-					(int) (((int64) j * 100) / (naccounts * (int64) scale)),
-					elapsed_sec, remaining_sec, eol);
+			chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)%c",
+					j, total,
+					(int) ((j * 100) / total),
+					table, elapsed_sec, remaining_sec, eol);
 		}
 		/* let's not call the timing for each row, but only each 100 rows */
 		else if (use_quiet && (j % 100 == 0))
 		{
 			double		elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
-			double		remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
+			double		remaining_sec = ((double) total - j) * elapsed_sec / j;
 
 			/* have we reached the next interval (or end)? */
-			if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
+			if ((j == total) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
 			{
-				fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)%c",
-						j, (int64) naccounts * scale,
-						(int) (((int64) j * 100) / (naccounts * (int64) scale)), elapsed_sec, remaining_sec, eol);
+				chars = fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) of %s done (elapsed %.2f s, remaining %.2f s)%c",
+						j, total,
+						(int) ((j * 100) / total),
+						table, elapsed_sec, remaining_sec, eol);
 
 				/* skip to the next interval */
 				log_interval = (int) ceil(elapsed_sec / LOG_STEP_SECONDS);
@@ -4988,8 +4987,8 @@ initGenerateDataClientSide(PGconn *con)
 		}
 	}
 
-	if (eol != '\n')
-		fputc('\n', stderr);	/* Need to move to next line */
+	if (chars != 0 && eol != '\n')
+		fprintf(stderr, "%*c\r", chars - 1, ' '); /* Clear the current line */
 
 	if (PQputline(con, "\\.\n"))
 		pg_fatal("very last PQputline failed");
@@ -4997,6 +4996,32 @@ initGenerateDataClientSide(PGconn *con)
 		pg_fatal("PQendcopy failed");
 
 	termPQExpBuffer(&sql);
+}
+
+/*
+ * Fill the standard tables with some data generated and sent from the client
+ */
+static void
+initGenerateDataClientSide(PGconn *con)
+{
+	fprintf(stderr, "generating data (client-side)...\n");
+
+	/*
+	 * we do all of this in one transaction to enable the backend's
+	 * data-loading optimizations
+	 */
+	executeStatement(con, "begin");
+
+	/* truncate away any old data */
+	initTruncateTables(con);
+
+	/*
+	 * fill branches, tellers, accounts in that order in case foreign keys
+	 * already exist
+	 */
+	initPopulateTable(con, "pgbench_branches", nbranches, initBranch);
+	initPopulateTable(con, "pgbench_tellers", ntellers, initTeller);
+	initPopulateTable(con, "pgbench_accounts", naccounts, initAccount);
 
 	executeStatement(con, "commit");
 }
-- 
Tristan Partin
Neon (https://neon.tech)

Reply via email to