Hello Marina,
1) It looks like pgbench will no longer support Windows XP due to the function DeleteSynchronizationBarrier. From https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-deletesynchronizationbarrier Minimum supported client: Windows 8 [desktop apps only] Minimum supported server: Windows Server 2012 [desktop apps only]
Thanks for the test and precise analysis! Sigh.I do not think that putting such version requirements are worth it just for the sake of pgbench.
In the attached version, I just comment out the call and add an explanation about why it is commented out. If pg overall version requirements are changed on windows, then it could be reinstated.
-- Fabien.
diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml index 7180fedd65..f02721da0d 100644 --- a/doc/src/sgml/ref/pgbench.sgml +++ b/doc/src/sgml/ref/pgbench.sgml @@ -58,8 +58,10 @@ number of clients: 10 number of threads: 1 number of transactions per client: 1000 number of transactions actually processed: 10000/10000 -tps = 85.184871 (including connections establishing) -tps = 85.296346 (excluding connections establishing) +latency average = 11.013 ms +latency stddev = 7.351 ms +initial connection time = 45.758 ms +tps = 896.967014 (without initial connection establishing) </screen> The first six lines report some of the most important parameter @@ -68,8 +70,7 @@ tps = 85.296346 (excluding connections establishing) and number of transactions per client); these will be equal unless the run failed before completion. (In <option>-T</option> mode, only the actual number of transactions is printed.) - The last two lines report the number of transactions per second, - figured with and without counting the time to start database sessions. + The last line reports the number of transactions per second. </para> <para> @@ -2234,22 +2235,22 @@ number of clients: 10 number of threads: 1 number of transactions per client: 1000 number of transactions actually processed: 10000/10000 -latency average = 15.844 ms -latency stddev = 2.715 ms -tps = 618.764555 (including connections establishing) -tps = 622.977698 (excluding connections establishing) +latency average = 10.870 ms +latency stddev = 7.341 ms +initial connection time = 30.954 ms +tps = 907.949122 (without initial connection establishing) statement latencies in milliseconds: - 0.002 \set aid random(1, 100000 * :scale) - 0.005 \set bid random(1, 1 * :scale) - 0.002 \set tid random(1, 10 * :scale) - 0.001 \set delta random(-5000, 5000) - 0.326 BEGIN; - 0.603 UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid; - 0.454 SELECT abalance FROM pgbench_accounts WHERE aid = :aid; - 5.528 UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid; - 7.335 UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid; - 0.371 INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP); - 1.212 END; + 0.001 \set aid random(1, 100000 * :scale) + 0.001 \set bid random(1, 1 * :scale) + 0.001 \set tid random(1, 10 * :scale) + 0.000 \set delta random(-5000, 5000) + 0.046 BEGIN; + 0.151 UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid; + 0.107 SELECT abalance FROM pgbench_accounts WHERE aid = :aid; + 4.241 UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid; + 5.245 UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid; + 0.102 INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP); + 0.974 END; </screen> </para> diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index 3057665bbe..b8a3e28690 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -292,9 +292,9 @@ typedef struct SimpleStats */ typedef struct StatsData { - time_t start_time; /* interval start time, for aggregates */ - int64 cnt; /* number of transactions, including skipped */ - int64 skipped; /* number of transactions skipped under --rate + pg_time_usec_t start_time; /* interval start time, for aggregates */ + int64 cnt; /* number of transactions, including skipped */ + int64 skipped; /* number of transactions skipped under --rate * and --latency-limit */ SimpleStats latency; SimpleStats lag; @@ -417,11 +417,11 @@ typedef struct int nvariables; /* number of variables */ bool vars_sorted; /* are variables sorted by name? */ - /* various times about current transaction */ - int64 txn_scheduled; /* scheduled start time of transaction (usec) */ - int64 sleep_until; /* scheduled start time of next cmd (usec) */ - instr_time txn_begin; /* used for measuring schedule lag times */ - instr_time stmt_begin; /* used for measuring statement latencies */ + /* various times about current transaction in microseconds */ + pg_time_usec_t txn_scheduled; /* scheduled start time of transaction */ + pg_time_usec_t sleep_until; /* scheduled start time of next cmd */ + pg_time_usec_t txn_begin; /* used for measuring schedule lag times */ + pg_time_usec_t stmt_begin; /* used for measuring statement latencies */ bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */ @@ -450,13 +450,16 @@ typedef struct RandomState ts_sample_rs; /* random state for log sampling */ int64 throttle_trigger; /* previous/next throttling (us) */ - FILE *logfile; /* where to log, or NULL */ + FILE *logfile; /* where to log, or NULL */ + + /* per thread collected stats in microseconds */ + pg_time_usec_t create_time; /* thread creation time */ + pg_time_usec_t started_time; /* thread is running */ + pg_time_usec_t bench_start; /* thread is benchmarking */ + pg_time_usec_t conn_duration; /* cumulated connection and deconnection delays */ - /* per thread collected stats */ - instr_time start_time; /* thread start time */ - instr_time conn_time; StatsData stats; - int64 latency_late; /* executed but late transactions */ + int64 latency_late; /* count executed but late transactions */ } TState; #define INVALID_THREAD ((pthread_t) 0) @@ -598,10 +601,10 @@ static void setIntValue(PgBenchValue *pv, int64 ival); static void setDoubleValue(PgBenchValue *pv, double dval); static bool evaluateExpr(CState *st, PgBenchExpr *expr, PgBenchValue *retval); -static ConnectionStateEnum executeMetaCommand(CState *st, instr_time *now); +static ConnectionStateEnum executeMetaCommand(CState *st, pg_time_usec_t *now); static void doLog(TState *thread, CState *st, StatsData *agg, bool skipped, double latency, double lag); -static void processXactStats(TState *thread, CState *st, instr_time *now, +static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now, bool skipped, StatsData *agg); static void addScript(ParsedScript script); static void *threadRun(void *arg); @@ -1105,9 +1108,9 @@ mergeSimpleStats(SimpleStats *acc, SimpleStats *ss) * the given value. */ static void -initStats(StatsData *sd, time_t start_time) +initStats(StatsData *sd, pg_time_usec_t start) { - sd->start_time = start_time; + sd->start_time = start; sd->cnt = 0; sd->skipped = 0; initSimpleStats(&sd->latency); @@ -2876,7 +2879,6 @@ evaluateSleep(CState *st, int argc, char **argv, int *usecs) static void advanceConnectionState(TState *thread, CState *st, StatsData *agg) { - instr_time now; /* * gettimeofday() isn't free, so we get the current timestamp lazily the @@ -2886,7 +2888,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) * means "not set yet". Reset "now" when we execute shell commands or * expressions, which might take a non-negligible amount of time, though. */ - INSTR_TIME_SET_ZERO(now); + pg_time_usec_t now = 0; /* * Loop in the state machine, until we have to wait for a result from the @@ -2921,29 +2923,30 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) /* Start new transaction (script) */ case CSTATE_START_TX: + pg_time_now_lazy(&now); /* establish connection if needed, i.e. under --connect */ if (st->con == NULL) { - instr_time start; + pg_time_usec_t start = now; - INSTR_TIME_SET_CURRENT_LAZY(now); - start = now; if ((st->con = doConnect()) == NULL) { pg_log_error("client %d aborted while establishing connection", st->id); st->state = CSTATE_ABORTED; break; } - INSTR_TIME_SET_CURRENT(now); - INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start); + + /* reset now after connection */ + now = pg_time_now(); + + thread->conn_duration += now - start; /* Reset session-local state */ memset(st->prepared, 0, sizeof(st->prepared)); } /* record transaction start time */ - INSTR_TIME_SET_CURRENT_LAZY(now); st->txn_begin = now; /* @@ -2951,7 +2954,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) * scheduled start time. */ if (!throttle_delay) - st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now); + st->txn_scheduled = now; /* Begin with the first command */ st->state = CSTATE_START_COMMAND; @@ -2987,12 +2990,9 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) */ if (latency_limit) { - int64 now_us; + pg_time_now_lazy(&now); - INSTR_TIME_SET_CURRENT_LAZY(now); - now_us = INSTR_TIME_GET_MICROSEC(now); - - while (thread->throttle_trigger < now_us - latency_limit && + while (thread->throttle_trigger < now - latency_limit && (nxacts <= 0 || st->cnt < nxacts)) { processXactStats(thread, st, &now, true, agg); @@ -3025,9 +3025,9 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) * Wait until it's time to start next transaction. */ case CSTATE_THROTTLE: - INSTR_TIME_SET_CURRENT_LAZY(now); + pg_time_now_lazy(&now); - if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled) + if (now < st->txn_scheduled) return; /* still sleeping, nothing to do here */ /* done sleeping, but don't start transaction if we're done */ @@ -3050,7 +3050,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) /* record begin time of next command, and initiate it */ if (report_per_command) { - INSTR_TIME_SET_CURRENT_LAZY(now); + pg_time_now_lazy(&now); st->stmt_begin = now; } @@ -3215,8 +3215,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) * instead of CSTATE_START_TX. */ case CSTATE_SLEEP: - INSTR_TIME_SET_CURRENT_LAZY(now); - if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until) + pg_time_now_lazy(&now); + if (now < st->sleep_until) return; /* still sleeping, nothing to do here */ /* Else done sleeping. */ st->state = CSTATE_END_COMMAND; @@ -3236,13 +3236,12 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) { Command *command; - INSTR_TIME_SET_CURRENT_LAZY(now); + pg_time_now_lazy(&now); command = sql_script[st->use_file].commands[st->command]; /* XXX could use a mutex here, but we choose not to */ addToSimpleStats(&command->stats, - INSTR_TIME_GET_DOUBLE(now) - - INSTR_TIME_GET_DOUBLE(st->stmt_begin)); + PG_TIME_GET_DOUBLE(now - st->stmt_begin)); } /* Go ahead with next command, to be executed or skipped */ @@ -3268,7 +3267,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) if (is_connect) { finishCon(st); - INSTR_TIME_SET_ZERO(now); + now = 0; } if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded) @@ -3306,7 +3305,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg) * take no time to execute. */ static ConnectionStateEnum -executeMetaCommand(CState *st, instr_time *now) +executeMetaCommand(CState *st, pg_time_usec_t *now) { Command *command = sql_script[st->use_file].commands[st->command]; int argc; @@ -3348,8 +3347,8 @@ executeMetaCommand(CState *st, instr_time *now) return CSTATE_ABORTED; } - INSTR_TIME_SET_CURRENT_LAZY(*now); - st->sleep_until = INSTR_TIME_GET_MICROSEC(*now) + usec; + pg_time_now_lazy(now); + st->sleep_until = (*now) + usec; return CSTATE_SLEEP; } else if (command->meta == META_SET) @@ -3452,7 +3451,7 @@ executeMetaCommand(CState *st, instr_time *now) * executing the expression or shell command might have taken a * non-negligible amount of time, so reset 'now' */ - INSTR_TIME_SET_ZERO(*now); + *now = 0; return CSTATE_END_COMMAND; } @@ -3462,14 +3461,15 @@ executeMetaCommand(CState *st, instr_time *now) * * We print Unix-epoch timestamps in the log, so that entries can be * correlated against other logs. On some platforms this could be obtained - * from the instr_time reading the caller has, but rather than get entangled - * with that, we just eat the cost of an extra syscall in all cases. + * from the caller, but rather than get entangled with that, we just eat + * the cost of an extra syscall in all cases. */ static void doLog(TState *thread, CState *st, StatsData *agg, bool skipped, double latency, double lag) { FILE *logfile = thread->logfile; + pg_time_usec_t now = pg_time_now(); Assert(use_log); @@ -3489,13 +3489,12 @@ doLog(TState *thread, CState *st, * any empty intervals in between (this may happen with very low tps, * e.g. --rate=0.1). */ - time_t now = time(NULL); while (agg->start_time + agg_interval <= now) { /* print aggregated report to logfile */ - fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f", - (long) agg->start_time, + fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f", + agg->start_time, agg->cnt, agg->latency.sum, agg->latency.sum2, @@ -3523,17 +3522,13 @@ doLog(TState *thread, CState *st, else { /* no, print raw transactions */ - struct timeval tv; - - gettimeofday(&tv, NULL); if (skipped) fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld", - st->id, st->cnt, st->use_file, - (long) tv.tv_sec, (long) tv.tv_usec); + st->id, st->cnt, st->use_file, now / 1000000, now % 1000000); else fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld", st->id, st->cnt, latency, st->use_file, - (long) tv.tv_sec, (long) tv.tv_usec); + now / 1000000, now % 1000000); if (throttle_delay) fprintf(logfile, " %.0f", lag); fputc('\n', logfile); @@ -3547,7 +3542,7 @@ doLog(TState *thread, CState *st, * Note that even skipped transactions are counted in the "cnt" fields.) */ static void -processXactStats(TState *thread, CState *st, instr_time *now, +processXactStats(TState *thread, CState *st, pg_time_usec_t *now, bool skipped, StatsData *agg) { double latency = 0.0, @@ -3557,11 +3552,11 @@ processXactStats(TState *thread, CState *st, instr_time *now, if (detailed && !skipped) { - INSTR_TIME_SET_CURRENT_LAZY(*now); + pg_time_now_lazy(now); /* compute latency & lag */ - latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled; - lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled; + latency = (*now) - st->txn_scheduled; + lag = st->txn_begin - st->txn_scheduled; } if (thread_details) @@ -3812,10 +3807,7 @@ initGenerateDataClientSide(PGconn *con) int64 k; /* used to track elapsed time and estimate of the remaining time */ - instr_time start, - diff; - double elapsed_sec, - remaining_sec; + pg_time_usec_t start; int log_interval = 1; /* Stay on the same line if reporting to a terminal */ @@ -3867,7 +3859,7 @@ initGenerateDataClientSide(PGconn *con) } PQclear(res); - INSTR_TIME_SET_CURRENT(start); + start = pg_time_now(); for (k = 0; k < (int64) naccounts * scale; k++) { @@ -3892,11 +3884,8 @@ initGenerateDataClientSide(PGconn *con) */ if ((!use_quiet) && (j % 100000 == 0)) { - INSTR_TIME_SET_CURRENT(diff); - INSTR_TIME_SUBTRACT(diff, start); - - elapsed_sec = INSTR_TIME_GET_DOUBLE(diff); - remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j; + double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); + double remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j; fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)%c", j, (int64) naccounts * scale, @@ -3906,11 +3895,8 @@ initGenerateDataClientSide(PGconn *con) /* let's not call the timing for each row, but only each 100 rows */ else if (use_quiet && (j % 100 == 0)) { - INSTR_TIME_SET_CURRENT(diff); - INSTR_TIME_SUBTRACT(diff, start); - - elapsed_sec = INSTR_TIME_GET_DOUBLE(diff); - remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j; + double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); + double remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j; /* have we reached the next interval (or end)? */ if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS)) @@ -4115,10 +4101,8 @@ runInitSteps(const char *initialize_steps) for (step = initialize_steps; *step != '\0'; step++) { - instr_time start; char *op = NULL; - - INSTR_TIME_SET_CURRENT(start); + pg_time_usec_t start = pg_time_now(); switch (*step) { @@ -4160,12 +4144,7 @@ runInitSteps(const char *initialize_steps) if (op != NULL) { - instr_time diff; - double elapsed_sec; - - INSTR_TIME_SET_CURRENT(diff); - INSTR_TIME_SUBTRACT(diff, start); - elapsed_sec = INSTR_TIME_GET_DOUBLE(diff); + double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start); if (!first) appendPQExpBufferStr(&stats, ", "); @@ -5087,12 +5066,12 @@ addScript(ParsedScript script) * progress report. On exit, they are updated with the new stats. */ static void -printProgressReport(TState *threads, int64 test_start, int64 now, +printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now, StatsData *last, int64 *last_report) { /* generate and show report */ - int64 run = now - *last_report, - ntx; + pg_time_usec_t run = now - *last_report; + int64 ntx; double tps, total_run, latency, @@ -5139,16 +5118,7 @@ printProgressReport(TState *threads, int64 test_start, int64 now, if (progress_timestamp) { - /* - * On some platforms the current system timestamp is available in - * now_time, but rather than get entangled with that, we just eat the - * cost of an extra syscall in all cases. - */ - struct timeval tv; - - gettimeofday(&tv, NULL); - snprintf(tbuf, sizeof(tbuf), "%ld.%03ld s", - (long) tv.tv_sec, (long) (tv.tv_usec / 1000)); + snprintf(tbuf, sizeof(tbuf), "%.3f s", PG_TIME_GET_DOUBLE(now)); } else { @@ -5188,21 +5158,18 @@ printSimpleStats(const char *prefix, SimpleStats *ss) /* print out results */ static void -printResults(StatsData *total, instr_time total_time, - instr_time conn_total_time, int64 latency_late) +printResults(StatsData *total, + pg_time_usec_t total_duration, /* benchmarking time */ + pg_time_usec_t conn_total_duration, /* is_connect */ + pg_time_usec_t conn_elapsed_duration, /* !is_connect */ + int64 latency_late) { - double time_include, - tps_include, - tps_exclude; + /* tps is about actually executed transactions during benchmarking */ int64 ntx = total->cnt - total->skipped; + double bench_duration = PG_TIME_GET_DOUBLE(total_duration); + double tps = ntx / bench_duration; - time_include = INSTR_TIME_GET_DOUBLE(total_time); - - /* tps is about actually executed transactions */ - tps_include = ntx / time_include; - tps_exclude = ntx / - (time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients)); - + printf("pgbench (PostgreSQL) %d.%d\n", PG_VERSION_NUM / 10000, PG_VERSION_NUM % 100); /* Report test parameters. */ printf("transaction type: %s\n", num_scripts == 1 ? sql_script[0].desc : "multiple scripts"); @@ -5233,8 +5200,7 @@ printResults(StatsData *total, instr_time total_time, if (throttle_delay && latency_limit) printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n", - total->skipped, - 100.0 * total->skipped / total->cnt); + total->skipped, 100.0 * total->skipped / total->cnt); if (latency_limit) printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT "/" INT64_FORMAT " (%.3f %%)\n", @@ -5247,7 +5213,7 @@ printResults(StatsData *total, instr_time total_time, { /* no measurement, show average latency computed from run time */ printf("latency average = %.3f ms\n", - 1000.0 * time_include * nclients / total->cnt); + 0.001 * total_duration * nclients / total->cnt); } if (throttle_delay) @@ -5262,8 +5228,25 @@ printResults(StatsData *total, instr_time total_time, 0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max); } - printf("tps = %f (including connections establishing)\n", tps_include); - printf("tps = %f (excluding connections establishing)\n", tps_exclude); + /* + * Under -C/--connect, each transaction incurs a significant connection cost, + * it would not make much sense to ignore it in tps, and it would not be tps + * anyway. + * + * Otherwise connections are made just once at the beginning of the run + * and should not impact performance but for very short run, so they are + * (right)fully ignored in tps. + */ + if (is_connect) + { + printf("average connection time = %.3f ms\n", 0.001 * conn_total_duration / total->cnt); + printf("tps = %f (including reconnection times)\n", tps); + } + else + { + printf("initial connection time = %.3f ms\n", 0.001 * conn_elapsed_duration); + printf("tps = %f (without initial connection establishing)\n", tps); + } /* Report per-script/command statistics */ if (per_script_stats || report_per_command) @@ -5284,7 +5267,7 @@ printResults(StatsData *total, instr_time total_time, 100.0 * sql_script[i].weight / total_weight, sstats->cnt, 100.0 * sstats->cnt / total->cnt, - (sstats->cnt - sstats->skipped) / time_include); + (sstats->cnt - sstats->skipped) / bench_duration); if (throttle_delay && latency_limit && sstats->cnt > 0) printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n", @@ -5332,10 +5315,7 @@ set_random_seed(const char *seed) if (seed == NULL || strcmp(seed, "time") == 0) { /* rely on current time */ - instr_time now; - - INSTR_TIME_SET_CURRENT(now); - iseed = (uint64) INSTR_TIME_GET_MICROSEC(now); + iseed = pg_time_now(); } else if (strcmp(seed, "rand") == 0) { @@ -5438,9 +5418,10 @@ main(int argc, char **argv) CState *state; /* status of clients */ TState *threads; /* array of thread */ - instr_time start_time; /* start up time */ - instr_time total_time; - instr_time conn_total_time; + pg_time_usec_t + start_time, /* start up time */ + bench_start = 0, /* first recorded benchmarking time */ + conn_total_duration; /* cumulated connection time in threads */ int64 latency_late = 0; StatsData stats; int weight; @@ -6123,67 +6104,53 @@ main(int argc, char **argv) /* all clients must be assigned to a thread */ Assert(nclients_dealt == nclients); - /* get start up time */ - INSTR_TIME_SET_CURRENT(start_time); + /* get start up time for the whole computation */ + start_time = pg_time_now(); /* set alarm if duration is specified. */ if (duration > 0) setalarm(duration); - /* start threads */ #ifdef ENABLE_THREAD_SAFETY - for (i = 0; i < nthreads; i++) + /* start all threads but thread 0 which is executed directly later */ + for (i = 1; i < nthreads; i++) { TState *thread = &threads[i]; + int err; - INSTR_TIME_SET_CURRENT(thread->start_time); + thread->create_time = pg_time_now(); + err = pthread_create(&thread->thread, NULL, threadRun, thread); - /* compute when to stop */ - if (duration > 0) - end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) + - (int64) 1000000 * duration; - - /* the first thread (i = 0) is executed by main thread */ - if (i > 0) - { - int err = pthread_create(&thread->thread, NULL, threadRun, thread); - - if (err != 0 || thread->thread == INVALID_THREAD) - { - pg_log_fatal("could not create thread: %m"); - exit(1); - } - } - else + if (err != 0 || thread->thread == INVALID_THREAD) { - thread->thread = INVALID_THREAD; + pg_log_fatal("could not create thread: %m"); + exit(1); } } #else - INSTR_TIME_SET_CURRENT(threads[0].start_time); + Assert(nthreads == 1); +#endif /* ENABLE_THREAD_SAFETY */ + /* compute when to stop */ + threads[0].create_time = pg_time_now(); if (duration > 0) - end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) + - (int64) 1000000 * duration; + end_time = threads[0].create_time + (int64) 1000000 * duration; + + /* run thread 0 directly */ threads[0].thread = INVALID_THREAD; -#endif /* ENABLE_THREAD_SAFETY */ + (void) threadRun(&threads[0]); - /* wait for threads and accumulate results */ + /* wait for other threads and accumulate results */ initStats(&stats, 0); - INSTR_TIME_SET_ZERO(conn_total_time); + conn_total_duration = 0; + for (i = 0; i < nthreads; i++) { TState *thread = &threads[i]; #ifdef ENABLE_THREAD_SAFETY - if (threads[i].thread == INVALID_THREAD) - /* actually run this thread directly in the main thread */ - (void) threadRun(thread); - else - /* wait of other threads. should check that 0 is returned? */ + if (i > 0) pthread_join(thread->thread, NULL); -#else - (void) threadRun(thread); #endif /* ENABLE_THREAD_SAFETY */ for (int j = 0; j < thread->nstate; j++) @@ -6196,23 +6163,24 @@ main(int argc, char **argv) stats.cnt += thread->stats.cnt; stats.skipped += thread->stats.skipped; latency_late += thread->latency_late; - INSTR_TIME_ADD(conn_total_time, thread->conn_time); + conn_total_duration += thread->conn_duration; + + /* first recorded benchmarking start time */ + if (bench_start == 0 || thread->bench_start < bench_start) + bench_start = thread->bench_start; } + + /* XXX should this be connection time? */ disconnect_all(state, nclients); /* - * XXX We compute results as though every client of every thread started - * and finished at the same time. That model can diverge noticeably from - * reality for a short benchmark run involving relatively many threads. - * The first thread may process notably many transactions before the last - * thread begins. Improving the model alone would bring limited benefit, - * because performance during those periods of partial thread count can - * easily exceed steady state performance. This is one of the many ways - * short runs convey deceptive performance figures. + * Beware that performance of short benchmarks with many threads and possibly + * long transactions can be deceptive because threads do not start and finish + * at the exact same time. The total duration computed here encompasses all + * transactions so that tps shown is somehow slightly underestimated. */ - INSTR_TIME_SET_CURRENT(total_time); - INSTR_TIME_SUBTRACT(total_time, start_time); - printResults(&stats, total_time, conn_total_time, latency_late); + printResults(&stats, pg_time_now() - bench_start, conn_total_duration, + bench_start - start_time, latency_late); if (exit_code != 0) pg_log_fatal("Run was aborted; the above results are incomplete."); @@ -6225,34 +6193,16 @@ threadRun(void *arg) { TState *thread = (TState *) arg; CState *state = thread->state; - instr_time start, - end; + pg_time_usec_t start; int nstate = thread->nstate; int remains = nstate; /* number of remaining clients */ socket_set *sockets = alloc_socket_set(nstate); - int i; - - /* for reporting progress: */ - int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time); - int64 last_report = thread_start; - int64 next_report = last_report + (int64) progress * 1000000; + int64 thread_start, + last_report, + next_report; StatsData last, aggs; - /* - * Initialize throttling rate target for all of the thread's clients. It - * might be a little more accurate to reset thread->start_time here too. - * The possible drift seems too small relative to typical throttle delay - * times to worry about it. - */ - INSTR_TIME_SET_CURRENT(start); - thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start); - - INSTR_TIME_SET_ZERO(thread->conn_time); - - initStats(&aggs, time(NULL)); - last = aggs; - /* open log file if requested */ if (use_log) { @@ -6273,32 +6223,49 @@ threadRun(void *arg) } } + /* explicitly initialize the state machines */ + for (int i = 0; i < nstate; i++) + state[i].state = CSTATE_CHOOSE_SCRIPT; + + /* READY */ + thread_start = pg_time_now(); + thread->started_time = thread_start; + last_report = thread_start; + next_report = last_report + (int64) 1000000 * progress; + + /* STEADY */ if (!is_connect) { /* make connections to the database before starting */ - for (i = 0; i < nstate; i++) + for (int i = 0; i < nstate; i++) { if ((state[i].con = doConnect()) == NULL) goto done; } + + /* compute connection delay */ + thread->conn_duration = pg_time_now() - thread->started_time; } - - /* time after thread and connections set up */ - INSTR_TIME_SET_CURRENT(thread->conn_time); - INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time); - - /* explicitly initialize the state machines */ - for (i = 0; i < nstate; i++) + else { - state[i].state = CSTATE_CHOOSE_SCRIPT; + /* no connection delay to record */ + thread->conn_duration = 0; } + + start = pg_time_now(); + thread->bench_start = start; + thread->throttle_trigger = start; + + initStats(&aggs, start); + last = aggs; + /* loop till all clients have terminated */ while (remains > 0) { int nsocks; /* number of sockets to be waited for */ - int64 min_usec; - int64 now_usec = 0; /* set this only if needed */ + pg_time_usec_t min_usec; + pg_time_usec_t now = 0; /* set this only if needed */ /* * identify which client sockets should be checked for input, and @@ -6307,27 +6274,21 @@ threadRun(void *arg) clear_socket_set(sockets); nsocks = 0; min_usec = PG_INT64_MAX; - for (i = 0; i < nstate; i++) + for (int i = 0; i < nstate; i++) { CState *st = &state[i]; if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE) { /* a nap from the script, or under throttling */ - int64 this_usec; + pg_time_usec_t this_usec; /* get current time if needed */ - if (now_usec == 0) - { - instr_time now; - - INSTR_TIME_SET_CURRENT(now); - now_usec = INSTR_TIME_GET_MICROSEC(now); - } + pg_time_now_lazy(&now); /* min_usec should be the minimum delay across all clients */ this_usec = (st->state == CSTATE_SLEEP ? - st->sleep_until : st->txn_scheduled) - now_usec; + st->sleep_until : st->txn_scheduled) - now; if (min_usec > this_usec) min_usec = this_usec; } @@ -6362,19 +6323,12 @@ threadRun(void *arg) /* also wake up to print the next progress report on time */ if (progress && min_usec > 0 && thread->tid == 0) { - /* get current time if needed */ - if (now_usec == 0) - { - instr_time now; + pg_time_now_lazy(&now); - INSTR_TIME_SET_CURRENT(now); - now_usec = INSTR_TIME_GET_MICROSEC(now); - } - - if (now_usec >= next_report) + if (now >= next_report) min_usec = 0; - else if ((next_report - now_usec) < min_usec) - min_usec = next_report - now_usec; + else if ((next_report - now) < min_usec) + min_usec = next_report - now; } /* @@ -6423,7 +6377,7 @@ threadRun(void *arg) /* ok, advance the state machine of each connection */ nsocks = 0; - for (i = 0; i < nstate; i++) + for (int i = 0; i < nstate; i++) { CState *st = &state[i]; @@ -6461,11 +6415,8 @@ threadRun(void *arg) /* progress report is made by thread 0 for all threads */ if (progress && thread->tid == 0) { - instr_time now_time; - int64 now; + pg_time_usec_t now = pg_time_now(); - INSTR_TIME_SET_CURRENT(now_time); - now = INSTR_TIME_GET_MICROSEC(now_time); if (now >= next_report) { /* @@ -6483,17 +6434,17 @@ threadRun(void *arg) */ do { - next_report += (int64) progress * 1000000; + next_report += (int64) 1000000 * progress; } while (now >= next_report); } } } done: - INSTR_TIME_SET_CURRENT(start); + start = pg_time_now(); disconnect_all(state, nstate); - INSTR_TIME_SET_CURRENT(end); - INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start); + thread->conn_duration += pg_time_now() - start; + if (thread->logfile) { if (agg_interval > 0) diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h index d6459327cc..fcc0bdda28 100644 --- a/src/include/portability/instr_time.h +++ b/src/include/portability/instr_time.h @@ -253,4 +253,32 @@ GetTimerFrequency(void) #define INSTR_TIME_SET_CURRENT_LAZY(t) \ (INSTR_TIME_IS_ZERO(t) ? INSTR_TIME_SET_CURRENT(t), true : false) +/* + * Simpler convenient interface + * + * The instr_time type is expensive when dealing with time arithmetic. + * Define a type to hold microseconds on top of this, suitable for + * benchmarking performance measures, eg in "pgbench". + * + * Type int64 is good enough for about 584500 years. + */ +typedef int64 pg_time_usec_t; + +static inline pg_time_usec_t +pg_time_now(void) +{ + instr_time now; + + INSTR_TIME_SET_CURRENT(now); + return (pg_time_usec_t) INSTR_TIME_GET_MICROSEC(now); +} + +static inline void +pg_time_now_lazy(pg_time_usec_t *now) +{ + if ((*now) == 0) + (*now) = pg_time_now(); +} + +#define PG_TIME_GET_DOUBLE(t) (0.000001 * (t)) #endif /* INSTR_TIME_H */
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c index b8a3e28690..f2c54e4762 100644 --- a/src/bin/pgbench/pgbench.c +++ b/src/bin/pgbench/pgbench.c @@ -114,18 +114,29 @@ typedef struct socket_set */ #ifdef WIN32 +#define PTHREAD_BARRIER_SERIAL_THREAD (-1) + /* Use native win32 threads on Windows */ typedef struct win32_pthread *pthread_t; typedef int pthread_attr_t; +typedef SYNCHRONIZATION_BARRIER pthread_barrier_t; static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg); static int pthread_join(pthread_t th, void **thread_return); + +static int pthread_barrier_init(pthread_barrier_t *barrier, void *unused, int nthreads); +static int pthread_barrier_wait(pthread_barrier_t *barrier); +static int pthread_barrier_destroy(pthread_barrier_t *barrier); #elif defined(ENABLE_THREAD_SAFETY) /* Use platform-dependent pthread capability */ #include <pthread.h> #else /* No threads implementation, use none (-j 1) */ #define pthread_t void * +#define pthread_barrier_t void * +#define pthread_barrier_init(a, b, c) /* ignore */ +#define pthread_barrier_wait(a) /* ignore */ +#define pthread_barrier_destroy(a) /* ignore */ #endif @@ -311,6 +322,9 @@ typedef struct RandomState /* Various random sequences are initialized from this one. */ static RandomState base_random_sequence; +/* Synchronization barrier for start and connection */ +static pthread_barrier_t barrier; + /* * Connection state machine states. */ @@ -454,8 +468,8 @@ typedef struct /* per thread collected stats in microseconds */ pg_time_usec_t create_time; /* thread creation time */ - pg_time_usec_t started_time; /* thread is running */ - pg_time_usec_t bench_start; /* thread is benchmarking */ + pg_time_usec_t started_time; /* thread is running after start barrier */ + pg_time_usec_t bench_start; /* thread is benchmarking after connection barrier */ pg_time_usec_t conn_duration; /* cumulated connection and deconnection delays */ StatsData stats; @@ -6111,6 +6125,8 @@ main(int argc, char **argv) if (duration > 0) setalarm(duration); + pthread_barrier_init(&barrier, NULL, nthreads); + #ifdef ENABLE_THREAD_SAFETY /* start all threads but thread 0 which is executed directly later */ for (i = 1; i < nthreads; i++) @@ -6182,6 +6198,8 @@ main(int argc, char **argv) printResults(&stats, pg_time_now() - bench_start, conn_total_duration, bench_start - start_time, latency_late); + pthread_barrier_destroy(&barrier); + if (exit_code != 0) pg_log_fatal("Run was aborted; the above results are incomplete."); @@ -6228,6 +6246,8 @@ threadRun(void *arg) state[i].state = CSTATE_CHOOSE_SCRIPT; /* READY */ + pthread_barrier_wait(&barrier); + thread_start = pg_time_now(); thread->started_time = thread_start; last_report = thread_start; @@ -6240,7 +6260,18 @@ threadRun(void *arg) for (int i = 0; i < nstate; i++) { if ((state[i].con = doConnect()) == NULL) + { + /* + * On connection failure, we meet the barrier here in place of + * GO before proceeding to the "done" path which will cleanup, + * so as to avoid locking the process. + * + * It is unclear whether it is worth doing anything rather than + * coldly exiting with an error message. + */ + pthread_barrier_wait(&barrier); goto done; + } } /* compute connection delay */ @@ -6252,6 +6283,8 @@ threadRun(void *arg) thread->conn_duration = 0; } + /* GO */ + pthread_barrier_wait(&barrier); start = pg_time_now(); thread->bench_start = start; @@ -6757,4 +6790,34 @@ pthread_join(pthread_t th, void **thread_return) return 0; } +static int +pthread_barrier_init(pthread_barrier_t *barrier, void *unused, int nthreads) +{ + /* no spinning: threads are not expected to arrive at the barrier together */ + bool ok = InitializeSynchronizationBarrier(barrier, nthreads, 0); + return 0; +} + +static int +pthread_barrier_wait(pthread_barrier_t *barrier) +{ + bool last = EnterSynchronizationBarrier(barrier, SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY); + return last ? PTHREAD_BARRIER_SERIAL_THREAD : 0; +} + +static int +pthread_barrier_destroy(pthread_barrier_t *barrier) +{ + /* + * The following is coldly ignored because it requires Windows 8 + * or Windows Server 2012, which is a little too much. + * + * Also, there is a SYNCHRONIZATION_BARRIER_FLAGS_NO_DELETE flag + * but it probably requires the same versions. + * + * (void) DeleteSynchronizationBarrier(barrier); + */ + return 0; +} + #endif /* WIN32 */