Bonjour Michaƫl,

If this were core server code threatening data integrity I would be
inclined to be more strict, but after all pg_bench is a utility program,
and I think we can allow a little more latitude.

+1.  Let's be flexible here.  It looks better to not rush a fix, and
we still have some time ahead.

Attached an updated v8 patch which adds (reinstate) an improved TAP test which would have caught the various regressions on logs.

Given that such tests were removed once before, I'm unsure whether they will be acceptable, despite that their usefulness has been clearly demonstrated. At least it is for the record. Sigh:-(

--
Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index e61055b6b7..dfb2ff6859 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -343,6 +343,12 @@ typedef struct StatsData
 	SimpleStats lag;
 } StatsData;
 
+/*
+ * For displaying epoch timestamp, as some time functions may have
+ * another reference.
+ */
+pg_time_usec_t epoch_shift;
+
 /*
  * Struct to keep random state.
  */
@@ -649,7 +655,7 @@ static void setDoubleValue(PgBenchValue *pv, double dval);
 static bool evaluateExpr(CState *st, PgBenchExpr *expr,
 						 PgBenchValue *retval);
 static ConnectionStateEnum executeMetaCommand(CState *st, pg_time_usec_t *now);
-static void doLog(TState *thread, CState *st,
+static void doLog(TState *thread, CState *st, bool tx,
 				  StatsData *agg, bool skipped, double latency, double lag);
 static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
 							 bool skipped, StatsData *agg);
@@ -3766,16 +3772,47 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
 	return CSTATE_END_COMMAND;
 }
 
+/* print aggregated report to logfile */
+static void
+logAgg(FILE *logfile, StatsData *agg)
+{
+	fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+			(agg->start_time + epoch_shift) / 1000000,
+			agg->cnt,
+			agg->latency.sum,
+			agg->latency.sum2,
+			agg->latency.min,
+			agg->latency.max);
+	if (throttle_delay)
+	{
+		fprintf(logfile, " %.0f %.0f %.0f %.0f",
+				agg->lag.sum,
+				agg->lag.sum2,
+				agg->lag.min,
+				agg->lag.max);
+		if (latency_limit)
+			fprintf(logfile, " " INT64_FORMAT, agg->skipped);
+	}
+	fputc('\n', logfile);
+}
+
 /*
  * Print log entry after completing one transaction.
  *
+ * Param tx tells whether the call actually represents a transaction,
+ * or if it is used to flush aggregated logs.
+ *
+ * The function behavior changes depending on sample_rate (a fraction of
+ * transaction is reported) and agg_interval (transactions are aggregated
+ * and reported once every agg_interval seconds).
+ *
  * We print Unix-epoch timestamps in the log, so that entries can be
  * correlated against other logs.  On some platforms this could be obtained
  * from the caller, but rather than get entangled with that, we just eat
  * the cost of an extra syscall in all cases.
  */
 static void
-doLog(TState *thread, CState *st,
+doLog(TState *thread, CState *st, bool tx,
 	  StatsData *agg, bool skipped, double latency, double lag)
 {
 	FILE	   *logfile = thread->logfile;
@@ -3794,43 +3831,36 @@ doLog(TState *thread, CState *st,
 	/* should we aggregate the results or not? */
 	if (agg_interval > 0)
 	{
+		pg_time_usec_t	next;
+
 		/*
 		 * Loop until we reach the interval of the current moment, and print
 		 * any empty intervals in between (this may happen with very low tps,
 		 * e.g. --rate=0.1).
 		 */
-
-		while (agg->start_time + agg_interval <= now)
+		while ((next = agg->start_time + agg_interval * INT64CONST(1000000)) <= now)
 		{
-			/* print aggregated report to logfile */
-			fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
-					agg->start_time,
-					agg->cnt,
-					agg->latency.sum,
-					agg->latency.sum2,
-					agg->latency.min,
-					agg->latency.max);
-			if (throttle_delay)
-			{
-				fprintf(logfile, " %.0f %.0f %.0f %.0f",
-						agg->lag.sum,
-						agg->lag.sum2,
-						agg->lag.min,
-						agg->lag.max);
-				if (latency_limit)
-					fprintf(logfile, " " INT64_FORMAT, agg->skipped);
-			}
-			fputc('\n', logfile);
+			logAgg(logfile, agg);
 
 			/* reset data and move to next interval */
-			initStats(agg, agg->start_time + agg_interval);
+			initStats(agg, next);
 		}
 
-		/* accumulate the current transaction */
-		accumStats(agg, skipped, latency, lag);
+		if (tx)
+			/* accumulate the current transaction */
+			accumStats(agg, skipped, latency, lag);
+		else
+			/* final call to show the last aggregate */
+			logAgg(logfile, agg);
 	}
 	else
 	{
+		/* switch to epoch */
+		now += epoch_shift;
+
+		/* !tx only used for aggregated data */
+		Assert(tx);
+
 		/* no, print raw transactions */
 		if (skipped)
 			fprintf(logfile, "%d " INT64_FORMAT " skipped %d " INT64_FORMAT " "
@@ -3890,7 +3920,7 @@ processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
 	st->cnt++;
 
 	if (use_log)
-		doLog(thread, st, agg, skipped, latency, lag);
+		doLog(thread, st, true, agg, skipped, latency, lag);
 
 	/* XXX could use a mutex here, but we choose not to */
 	if (per_script_stats)
@@ -5456,7 +5486,7 @@ printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now,
 
 	if (progress_timestamp)
 	{
-		snprintf(tbuf, sizeof(tbuf), "%.3f s", PG_TIME_GET_DOUBLE(now));
+		snprintf(tbuf, sizeof(tbuf), "%.3f s", PG_TIME_GET_DOUBLE(now + epoch_shift));
 	}
 	else
 	{
@@ -5806,6 +5836,11 @@ main(int argc, char **argv)
 	char	   *env;
 
 	int			exit_code = 0;
+	struct timeval tv;
+
+	/* record shift between epoch and now() */
+	gettimeofday(&tv, NULL);
+	epoch_shift = tv.tv_sec * 1000000 + tv.tv_usec - pg_time_now();
 
 	pg_logging_init(argv[0]);
 	progname = get_progname(argv[0]);
@@ -6828,8 +6863,9 @@ done:
 		if (agg_interval > 0)
 		{
 			/* log aggregated but not yet reported transactions */
-			doLog(thread, state, &aggs, false, 0, 0);
+			doLog(thread, state, false, &aggs, false, 0.0, 0.0);
 		}
+
 		fclose(thread->logfile);
 		thread->logfile = NULL;
 	}
diff --git a/src/bin/pgbench/t/001_pgbench_with_server.pl b/src/bin/pgbench/t/001_pgbench_with_server.pl
index 923203ea51..2e34c1abf6 100644
--- a/src/bin/pgbench/t/001_pgbench_with_server.pl
+++ b/src/bin/pgbench/t/001_pgbench_with_server.pl
@@ -8,6 +8,7 @@ use PostgresNode;
 use TestLib;
 use Test::More;
 use Config;
+use Time::HiRes qw( time );
 
 # start a pgbench specific server
 my $node = get_new_node('main');
@@ -54,12 +55,14 @@ sub pgbench
 
 	push @cmd, @args;
 
+	my $start = time();
 	$node->command_checks_all(\@cmd, $stat, $out, $err, $name);
+	my $stop = time();
 
 	# cleanup?
 	#unlink @filenames or die "cannot unlink files (@filenames): $!";
 
-	return;
+	return $stop - $start;
 }
 
 # tablespace for testing, because partitioned tables cannot use pg_default
@@ -1173,7 +1176,14 @@ sub list_files
 	return map { $dir . '/' . $_ } @files;
 }
 
+#
 # check log contents and cleanup
+#   $dir: directory holding logs
+#   $prefix: file prefix for per-thread logs
+#   $nb: number of expected files
+#   $min/$max: minimum/maximum number of lines in log files
+#   $re: regular expression each line should match
+#
 sub check_pgbench_logs
 {
 	local $Test::Builder::Level = $Test::Builder::Level + 1;
@@ -1182,7 +1192,7 @@ sub check_pgbench_logs
 
 	# $prefix is simple enough, thus does not need escaping
 	my @logs = list_files($dir, qr{^$prefix\..*$});
-	ok(@logs == $nb, "number of log files");
+	ok(@logs == $nb, "number of log files (@logs)");
 	ok(grep(/\/$prefix\.\d+(\.\d+)?$/, @logs) == $nb, "file name format");
 
 	my $log_number = 0;
@@ -1205,6 +1215,36 @@ sub check_pgbench_logs
 
 my $bdir = $node->basedir;
 
+# note: this test is time sensitive, and may fail on a very
+#       loaded host.
+# note: --progress-timestamp is not tested
+my $delay = pgbench(
+	'-T 2 -P 1 -l --aggregate-interval=1 -S -b se@2'
+	. ' --rate=20 --latency-limit=1000 -j ' . $nthreads
+	. ' -c 3 -r',
+	0,
+	[   qr{type: multiple},
+		qr{clients: 3},
+		qr{threads: $nthreads},
+		qr{duration: 2 s},
+		qr{script 1: .* select only},
+		qr{script 2: .* select only},
+		qr{statement latencies in milliseconds},
+		qr{FROM pgbench_accounts} ],
+	[ qr{vacuum}, qr{progress: 1\b} ],
+	'pgbench progress', undef,
+	"--log-prefix=$bdir/001_pgbench_log_1");
+
+# cool check that we are around 2 seconds
+# The rate may results in an unlucky schedule which triggers
+# an early exit, hence the loose bound.
+ok(0.0 < $delay && $delay < 4.0, "-T 2 run around 2 seconds");
+
+# $nthreads threads, 2 seconds, but due to timing imprecision we might get
+# only 1 or as many as 3 progress reports per thread.
+check_pgbench_logs($bdir, '001_pgbench_log_1', $nthreads, 1, 3,
+	qr{^\d{10,} \d{1,2} \d+ \d+ \d+ \d+ \d+ \d+ \d+ \d+ \d+$});
+
 # with sampling rate
 pgbench(
 	"-n -S -t 50 -c 2 --log --sampling-rate=0.5", 0,

Reply via email to