Hello,
I've merged all time-related stuff (time_t, instr_time, int64) to use a
unique type (pg_time_usec_t) and set of functions/macros, which simplifies
the code somehow.
Hm. I'm not convinced it's a good idea for pgbench to do its own thing
here.
I really think that the refactoring part is a good thing because cloc and
cost is reduced (time arithmetic is an ugly pain with instr_time).
I have split the patch.
* First patch reworks time measurements in pgbench.
It creates a convenient pg_time_usec_t and use it everywhere, getting rid
of "instr_time_t". The code is somehow simplified wrt what time are taken
and what they mean.
Instead of displaying 2 tps at the end, which is basically insane, it
shows one tps for --connect, which includes reconnection times, and one
tps for the usual one connection at startup which simply ignores the
initial connection time.
This (mostly) refactoring reduces the cloc.
* Second patch adds a barrier before starting the bench
It applies on top of the previous one. The initial imbalance due to thread
creation times is smoothed.
I may add a --start-on option afterwards so that several pgbench (running
on distinct hosts) can be synchronized, which would be implemented as a
delay inserted by thread 0 before the barrier.
The windows implementation is more or less blind, if someone can confirm
that it works, it would be nice.
--
Fabien.
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index f5a51d3732..26b4c4f61c 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -58,8 +58,10 @@ number of clients: 10
number of threads: 1
number of transactions per client: 1000
number of transactions actually processed: 10000/10000
-tps = 85.184871 (including connections establishing)
-tps = 85.296346 (excluding connections establishing)
+latency average = 11.013 ms
+latency stddev = 7.351 ms
+initial connection time = 45.758 ms
+tps = 896.967014 (without initial connection establishing)
</screen>
The first six lines report some of the most important parameter
@@ -2228,7 +2230,6 @@ END;
<para>
For the default script, the output will look similar to this:
<screen>
-starting vacuum...end.
transaction type: <builtin: TPC-B (sort of)>
scaling factor: 1
query mode: simple
@@ -2236,22 +2237,22 @@ number of clients: 10
number of threads: 1
number of transactions per client: 1000
number of transactions actually processed: 10000/10000
-latency average = 15.844 ms
-latency stddev = 2.715 ms
-tps = 618.764555 (including connections establishing)
-tps = 622.977698 (excluding connections establishing)
+latency average = 10.870 ms
+latency stddev = 7.341 ms
+initial connection time = 30.954 ms
+tps = 907.949122 (without initial connection establishing)
statement latencies in milliseconds:
- 0.002 \set aid random(1, 100000 * :scale)
- 0.005 \set bid random(1, 1 * :scale)
- 0.002 \set tid random(1, 10 * :scale)
- 0.001 \set delta random(-5000, 5000)
- 0.326 BEGIN;
- 0.603 UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;
- 0.454 SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
- 5.528 UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;
- 7.335 UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;
- 0.371 INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
- 1.212 END;
+ 0.001 \set aid random(1, 100000 * :scale)
+ 0.001 \set bid random(1, 1 * :scale)
+ 0.001 \set tid random(1, 10 * :scale)
+ 0.000 \set delta random(-5000, 5000)
+ 0.046 BEGIN;
+ 0.151 UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;
+ 0.107 SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
+ 4.241 UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;
+ 5.245 UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;
+ 0.102 INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
+ 0.974 END;
</screen>
</para>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 08a5947a9e..46be67adaf 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -291,9 +291,9 @@ typedef struct SimpleStats
*/
typedef struct StatsData
{
- time_t start_time; /* interval start time, for aggregates */
- int64 cnt; /* number of transactions, including skipped */
- int64 skipped; /* number of transactions skipped under --rate
+ pg_time_usec_t start_time; /* interval start time, for aggregates */
+ int64 cnt; /* number of transactions, including skipped */
+ int64 skipped; /* number of transactions skipped under --rate
* and --latency-limit */
SimpleStats latency;
SimpleStats lag;
@@ -416,11 +416,11 @@ typedef struct
int nvariables; /* number of variables */
bool vars_sorted; /* are variables sorted by name? */
- /* various times about current transaction */
- int64 txn_scheduled; /* scheduled start time of transaction (usec) */
- int64 sleep_until; /* scheduled start time of next cmd (usec) */
- instr_time txn_begin; /* used for measuring schedule lag times */
- instr_time stmt_begin; /* used for measuring statement latencies */
+ /* various times about current transaction in microseconds */
+ pg_time_usec_t txn_scheduled; /* scheduled start time of transaction */
+ pg_time_usec_t sleep_until; /* scheduled start time of next cmd */
+ pg_time_usec_t txn_begin; /* used for measuring schedule lag times */
+ pg_time_usec_t stmt_begin; /* used for measuring statement latencies */
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
@@ -449,13 +449,16 @@ typedef struct
RandomState ts_sample_rs; /* random state for log sampling */
int64 throttle_trigger; /* previous/next throttling (us) */
- FILE *logfile; /* where to log, or NULL */
+ FILE *logfile; /* where to log, or NULL */
+
+ /* per thread collected stats in microseconds */
+ pg_time_usec_t create_time; /* thread creation time */
+ pg_time_usec_t started_time; /* thread is running */
+ pg_time_usec_t bench_start; /* thread is benchmarking */
+ pg_time_usec_t conn_duration; /* cumulated connection and deconnection delays */
- /* per thread collected stats */
- instr_time start_time; /* thread start time */
- instr_time conn_time;
StatsData stats;
- int64 latency_late; /* executed but late transactions */
+ int64 latency_late; /* count executed but late transactions */
} TState;
#define INVALID_THREAD ((pthread_t) 0)
@@ -597,10 +600,10 @@ static void setIntValue(PgBenchValue *pv, int64 ival);
static void setDoubleValue(PgBenchValue *pv, double dval);
static bool evaluateExpr(CState *st, PgBenchExpr *expr,
PgBenchValue *retval);
-static ConnectionStateEnum executeMetaCommand(CState *st, instr_time *now);
+static ConnectionStateEnum executeMetaCommand(CState *st, pg_time_usec_t *now);
static void doLog(TState *thread, CState *st,
StatsData *agg, bool skipped, double latency, double lag);
-static void processXactStats(TState *thread, CState *st, instr_time *now,
+static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
bool skipped, StatsData *agg);
static void append_fillfactor(char *opts, int len);
static void addScript(ParsedScript script);
@@ -1105,9 +1108,9 @@ mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
* the given value.
*/
static void
-initStats(StatsData *sd, time_t start_time)
+initStats(StatsData *sd, pg_time_usec_t start)
{
- sd->start_time = start_time;
+ sd->start_time = start;
sd->cnt = 0;
sd->skipped = 0;
initSimpleStats(&sd->latency);
@@ -2878,7 +2881,6 @@ evaluateSleep(CState *st, int argc, char **argv, int *usecs)
static void
advanceConnectionState(TState *thread, CState *st, StatsData *agg)
{
- instr_time now;
/*
* gettimeofday() isn't free, so we get the current timestamp lazily the
@@ -2888,7 +2890,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
* means "not set yet". Reset "now" when we execute shell commands or
* expressions, which might take a non-negligible amount of time, though.
*/
- INSTR_TIME_SET_ZERO(now);
+ pg_time_usec_t now = 0;
/*
* Loop in the state machine, until we have to wait for a result from the
@@ -2923,29 +2925,30 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
/* Start new transaction (script) */
case CSTATE_START_TX:
+ pg_time_now_lazy(&now);
/* establish connection if needed, i.e. under --connect */
if (st->con == NULL)
{
- instr_time start;
+ pg_time_usec_t start = now;
- INSTR_TIME_SET_CURRENT_LAZY(now);
- start = now;
if ((st->con = doConnect()) == NULL)
{
pg_log_error("client %d aborted while establishing connection", st->id);
st->state = CSTATE_ABORTED;
break;
}
- INSTR_TIME_SET_CURRENT(now);
- INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
+
+ /* reset now after connection */
+ now = pg_time_now();
+
+ thread->conn_duration += now - start;
/* Reset session-local state */
memset(st->prepared, 0, sizeof(st->prepared));
}
/* record transaction start time */
- INSTR_TIME_SET_CURRENT_LAZY(now);
st->txn_begin = now;
/*
@@ -2953,7 +2956,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
* scheduled start time.
*/
if (!throttle_delay)
- st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
+ st->txn_scheduled = now;
/* Begin with the first command */
st->state = CSTATE_START_COMMAND;
@@ -2989,12 +2992,9 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
*/
if (latency_limit)
{
- int64 now_us;
+ pg_time_now_lazy(&now);
- INSTR_TIME_SET_CURRENT_LAZY(now);
- now_us = INSTR_TIME_GET_MICROSEC(now);
-
- while (thread->throttle_trigger < now_us - latency_limit &&
+ while (thread->throttle_trigger < now - latency_limit &&
(nxacts <= 0 || st->cnt < nxacts))
{
processXactStats(thread, st, &now, true, agg);
@@ -3027,9 +3027,9 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
* Wait until it's time to start next transaction.
*/
case CSTATE_THROTTLE:
- INSTR_TIME_SET_CURRENT_LAZY(now);
+ pg_time_now_lazy(&now);
- if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
+ if (now < st->txn_scheduled)
return; /* still sleeping, nothing to do here */
/* done sleeping, but don't start transaction if we're done */
@@ -3052,7 +3052,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
/* record begin time of next command, and initiate it */
if (report_per_command)
{
- INSTR_TIME_SET_CURRENT_LAZY(now);
+ pg_time_now_lazy(&now);
st->stmt_begin = now;
}
@@ -3217,8 +3217,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
* instead of CSTATE_START_TX.
*/
case CSTATE_SLEEP:
- INSTR_TIME_SET_CURRENT_LAZY(now);
- if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
+ pg_time_now_lazy(&now);
+ if (now < st->sleep_until)
return; /* still sleeping, nothing to do here */
/* Else done sleeping. */
st->state = CSTATE_END_COMMAND;
@@ -3238,13 +3238,12 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
{
Command *command;
- INSTR_TIME_SET_CURRENT_LAZY(now);
+ pg_time_now_lazy(&now);
command = sql_script[st->use_file].commands[st->command];
/* XXX could use a mutex here, but we choose not to */
addToSimpleStats(&command->stats,
- INSTR_TIME_GET_DOUBLE(now) -
- INSTR_TIME_GET_DOUBLE(st->stmt_begin));
+ PG_TIME_GET_DOUBLE(now - st->stmt_begin));
}
/* Go ahead with next command, to be executed or skipped */
@@ -3270,7 +3269,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
if (is_connect)
{
finishCon(st);
- INSTR_TIME_SET_ZERO(now);
+ now = 0;
}
if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
@@ -3308,7 +3307,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
* take no time to execute.
*/
static ConnectionStateEnum
-executeMetaCommand(CState *st, instr_time *now)
+executeMetaCommand(CState *st, pg_time_usec_t *now)
{
Command *command = sql_script[st->use_file].commands[st->command];
int argc;
@@ -3350,8 +3349,8 @@ executeMetaCommand(CState *st, instr_time *now)
return CSTATE_ABORTED;
}
- INSTR_TIME_SET_CURRENT_LAZY(*now);
- st->sleep_until = INSTR_TIME_GET_MICROSEC(*now) + usec;
+ pg_time_now_lazy(now);
+ st->sleep_until = (*now) + usec;
return CSTATE_SLEEP;
}
else if (command->meta == META_SET)
@@ -3454,7 +3453,7 @@ executeMetaCommand(CState *st, instr_time *now)
* executing the expression or shell command might have taken a
* non-negligible amount of time, so reset 'now'
*/
- INSTR_TIME_SET_ZERO(*now);
+ *now = 0;
return CSTATE_END_COMMAND;
}
@@ -3464,14 +3463,15 @@ executeMetaCommand(CState *st, instr_time *now)
*
* We print Unix-epoch timestamps in the log, so that entries can be
* correlated against other logs. On some platforms this could be obtained
- * from the instr_time reading the caller has, but rather than get entangled
- * with that, we just eat the cost of an extra syscall in all cases.
+ * from the caller, but rather than get entangled with that, we just eat
+ * the cost of an extra syscall in all cases.
*/
static void
doLog(TState *thread, CState *st,
StatsData *agg, bool skipped, double latency, double lag)
{
FILE *logfile = thread->logfile;
+ pg_time_usec_t now = pg_time_now();
Assert(use_log);
@@ -3491,13 +3491,12 @@ doLog(TState *thread, CState *st,
* any empty intervals in between (this may happen with very low tps,
* e.g. --rate=0.1).
*/
- time_t now = time(NULL);
while (agg->start_time + agg_interval <= now)
{
/* print aggregated report to logfile */
- fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
- (long) agg->start_time,
+ fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+ agg->start_time,
agg->cnt,
agg->latency.sum,
agg->latency.sum2,
@@ -3525,17 +3524,13 @@ doLog(TState *thread, CState *st,
else
{
/* no, print raw transactions */
- struct timeval tv;
-
- gettimeofday(&tv, NULL);
if (skipped)
fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
- st->id, st->cnt, st->use_file,
- (long) tv.tv_sec, (long) tv.tv_usec);
+ st->id, st->cnt, st->use_file, now / 1000000, now % 1000000);
else
fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
st->id, st->cnt, latency, st->use_file,
- (long) tv.tv_sec, (long) tv.tv_usec);
+ now / 1000000, now % 1000000);
if (throttle_delay)
fprintf(logfile, " %.0f", lag);
fputc('\n', logfile);
@@ -3549,7 +3544,7 @@ doLog(TState *thread, CState *st,
* Note that even skipped transactions are counted in the "cnt" fields.)
*/
static void
-processXactStats(TState *thread, CState *st, instr_time *now,
+processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
bool skipped, StatsData *agg)
{
double latency = 0.0,
@@ -3559,11 +3554,11 @@ processXactStats(TState *thread, CState *st, instr_time *now,
if (detailed && !skipped)
{
- INSTR_TIME_SET_CURRENT_LAZY(*now);
+ pg_time_now_lazy(now);
/* compute latency & lag */
- latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
- lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled;
+ latency = (*now) - st->txn_scheduled;
+ lag = st->txn_begin - st->txn_scheduled;
}
if (thread_details)
@@ -3826,10 +3821,7 @@ initGenerateDataClientSide(PGconn *con)
int64 k;
/* used to track elapsed time and estimate of the remaining time */
- instr_time start,
- diff;
- double elapsed_sec,
- remaining_sec;
+ pg_time_usec_t start;
int log_interval = 1;
/* Stay on the same line if reporting to a terminal */
@@ -3879,7 +3871,7 @@ initGenerateDataClientSide(PGconn *con)
}
PQclear(res);
- INSTR_TIME_SET_CURRENT(start);
+ start = pg_time_now();
for (k = 0; k < (int64) naccounts * scale; k++)
{
@@ -3904,11 +3896,8 @@ initGenerateDataClientSide(PGconn *con)
*/
if ((!use_quiet) && (j % 100000 == 0))
{
- INSTR_TIME_SET_CURRENT(diff);
- INSTR_TIME_SUBTRACT(diff, start);
-
- elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
- remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
+ double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
+ double remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)%c",
j, (int64) naccounts * scale,
@@ -3918,11 +3907,8 @@ initGenerateDataClientSide(PGconn *con)
/* let's not call the timing for each row, but only each 100 rows */
else if (use_quiet && (j % 100 == 0))
{
- INSTR_TIME_SET_CURRENT(diff);
- INSTR_TIME_SUBTRACT(diff, start);
-
- elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
- remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
+ double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
+ double remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
/* have we reached the next interval (or end)? */
if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
@@ -4118,10 +4104,8 @@ runInitSteps(const char *initialize_steps)
for (step = initialize_steps; *step != '\0'; step++)
{
- instr_time start;
char *op = NULL;
-
- INSTR_TIME_SET_CURRENT(start);
+ pg_time_usec_t start = pg_time_now();
switch (*step)
{
@@ -4163,12 +4147,7 @@ runInitSteps(const char *initialize_steps)
if (op != NULL)
{
- instr_time diff;
- double elapsed_sec;
-
- INSTR_TIME_SET_CURRENT(diff);
- INSTR_TIME_SUBTRACT(diff, start);
- elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
+ double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
if (!first)
appendPQExpBufferStr(&stats, ", ");
@@ -5090,12 +5069,12 @@ addScript(ParsedScript script)
* progress report. On exit, they are updated with the new stats.
*/
static void
-printProgressReport(TState *threads, int64 test_start, int64 now,
+printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now,
StatsData *last, int64 *last_report)
{
/* generate and show report */
- int64 run = now - *last_report,
- ntx;
+ pg_time_usec_t run = now - *last_report;
+ int64 ntx;
double tps,
total_run,
latency,
@@ -5142,16 +5121,7 @@ printProgressReport(TState *threads, int64 test_start, int64 now,
if (progress_timestamp)
{
- /*
- * On some platforms the current system timestamp is available in
- * now_time, but rather than get entangled with that, we just eat the
- * cost of an extra syscall in all cases.
- */
- struct timeval tv;
-
- gettimeofday(&tv, NULL);
- snprintf(tbuf, sizeof(tbuf), "%ld.%03ld s",
- (long) tv.tv_sec, (long) (tv.tv_usec / 1000));
+ snprintf(tbuf, sizeof(tbuf), "%.3f s", PG_TIME_GET_DOUBLE(now));
}
else
{
@@ -5191,21 +5161,18 @@ printSimpleStats(const char *prefix, SimpleStats *ss)
/* print out results */
static void
-printResults(StatsData *total, instr_time total_time,
- instr_time conn_total_time, int64 latency_late)
+printResults(StatsData *total,
+ pg_time_usec_t total_duration, /* benchmarking time */
+ pg_time_usec_t conn_total_duration, /* is_connect */
+ pg_time_usec_t conn_elapsed_duration, /* !is_connect */
+ int64 latency_late)
{
- double time_include,
- tps_include,
- tps_exclude;
+ /* tps is about actually executed transactions during benchmarking */
int64 ntx = total->cnt - total->skipped;
+ double bench_duration = PG_TIME_GET_DOUBLE(total_duration);
+ double tps = ntx / bench_duration;
- time_include = INSTR_TIME_GET_DOUBLE(total_time);
-
- /* tps is about actually executed transactions */
- tps_include = ntx / time_include;
- tps_exclude = ntx /
- (time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
-
+ printf("pgbench (PostgreSQL) %d.%d\n", PG_VERSION_NUM / 10000, PG_VERSION_NUM % 100);
/* Report test parameters. */
printf("transaction type: %s\n",
num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
@@ -5236,8 +5203,7 @@ printResults(StatsData *total, instr_time total_time,
if (throttle_delay && latency_limit)
printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
- total->skipped,
- 100.0 * total->skipped / total->cnt);
+ total->skipped, 100.0 * total->skipped / total->cnt);
if (latency_limit)
printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT "/" INT64_FORMAT " (%.3f %%)\n",
@@ -5250,7 +5216,7 @@ printResults(StatsData *total, instr_time total_time,
{
/* no measurement, show average latency computed from run time */
printf("latency average = %.3f ms\n",
- 1000.0 * time_include * nclients / total->cnt);
+ 0.001 * total_duration * nclients / total->cnt);
}
if (throttle_delay)
@@ -5265,8 +5231,25 @@ printResults(StatsData *total, instr_time total_time,
0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
}
- printf("tps = %f (including connections establishing)\n", tps_include);
- printf("tps = %f (excluding connections establishing)\n", tps_exclude);
+ /*
+ * Under -C/--connect, each transaction incurs a significant connection cost,
+ * it would not make much sense to ignore it in tps, and it would not be tps
+ * anyway.
+ *
+ * Otherwise connections are made just once at the beginning of the run
+ * and should not impact performance but for very short run, so they are
+ * (right)fully ignored in tps.
+ */
+ if (is_connect)
+ {
+ printf("average connection time = %.3f ms\n", 0.001 * conn_total_duration / total->cnt);
+ printf("tps = %f (including reconnection times)\n", tps);
+ }
+ else
+ {
+ printf("initial connection time = %.3f ms\n", 0.001 * conn_elapsed_duration);
+ printf("tps = %f (without initial connection establishing)\n", tps);
+ }
/* Report per-script/command statistics */
if (per_script_stats || report_per_command)
@@ -5287,7 +5270,7 @@ printResults(StatsData *total, instr_time total_time,
100.0 * sql_script[i].weight / total_weight,
sstats->cnt,
100.0 * sstats->cnt / total->cnt,
- (sstats->cnt - sstats->skipped) / time_include);
+ (sstats->cnt - sstats->skipped) / bench_duration);
if (throttle_delay && latency_limit && sstats->cnt > 0)
printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
@@ -5335,10 +5318,7 @@ set_random_seed(const char *seed)
if (seed == NULL || strcmp(seed, "time") == 0)
{
/* rely on current time */
- instr_time now;
-
- INSTR_TIME_SET_CURRENT(now);
- iseed = (uint64) INSTR_TIME_GET_MICROSEC(now);
+ iseed = pg_time_now();
}
else if (strcmp(seed, "rand") == 0)
{
@@ -5441,9 +5421,10 @@ main(int argc, char **argv)
CState *state; /* status of clients */
TState *threads; /* array of thread */
- instr_time start_time; /* start up time */
- instr_time total_time;
- instr_time conn_total_time;
+ pg_time_usec_t
+ start_time, /* start up time */
+ bench_start = 0, /* first recorded benchmarking time */
+ conn_total_duration; /* cumulated connection time in threads */
int64 latency_late = 0;
StatsData stats;
int weight;
@@ -6126,67 +6107,53 @@ main(int argc, char **argv)
/* all clients must be assigned to a thread */
Assert(nclients_dealt == nclients);
- /* get start up time */
- INSTR_TIME_SET_CURRENT(start_time);
+ /* get start up time for the whole computation */
+ start_time = pg_time_now();
/* set alarm if duration is specified. */
if (duration > 0)
setalarm(duration);
- /* start threads */
#ifdef ENABLE_THREAD_SAFETY
- for (i = 0; i < nthreads; i++)
+ /* start all threads but thread 0 which is executed directly later */
+ for (i = 1; i < nthreads; i++)
{
TState *thread = &threads[i];
+ int err;
- INSTR_TIME_SET_CURRENT(thread->start_time);
+ thread->create_time = pg_time_now();
+ err = pthread_create(&thread->thread, NULL, threadRun, thread);
- /* compute when to stop */
- if (duration > 0)
- end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
- (int64) 1000000 * duration;
-
- /* the first thread (i = 0) is executed by main thread */
- if (i > 0)
- {
- int err = pthread_create(&thread->thread, NULL, threadRun, thread);
-
- if (err != 0 || thread->thread == INVALID_THREAD)
- {
- pg_log_fatal("could not create thread: %m");
- exit(1);
- }
- }
- else
+ if (err != 0 || thread->thread == INVALID_THREAD)
{
- thread->thread = INVALID_THREAD;
+ pg_log_fatal("could not create thread: %m");
+ exit(1);
}
}
#else
- INSTR_TIME_SET_CURRENT(threads[0].start_time);
+ Assert(nthreads == 1);
+#endif /* ENABLE_THREAD_SAFETY */
+
/* compute when to stop */
+ threads[0].create_time = pg_time_now();
if (duration > 0)
- end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
- (int64) 1000000 * duration;
+ end_time = threads[0].create_time + (int64) 1000000 * duration;
+
+ /* run thread 0 directly */
threads[0].thread = INVALID_THREAD;
-#endif /* ENABLE_THREAD_SAFETY */
+ (void) threadRun(&threads[0]);
- /* wait for threads and accumulate results */
+ /* wait for other threads and accumulate results */
initStats(&stats, 0);
- INSTR_TIME_SET_ZERO(conn_total_time);
+ conn_total_duration = 0;
+
for (i = 0; i < nthreads; i++)
{
TState *thread = &threads[i];
#ifdef ENABLE_THREAD_SAFETY
- if (threads[i].thread == INVALID_THREAD)
- /* actually run this thread directly in the main thread */
- (void) threadRun(thread);
- else
- /* wait of other threads. should check that 0 is returned? */
+ if (i > 0)
pthread_join(thread->thread, NULL);
-#else
- (void) threadRun(thread);
#endif /* ENABLE_THREAD_SAFETY */
for (int j = 0; j < thread->nstate; j++)
@@ -6199,23 +6166,24 @@ main(int argc, char **argv)
stats.cnt += thread->stats.cnt;
stats.skipped += thread->stats.skipped;
latency_late += thread->latency_late;
- INSTR_TIME_ADD(conn_total_time, thread->conn_time);
+ conn_total_duration += thread->conn_duration;
+
+ /* first recorded benchmarking start time */
+ if (bench_start == 0 || thread->bench_start < bench_start)
+ bench_start = thread->bench_start;
}
+
+ /* XXX should this be connection time? */
disconnect_all(state, nclients);
/*
- * XXX We compute results as though every client of every thread started
- * and finished at the same time. That model can diverge noticeably from
- * reality for a short benchmark run involving relatively many threads.
- * The first thread may process notably many transactions before the last
- * thread begins. Improving the model alone would bring limited benefit,
- * because performance during those periods of partial thread count can
- * easily exceed steady state performance. This is one of the many ways
- * short runs convey deceptive performance figures.
+ * Beware that performance of short benchmarks with many threads and possibly
+ * long transactions can be deceptive because threads do not start and finish
+ * at the exact same time. The total duration computed here encompasses all
+ * transactions so that tps shown is somehow slightly underestimated.
*/
- INSTR_TIME_SET_CURRENT(total_time);
- INSTR_TIME_SUBTRACT(total_time, start_time);
- printResults(&stats, total_time, conn_total_time, latency_late);
+ printResults(&stats, pg_time_now() - bench_start, conn_total_duration,
+ bench_start - start_time, latency_late);
if (exit_code != 0)
pg_log_fatal("Run was aborted; the above results are incomplete.");
@@ -6228,34 +6196,16 @@ threadRun(void *arg)
{
TState *thread = (TState *) arg;
CState *state = thread->state;
- instr_time start,
- end;
+ pg_time_usec_t start;
int nstate = thread->nstate;
int remains = nstate; /* number of remaining clients */
socket_set *sockets = alloc_socket_set(nstate);
- int i;
-
- /* for reporting progress: */
- int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
- int64 last_report = thread_start;
- int64 next_report = last_report + (int64) progress * 1000000;
+ int64 thread_start,
+ last_report,
+ next_report;
StatsData last,
aggs;
- /*
- * Initialize throttling rate target for all of the thread's clients. It
- * might be a little more accurate to reset thread->start_time here too.
- * The possible drift seems too small relative to typical throttle delay
- * times to worry about it.
- */
- INSTR_TIME_SET_CURRENT(start);
- thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
-
- INSTR_TIME_SET_ZERO(thread->conn_time);
-
- initStats(&aggs, time(NULL));
- last = aggs;
-
/* open log file if requested */
if (use_log)
{
@@ -6276,32 +6226,51 @@ threadRun(void *arg)
}
}
+ /* explicitly initialize the state machines */
+ for (int i = 0; i < nstate; i++)
+ {
+ state[i].state = CSTATE_CHOOSE_SCRIPT;
+ }
+
+ /* READY */
+ thread_start = pg_time_now();
+ thread->started_time = thread_start;
+ last_report = thread_start;
+ next_report = last_report + (int64) 1000000 * progress;
+
+ /* STEADY */
if (!is_connect)
{
/* make connections to the database before starting */
- for (i = 0; i < nstate; i++)
+ for (int i = 0; i < nstate; i++)
{
if ((state[i].con = doConnect()) == NULL)
goto done;
}
+
+ /* compute connection delay */
+ thread->conn_duration = pg_time_now() - thread->started_time;
}
-
- /* time after thread and connections set up */
- INSTR_TIME_SET_CURRENT(thread->conn_time);
- INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
-
- /* explicitly initialize the state machines */
- for (i = 0; i < nstate; i++)
+ else
{
- state[i].state = CSTATE_CHOOSE_SCRIPT;
+ /* no connection delay to record */
+ thread->conn_duration = 0;
}
+
+ start = pg_time_now();
+ thread->bench_start = start;
+ thread->throttle_trigger = start;
+
+ initStats(&aggs, start);
+ last = aggs;
+
/* loop till all clients have terminated */
while (remains > 0)
{
int nsocks; /* number of sockets to be waited for */
- int64 min_usec;
- int64 now_usec = 0; /* set this only if needed */
+ pg_time_usec_t min_usec;
+ pg_time_usec_t now = 0; /* set this only if needed */
/*
* identify which client sockets should be checked for input, and
@@ -6310,27 +6279,21 @@ threadRun(void *arg)
clear_socket_set(sockets);
nsocks = 0;
min_usec = PG_INT64_MAX;
- for (i = 0; i < nstate; i++)
+ for (int i = 0; i < nstate; i++)
{
CState *st = &state[i];
if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
{
/* a nap from the script, or under throttling */
- int64 this_usec;
+ pg_time_usec_t this_usec;
/* get current time if needed */
- if (now_usec == 0)
- {
- instr_time now;
-
- INSTR_TIME_SET_CURRENT(now);
- now_usec = INSTR_TIME_GET_MICROSEC(now);
- }
+ pg_time_now_lazy(&now);
/* min_usec should be the minimum delay across all clients */
this_usec = (st->state == CSTATE_SLEEP ?
- st->sleep_until : st->txn_scheduled) - now_usec;
+ st->sleep_until : st->txn_scheduled) - now;
if (min_usec > this_usec)
min_usec = this_usec;
}
@@ -6365,19 +6328,12 @@ threadRun(void *arg)
/* also wake up to print the next progress report on time */
if (progress && min_usec > 0 && thread->tid == 0)
{
- /* get current time if needed */
- if (now_usec == 0)
- {
- instr_time now;
+ pg_time_now_lazy(&now);
- INSTR_TIME_SET_CURRENT(now);
- now_usec = INSTR_TIME_GET_MICROSEC(now);
- }
-
- if (now_usec >= next_report)
+ if (now >= next_report)
min_usec = 0;
- else if ((next_report - now_usec) < min_usec)
- min_usec = next_report - now_usec;
+ else if ((next_report - now) < min_usec)
+ min_usec = next_report - now;
}
/*
@@ -6426,7 +6382,7 @@ threadRun(void *arg)
/* ok, advance the state machine of each connection */
nsocks = 0;
- for (i = 0; i < nstate; i++)
+ for (int i = 0; i < nstate; i++)
{
CState *st = &state[i];
@@ -6464,11 +6420,8 @@ threadRun(void *arg)
/* progress report is made by thread 0 for all threads */
if (progress && thread->tid == 0)
{
- instr_time now_time;
- int64 now;
+ pg_time_usec_t now = pg_time_now();
- INSTR_TIME_SET_CURRENT(now_time);
- now = INSTR_TIME_GET_MICROSEC(now_time);
if (now >= next_report)
{
/*
@@ -6486,17 +6439,17 @@ threadRun(void *arg)
*/
do
{
- next_report += (int64) progress * 1000000;
+ next_report += (int64) 1000000 * progress;
} while (now >= next_report);
}
}
}
done:
- INSTR_TIME_SET_CURRENT(start);
+ start = pg_time_now();
disconnect_all(state, nstate);
- INSTR_TIME_SET_CURRENT(end);
- INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
+ thread->conn_duration += pg_time_now() - start;
+
if (thread->logfile)
{
if (agg_interval > 0)
diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h
index d6459327cc..fcc0bdda28 100644
--- a/src/include/portability/instr_time.h
+++ b/src/include/portability/instr_time.h
@@ -253,4 +253,32 @@ GetTimerFrequency(void)
#define INSTR_TIME_SET_CURRENT_LAZY(t) \
(INSTR_TIME_IS_ZERO(t) ? INSTR_TIME_SET_CURRENT(t), true : false)
+/*
+ * Simpler convenient interface
+ *
+ * The instr_time type is expensive when dealing with time arithmetic.
+ * Define a type to hold microseconds on top of this, suitable for
+ * benchmarking performance measures, eg in "pgbench".
+ *
+ * Type int64 is good enough for about 584500 years.
+ */
+typedef int64 pg_time_usec_t;
+
+static inline pg_time_usec_t
+pg_time_now(void)
+{
+ instr_time now;
+
+ INSTR_TIME_SET_CURRENT(now);
+ return (pg_time_usec_t) INSTR_TIME_GET_MICROSEC(now);
+}
+
+static inline void
+pg_time_now_lazy(pg_time_usec_t *now)
+{
+ if ((*now) == 0)
+ (*now) = pg_time_now();
+}
+
+#define PG_TIME_GET_DOUBLE(t) (0.000001 * (t))
#endif /* INSTR_TIME_H */
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 46be67adaf..79a2a10dee 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -113,18 +113,29 @@ typedef struct socket_set
*/
#ifdef WIN32
+#define PTHREAD_BARRIER_SERIAL_THREAD (-1)
+
/* Use native win32 threads on Windows */
typedef struct win32_pthread *pthread_t;
typedef int pthread_attr_t;
+typedef SYNCHRONIZATION_BARRIER pthread_barrier_t;
static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
static int pthread_join(pthread_t th, void **thread_return);
+
+static int pthread_barrier_init(pthread_barrier_t *barrier, void *unused, int nthreads);
+static int pthread_barrier_wait(pthread_barrier_t *barrier);
+static int pthread_barrier_destroy(pthread_barrier_t *barrier);
#elif defined(ENABLE_THREAD_SAFETY)
/* Use platform-dependent pthread capability */
#include <pthread.h>
#else
/* No threads implementation, use none (-j 1) */
#define pthread_t void *
+#define pthread_barrier_t void *
+#define pthread_barrier_init(a, b, c) /* ignore */
+#define pthread_barrier_wait(a) /* ignore */
+#define pthread_barrier_destroy(a) /* ignore */
#endif
@@ -310,6 +321,9 @@ typedef struct RandomState
/* Various random sequences are initialized from this one. */
static RandomState base_random_sequence;
+/* Synchronization barrier for start and connection */
+static pthread_barrier_t barrier;
+
/*
* Connection state machine states.
*/
@@ -453,8 +467,8 @@ typedef struct
/* per thread collected stats in microseconds */
pg_time_usec_t create_time; /* thread creation time */
- pg_time_usec_t started_time; /* thread is running */
- pg_time_usec_t bench_start; /* thread is benchmarking */
+ pg_time_usec_t started_time; /* thread is running after start barrier */
+ pg_time_usec_t bench_start; /* thread is benchmarking after connection barrier */
pg_time_usec_t conn_duration; /* cumulated connection and deconnection delays */
StatsData stats;
@@ -6114,6 +6128,8 @@ main(int argc, char **argv)
if (duration > 0)
setalarm(duration);
+ pthread_barrier_init(&barrier, NULL, nthreads);
+
#ifdef ENABLE_THREAD_SAFETY
/* start all threads but thread 0 which is executed directly later */
for (i = 1; i < nthreads; i++)
@@ -6185,6 +6201,8 @@ main(int argc, char **argv)
printResults(&stats, pg_time_now() - bench_start, conn_total_duration,
bench_start - start_time, latency_late);
+ pthread_barrier_destroy(&barrier);
+
if (exit_code != 0)
pg_log_fatal("Run was aborted; the above results are incomplete.");
@@ -6233,6 +6251,8 @@ threadRun(void *arg)
}
/* READY */
+ pthread_barrier_wait(&barrier);
+
thread_start = pg_time_now();
thread->started_time = thread_start;
last_report = thread_start;
@@ -6257,6 +6277,8 @@ threadRun(void *arg)
thread->conn_duration = 0;
}
+ /* GO */
+ pthread_barrier_wait(&barrier);
start = pg_time_now();
thread->bench_start = start;
@@ -6762,4 +6784,26 @@ pthread_join(pthread_t th, void **thread_return)
return 0;
}
+static int
+pthread_barrier_init(pthread_barrier_t *barrier, void *unused, int nthreads)
+{
+ /* no spinning: threads are not expected to arrive at the barrier together */
+ bool ok = InitializeSynchronizationBarrier(barrier, nthreads, 0);
+ return 0;
+}
+
+static int
+pthread_barrier_wait(pthread_barrier_t *barrier)
+{
+ bool last = EnterSynchronizationBarrier(barrier, SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY);
+ return last ? PTHREAD_BARRIER_SERIAL_THREAD : 0;
+}
+
+static int
+pthread_barrier_destroy(pthread_barrier_t *barrier)
+{
+ (void) DeleteSynchronizationBarrier(barrier);
+ return 0;
+}
+
#endif /* WIN32 */