Hi there,

attached is a patch implementing merging of pgbench logs. These logs are
written by each thread, so with N threads you get N files with names

    pgbench_log.PID
    pgbench_log.PID.1
    ...
    pgbench_log.PID.N

Before analyzing these logs, these files need to be combined. I usually
ended up wrinting ad-hoc scripts doing that, lost them, written them
again and so on over and over again.

The other disadvantage of the external scripts is that you have to pass
all the info about the logs (whether the logs are aggregated, whther
there's throttling, etc.).

So integrating this into pgbench directly seems like a better approach,
and the attached patch implements that.

With '-m' or '--merge-logs' on the command-line, the logs are merged at
the end, using a simple 2-way merge to keep the log sorted by the time
field. It should work with all the other options that influence the log
format (--rate, --aggregate-interval and --latency-limit).

I'll add this to CF 2016-06.

-- 
Tomas Vondra                http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index 706fdf5..d6ec87e 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -157,6 +157,11 @@ char	   *tablespace = NULL;
 char	   *index_tablespace = NULL;
 
 /*
+ * merge logs (transaction logs, aggregated logs) at the end
+ */
+bool		merge_logs = false;
+
+/*
  * end of configurable parameters
  *********************************************************************/
 
@@ -367,6 +372,10 @@ static void *threadRun(void *arg);
 static void doLog(TState *thread, CState *st, FILE *logfile, instr_time *now,
 	  AggVals *agg, bool skipped);
 
+static void merge_log_files(int agg_interval, int nthreads);
+static void merge_aggregated_logs(FILE *infile_a, FILE *infile_b, FILE *outfile);
+static void merge_simple_logs(FILE *infile_a, FILE *infile_b, FILE *outfile);
+
 static void
 usage(void)
 {
@@ -408,6 +417,7 @@ usage(void)
 		   "  -v, --vacuum-all         vacuum all four standard tables before tests\n"
 		   "  --aggregate-interval=NUM aggregate data over NUM seconds\n"
 		   "  --sampling-rate=NUM      fraction of transactions to log (e.g. 0.01 for 1%%)\n"
+		   "  -m, --merge-logs         merge logs produced by multiple threads\n"
 		   "\nCommon options:\n"
 		   "  -d, --debug              print debugging output\n"
 	  "  -h, --host=HOSTNAME      database server host or socket directory\n"
@@ -2733,6 +2743,7 @@ main(int argc, char **argv)
 		{"aggregate-interval", required_argument, NULL, 5},
 		{"rate", required_argument, NULL, 'R'},
 		{"latency-limit", required_argument, NULL, 'L'},
+		{"merge-logs", no_argument, NULL, 'm'},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -2808,7 +2819,7 @@ main(int argc, char **argv)
 	state = (CState *) pg_malloc(sizeof(CState));
 	memset(state, 0, sizeof(CState));
 
-	while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:m", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -3017,6 +3028,10 @@ main(int argc, char **argv)
 					latency_limit = (int64) (limit_ms * 1000);
 				}
 				break;
+			case 'm':
+				printf("merge logs\n");
+				merge_logs = true;
+				break;
 			case 0:
 				/* This covers long options which take no argument. */
 				if (foreign_keys || unlogged_tables)
@@ -3137,6 +3152,12 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (merge_logs && (! use_log))
+	{
+		fprintf(stderr, "log merging is allowed only when actually logging transactions\n");
+		exit(1);
+	}
+
 	/*
 	 * is_latencies only works with multiple threads in thread-based
 	 * implementations, not fork-based ones, because it supposes that the
@@ -3418,6 +3439,10 @@ main(int argc, char **argv)
 				 throttle_lag, throttle_lag_max, throttle_latency_skipped,
 				 latency_late);
 
+	/* Merge logs, if needed */
+	if (merge_logs)
+		merge_log_files(agg_interval, nthreads);
+
 	return 0;
 }
 
@@ -3783,6 +3808,339 @@ done:
 	return result;
 }
 
+static void
+merge_log_files(int agg_interval, int nthreads)
+{
+	int i;
+
+	/* we can do this as 2-way merges (all the logs are sorted by timestamp) */
+	for (i = 1; i < nthreads; i++)
+	{
+		char	logpath[64];
+		char	logpath_new[64];
+
+		/* input and output files */
+		FILE   *infile_a, *infile_b;
+		FILE   *outfile;
+
+		/* the first input is always the 'main' logfile */
+		snprintf(logpath, sizeof(logpath), "pgbench_log.%d", main_pid);
+		infile_a = fopen(logpath, "r");
+
+		if (infile_b == NULL)
+		{
+			fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
+			return;
+		}
+
+		snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, i);
+		infile_b = fopen(logpath, "r");
+
+		if (infile_b == NULL)
+		{
+			fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
+			return;
+		}
+
+		snprintf(logpath, sizeof(logpath), "pgbench_log.%d.tmp", main_pid);
+		outfile = fopen(logpath, "w");
+
+		if (outfile == NULL)
+		{
+			fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno));
+			return;
+		}
+
+		if (agg_interval > 0)
+			merge_aggregated_logs(infile_a, infile_b, outfile);
+		else
+			merge_simple_logs(infile_a, infile_b, outfile);
+
+		fclose(infile_a);
+		fclose(infile_b);
+		fclose(outfile);
+
+		snprintf(logpath_new, sizeof(logpath), "pgbench_log.%d", main_pid);
+		unlink(logpath_new);
+
+		snprintf(logpath, sizeof(logpath), "pgbench_log.%d.%d", main_pid, i);
+		unlink(logpath);
+
+		snprintf(logpath, sizeof(logpath), "pgbench_log.%d.tmp", main_pid);
+		rename(logpath, logpath_new);
+	}
+}
+
+static void
+merge_simple_logs(FILE *infile_a, FILE *infile_b, FILE *outfile)
+{
+	bool	fetch_a = true, fetch_b = true;
+	bool	done_a = false, done_b = false;
+
+	long	tv_sec_a, tv_sec_b;
+	long	tv_usec_a, tv_usec_b;
+	int		id_a, id_b;
+	int		cnt_a, cnt_b;
+	int		use_file_a, use_file_b;
+	double	lag_a, lag_b;
+	
+	/* latency may be 'skipped' in some cases */
+	char	latency_a[64], latency_b[64];
+
+	/* repeat until we exhaust data from both files */
+	while (! (done_a && done_b))
+	{
+		if (fetch_a)
+		{
+			if (throttle_delay)
+			{
+				if (fscanf(infile_a, "%d %d %s %d %ld %ld %lf",
+					&id_a, &cnt_a, latency_a, &use_file_a,
+					&tv_sec_a, &tv_usec_a, &lag_a) != 7)
+					done_a = true;
+			}
+			else
+				if (fscanf(infile_a, "%d %d %s %d %ld %ld",
+					&id_a, &cnt_a, latency_a, &use_file_a,
+					&tv_sec_a, &tv_usec_a) != 6)
+					done_a = true;
+
+			fetch_a = false;
+		}
+
+		if (fetch_b)
+		{
+			if (throttle_delay)
+			{
+				if (fscanf(infile_b, "%d %d %s %d %ld %ld %lf",
+					&id_b, &cnt_b, latency_b, &use_file_b,
+					&tv_sec_b, &tv_usec_b, &lag_b) != 7)
+					done_b = true;
+			}
+			else
+				if (fscanf(infile_b, "%d %d %s %d %ld %ld",
+					&id_b, &cnt_b, latency_b, &use_file_b,
+					&tv_sec_b, &tv_usec_b) != 6)
+					done_b = true;
+
+			fetch_b = false;
+		}
+
+		/* both files completed */
+		if (done_a && done_b)
+			break;
+
+		if ((!done_a) && (! done_b))
+		{
+			/* 'a' before 'b' (or at the same time) */
+			if ((tv_sec_a < tv_sec_b) || ((tv_sec_a == tv_sec_b) && (tv_usec_a <= tv_usec_b)))
+			{
+				if (throttle_delay)
+					fprintf(outfile, "%d %d %s %d %ld %ld %.0f\n",
+							id_a, cnt_a, latency_a, use_file_a,
+							tv_sec_a, tv_usec_a, lag_a);
+				else
+					fprintf(outfile, "%d %d %s %d %ld %ld\n",
+							id_a, cnt_a, latency_a, use_file_a,
+							tv_sec_a, tv_usec_a);
+				fetch_a = true;
+			}
+			else /* b after a */
+			{
+				if (throttle_delay)
+					fprintf(outfile, "%d %d %s %d %ld %ld %.0f\n",
+							id_b, cnt_b, latency_b, use_file_b,
+							tv_sec_b, tv_usec_b, lag_b);
+				else
+					fprintf(outfile, "%d %d %s %d %ld %ld\n",
+							id_b, cnt_b, latency_b, use_file_b,
+							tv_sec_b, tv_usec_b);
+				fetch_b = true;
+			}
+		}
+		else if (!done_a)
+		{
+			if (throttle_delay)
+				fprintf(outfile, "%d %d %s %d %ld %ld %.0f\n",
+						id_a, cnt_a, latency_a, use_file_a,
+						tv_sec_a, tv_usec_a, lag_a);
+			else
+				fprintf(outfile, "%d %d %s %d %ld %ld\n",
+						id_a, cnt_a, latency_a, use_file_a,
+						tv_sec_a, tv_usec_a);
+			fetch_a = true;
+		}
+		else if (!done_b)
+		{
+			if (throttle_delay)
+				fprintf(outfile, "%d %d %s %d %ld %ld %.0f\n",
+						id_b, cnt_b, latency_b, use_file_b,
+						tv_sec_b, tv_usec_b, lag_b);
+			else
+				fprintf(outfile, "%d %d %s %d %ld %ld\n",
+						id_b, cnt_b, latency_b, use_file_b,
+						tv_sec_b, tv_usec_b);
+			fetch_b = true;
+		}
+	}
+}
+
+static void
+merge_aggregated_logs(FILE *infile_a, FILE *infile_b, FILE *outfile)
+{
+	bool	fetch_a = true, fetch_b = true;
+	bool	done_a = false, done_b = false;
+
+	long	start_time_a, start_time_b;
+	int		cnt_a, cnt_b;
+
+	double	min_latency_a, min_latency_b;
+	double	max_latency_a, max_latency_b;
+	double	sum_latency_a, sum_latency_b;
+	double	sum2_latency_a, sum2_latency_b;
+
+	/* used with 'throttle_delay' */
+	double	min_lag_a, min_lag_b;
+	double	max_lag_a, max_lag_b;
+	double	sum_lag_a, sum_lag_b;
+	double	sum2_lag_a, sum2_lag_b;
+
+	/* repeat until we exhaust data from both files */
+	while (! (done_a && done_b))
+	{
+		if (fetch_a)
+		{
+			if (throttle_delay)
+			{
+				if (fscanf(infile_a, "%ld %d %lf %lf %lf %lf %lf %lf %lf %lf",
+					&start_time_a, &cnt_a, &sum_latency_a, &sum2_latency_a,
+					&min_latency_a, &max_latency_a, &sum_lag_a, &sum2_lag_a,
+					&min_lag_a, &max_lag_a) != 10)
+					done_a = true;
+			}
+			else if (fscanf(infile_a, "%ld %d %lf %lf %lf %lf",
+					&start_time_a, &cnt_a, &sum_latency_a, &sum2_latency_a,
+					&min_latency_a, &max_latency_a) != 6)
+					done_a = true;
+
+			fetch_a = false;
+		}
+
+		if (fetch_b)
+		{
+			if (throttle_delay)
+			{
+				if (fscanf(infile_b, "%ld %d %lf %lf %lf %lf %lf %lf %lf %lf",
+					&start_time_b, &cnt_b, &sum_latency_b, &sum2_latency_b,
+					&min_latency_b, &max_latency_b, &sum_lag_b, &sum2_lag_b,
+					&min_lag_b, &max_lag_b) != 10)
+					done_b = true;
+			}
+			else if (fscanf(infile_b, "%ld %d %lf %lf %lf %lf",
+					&start_time_b, &cnt_b, &sum_latency_b, &sum2_latency_b,
+					&min_latency_b, &max_latency_b) != 6)
+					done_b = true;
+
+			fetch_b = false;
+		}
+
+		/* both files completed */
+		if (done_a && done_b)
+			break;
+
+		if ((!done_a) && (! done_b))
+		{
+			/* we need to decide whether to merge the data (may contain gaps) */
+			if (start_time_a == start_time_b)
+			{
+				if (throttle_delay)
+					fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f %lf %lf %lf %lf\n",
+							start_time_a, (cnt_a + cnt_b),
+							(sum_latency_a + sum_latency_b), (sum2_latency_a + sum2_latency_b),
+							(min_latency_a < min_latency_b) ? min_latency_a : min_latency_b,
+							(max_latency_a > max_latency_b) ? max_latency_a : max_latency_b,
+							(sum_lag_a + sum_lag_b), (sum2_lag_a + sum2_lag_b),
+							(min_lag_a < min_lag_b) ? min_lag_a : min_lag_b,
+							(max_lag_a > max_lag_b) ? max_lag_a : max_lag_b);
+				else
+					fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f\n",
+							start_time_a, cnt_a + cnt_b,
+							sum_latency_a + sum_latency_b,
+							sum2_latency_a + sum2_latency_b,
+							(min_latency_a < min_latency_b) ? min_latency_a : min_latency_b,
+							(max_latency_a > max_latency_b) ? max_latency_a : max_latency_b);
+
+				fetch_a = true;
+				fetch_b = true;
+			}
+			else if (start_time_a < start_time_b)
+			{
+				if (throttle_delay)
+					fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f %lf %lf %lf %lf\n",
+							start_time_a, cnt_a,
+							sum_latency_a, sum2_latency_a,
+							min_latency_a, max_latency_a,
+							sum_lag_a, sum2_lag_a,
+							min_lag_a, max_lag_a);
+				else
+					fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f\n",
+							start_time_a, cnt_a,
+							sum_latency_a, sum2_latency_a,
+							min_latency_a, max_latency_a);
+				fetch_a = true;
+			}
+			else
+			{
+				if (throttle_delay)
+					fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f %lf %lf %lf %lf\n",
+							start_time_b, cnt_b,
+							sum_latency_b, sum2_latency_b,
+							min_latency_b, max_latency_b,
+							sum_lag_b, sum2_lag_b,
+							min_lag_b, max_lag_b);
+				else
+					fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f\n",
+							start_time_b, cnt_b,
+							sum_latency_b, sum2_latency_b,
+							min_latency_b, max_latency_b);
+				fetch_b = true;
+			}
+		}
+		else if (!done_a)
+		{
+			if (throttle_delay)
+				fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f %lf %lf %lf %lf\n",
+						start_time_a, cnt_a,
+						sum_latency_a, sum2_latency_a,
+						min_latency_a, max_latency_a,
+						sum_lag_a, sum2_lag_a,
+						min_lag_a, max_lag_a);
+			else
+				fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f\n",
+						start_time_a, cnt_a,
+						sum_latency_a, sum2_latency_a,
+						min_latency_a, max_latency_a);
+			fetch_a = true;
+		}
+		else if (!done_b)
+		{
+			if (throttle_delay)
+				fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f %lf %lf %lf %lf\n",
+						start_time_b, cnt_b,
+						sum_latency_b, sum2_latency_b,
+						min_latency_b, max_latency_b,
+						sum_lag_b, sum2_lag_b,
+						min_lag_b, max_lag_b);
+			else
+				fprintf(outfile, "%ld %d %.0f %.0f %.0f %.0f\n",
+						start_time_b, cnt_b,
+						sum_latency_b, sum2_latency_b,
+						min_latency_b, max_latency_b);
+			fetch_b = true;
+		}
+	}
+}
+
 /*
  * Support for duration option: set timer_exceeded after so many seconds.
  */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to