Hello again,
Obviously this would work. I did not think the special case was worth the
extra argument. This one has some oddity too, because the second argument is
ignored depending on the third. Do as you feel.
Actually my question was whether keeping the original start_time was the
intended design.
Sorry I misunderstood the question.
The answer is essentially yes, the field is needed for the "aggregated"
mode where this specific behavior is used.
However, after some look at the code I think that it is possible to do
without.
I also spotted an small issue under low tps where the last aggregation was
not shown.
With the attached version these problems have been removed, no conditional
initialization. There is also a small diff with the version you sent.
--
Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index d5f242c..b3fe994 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -166,10 +166,8 @@ int agg_interval; /* log aggregates instead of individual
* transactions */
int progress = 0; /* thread progress report every this seconds */
bool progress_timestamp = false; /* progress report with Unix time */
-int progress_nclients = 0; /* number of clients for progress
- * report */
-int progress_nthreads = 0; /* number of threads for progress
- * report */
+int nclients = 1; /* number of clients */
+int nthreads = 1; /* number of threads */
bool is_connect; /* establish connection for each transaction */
bool is_latencies; /* report per-command latencies */
int main_pid; /* main process id used in log filename */
@@ -193,6 +191,35 @@ typedef struct
#define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */
/*
+ * Simple data structure to keep stats about something.
+ *
+ * XXX probably the first value should be kept and used as an offset for
+ * better numerical stability...
+ */
+typedef struct
+{
+ int64 count; /* how many values were encountered */
+ double min; /* the minimum seen */
+ double max; /* the maximum seen */
+ double sum; /* sum of values */
+ double sum2; /* sum of squared values */
+} SimpleStats;
+
+/*
+ * Data structure to hold various statistics, used for interval statistics as
+ * well as file statistics.
+ */
+typedef struct
+{
+ long start_time; /* interval start time, for aggregates */
+ int64 cnt; /* number of transactions */
+ int64 skipped; /* number of transactions skipped under --rate
+ * and --latency-limit */
+ SimpleStats latency;
+ SimpleStats lag;
+} StatsData;
+
+/*
* Connection state
*/
typedef struct
@@ -213,10 +240,8 @@ typedef struct
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
/* per client collected stats */
- int cnt; /* xacts count */
+ int64 cnt; /* transaction count */
int ecnt; /* error count */
- int64 txn_latencies; /* cumulated latencies */
- int64 txn_sqlats; /* cumulated square latencies */
} CState;
/*
@@ -228,19 +253,14 @@ typedef struct
pthread_t thread; /* thread handle */
CState *state; /* array of CState */
int nstate; /* length of state[] */
- instr_time start_time; /* thread start time */
- instr_time *exec_elapsed; /* time spent executing cmds (per Command) */
- int *exec_count; /* number of cmd executions (per Command) */
unsigned short random_state[3]; /* separate randomness for each thread */
int64 throttle_trigger; /* previous/next throttling (us) */
/* per thread collected stats */
+ instr_time start_time; /* thread start time */
instr_time conn_time;
- int64 throttle_lag; /* total transaction lag behind throttling */
- int64 throttle_lag_max; /* max transaction lag */
- int64 throttle_latency_skipped; /* lagging transactions
- * skipped */
- int64 latency_late; /* late transactions */
+ StatsData stats;
+ int64 latency_late; /* executed but late transactions */
} TState;
#define INVALID_THREAD ((pthread_t) 0)
@@ -272,33 +292,14 @@ typedef struct
char *argv[MAX_ARGS]; /* command word list */
int cols[MAX_ARGS]; /* corresponding column starting from 1 */
PgBenchExpr *expr; /* parsed expression */
+ SimpleStats stats; /* time spent in this command */
} Command;
-typedef struct
-{
-
- long start_time; /* when does the interval start */
- int cnt; /* number of transactions */
- int skipped; /* number of transactions skipped under --rate
- * and --latency-limit */
-
- double min_latency; /* min/max latencies */
- double max_latency;
- double sum_latency; /* sum(latency), sum(latency^2) - for
- * estimates */
- double sum2_latency;
-
- double min_lag;
- double max_lag;
- double sum_lag; /* sum(lag) */
- double sum2_lag; /* sum(lag*lag) */
-} AggVals;
-
static struct
{
const char *name;
- Command **commands;
-} sql_script[MAX_SCRIPTS]; /* SQL script files */
+ Command **commands;
+} sql_script[MAX_SCRIPTS]; /* SQL script files */
static int num_scripts; /* number of scripts in sql_script[] */
static int num_commands = 0; /* total number of Command structs */
static int debug = 0; /* debug flag */
@@ -361,9 +362,8 @@ static struct
/* Function prototypes */
static void setalarm(int seconds);
static void *threadRun(void *arg);
+static void doTxStats(TState *, CState *, instr_time *, bool, FILE *, StatsData *);
-static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
- AggVals *agg, bool skipped);
static void
usage(void)
@@ -602,6 +602,61 @@ getPoissonRand(TState *thread, int64 center)
return (int64) (-log(uniform) * ((double) center) + 0.5);
}
+/*
+ * Initialize the given SimpleStats struct to all zeroes
+ */
+static void
+initSimpleStats(SimpleStats *ss)
+{
+ memset(ss, 0, sizeof(SimpleStats));
+}
+
+/*
+ * Accumulate one value into a SimpleStats struct.
+ */
+static void
+addToSimpleStats(SimpleStats *ss, double val)
+{
+ if (ss->count == 0 || val < ss->min)
+ ss->min = val;
+ if (ss->count == 0 || val > ss->max)
+ ss->max = val;
+ ss->count++;
+ ss->sum += val;
+ ss->sum2 += val * val;
+}
+
+/*
+ * Merge two SimpleStats objects
+ */
+static void
+mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
+{
+ if (acc->count == 0 || ss->min < acc->min)
+ acc->min = ss->min;
+ if (acc->count == 0 || ss->max > acc->max)
+ acc->max = ss->max;
+ acc->count += ss->count;
+ acc->sum += ss->sum;
+ acc->sum2 += ss->sum2;
+}
+
+/*
+ * Initialize a StatsData struct to all zeroes.
+ *
+ * If the given start_time is different from 0.0, it is used; otherwise
+ * the original value is retained.
+ */
+static void
+initStats(StatsData *sd, double start_time)
+{
+ sd->start_time = start_time;
+ sd->cnt = 0;
+ sd->skipped = 0;
+ initSimpleStats(&sd->latency);
+ initSimpleStats(&sd->lag);
+}
+
/* call PQexec() and exit() on failure */
static void
executeStatement(PGconn *con, const char *sql)
@@ -1121,30 +1176,6 @@ clientDone(CState *st, bool ok)
return false; /* always false */
}
-static void
-agg_vals_init(AggVals *aggs, instr_time start)
-{
- /* basic counters */
- aggs->cnt = 0; /* number of transactions (includes skipped) */
- aggs->skipped = 0; /* xacts skipped under --rate --latency-limit */
-
- aggs->sum_latency = 0; /* SUM(latency) */
- aggs->sum2_latency = 0; /* SUM(latency*latency) */
-
- /* min and max transaction duration */
- aggs->min_latency = 0;
- aggs->max_latency = 0;
-
- /* schedule lag counters */
- aggs->sum_lag = 0;
- aggs->sum2_lag = 0;
- aggs->min_lag = 0;
- aggs->max_lag = 0;
-
- /* start of the current interval */
- aggs->start_time = INSTR_TIME_GET_DOUBLE(start);
-}
-
static int
chooseScript(TState *thread)
{
@@ -1156,7 +1187,7 @@ chooseScript(TState *thread)
/* return false iff client should be disconnected */
static bool
-doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVals *agg)
+doCustom(TState *thread, CState *st, FILE *logfile, StatsData *agg)
{
PGresult *res;
Command **commands;
@@ -1210,11 +1241,8 @@ top:
now_us = INSTR_TIME_GET_MICROSEC(now);
while (thread->throttle_trigger < now_us - latency_limit)
{
- thread->throttle_latency_skipped++;
-
- if (logfile)
- doLog(thread, st, logfile, &now, agg, true);
-
+ doTxStats(thread, st, &now, true, logfile, agg);
+ /* next rendez-vous */
wait = getPoissonRand(thread, throttle_delay);
thread->throttle_trigger += wait;
st->txn_scheduled = thread->throttle_trigger;
@@ -1231,28 +1259,13 @@ top:
if (st->sleeping)
{ /* are we sleeping? */
- int64 now_us;
-
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
- now_us = INSTR_TIME_GET_MICROSEC(now);
- if (st->txn_scheduled <= now_us)
- {
- /* Done sleeping, go ahead with next command */
- st->sleeping = false;
- if (st->throttling)
- {
- /* Measure lag of throttled transaction relative to target */
- int64 lag = now_us - st->txn_scheduled;
-
- thread->throttle_lag += lag;
- if (lag > thread->throttle_lag_max)
- thread->throttle_lag_max = lag;
- st->throttling = false;
- }
- }
- else
+ if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
return true; /* Still sleeping, nothing to do here */
+ /* Else done sleeping, go ahead with next command */
+ st->sleeping = 0;
+ st->throttling = false;
}
if (st->listen)
@@ -1276,47 +1289,27 @@ top:
*/
if (is_latencies)
{
- int cnum = commands[st->state]->command_num;
-
if (INSTR_TIME_IS_ZERO(now))
INSTR_TIME_SET_CURRENT(now);
- INSTR_TIME_ACCUM_DIFF(thread->exec_elapsed[cnum],
- now, st->stmt_begin);
- thread->exec_count[cnum]++;
+
+ /*
+ * XXX When doing multiple threads, more than one could try to
+ * update the stats for one command simultaneously. Instead of
+ * adding the overhead of a mutex, we accept the very small
+ * probability of getting slightly wrong values.
+ */
+ addToSimpleStats(&commands[st->state]->stats,
+ INSTR_TIME_GET_DOUBLE(now) -
+ INSTR_TIME_GET_DOUBLE(st->stmt_begin));
}
/* transaction finished: calculate latency and log the transaction */
if (commands[st->state + 1] == NULL)
{
- /* only calculate latency if an option is used that needs it */
- if (progress || throttle_delay || latency_limit)
- {
- int64 latency;
-
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
-
- latency = INSTR_TIME_GET_MICROSEC(now) - st->txn_scheduled;
-
- st->txn_latencies += latency;
-
- /*
- * XXX In a long benchmark run of high-latency transactions,
- * this int64 addition eventually overflows. For example, 100
- * threads running 10s transactions will overflow it in 2.56
- * hours. With a more-typical OLTP workload of .1s
- * transactions, overflow would take 256 hours.
- */
- st->txn_sqlats += latency * latency;
-
- /* record over the limit transactions if needed. */
- if (latency_limit && latency > latency_limit)
- thread->latency_late++;
- }
-
- /* record the time it took in the log */
- if (logfile)
- doLog(thread, st, logfile, &now, agg, false);
+ if (progress || throttle_delay || latency_limit || logfile)
+ doTxStats(thread, st, &now, false, logfile, agg);
+ else
+ thread->stats.cnt++;
}
if (commands[st->state]->type == SQL_COMMAND)
@@ -1391,7 +1384,7 @@ top:
return clientDone(st, false);
}
INSTR_TIME_SET_CURRENT(end);
- INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
+ INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
}
/*
@@ -1734,22 +1727,41 @@ top:
else /* succeeded */
st->listen = true;
}
+
+ /* after a meta command, immediately proceed with next command */
goto top;
}
return true;
}
+static void
+doStats(StatsData *stats, bool skipped, double lat, double lag)
+{
+ stats->cnt++;
+
+ if (skipped)
+ {
+ /* no latency to record on skipped transactions */
+ stats->skipped++;
+ }
+ else
+ {
+ addToSimpleStats(&stats->latency, lat);
+
+ /* and possibly the same for schedule lag */
+ if (throttle_delay)
+ addToSimpleStats(&stats->lag, lag);
+ }
+}
+
/*
* print log entry after completing one transaction.
*/
static void
-doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
- bool skipped)
+doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
+ StatsData *agg, bool skipped, double latency, double lag)
{
- double lag;
- double latency;
-
/*
* Skip the log entry if sampling is enabled and this row doesn't belong
* to the random sample.
@@ -1758,118 +1770,42 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
pg_erand48(thread->random_state) > sample_rate)
return;
- if (INSTR_TIME_IS_ZERO(*now))
- INSTR_TIME_SET_CURRENT(*now);
-
- latency = (double) (INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled);
- if (skipped)
- lag = latency;
- else
- lag = (double) (INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled);
-
/* should we aggregate the results or not? */
if (agg_interval > 0)
{
/*
- * Are we still in the same interval? If yes, accumulate the values
- * (print them otherwise)
+ * Loop until we reach the interval of the current transaction,
+ * and print all the empty intervals in between (this may happen
+ * with very low tps, e.g. --rate=0.1).
*/
- if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
+ while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
{
- agg->cnt += 1;
- if (skipped)
+ /* print aggregated report to logfile */
+ fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+ agg->start_time,
+ agg->cnt,
+ agg->latency.sum,
+ agg->latency.sum2,
+ agg->latency.min,
+ agg->latency.max);
+ if (throttle_delay)
{
- /*
- * there is no latency to record if the transaction was
- * skipped
- */
- agg->skipped += 1;
+ fprintf(logfile, " %.0f %.0f %.0f %.0f",
+ agg->lag.sum,
+ agg->lag.sum2,
+ agg->lag.min,
+ agg->lag.max);
+ if (latency_limit)
+ fprintf(logfile, " " INT64_FORMAT, agg->skipped);
}
- else
- {
- agg->sum_latency += latency;
- agg->sum2_latency += latency * latency;
-
- /* first in this aggregation interval */
- if ((agg->cnt == 1) || (latency < agg->min_latency))
- agg->min_latency = latency;
-
- if ((agg->cnt == 1) || (latency > agg->max_latency))
- agg->max_latency = latency;
+ fputc('\n', logfile);
- /* and the same for schedule lag */
- if (throttle_delay)
- {
- agg->sum_lag += lag;
- agg->sum2_lag += lag * lag;
-
- if ((agg->cnt == 1) || (lag < agg->min_lag))
- agg->min_lag = lag;
- if ((agg->cnt == 1) || (lag > agg->max_lag))
- agg->max_lag = lag;
- }
- }
+ /* reset data and move to next interval */
+ initStats(agg, agg->start_time + agg_interval);
}
- else
- {
- /*
- * Loop until we reach the interval of the current transaction
- * (and print all the empty intervals in between).
- */
- while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
- {
- /*
- * This is a non-Windows branch (thanks to the ifdef in
- * usage), so we don't need to handle this in a special way
- * (see below).
- */
- fprintf(logfile, "%ld %d %.0f %.0f %.0f %.0f",
- agg->start_time,
- agg->cnt,
- agg->sum_latency,
- agg->sum2_latency,
- agg->min_latency,
- agg->max_latency);
- if (throttle_delay)
- {
- fprintf(logfile, " %.0f %.0f %.0f %.0f",
- agg->sum_lag,
- agg->sum2_lag,
- agg->min_lag,
- agg->max_lag);
- if (latency_limit)
- fprintf(logfile, " %d", agg->skipped);
- }
- fputc('\n', logfile);
-
- /* move to the next inteval */
- agg->start_time = agg->start_time + agg_interval;
- /* reset for "no transaction" intervals */
- agg->cnt = 0;
- agg->skipped = 0;
- agg->min_latency = 0;
- agg->max_latency = 0;
- agg->sum_latency = 0;
- agg->sum2_latency = 0;
- agg->min_lag = 0;
- agg->max_lag = 0;
- agg->sum_lag = 0;
- agg->sum2_lag = 0;
- }
-
- /* reset the values to include only the current transaction. */
- agg->cnt = 1;
- agg->skipped = skipped ? 1 : 0;
- agg->min_latency = latency;
- agg->max_latency = latency;
- agg->sum_latency = skipped ? 0.0 : latency;
- agg->sum2_latency = skipped ? 0.0 : latency * latency;
- agg->min_lag = lag;
- agg->max_lag = lag;
- agg->sum_lag = lag;
- agg->sum2_lag = lag * lag;
- }
+ /* accumulate the current transaction */
+ doStats(agg, skipped, latency, lag);
}
else
{
@@ -1878,21 +1814,21 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
/* This is more than we really ought to know about instr_time */
if (skipped)
- fprintf(logfile, "%d %d skipped %d %ld %ld",
+ fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
st->id, st->cnt, st->use_file,
(long) now->tv_sec, (long) now->tv_usec);
else
- fprintf(logfile, "%d %d %.0f %d %ld %ld",
+ fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
st->id, st->cnt, latency, st->use_file,
(long) now->tv_sec, (long) now->tv_usec);
#else
/* On Windows, instr_time doesn't provide a timestamp anyway */
if (skipped)
- fprintf(logfile, "%d %d skipped %d 0 0",
+ fprintf(logfile, "%d " INT64_FORMAT " skipped %d 0 0",
st->id, st->cnt, st->use_file);
else
- fprintf(logfile, "%d %d %.0f %d 0 0",
+ fprintf(logfile, "%d " INT64_FORMAT " %.0f %d 0 0",
st->id, st->cnt, latency, st->use_file);
#endif
if (throttle_delay)
@@ -1901,6 +1837,42 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now, AggVals *agg,
}
}
+/*
+ * end of transaction statistics
+ */
+static void
+doTxStats(TState *thread, CState *st, instr_time *now,
+ bool skipped, FILE *logfile, StatsData *agg)
+{
+ double latency = 0.0,
+ lag = 0.0;
+
+ if ((!skipped || agg_interval) && INSTR_TIME_IS_ZERO(*now))
+ INSTR_TIME_SET_CURRENT(*now);
+
+ if (!skipped)
+ {
+ /* compute latency & lag if needed */
+ latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
+ lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled;
+ }
+
+ if (progress || throttle_delay || latency_limit)
+ {
+ doStats(&thread->stats, skipped, latency, lag);
+
+ /* record over the limit transactions if needed. */
+ if (latency_limit && latency > latency_limit)
+ thread->latency_late++;
+ }
+ else
+ thread->stats.cnt++;
+
+ if (use_log)
+ doLog(thread, st, logfile, now, agg, skipped, latency, lag);
+}
+
+
/* discard connections */
static void
disconnect_all(CState *state, int length)
@@ -2297,6 +2269,7 @@ process_commands(char *buf, const char *source, const int lineno)
my_commands->command_num = num_commands++;
my_commands->type = 0; /* until set */
my_commands->argc = 0;
+ initSimpleStats(&my_commands->stats);
if (*p == '\\')
{
@@ -2641,7 +2614,7 @@ process_builtin(const char *tb, const char *source)
static void
listAvailableScripts(void)
{
- int i;
+ int i;
fprintf(stderr, "Available builtin scripts:\n");
for (i = 0; i < N_BUILTIN; i++)
@@ -2689,22 +2662,29 @@ addScript(const char *name, Command **commands)
num_scripts++;
}
+static void
+printSimpleStats(char *prefix, SimpleStats *ss)
+{
+ /* print NaN if no transactions where executed */
+ double latency = ss->sum / ss->count;
+ double stddev = sqrt(ss->sum2 / ss->count - latency * latency);
+
+ printf("%s average = %.3f ms\n", prefix, 0.001 * latency);
+ printf("%s stddev = %.3f ms\n", prefix, 0.001 * stddev);
+}
+
/* print out results */
static void
-printResults(int64 normal_xacts, int nclients,
- TState *threads, int nthreads,
- instr_time total_time, instr_time conn_total_time,
- int64 total_latencies, int64 total_sqlats,
- int64 throttle_lag, int64 throttle_lag_max,
- int64 throttle_latency_skipped, int64 latency_late)
+printResults(TState *threads, StatsData *total, instr_time total_time,
+ instr_time conn_total_time, int latency_late)
{
double time_include,
tps_include,
tps_exclude;
time_include = INSTR_TIME_GET_DOUBLE(total_time);
- tps_include = normal_xacts / time_include;
- tps_exclude = normal_xacts / (time_include -
+ tps_include = total->cnt / time_include;
+ tps_exclude = total->cnt / (time_include -
(INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
printf("transaction type: %s\n",
@@ -2716,46 +2696,36 @@ printResults(int64 normal_xacts, int nclients,
if (duration <= 0)
{
printf("number of transactions per client: %d\n", nxacts);
- printf("number of transactions actually processed: " INT64_FORMAT "/" INT64_FORMAT "\n",
- normal_xacts, (int64) nxacts * nclients);
+ printf("number of transactions actually processed: " INT64_FORMAT "/%d\n",
+ total->cnt, nxacts * nclients);
}
else
{
printf("duration: %d s\n", duration);
printf("number of transactions actually processed: " INT64_FORMAT "\n",
- normal_xacts);
+ total->cnt);
}
/* Remaining stats are nonsensical if we failed to execute any xacts */
- if (normal_xacts <= 0)
+ if (total->cnt <= 0)
return;
if (throttle_delay && latency_limit)
printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
- throttle_latency_skipped,
- 100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
+ total->skipped,
+ 100.0 * total->skipped / (total->skipped + total->cnt));
if (latency_limit)
- printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT " (%.3f %%)\n",
+ printf("number of transactions above the %.1f ms latency limit: %d (%.3f %%)\n",
latency_limit / 1000.0, latency_late,
- 100.0 * latency_late / (throttle_latency_skipped + normal_xacts));
+ 100.0 * latency_late / (total->skipped + total->cnt));
if (throttle_delay || progress || latency_limit)
- {
- /* compute and show latency average and standard deviation */
- double latency = 0.001 * total_latencies / normal_xacts;
- double sqlat = (double) total_sqlats / normal_xacts;
-
- printf("latency average: %.3f ms\n"
- "latency stddev: %.3f ms\n",
- latency, 0.001 * sqrt(sqlat - 1000000.0 * latency * latency));
- }
+ printSimpleStats("latency", &total->latency);
else
- {
/* only an average latency computed from the duration is available */
printf("latency average: %.3f ms\n",
- 1000.0 * duration * nclients / normal_xacts);
- }
+ 1000.0 * duration * nclients / total->cnt);
if (throttle_delay)
{
@@ -2766,7 +2736,7 @@ printResults(int64 normal_xacts, int nclients,
* the database load, or the Poisson throttling process.
*/
printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
- 0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+ 0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
}
printf("tps = %f (including connections establishing)\n", tps_include);
@@ -2785,33 +2755,9 @@ printResults(int64 normal_xacts, int nclients,
printf(" - statement latencies in milliseconds:\n");
for (commands = sql_script[i].commands; *commands != NULL; commands++)
- {
- Command *command = *commands;
- int cnum = command->command_num;
- double total_time;
- instr_time total_exec_elapsed;
- int total_exec_count;
- int t;
-
- /* Accumulate per-thread data for command */
- INSTR_TIME_SET_ZERO(total_exec_elapsed);
- total_exec_count = 0;
- for (t = 0; t < nthreads; t++)
- {
- TState *thread = &threads[t];
-
- INSTR_TIME_ADD(total_exec_elapsed,
- thread->exec_elapsed[cnum]);
- total_exec_count += thread->exec_count[cnum];
- }
-
- if (total_exec_count > 0)
- total_time = INSTR_TIME_GET_MILLISEC(total_exec_elapsed) / (double) total_exec_count;
- else
- total_time = 0.0;
-
- printf("\t%f\t%s\n", total_time, command->line);
- }
+ printf(" %11.3f %s\n",
+ 1000.0 * (*commands)->stats.sum / (*commands)->stats.count,
+ (*commands)->line);
}
}
}
@@ -2860,8 +2806,6 @@ main(int argc, char **argv)
};
int c;
- int nclients = 1; /* default number of simulated clients */
- int nthreads = 1; /* default number of threads */
int is_init_mode = 0; /* initialize mode? */
int is_no_vacuum = 0; /* no vacuum at all before testing? */
int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */
@@ -2878,13 +2822,8 @@ main(int argc, char **argv)
instr_time start_time; /* start up time */
instr_time total_time;
instr_time conn_total_time;
- int64 total_xacts = 0;
- int64 total_latencies = 0;
- int64 total_sqlats = 0;
- int64 throttle_lag = 0;
- int64 throttle_lag_max = 0;
- int64 throttle_latency_skipped = 0;
int64 latency_late = 0;
+ StatsData stats;
char *desc;
int i;
@@ -3071,14 +3010,14 @@ main(int argc, char **argv)
case 'S':
addScript(desc,
process_builtin(findBuiltin("select-only", &desc),
- desc));
+ desc));
benchmarking_option_set = true;
internal_script_used = true;
break;
case 'N':
addScript(desc,
process_builtin(findBuiltin("simple-update", &desc),
- desc));
+ desc));
benchmarking_option_set = true;
internal_script_used = true;
break;
@@ -3311,8 +3250,6 @@ main(int argc, char **argv)
* changed after fork.
*/
main_pid = (int) getpid();
- progress_nclients = nclients;
- progress_nthreads = nthreads;
if (nclients > 1)
{
@@ -3454,32 +3391,10 @@ main(int argc, char **argv)
thread->random_state[0] = random();
thread->random_state[1] = random();
thread->random_state[2] = random();
- thread->throttle_latency_skipped = 0;
thread->latency_late = 0;
+ initStats(&thread->stats, 0.0);
nclients_dealt += thread->nstate;
-
- if (is_latencies)
- {
- /* Reserve memory for the thread to store per-command latencies */
- int t;
-
- thread->exec_elapsed = (instr_time *)
- pg_malloc(sizeof(instr_time) * num_commands);
- thread->exec_count = (int *)
- pg_malloc(sizeof(int) * num_commands);
-
- for (t = 0; t < num_commands; t++)
- {
- INSTR_TIME_SET_ZERO(thread->exec_elapsed[t]);
- thread->exec_count[t] = 0;
- }
- }
- else
- {
- thread->exec_elapsed = NULL;
- thread->exec_count = NULL;
- }
}
/* all clients must be assigned to a thread */
@@ -3522,11 +3437,11 @@ main(int argc, char **argv)
#endif /* ENABLE_THREAD_SAFETY */
/* wait for threads and accumulate results */
+ initStats(&stats, 0.0);
INSTR_TIME_SET_ZERO(conn_total_time);
for (i = 0; i < nthreads; i++)
{
TState *thread = &threads[i];
- int j;
#ifdef ENABLE_THREAD_SAFETY
if (threads[i].thread == INVALID_THREAD)
@@ -3539,21 +3454,13 @@ main(int argc, char **argv)
(void) threadRun(thread);
#endif /* ENABLE_THREAD_SAFETY */
- /* thread level stats */
- throttle_lag += thread->throttle_lag;
- throttle_latency_skipped += threads->throttle_latency_skipped;
+ /* aggregate thread level stats */
+ mergeSimpleStats(&stats.latency, &thread->stats.latency);
+ mergeSimpleStats(&stats.lag, &thread->stats.lag);
+ stats.cnt += thread->stats.cnt;
+ stats.skipped += thread->stats.skipped;
latency_late += thread->latency_late;
- if (throttle_lag_max > thread->throttle_lag_max)
- throttle_lag_max = thread->throttle_lag_max;
INSTR_TIME_ADD(conn_total_time, thread->conn_time);
-
- /* client-level stats */
- for (j = 0; j < thread->nstate; j++)
- {
- total_xacts += thread->state[j].cnt;
- total_latencies += thread->state[j].txn_latencies;
- total_sqlats += thread->state[j].txn_sqlats;
- }
}
disconnect_all(state, nclients);
@@ -3569,10 +3476,7 @@ main(int argc, char **argv)
*/
INSTR_TIME_SET_CURRENT(total_time);
INSTR_TIME_SUBTRACT(total_time, start_time);
- printResults(total_xacts, nclients, threads, nthreads,
- total_time, conn_total_time, total_latencies, total_sqlats,
- throttle_lag, throttle_lag_max, throttle_latency_skipped,
- latency_late);
+ printResults(threads, &stats, total_time, conn_total_time, latency_late);
return 0;
}
@@ -3593,13 +3497,8 @@ threadRun(void *arg)
int64 thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
int64 last_report = thread_start;
int64 next_report = last_report + (int64) progress * 1000000;
- int64 last_count = 0,
- last_lats = 0,
- last_sqlats = 0,
- last_lags = 0,
- last_skipped = 0;
-
- AggVals aggs;
+ StatsData last,
+ aggs;
/*
* Initialize throttling rate target for all of the thread's clients. It
@@ -3609,8 +3508,6 @@ threadRun(void *arg)
*/
INSTR_TIME_SET_CURRENT(start);
thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
- thread->throttle_lag = 0;
- thread->throttle_lag_max = 0;
INSTR_TIME_SET_ZERO(thread->conn_time);
@@ -3647,7 +3544,8 @@ threadRun(void *arg)
INSTR_TIME_SET_CURRENT(thread->conn_time);
INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
- agg_vals_init(&aggs, thread->start_time);
+ initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
+ last = aggs;
/* send start up queries in async manner */
for (i = 0; i < nstate; i++)
@@ -3661,7 +3559,7 @@ threadRun(void *arg)
if (debug)
fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
sql_script[st->use_file].name);
- if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
+ if (!doCustom(thread, st, logfile, &aggs))
remains--; /* I've aborted */
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
@@ -3800,7 +3698,7 @@ threadRun(void *arg)
if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
|| commands[st->state]->type == META_COMMAND))
{
- if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
+ if (!doCustom(thread, st, logfile, &aggs))
remains--; /* I've aborted */
}
@@ -3825,11 +3723,7 @@ threadRun(void *arg)
if (now >= next_report)
{
/* generate and show report */
- int64 count = 0,
- lats = 0,
- sqlats = 0,
- lags = 0,
- skipped = 0;
+ StatsData cur;
int64 run = now - last_report;
double tps,
total_run,
@@ -3850,25 +3744,24 @@ threadRun(void *arg)
* (If a read from a 64-bit integer is not atomic, you might
* get a "torn" read and completely bogus latencies though!)
*/
- for (i = 0; i < progress_nclients; i++)
+ initStats(&cur, 0.0);
+ for (i = 0; i < nthreads; i++)
{
- count += state[i].cnt;
- lats += state[i].txn_latencies;
- sqlats += state[i].txn_sqlats;
- }
-
- for (i = 0; i < progress_nthreads; i++)
- {
- skipped += thread[i].throttle_latency_skipped;
- lags += thread[i].throttle_lag;
+ mergeSimpleStats(&cur.latency, &thread[i].stats.latency);
+ mergeSimpleStats(&cur.lag, &thread[i].stats.lag);
+ cur.cnt += thread[i].stats.cnt;
+ cur.skipped += thread[i].stats.skipped;
}
total_run = (now - thread_start) / 1000000.0;
- tps = 1000000.0 * (count - last_count) / run;
- latency = 0.001 * (lats - last_lats) / (count - last_count);
- sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
+ tps = 1000000.0 * (cur.cnt - last.cnt) / run;
+ latency = 0.001 * (cur.latency.sum - last.latency.sum) /
+ (cur.cnt - last.cnt);
+ sqlat = 1.0 * (cur.latency.sum2 - last.latency.sum2)
+ / (cur.cnt - last.cnt);
stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
- lag = 0.001 * (lags - last_lags) / (count - last_count);
+ lag = 0.001 * (cur.lag.sum - last.lag.sum) /
+ (cur.cnt - last.cnt);
if (progress_timestamp)
sprintf(tbuf, "%.03f s",
@@ -3885,16 +3778,12 @@ threadRun(void *arg)
fprintf(stderr, ", lag %.3f ms", lag);
if (latency_limit)
fprintf(stderr, ", " INT64_FORMAT " skipped",
- skipped - last_skipped);
+ cur.skipped - last.skipped);
}
fprintf(stderr, "\n");
- last_count = count;
- last_lats = lats;
- last_sqlats = sqlats;
- last_lags = lags;
+ last = cur;
last_report = now;
- last_skipped = skipped;
/*
* Ensure that the next report is in the future, in case
@@ -3914,7 +3803,12 @@ done:
INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
if (logfile)
+ {
+ if (agg_interval)
+ /* log aggregated but not yet reported transactions */
+ doLog(thread, state, logfile, &end, &aggs, false, 0, 0);
fclose(logfile);
+ }
return NULL;
}
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 305c319..b3fe994 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -650,9 +650,7 @@ mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
static void
initStats(StatsData *sd, double start_time)
{
- if (start_time != 0.0)
- sd->start_time = start_time;
-
+ sd->start_time = start_time;
sd->cnt = 0;
sd->skipped = 0;
initSimpleStats(&sd->latency);
@@ -1776,55 +1774,38 @@ doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
if (agg_interval > 0)
{
/*
- * Are we still in the same interval? If yes, accumulate the values
- * (print them otherwise)
+ * Loop until we reach the interval of the current transaction,
+ * and print all the empty intervals in between (this may happen
+ * with very low tps, e.g. --rate=0.1).
*/
- if (agg->start_time + agg_interval >= INSTR_TIME_GET_DOUBLE(*now))
+ while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
{
- doStats(agg, skipped, latency, lag);
- }
- else
- {
- /*
- * Loop until we reach the interval of the current transaction
- * (and print all the empty intervals in between).
- */
- while (agg->start_time + agg_interval < INSTR_TIME_GET_DOUBLE(*now))
+ /* print aggregated report to logfile */
+ fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+ agg->start_time,
+ agg->cnt,
+ agg->latency.sum,
+ agg->latency.sum2,
+ agg->latency.min,
+ agg->latency.max);
+ if (throttle_delay)
{
- /*
- * This is a non-Windows branch (thanks to the ifdef in
- * usage), so we don't need to handle this in a special way
- * (see below).
- */
- fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
- agg->start_time,
- agg->cnt,
- agg->latency.sum,
- agg->latency.sum2,
- agg->latency.min,
- agg->latency.max);
- if (throttle_delay)
- {
- fprintf(logfile, " %.0f %.0f %.0f %.0f",
- agg->lag.sum,
- agg->lag.sum2,
- agg->lag.min,
- agg->lag.max);
- if (latency_limit)
- fprintf(logfile, " " INT64_FORMAT, agg->skipped);
- }
- fputc('\n', logfile);
-
- /* move to the next interval */
- agg->start_time += agg_interval;
-
- /* reset for "no transaction" intervals */
- initStats(agg, 0.0);
+ fprintf(logfile, " %.0f %.0f %.0f %.0f",
+ agg->lag.sum,
+ agg->lag.sum2,
+ agg->lag.min,
+ agg->lag.max);
+ if (latency_limit)
+ fprintf(logfile, " " INT64_FORMAT, agg->skipped);
}
+ fputc('\n', logfile);
- /* reset the values to include only the current transaction. */
- doStats(agg, skipped, latency, lag);
+ /* reset data and move to next interval */
+ initStats(agg, agg->start_time + agg_interval);
}
+
+ /* accumulate the current transaction */
+ doStats(agg, skipped, latency, lag);
}
else
{
@@ -3822,7 +3803,12 @@ done:
INSTR_TIME_SET_CURRENT(end);
INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
if (logfile)
+ {
+ if (agg_interval)
+ /* log aggregated but not yet reported transactions */
+ doLog(thread, state, logfile, &end, &aggs, false, 0, 0);
fclose(logfile);
+ }
return NULL;
}
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers