Hello Greg,

I think that the weirdness really comes from the way transactions times are measured, their interactions with throttling, and latent bugs in the code.

One issue is that the throttling time was included in the measure, but not the first time because "txn_begin" is not set at the beginning of doCustom.

Also, flag st->listen is set to 1 but *never* set back to 0...
 sh> grep listen pgbench.c
        int                     listen;
        if (st->listen)
                        st->listen = 1;
                        st->listen = 1;
                        st->listen = 1;
                        st->listen = 1;
                                st->listen = 1;
                                st->listen = 1;

ISTM that I can fix the "weirdness" by inserting an ugly "goto top;", but I would feel better about it by removing all gotos and reordering some actions in doCustom in a more logical way. However that would be a bigger patch.

Please find attached 2 patches:

 - the first is the full throttle patch which ensures that the
   txn_begin is taken at a consistent point, after throttling,
   which requires resetting "listen". There is an ugly goto.
   I've also put times in a consistent format in the log,
   "789.012345" instead of "789 12345".

 - the second patch just shows the diff between v10 and the first one.

--
Fabien.
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index 8c202bf..dc4f819 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -137,6 +137,12 @@ int			unlogged_tables = 0;
 double		sample_rate = 0.0;
 
 /*
+ * When threads are throttled to a given rate limit, this is the target delay
+ * to reach that rate in usec.  0 is the default and means no throttling.
+ */
+int64		throttle_delay = 0;
+
+/*
  * tablespace selection
  */
 char	   *tablespace = NULL;
@@ -205,6 +211,7 @@ typedef struct
 	int			nvariables;
 	instr_time	txn_begin;		/* used for measuring transaction latencies */
 	instr_time	stmt_begin;		/* used for measuring statement latencies */
+	bool		throttled;      /* whether current transaction was throttled */
 	int			use_file;		/* index in sql_files for this client */
 	bool		prepared[MAX_FILES];
 } CState;
@@ -222,6 +229,10 @@ typedef struct
 	instr_time *exec_elapsed;	/* time spent executing cmds (per Command) */
 	int		   *exec_count;		/* number of cmd executions (per Command) */
 	unsigned short random_state[3];		/* separate randomness for each thread */
+    int64       throttle_trigger;  /* previous/next throttling (us) */
+	int64       throttle_lag;      /* total transaction lag behind throttling */
+	int64       throttle_lag_max;  /* max transaction lag */
+
 } TState;
 
 #define INVALID_THREAD		((pthread_t) 0)
@@ -230,6 +241,8 @@ typedef struct
 {
 	instr_time	conn_time;
 	int			xacts;
+	int64       throttle_lag;
+	int64       throttle_lag_max;
 } TResult;
 
 /*
@@ -355,6 +368,8 @@ usage(void)
 		   "  -n           do not run VACUUM before tests\n"
 		   "  -N           do not update tables \"pgbench_tellers\" and \"pgbench_branches\"\n"
 		   "  -r           report average latency per command\n"
+		   "  -R SPEC, --rate SPEC\n"
+		   "               target rate in transactions per second\n"
 		   "  -s NUM       report this scale factor in output\n"
 		   "  -S           perform SELECT-only transactions\n"
 	 "  -t NUM       number of transactions each client runs (default: 10)\n"
@@ -898,17 +913,56 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
 {
 	PGresult   *res;
 	Command   **commands;
+	bool        do_throttle = false;
 
 top:
 	commands = sql_files[st->use_file];
 
+	/* handle throttling once per transaction by inserting a sleep.
+	 * this is simpler than doing it at the end.
+	 */
+	if (throttle_delay && ! st->throttled)
+	{
+		/* compute delay to approximate a Poisson distribution
+		 * 1000000 => 13.8 .. 0 multiplier
+		 *  100000 => 11.5 .. 0
+		 *   10000 =>  9.2 .. 0
+		 *    1000 =>  6.9 .. 0
+		 * if transactions are too slow or a given wait shorter than
+		 * a transaction, the next transaction will start right away.
+		 */
+		int64 wait = (int64)
+			throttle_delay * -log(getrand(thread, 1, 1000)/1000.0);
+
+		thread->throttle_trigger += wait;
+
+		st->until = thread->throttle_trigger;
+		st->sleeping = 1;
+		st->throttled = true;
+		if (debug)
+			fprintf(stderr, "client %d throttling "INT64_FORMAT" us\n",
+					st->id, wait);
+	}
+
 	if (st->sleeping)
 	{							/* are we sleeping? */
 		instr_time	now;
+		int64 now_us;
 
 		INSTR_TIME_SET_CURRENT(now);
-		if (st->until <= INSTR_TIME_GET_MICROSEC(now))
+		now_us = INSTR_TIME_GET_MICROSEC(now);
+		if (st->until <= now_us)
+		{
 			st->sleeping = 0;	/* Done sleeping, go ahead with next command */
+			if (throttle_delay && st->state==0)
+			{
+				/* measure lag of throttled transaction */
+				int64 lag = now_us - st->until;
+				thread->throttle_lag += lag;
+				if (lag > thread->throttle_lag_max)
+					thread->throttle_lag_max = lag;
+			}
+		}
 		else
 			return true;		/* Still sleeping, nothing to do here */
 	}
@@ -1037,7 +1091,7 @@ top:
 					 * This is more than we really ought to know about
 					 * instr_time
 					 */
-					fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
+					fprintf(logfile, "%d %d %.0f %d %ld.%06ld\n",
 							st->id, st->cnt, usec, st->use_file,
 							(long) now.tv_sec, (long) now.tv_usec);
 #else
@@ -1046,7 +1100,7 @@ top:
 					 * On Windows, instr_time doesn't provide a timestamp
 					 * anyway
 					 */
-					fprintf(logfile, "%d %d %.0f %d 0 0\n",
+					fprintf(logfile, "%d %d %.0f %d 0.0\n",
 							st->id, st->cnt, usec, st->use_file);
 #endif
 				}
@@ -1095,6 +1149,9 @@ top:
 			st->state = 0;
 			st->use_file = (int) getrand(thread, 0, num_files - 1);
 			commands = sql_files[st->use_file];
+			st->throttled = false;
+			st->listen = 0;
+			do_throttle = (throttle_delay>0);
 		}
 	}
 
@@ -1113,6 +1170,12 @@ top:
 		INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
 	}
 
+	if (do_throttle) {
+		/* delay throttling after reopenning the connection */
+		do_throttle = false;
+		goto top;
+	}
+
 	/* Record transaction start time if logging is enabled */
 	if (logfile && st->state == 0)
 		INSTR_TIME_SET_CURRENT(st->txn_begin);
@@ -2017,7 +2080,8 @@ process_builtin(char *tb)
 static void
 printResults(int ttype, int normal_xacts, int nclients,
 			 TState *threads, int nthreads,
-			 instr_time total_time, instr_time conn_total_time)
+			 instr_time total_time, instr_time conn_total_time,
+			 int64 throttle_lag, int64 throttle_lag_max)
 {
 	double		time_include,
 				tps_include,
@@ -2055,6 +2119,18 @@ printResults(int ttype, int normal_xacts, int nclients,
 		printf("number of transactions actually processed: %d\n",
 			   normal_xacts);
 	}
+
+	if (throttle_delay)
+	{
+		/* Report average transaction lag under throttling, i.e. the delay
+		   between scheduled and actual start times for the transaction.
+		   The measured lag may be linked to the thread/client load,
+		   the database load, or the Poisson throttling process.
+		 */
+		printf("average transaction lag: %.3f ms (max %.3f ms)\n",
+			   0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+	}
+
 	printf("tps = %f (including connections establishing)\n", tps_include);
 	printf("tps = %f (excluding connections establishing)\n", tps_exclude);
 
@@ -2115,6 +2191,7 @@ main(int argc, char **argv)
 		{"unlogged-tables", no_argument, &unlogged_tables, 1},
 		{"sampling-rate", required_argument, NULL, 4},
 		{"aggregate-interval", required_argument, NULL, 5},
+		{"rate", required_argument, NULL, 'R'},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -2137,6 +2214,8 @@ main(int argc, char **argv)
 	instr_time	total_time;
 	instr_time	conn_total_time;
 	int			total_xacts;
+	int64       throttle_lag = 0;
+	int64       throttle_lag_max = 0;
 
 	int			i;
 
@@ -2181,7 +2260,7 @@ main(int argc, char **argv)
 	state = (CState *) pg_malloc(sizeof(CState));
 	memset(state, 0, sizeof(CState));
 
-	while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:R:", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -2336,6 +2415,19 @@ main(int argc, char **argv)
 					exit(1);
 				}
 				break;
+			case 'R':
+			{
+				/* get a double from the beginning of option value */
+				double throttle_value = atof(optarg);
+				if (throttle_value <= 0.0)
+				{
+					fprintf(stderr, "invalid rate limit: %s\n", optarg);
+					exit(1);
+				}
+				/* Invert rate limit into a time offset */
+				throttle_delay = (int64) (1000000.0 / throttle_value);
+			}
+				break;
 			case 0:
 				/* This covers long options which take no argument. */
 				break;
@@ -2373,6 +2465,9 @@ main(int argc, char **argv)
 		}
 	}
 
+    /* compute a per thread delay */
+	throttle_delay *= nthreads;
+
 	if (argc > optind)
 		dbName = argv[optind];
 	else
@@ -2671,6 +2766,9 @@ main(int argc, char **argv)
 			TResult    *r = (TResult *) ret;
 
 			total_xacts += r->xacts;
+			throttle_lag += r->throttle_lag;
+			if (r->throttle_lag_max > throttle_lag_max)
+				throttle_lag_max = r->throttle_lag_max;
 			INSTR_TIME_ADD(conn_total_time, r->conn_time);
 			free(ret);
 		}
@@ -2681,7 +2779,7 @@ main(int argc, char **argv)
 	INSTR_TIME_SET_CURRENT(total_time);
 	INSTR_TIME_SUBTRACT(total_time, start_time);
 	printResults(ttype, total_xacts, nclients, threads, nthreads,
-				 total_time, conn_total_time);
+				 total_time, conn_total_time, throttle_lag, throttle_lag_max);
 
 	return 0;
 }
@@ -2701,6 +2799,15 @@ threadRun(void *arg)
 
 	AggVals		aggs;
 
+	/* SHOULD take actual thread start time when the thread is running? */
+	/* INSTR_TIME_SET_CURRENT(thread->start_time); */
+
+	/* throttling for all thread's clients */
+	INSTR_TIME_SET_CURRENT(start);
+	thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
+	thread->throttle_lag = 0;
+	thread->throttle_lag_max = 0;
+
 	result = pg_malloc(sizeof(TResult));
 
 	INSTR_TIME_SET_ZERO(result->conn_time);
@@ -2869,6 +2976,8 @@ done:
 	result->xacts = 0;
 	for (i = 0; i < nstate; i++)
 		result->xacts += state[i].cnt;
+	result->throttle_lag = thread->throttle_lag;
+	result->throttle_lag_max = thread->throttle_lag_max;
 	INSTR_TIME_SET_CURRENT(end);
 	INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
 	if (logfile)
diff --git a/doc/src/sgml/pgbench.sgml b/doc/src/sgml/pgbench.sgml
index e9900d3..c23dc7d 100644
--- a/doc/src/sgml/pgbench.sgml
+++ b/doc/src/sgml/pgbench.sgml
@@ -392,6 +392,27 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
      </varlistentry>
 
      <varlistentry>
+      <term><option>-R</option> <replaceable>rate</></term>
+      <term><option>--rate</option> <replaceable>rate</></term>
+      <listitem>
+       <para>
+	Execute transactions targeting the specified rate instead of
+	running as fast as possible (the default).  The rate is given in
+        transactions per second.  If the targeted rate is
+        above the maximum possible rate these transactions can execute at,
+        the rate limit won't have any impact on results.
+
+	The rate is targeted by starting transactions along a
+	Poisson-distributed event time line.  When a rate limit is
+        active, the average and maximum transaction lag time
+	(the delay between the scheduled and actual transaction start times)
+	are reported in ms. High values indicate that the database
+	could not handle the scheduled load at some time.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>-s</option> <replaceable>scale_factor</></term>
       <listitem>
        <para>
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index 136a2bf..dc4f819 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -913,6 +913,7 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
 {
 	PGresult   *res;
 	Command   **commands;
+	bool        do_throttle = false;
 
 top:
 	commands = sql_files[st->use_file];
@@ -1090,7 +1091,7 @@ top:
 					 * This is more than we really ought to know about
 					 * instr_time
 					 */
-					fprintf(logfile, "%d %d %.0f %d %ld %ld\n",
+					fprintf(logfile, "%d %d %.0f %d %ld.%06ld\n",
 							st->id, st->cnt, usec, st->use_file,
 							(long) now.tv_sec, (long) now.tv_usec);
 #else
@@ -1099,7 +1100,7 @@ top:
 					 * On Windows, instr_time doesn't provide a timestamp
 					 * anyway
 					 */
-					fprintf(logfile, "%d %d %.0f %d 0 0\n",
+					fprintf(logfile, "%d %d %.0f %d 0.0\n",
 							st->id, st->cnt, usec, st->use_file);
 #endif
 				}
@@ -1149,6 +1150,8 @@ top:
 			st->use_file = (int) getrand(thread, 0, num_files - 1);
 			commands = sql_files[st->use_file];
 			st->throttled = false;
+			st->listen = 0;
+			do_throttle = (throttle_delay>0);
 		}
 	}
 
@@ -1167,6 +1170,12 @@ top:
 		INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
 	}
 
+	if (do_throttle) {
+		/* delay throttling after reopenning the connection */
+		do_throttle = false;
+		goto top;
+	}
+
 	/* Record transaction start time if logging is enabled */
 	if (logfile && st->state == 0)
 		INSTR_TIME_SET_CURRENT(st->txn_begin);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to