Hello Thomas,

Thanks!  This doesn't seem to address the complaint, though.  Don't
you need to do something like this?  (See also attached.)

+    initStats(&aggs, start - (start + epoch_shift) % 1000000);

ISTM that this is: (start + epoch_shift) / 1000000 * 1000000

That should reproduce what pgbench 13 does implicitly when it uses
time(NULL).

I understand that you are shifting the aggregate internal start time to epoch, however ISTM that other points in the program are not shifted consistently with this, eg the while comparison in doLog? Also if start time is log shifted, then it should not be shifted again when printed (in logAgg). Attached version tries to be consistent.

Namely, it rewinds to the start of the current *wall clock* second, so that all future aggregates also start at round number wall clock times, at the cost of making the first aggregate miss out on a fraction of a second.

ISTM that it was already wall clock time, but not epoch wall clock.
I'm okay with realigning aggregates on full seconds.

I wonder if some of the confusion on the other thread about the final
aggregate[1] was due to this difference.

Dunno. The parallel execution with thread is a pain when handling details.

By rounding down, we get a "head start" (because the first aggregate is short), so we usually manage to record the expected number of aggregates before time runs out.

Fine with me if everything is consistent.

It's a race though.  Your non-rounding version was more likely
to lose the race and finish before the final expected aggregate was
logged, so you added code to force a final aggregate to be logged.

ISTM that we always want to force because some modes can have low tps, and the aggregates should be "full".

Do I have this right? I'm not entirely sure how useful a partial final aggregate is

If you ask for 10 seconds run with 1 aggregate per second, you expect to see (at least, about) 10 lines, and I want to ensure that, otherwise people will ask questions, tools will have to look for special cases, missing rows, whatever, and it will be a pain there. We want to produce something simple, consistent, reliable, that tools can depend on.

(it's probably one you have to throw away, like the first one, no? Isn't it better if we only have to throw away the first one?).

This should be the user decision to drop it or not, not the tool producing it, IMO.

I'm not sure, but if we keep that change, a couple of very minor nits: I found the "tx" parameter name a little confusing. Do you think it's clearer if we change it to "final" (with inverted sense)?

I agree that tx is not a very good name, but the inversion does not look right to me. The "normal" behavior is

For the final aggregate, shouldn't we call doLog() only if agg->cnt > 0?

No, I think that we should want to have all aggregates, even with zeros, so that the user can expect a deterministic number of lines.

I think I'd be inclined to take that change back out though, making this patch very small and net behaviour like pgbench 13, if you agree with my explanation for why you had to add it and why it's not actually necessary with the fixed rounding shown above. (And perhaps in v15 we might consider other ideas like using hi-res times in the log and not rounding, etc, a topic for later.)

I think that I'm moslty okay.

I don't really see the value in the test that checks that $delay falls
in the range 1.5s - 2.5s and then ignores the result.  If it hangs
forever, we'll find out about it, and otherwise no human or machine
will ever care about that test.  I removed it from this version.  Were
you really attached to it?

YES, REALLY! It would just have caught quite a few of the regressions we are trying to address here. I want it there even if ignored because I'll look for it to avoid regressions in the future. If the test is actually removed, recreating it is a pain. If you really want to disactivate it, use if(0) but PLEASE let it there so that it can ne reactivated for tests very simply, not bad maintaining some test outside of the tree.

Also, if farm logs show that it is okay on all animals, it can be switched on by removing the ignore trick.

I made some very minor language tweaks in comments (we don't usually
shorten "benchmark" to "bench" in English, "series" keeps the -s in
singular (blame the Romans), etc).

Thanks! My English is kind of fuzzy in the details:-)

I think we should make it clear when we mean the *Unix* epoch (a
comment "switch to epoch" isn't meaningful on its own, to me at
least), so I changed that in a few places.

Ok.

Attached v16:
 - tries to be consistent wrt epoch & aggregates, aligning to Unix epoch
   as you suggested.
 - renames tx as accumulate, but does not invert it.
 - always shows aggregates so that the user can depend on the output,
   even if stats are zero, because ISTM that clever must be avoided.
 - put tests back, even if ignored, because I really want them available
   easily.

When/if you get to commit this patch, eventually, do not forget that I'm pushing forward fixes contributed by others, including Kyotaro Horiguchi and Yugo Nagata.

--
Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 4aeccd93af..57c193d445 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -343,6 +343,12 @@ typedef struct StatsData
 	SimpleStats lag;
 } StatsData;
 
+/*
+ * For displaying Unix epoch timestamps, as some time functions may have
+ * another reference.
+ */
+pg_time_usec_t epoch_shift;
+
 /*
  * Struct to keep random state.
  */
@@ -649,7 +655,7 @@ static void setDoubleValue(PgBenchValue *pv, double dval);
 static bool evaluateExpr(CState *st, PgBenchExpr *expr,
 						 PgBenchValue *retval);
 static ConnectionStateEnum executeMetaCommand(CState *st, pg_time_usec_t *now);
-static void doLog(TState *thread, CState *st,
+static void doLog(TState *thread, CState *st, bool accumulate,
 				  StatsData *agg, bool skipped, double latency, double lag);
 static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
 							 bool skipped, StatsData *agg);
@@ -3768,20 +3774,50 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
 	return CSTATE_END_COMMAND;
 }
 
+/* print aggregated report to logfile */
+static void
+logAgg(FILE *logfile, StatsData *agg)
+{
+	fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+			agg->start_time / 1000000, /* this is epoch time */
+			agg->cnt,
+			agg->latency.sum,
+			agg->latency.sum2,
+			agg->latency.min,
+			agg->latency.max);
+	if (throttle_delay)
+	{
+		fprintf(logfile, " %.0f %.0f %.0f %.0f",
+				agg->lag.sum,
+				agg->lag.sum2,
+				agg->lag.min,
+				agg->lag.max);
+		if (latency_limit)
+			fprintf(logfile, " " INT64_FORMAT, agg->skipped);
+	}
+	fputc('\n', logfile);
+}
+
 /*
  * Print log entry after completing one transaction.
  *
- * We print Unix-epoch timestamps in the log, so that entries can be
+ * The function behavior changes depending on sample_rate (a fraction of
+ * transaction is reported) and agg_interval (transactions are aggregated
+ * and reported once every agg_interval seconds).  For aggregation,
+ * "accumulate" indicates that we are recording a transaction, otherwise it
+ * is a hint that the final aggregate should be logged.
+ *
+ * We use Unix-epoch timestamps in the log, so that entries can be
  * correlated against other logs.  On some platforms this could be obtained
  * from the caller, but rather than get entangled with that, we just eat
  * the cost of an extra syscall in all cases.
  */
 static void
-doLog(TState *thread, CState *st,
+doLog(TState *thread, CState *st, bool accumulate,
 	  StatsData *agg, bool skipped, double latency, double lag)
 {
 	FILE	   *logfile = thread->logfile;
-	pg_time_usec_t now = pg_time_now();
+	pg_time_usec_t now = pg_time_now() + epoch_shift;
 
 	Assert(use_log);
 
@@ -3796,43 +3832,33 @@ doLog(TState *thread, CState *st,
 	/* should we aggregate the results or not? */
 	if (agg_interval > 0)
 	{
+		pg_time_usec_t	next;
+
 		/*
 		 * Loop until we reach the interval of the current moment, and print
 		 * any empty intervals in between (this may happen with very low tps,
 		 * e.g. --rate=0.1).
 		 */
-
-		while (agg->start_time + agg_interval <= now)
+		while ((next = agg->start_time + agg_interval * INT64CONST(1000000)) <= now)
 		{
-			/* print aggregated report to logfile */
-			fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
-					agg->start_time,
-					agg->cnt,
-					agg->latency.sum,
-					agg->latency.sum2,
-					agg->latency.min,
-					agg->latency.max);
-			if (throttle_delay)
-			{
-				fprintf(logfile, " %.0f %.0f %.0f %.0f",
-						agg->lag.sum,
-						agg->lag.sum2,
-						agg->lag.min,
-						agg->lag.max);
-				if (latency_limit)
-					fprintf(logfile, " " INT64_FORMAT, agg->skipped);
-			}
-			fputc('\n', logfile);
+			logAgg(logfile, agg);
 
 			/* reset data and move to next interval */
-			initStats(agg, agg->start_time + agg_interval);
+			initStats(agg, next);
 		}
 
-		/* accumulate the current transaction */
-		accumStats(agg, skipped, latency, lag);
+		if (accumulate)
+			/* accumulate the current transaction */
+			accumStats(agg, skipped, latency, lag);
+		else
+			/* final call to show the last aggregate */
+			logAgg(logfile, agg);
 	}
 	else
 	{
+		/* accumulate as false is only used for aggregated data */
+		Assert(accumulate);
+
 		/* no, print raw transactions */
 		if (skipped)
 			fprintf(logfile, "%d " INT64_FORMAT " skipped %d " INT64_FORMAT " "
@@ -3892,7 +3918,7 @@ processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
 	st->cnt++;
 
 	if (use_log)
-		doLog(thread, st, agg, skipped, latency, lag);
+		doLog(thread, st, true, agg, skipped, latency, lag);
 
 	/* XXX could use a mutex here, but we choose not to */
 	if (per_script_stats)
@@ -5458,7 +5484,8 @@ printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now,
 
 	if (progress_timestamp)
 	{
-		snprintf(tbuf, sizeof(tbuf), "%.3f s", PG_TIME_GET_DOUBLE(now));
+		snprintf(tbuf, sizeof(tbuf), "%.3f s",
+				 PG_TIME_GET_DOUBLE(now + epoch_shift));
 	}
 	else
 	{
@@ -5789,8 +5816,8 @@ main(int argc, char **argv)
 	TState	   *threads;		/* array of thread */
 
 	pg_time_usec_t
-				start_time,		/* start up time */
-				bench_start = 0,	/* first recorded benchmarking time */
+				start_time,				/* overall start time */
+				bench_start = 0,		/* first recorded benchmarking time */
 				conn_total_duration;	/* cumulated connection time in
 										 * threads */
 	int64		latency_late = 0;
@@ -5808,6 +5835,11 @@ main(int argc, char **argv)
 	char	   *env;
 
 	int			exit_code = 0;
+	struct timeval tv;
+
+	/* record shift between Unix epoch and now() */
+	gettimeofday(&tv, NULL);
+	epoch_shift = tv.tv_sec * INT64CONST(1000000) + tv.tv_usec - pg_time_now();
 
 	pg_logging_init(argv[0]);
 	progname = get_progname(argv[0]);
@@ -6637,7 +6669,14 @@ threadRun(void *arg)
 	thread->bench_start = start;
 	thread->throttle_trigger = start;
 
-	initStats(&aggs, start);
+	/*
+	 * The log format currently has Unix epoch timestamps with whole numbers of
+	 * seconds.  Round the first aggregate's start time down to the nearest
+	 * Unix epoch second (the very first aggregate might really have started a
+	 * fraction of a second later, but later aggregates are measured from the
+	 * whole number time that is actually logged).
+	 */
+	initStats(&aggs, (start + epoch_shift) / 1000000 * 1000000);
 	last = aggs;
 
 	/* loop till all clients have terminated */
@@ -6830,8 +6869,9 @@ done:
 		if (agg_interval > 0)
 		{
 			/* log aggregated but not yet reported transactions */
-			doLog(thread, state, &aggs, false, 0, 0);
+			doLog(thread, state, false, &aggs, false, 0.0, 0.0);
 		}
+
 		fclose(thread->logfile);
 		thread->logfile = NULL;
 	}
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index 3aa9d5d753..41250bf91d 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -8,6 +8,7 @@ use PostgresNode;
 use TestLib;
 use Test::More;
 use Config;
+use Time::HiRes qw( time );
 
 # start a pgbench specific server
 my $node = get_new_node('main');
@@ -54,12 +55,14 @@ sub pgbench
 
 	push @cmd, @args;
 
+	my $start = time();
 	$node->command_checks_all(\@cmd, $stat, $out, $err, $name);
+	my $stop = time();
 
 	# cleanup?
 	#unlink @filenames or die "cannot unlink files (@filenames): $!";
 
-	return;
+	return $stop - $start;
 }
 
 # tablespace for testing, because partitioned tables cannot use pg_default
@@ -1187,7 +1190,7 @@ sub check_pgbench_logs
 
 	# $prefix is simple enough, thus does not need escaping
 	my @logs = list_files($dir, qr{^$prefix\..*$});
-	ok(@logs == $nb, "number of log files");
+	ok(@logs == $nb, "number of log files (@logs)");
 	ok(grep(/\/$prefix\.\d+(\.\d+)?$/, @logs) == $nb, "file name format");
 
 	my $log_number = 0;
@@ -1220,6 +1223,58 @@ sub check_pgbench_logs
 my $bdir = $node->basedir;
 
 # Run with sampling rate, 2 clients with 50 transactions each.
+#
+# Test time-sensitive features on a light read-only transaction:
+#
+#   -T: bench duration, 2 seconds to exercise progress & logs
+#   -P: progress report
+#   --aggregate-interval: periodic aggregated logs
+#   --rate: schedule load
+#   --latency-limit: max delay, not deeply exercice
+#
+# note: the --rate behavior is probabilistic in nature.
+# note: --progress-timestamp is not tested.
+my $delay = pgbench(
+	'-T 2 -P 1 -l --aggregate-interval=1 -S -b se@2'
+	. ' --rate=20 --latency-limit=1000 -j ' . $nthreads
+	. ' -c 3 -r',
+	0,
+	[   qr{type: multiple},
+		qr{clients: 3},
+		qr{threads: $nthreads},
+		qr{duration: 2 s},
+		qr{script 1: .* select only},
+		qr{script 2: .* select only},
+		qr{statement latencies in milliseconds},
+		qr{FROM pgbench_accounts} ],
+	[ qr{vacuum}, qr{progress: 1\b} ],
+	'pgbench progress', undef,
+	"--log-prefix=$bdir/001_pgbench_log_1");
+
+# cool check that we are around 2 seconds
+
+TODO: {
+	local $TODO = "possibly unreliable on slow hosts or unlucky runs";
+
+	# The rate may results in an unlucky schedule which triggers
+	# an early exit, hence the loose bound.
+
+	# also, the delay may totally fail on very slow or overloard hosts,
+	# valgrind runs...
+
+	ok(1.5 < $delay && $delay < 2.5, "-T 2 run around 2 seconds");
+}
+
+# $nthreads threads, 2 seconds, but due to timing imprecision we might get
+# only 1 or as many as 3 progress reports per thread.
+# aggregate log format is:
+#   epoch #tx sum sum2 min max [sum sum2 min max [skipped]]
+# first serie about latency ; second about lag (--rate) ;
+# skipped only if --latency-limit is set.
+check_pgbench_logs($bdir, '001_pgbench_log_1', $nthreads, 1, 3,
+	qr{^\d{10,} \d{1,2} \d+ \d+ \d+ \d+ \d+ \d+ \d+ \d+ \d+$});
+
+# with sampling rate, 2 clients with 50 tx each
 pgbench(
 	"-n -S -t 50 -c 2 --log --sampling-rate=0.5", 0,
 	[ qr{select only}, qr{processed: 100/100} ], [qr{^$}],

Reply via email to