[...]

Yeah, something like that. I don't think it would be necessary to set statement_timeout, you can inject that in your script or postgresql.conf if you want. I don't think aborting a transaction that's already started is necessary either. You could count it as LATE, but let it finish first.

I've implemented something along these simplified lines. The latency is not limited as such, but slow (over the limit) queries are counted and reported.

--
Fabien.
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index 2f7d80e..96e5fb9 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -141,6 +141,18 @@ double		sample_rate = 0.0;
 int64		throttle_delay = 0;
 
 /*
+ * Transactions which take longer that this limit are counted as late
+ * and reported as such, although they are completed anyway.
+ *
+ * When under throttling: execution time slots which are more than
+ * this late (in us) are simply skipped, and the corresponding transaction
+ * is counted as such... it is not even started;
+ * otherwise above the limit transactions are counted as such, with the latency
+ * measured wrt the transaction schedule, not its actual start.
+ */
+int64		latency_limit = 0;
+
+/*
  * tablespace selection
  */
 char	   *tablespace = NULL;
@@ -238,6 +250,8 @@ typedef struct
 	int64		throttle_trigger;		/* previous/next throttling (us) */
 	int64		throttle_lag;	/* total transaction lag behind throttling */
 	int64		throttle_lag_max;		/* max transaction lag */
+	int64		throttle_latency_skipped; /* lagging transactions skipped */
+	int64		latency_late; /* late transactions */
 } TState;
 
 #define INVALID_THREAD		((pthread_t) 0)
@@ -250,6 +264,8 @@ typedef struct
 	int64		sqlats;
 	int64		throttle_lag;
 	int64		throttle_lag_max;
+	int64		throttle_latency_skipped;
+	int64		latency_late;
 } TResult;
 
 /*
@@ -367,6 +383,10 @@ usage(void)
 		 "  -f, --file=FILENAME      read transaction script from FILENAME\n"
 		   "  -j, --jobs=NUM           number of threads (default: 1)\n"
 		   "  -l, --log                write transaction times to log file\n"
+		   "  -L, --limit=NUM          count transactions lasting more than NUM ms.\n"
+		   "                           under throttling (--rate), transactions behind schedule\n"
+		   "                           more than NUM ms are skipped, and those finishing more\n"
+		   "                           than NUM ms after their scheduled start are counted.\n"
 		   "  -M, --protocol=simple|extended|prepared\n"
 		   "                           protocol for submitting queries (default: simple)\n"
 		   "  -n, --no-vacuum          do not run VACUUM before tests\n"
@@ -1016,6 +1036,24 @@ top:
 
 		thread->throttle_trigger += wait;
 
+		if (latency_limit)
+		{
+			instr_time	now;
+			int64		now_us;
+			INSTR_TIME_SET_CURRENT(now);
+			now_us = INSTR_TIME_GET_MICROSEC(now);
+			while (thread->throttle_trigger < now_us - latency_limit)
+			{
+				/* if too far behind, this slot is skipped, and we
+				 * iterate till the next nearly on time slot.
+				 */
+				int64 wait = (int64) (throttle_delay *
+					1.00055271703 * -log(getrand(thread, 1, 10000) / 10000.0));
+				thread->throttle_trigger += wait;
+				thread->throttle_latency_skipped ++;
+			}
+		}
+
 		st->until = thread->throttle_trigger;
 		st->sleeping = 1;
 		st->throttling = true;
@@ -1080,13 +1118,17 @@ top:
 			thread->exec_count[cnum]++;
 		}
 
-		/* transaction finished: record latency under progress or throttling */
-		if ((progress || throttle_delay) && commands[st->state + 1] == NULL)
+		/* transaction finished: record latency under progress or throttling,
+		 * ot if latency limit is set
+		 */
+		if ((progress || throttle_delay || latency_limit) &&
+			commands[st->state + 1] == NULL)
 		{
 			instr_time	diff;
-			int64		latency;
+			int64		latency, now;
 
 			INSTR_TIME_SET_CURRENT(diff);
+			now = INSTR_TIME_GET_MICROSEC(diff);
 			INSTR_TIME_SUBTRACT(diff, st->txn_begin);
 			latency = INSTR_TIME_GET_MICROSEC(diff);
 			st->txn_latencies += latency;
@@ -1099,6 +1141,19 @@ top:
 			 * would take 256 hours.
 			 */
 			st->txn_sqlats += latency * latency;
+
+			/* record over the limit transactions if needed.
+			 */
+			if (latency_limit)
+			{
+				/* Under throttling, late means wrt to the initial schedule,
+				 * not the actual transaction start
+				 */
+				if (throttle_delay)
+					latency = now - thread->throttle_trigger;
+				if (latency > latency_limit)
+					thread->latency_late++;
+			}
 		}
 
 		/*
@@ -1294,7 +1349,7 @@ top:
 	}
 
 	/* Record transaction start time under logging, progress or throttling */
-	if ((logfile || progress || throttle_delay) && st->state == 0)
+	if ((logfile || progress || throttle_delay || latency_limit) && st->state == 0)
 		INSTR_TIME_SET_CURRENT(st->txn_begin);
 
 	/* Record statement start time if per-command latencies are requested */
@@ -2351,7 +2406,8 @@ printResults(int ttype, int64 normal_xacts, int nclients,
 			 TState *threads, int nthreads,
 			 instr_time total_time, instr_time conn_total_time,
 			 int64 total_latencies, int64 total_sqlats,
-			 int64 throttle_lag, int64 throttle_lag_max)
+			 int64 throttle_lag, int64 throttle_lag_max,
+			 int64 throttle_latency_skipped, int64 latency_late)
 {
 	double		time_include,
 				tps_include,
@@ -2407,6 +2463,10 @@ printResults(int ttype, int64 normal_xacts, int nclients,
 			   1000.0 * duration * nclients / normal_xacts);
 	}
 
+	if (latency_limit)
+		printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT "\n",
+			   latency_limit / 1000.0, latency_late);
+
 	if (throttle_delay)
 	{
 		/*
@@ -2417,6 +2477,10 @@ printResults(int ttype, int64 normal_xacts, int nclients,
 		 */
 		printf("rate limit schedule lag: avg %.3f (max %.3f) ms\n",
 			   0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+		if (latency_limit)
+			printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
+				   throttle_latency_skipped,
+				   100.0 * throttle_latency_skipped / (throttle_latency_skipped + normal_xacts));
 	}
 
 	printf("tps = %f (including connections establishing)\n", tps_include);
@@ -2505,6 +2569,7 @@ main(int argc, char **argv)
 		{"sampling-rate", required_argument, NULL, 4},
 		{"aggregate-interval", required_argument, NULL, 5},
 		{"rate", required_argument, NULL, 'R'},
+		{"limit", required_argument, NULL, 'L'},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -2534,6 +2599,8 @@ main(int argc, char **argv)
 	int64		total_sqlats = 0;
 	int64		throttle_lag = 0;
 	int64		throttle_lag_max = 0;
+	int64		throttle_latency_skipped = 0;
+	int64		latency_late = 0;
 
 	int			i;
 
@@ -2578,7 +2645,7 @@ main(int argc, char **argv)
 	state = (CState *) pg_malloc(sizeof(CState));
 	memset(state, 0, sizeof(CState));
 
-	while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:L:", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -2775,6 +2842,18 @@ main(int argc, char **argv)
 					throttle_delay = (int64) (1000000.0 / throttle_value);
 				}
 				break;
+			case 'L':
+				{
+					double limit_ms = atof(optarg);
+					if (limit_ms <= 0.0)
+					{
+						fprintf(stderr, "invalid latency limit: %s\n", optarg);
+						exit(1);
+					}
+					benchmarking_option_set = true;
+					latency_limit = (int64) (limit_ms * 1000);
+				}
+				break;
 			case 0:
 				/* This covers long options which take no argument. */
 				if (foreign_keys || unlogged_tables)
@@ -3070,6 +3149,8 @@ main(int argc, char **argv)
 		thread->random_state[0] = random();
 		thread->random_state[1] = random();
 		thread->random_state[2] = random();
+		thread->throttle_latency_skipped = 0;
+		thread->latency_late = 0;
 
 		if (is_latencies)
 		{
@@ -3144,6 +3225,8 @@ main(int argc, char **argv)
 			total_latencies += r->latencies;
 			total_sqlats += r->sqlats;
 			throttle_lag += r->throttle_lag;
+			throttle_latency_skipped += r->throttle_latency_skipped;
+			latency_late += r->latency_late;
 			if (r->throttle_lag_max > throttle_lag_max)
 				throttle_lag_max = r->throttle_lag_max;
 			INSTR_TIME_ADD(conn_total_time, r->conn_time);
@@ -3166,7 +3249,8 @@ main(int argc, char **argv)
 	INSTR_TIME_SUBTRACT(total_time, start_time);
 	printResults(ttype, total_xacts, nclients, threads, nthreads,
 				 total_time, conn_total_time, total_latencies, total_sqlats,
-				 throttle_lag, throttle_lag_max);
+				 throttle_lag, throttle_lag_max, throttle_latency_skipped,
+				 latency_late);
 
 	return 0;
 }
@@ -3191,7 +3275,8 @@ threadRun(void *arg)
 	int64		last_count = 0,
 				last_lats = 0,
 				last_sqlats = 0,
-				last_lags = 0;
+				last_lags = 0,
+				last_skipped = 0;
 
 	AggVals		aggs;
 
@@ -3394,7 +3479,8 @@ threadRun(void *arg)
 				/* generate and show report */
 				int64		count = 0,
 							lats = 0,
-							sqlats = 0;
+							sqlats = 0,
+							skipped = 0;
 				int64		lags = thread->throttle_lag;
 				int64		run = now - last_report;
 				double		tps,
@@ -3417,23 +3503,26 @@ threadRun(void *arg)
 				sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
 				stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
 				lag = 0.001 * (lags - last_lags) / (count - last_count);
+				skipped = thread->throttle_latency_skipped - last_skipped;
 
+				fprintf(stderr,
+						"progress %d: %.1f s, %.1f tps, "
+						"lat %.3f ms stddev %.3f",
+						thread->tid, total_run, tps, latency, stdev);
 				if (throttle_delay)
-					fprintf(stderr,
-							"progress %d: %.1f s, %.1f tps, "
-							"lat %.3f ms stddev %.3f, lag %.3f ms\n",
-							thread->tid, total_run, tps, latency, stdev, lag);
-				else
-					fprintf(stderr,
-							"progress %d: %.1f s, %.1f tps, "
-							"lat %.3f ms stddev %.3f\n",
-							thread->tid, total_run, tps, latency, stdev);
+				{
+					fprintf(stderr, ", lag %.3f ms", lag);
+					if (latency_limit)
+						fprintf(stderr, ", " INT64_FORMAT " skipped", skipped);
+				}
+				fprintf(stderr, "\n");
 
 				last_count = count;
 				last_lats = lats;
 				last_sqlats = sqlats;
 				last_lags = lags;
 				last_report = now;
+				last_skipped = thread->throttle_latency_skipped;
 				next_report += (int64) progress *1000000;
 			}
 		}
@@ -3452,7 +3541,8 @@ threadRun(void *arg)
 				int64		count = 0,
 							lats = 0,
 							sqlats = 0,
-							lags = 0;
+							lags = 0,
+							skipped = 0;
 				int64		run = now - last_report;
 				double		tps,
 							total_run,
@@ -3477,23 +3567,26 @@ threadRun(void *arg)
 				sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
 				stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
 				lag = 0.001 * (lags - last_lags) / (count - last_count);
+				skipped = thread->throttle_latency_skipped - last_skipped;
 
+				fprintf(stderr,
+						"progress: %.1f s, %.1f tps, "
+						"lat %.3f ms stddev %.3f",
+						total_run, tps, latency, stdev);
 				if (throttle_delay)
-					fprintf(stderr,
-							"progress: %.1f s, %.1f tps, "
-							"lat %.3f ms stddev %.3f, lag %.3f ms\n",
-							total_run, tps, latency, stdev, lag);
-				else
-					fprintf(stderr,
-							"progress: %.1f s, %.1f tps, "
-							"lat %.3f ms stddev %.3f\n",
-							total_run, tps, latency, stdev);
+				{
+					fprintf(stderr, ", lag %.3f ms", lag);
+					if (latency_limit)
+						fprintf(stderr, ", " INT64_FORMAT " skipped", skipped);
+				}
+				fprintf(stderr, "\n");
 
 				last_count = count;
 				last_lats = lats;
 				last_sqlats = sqlats;
 				last_lags = lags;
 				last_report = now;
+				last_skipped = thread->throttle_latency_skipped;
 				next_report += (int64) progress *1000000;
 			}
 		}
@@ -3514,6 +3607,9 @@ done:
 	}
 	result->throttle_lag = thread->throttle_lag;
 	result->throttle_lag_max = thread->throttle_lag_max;
+	result->throttle_latency_skipped = thread->throttle_latency_skipped;
+	result->latency_late = thread->latency_late;
+
 	INSTR_TIME_SET_CURRENT(end);
 	INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
 	if (logfile)
diff --git a/doc/src/sgml/pgbench.sgml b/doc/src/sgml/pgbench.sgml
index 23bfa9e..f80116f 100644
--- a/doc/src/sgml/pgbench.sgml
+++ b/doc/src/sgml/pgbench.sgml
@@ -345,6 +345,23 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
      </varlistentry>
 
      <varlistentry>
+      <term><option>-L</option> <replaceable>limit</></term>
+      <term><option>--limit=</option><replaceable>limit</></term>
+      <listitem>
+       <para>
+        Under normal operations, transaction which last more than
+        <replaceable>limit</> milliseconds are counted and reported.
+       </para>
+       <para>
+        Under throttling option <option>--rate=...</>, transactions behind
+        schedule by <replaceable>limit</> ms are simply skipped and counted
+        as such, then transactions finished later than the scheduled time plus
+        <replaceable>limit</> are counted and reported.
+       </para>
+       </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>-M</option> <replaceable>querymode</></term>
       <term><option>--protocol=</option><replaceable>querymode</></term>
       <listitem>
@@ -437,7 +454,9 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
         schedule moves forward based on when the client first started, not
         when the previous transaction ended.  That approach means that when
         transactions go past their original scheduled end time, it is
-        possible for later ones to catch up again.
+        possible for later ones to catch up again. It can be changed with
+        option <option>--limit</> which skips and counts transactions
+        late by more than this delay.
        </para>
        <para>
         When throttling is active, the average and maximum transaction
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to