On Thu, 17 Jun 2021 10:18:03 +0200 (CEST)
Fabien COELHO <[email protected]> wrote:
>
> >> Wouldn't it be better to put all those fixes into the same bag?
> >
> > Attached.
>
> Even better if the patch is not empty.
I found you forgot to fix printProgressReport().
Also, according to the document, interval_start in Aggregated Logging
seems to be printed in seconds instead of ms.
<para>
Here is some example output:
<screen>
1345828501 5601 1542744 483552416 61 2573
1345828503 7884 1979812 565806736 60 1479
1345828505 7208 1979422 567277552 59 1391
1345828507 7685 1980268 569784714 60 1398
1345828509 7073 1979779 573489941 236 1411
</screen></para>
If we obey the document and keep the back-compatibility, we should fix
logAgg().
The attached patch includes these fixes.
Regards,
Yugo Nagata
--
Yugo NAGATA <[email protected]>
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index d7479925cb..08ecb07b9b 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -342,6 +342,12 @@ typedef struct StatsData
SimpleStats lag;
} StatsData;
+/*
+ * For displaying epoch timestamp, as some time functions may have
+ * another reference.
+ */
+pg_time_usec_t epoch_shift;
+
/*
* Struct to keep random state.
*/
@@ -648,7 +654,7 @@ static void setDoubleValue(PgBenchValue *pv, double dval);
static bool evaluateExpr(CState *st, PgBenchExpr *expr,
PgBenchValue *retval);
static ConnectionStateEnum executeMetaCommand(CState *st, pg_time_usec_t *now);
-static void doLog(TState *thread, CState *st,
+static void doLog(TState *thread, CState *st, bool tx,
StatsData *agg, bool skipped, double latency, double lag);
static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
bool skipped, StatsData *agg);
@@ -3765,16 +3771,47 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
return CSTATE_END_COMMAND;
}
+/* print aggregated report to logfile */
+static void
+logAgg(FILE *logfile, StatsData *agg)
+{
+ fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+ (agg->start_time + epoch_shift) / 1000000,
+ agg->cnt,
+ agg->latency.sum,
+ agg->latency.sum2,
+ agg->latency.min,
+ agg->latency.max);
+ if (throttle_delay)
+ {
+ fprintf(logfile, " %.0f %.0f %.0f %.0f",
+ agg->lag.sum,
+ agg->lag.sum2,
+ agg->lag.min,
+ agg->lag.max);
+ if (latency_limit)
+ fprintf(logfile, " " INT64_FORMAT, agg->skipped);
+ }
+ fputc('\n', logfile);
+}
+
/*
* Print log entry after completing one transaction.
*
+ * Param tx tells whether the call actually represents a transaction,
+ * or if it is used to flush aggregated logs.
+ *
+ * The function behavior changes depending on sample_rate (a fraction of
+ * transaction is reported) and agg_interval (transactions are aggregated
+ * and reported once every agg_interval seconds).
+ *
* We print Unix-epoch timestamps in the log, so that entries can be
* correlated against other logs. On some platforms this could be obtained
* from the caller, but rather than get entangled with that, we just eat
* the cost of an extra syscall in all cases.
*/
static void
-doLog(TState *thread, CState *st,
+doLog(TState *thread, CState *st, bool tx,
StatsData *agg, bool skipped, double latency, double lag)
{
FILE *logfile = thread->logfile;
@@ -3793,43 +3830,36 @@ doLog(TState *thread, CState *st,
/* should we aggregate the results or not? */
if (agg_interval > 0)
{
+ pg_time_usec_t next;
+
/*
* Loop until we reach the interval of the current moment, and print
* any empty intervals in between (this may happen with very low tps,
* e.g. --rate=0.1).
*/
-
- while (agg->start_time + agg_interval <= now)
+ while ((next = agg->start_time + agg_interval * INT64CONST(1000000)) <= now)
{
- /* print aggregated report to logfile */
- fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
- agg->start_time,
- agg->cnt,
- agg->latency.sum,
- agg->latency.sum2,
- agg->latency.min,
- agg->latency.max);
- if (throttle_delay)
- {
- fprintf(logfile, " %.0f %.0f %.0f %.0f",
- agg->lag.sum,
- agg->lag.sum2,
- agg->lag.min,
- agg->lag.max);
- if (latency_limit)
- fprintf(logfile, " " INT64_FORMAT, agg->skipped);
- }
- fputc('\n', logfile);
+ logAgg(logfile, agg);
/* reset data and move to next interval */
- initStats(agg, agg->start_time + agg_interval);
+ initStats(agg, next);
}
- /* accumulate the current transaction */
- accumStats(agg, skipped, latency, lag);
+ if (tx)
+ /* accumulate the current transaction */
+ accumStats(agg, skipped, latency, lag);
+ else
+ /* final call to show the last aggregate */
+ logAgg(logfile, agg);
}
else
{
+ /* switch to epoch */
+ now += epoch_shift;
+
+ /* !tx only used for aggregated data */
+ Assert(tx);
+
/* no, print raw transactions */
if (skipped)
fprintf(logfile, "%d " INT64_FORMAT " skipped %d " INT64_FORMAT " "
@@ -3889,7 +3919,7 @@ processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
st->cnt++;
if (use_log)
- doLog(thread, st, agg, skipped, latency, lag);
+ doLog(thread, st, true, agg, skipped, latency, lag);
/* XXX could use a mutex here, but we choose not to */
if (per_script_stats)
@@ -5455,7 +5485,7 @@ printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now,
if (progress_timestamp)
{
- snprintf(tbuf, sizeof(tbuf), "%.3f s", PG_TIME_GET_DOUBLE(now));
+ snprintf(tbuf, sizeof(tbuf), "%.3f s", PG_TIME_GET_DOUBLE(now + epoch_shift));
}
else
{
@@ -5775,6 +5805,11 @@ main(int argc, char **argv)
char *env;
int exit_code = 0;
+ struct timeval tv;
+
+ /* record shift between epoch and now() */
+ gettimeofday(&tv, NULL);
+ epoch_shift = tv.tv_sec * 1000000 + tv.tv_usec - pg_time_now();
pg_logging_init(argv[0]);
progname = get_progname(argv[0]);
@@ -6794,8 +6829,9 @@ done:
if (agg_interval > 0)
{
/* log aggregated but not yet reported transactions */
- doLog(thread, state, &aggs, false, 0, 0);
+ doLog(thread, state, false, &aggs, false, 0.0, 0.0);
}
+
fclose(thread->logfile);
thread->logfile = NULL;
}