>> Sorry about that, with your clarification I see what you were trying
>> to explain now.  The code initializes the target time like this:
>> 
>> thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
>> 
>> And then each time a transaction fires, it advances the reference time
>> forward based on the expected rate:
>> 
>> thread->throttle_trigger += wait;
>> 
>> It does *not* reset thread->throttle_trigger based on when the
>> previous transaction ended / when the next transaction started.  If
>> the goal is 10us transaction times, it beats a steady drum saying the
>> transactions should come at 10us, 20us, 30us (on average--there's some
>> randomness in the goals).  It does not pay any attention to when the
>> previous transactions finished.
>> 
>> That means that if an early transaction takes an extra 1000us, every
>> transaction after that will also show as 1000us late--even if all of
>> them take 10us.  You expect that those later transactions will show 0
>> lag, since they took the right duration.  For that to happen,
>> thread->throttle_trigger would need to be re-initialized with the
>> current time at the end of each completed transaction.
> 
> Yes, that's exactly what I understand from the code.
> 
>> The lag computation was not the interesting part of this feature to
>> me.  As I said before, I considered it more of a debugging level thing
>> than a number people would analyze as much as you did.  I understand
>> why you don't like it though.  If the reference time was moved forward
>> to match the transaction end each time, I think that would give the
>> lag definition you're looking for.  That's fine to me too, if Fabien
>> doesn't have a good reason to reject the idea.  We would need to make
>> sure that doesn't break some part of the design too.
> 
> I would like to hear from Fabien about the issue too.

For your information, included is the patch against git master head to
implement the lag in a way what I proposed. With the patch, I get more
consistent number on Linux (and Mac OS X).
--
Tatsuo Ishii
SRA OSS, Inc. Japan
English: http://www.sraoss.co.jp/index_en.php
Japanese: http://www.sraoss.co.jp
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index 2ad8f0b..57e62dc 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -137,6 +137,12 @@ int			unlogged_tables = 0;
 double		sample_rate = 0.0;
 
 /*
+ * When threads are throttled to a given rate limit, this is the target delay
+ * to reach that rate in usec.  0 is the default and means no throttling.
+ */
+int64		throttle_delay = 0;
+
+/*
  * tablespace selection
  */
 char	   *tablespace = NULL;
@@ -202,11 +208,15 @@ typedef struct
 	int			listen;			/* 0 indicates that an async query has been
 								 * sent */
 	int			sleeping;		/* 1 indicates that the client is napping */
+	bool        throttling;     /* whether nap is for throttling */
 	int64		until;			/* napping until (usec) */
+	int64		wait;			/* randomly generated delay (usec) */
 	Variable   *variables;		/* array of variable definitions */
 	int			nvariables;
 	instr_time	txn_begin;		/* used for measuring transaction latencies */
+	instr_time  txn_begin_throttle;		/* tx start time used when transaction throttling enabled */
 	instr_time	stmt_begin;		/* used for measuring statement latencies */
+	bool		is_throttled;	/* whether transaction throttling is done */
 	int			use_file;		/* index in sql_files for this client */
 	bool		prepared[MAX_FILES];
 } CState;
@@ -224,6 +234,9 @@ typedef struct
 	instr_time *exec_elapsed;	/* time spent executing cmds (per Command) */
 	int		   *exec_count;		/* number of cmd executions (per Command) */
 	unsigned short random_state[3];		/* separate randomness for each thread */
+	int64       throttle_trigger; 	/* previous/next throttling (us) */
+	int64       throttle_lag; 		/* total transaction lag behind throttling */
+	int64       throttle_lag_max; 	/* max transaction lag */
 } TState;
 
 #define INVALID_THREAD		((pthread_t) 0)
@@ -232,6 +245,8 @@ typedef struct
 {
 	instr_time	conn_time;
 	int			xacts;
+	int64       throttle_lag;
+	int64       throttle_lag_max;
 } TResult;
 
 /*
@@ -356,6 +371,7 @@ usage(void)
 		   "  -N, --skip-some-updates  skip updates of pgbench_tellers and pgbench_branches\n"
 		   "  -P, --progress=NUM       show thread progress report every NUM seconds\n"
 		   "  -r, --report-latencies   report average latency per command\n"
+		   "  -R, --rate=SPEC          target rate in transactions per second\n"
 		   "  -s, --scale=NUM          report this scale factor in output\n"
 		   "  -S, --select-only        perform SELECT-only transactions\n"
 		   "  -t, --transactions       number of transactions each client runs "
@@ -898,19 +914,80 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
 {
 	PGresult   *res;
 	Command   **commands;
+	bool        trans_needs_throttle = false;
 
 top:
 	commands = sql_files[st->use_file];
 
+	/*
+	 * Handle throttling once per transaction by sleeping.  It is simpler
+	 * to do this here rather than at the end, because so much complicated
+	 * logic happens below when statements finish.
+	 */
+	if (throttle_delay && ! st->is_throttled)
+	{
+		/*
+		 * Use inverse transform sampling to randomly generate a delay, such
+		 * that the series of delays will approximate a Poisson distribution
+		 * centered on the throttle_delay time.
+		 *
+		 * 1000 implies a 6.9 (-log(1/1000)) to 0.0 (log 1.0) delay multiplier.
+		 *
+		 * If transactions are too slow or a given wait is shorter than
+		 * a transaction, the next transaction will start right away.
+		 */
+		int64 wait = (int64)
+			throttle_delay * -log(getrand(thread, 1, 1000)/1000.0);
+
+		thread->throttle_trigger += wait;
+
+		st->until = thread->throttle_trigger;
+		st->wait = wait;
+		st->sleeping = 1;
+		st->throttling = true;
+		st->is_throttled = true;
+		if (debug)
+			fprintf(stderr, "client %d throttling "INT64_FORMAT" us\n",
+					st->id, wait);
+
+	}
+
 	if (st->sleeping)
 	{							/* are we sleeping? */
 		instr_time	now;
+		int64 now_us;
+		int64 start_us;
 
 		INSTR_TIME_SET_CURRENT(now);
-		if (st->until <= INSTR_TIME_GET_MICROSEC(now))
+		now_us = INSTR_TIME_GET_MICROSEC(now);
+		if (st->until <= now_us)
+		{
 			st->sleeping = 0;	/* Done sleeping, go ahead with next command */
+			start_us = INSTR_TIME_GET_MICROSEC(st->txn_begin_throttle);
+			if (start_us <= 0)
+				start_us = INSTR_TIME_GET_MICROSEC(thread->start_time);
+
+			if (st->throttling)
+			{
+				/* Measure lag of throttled transaction relative to target */
+				int64 lag = now_us - start_us - st->wait;
+
+				if (debug)
+					fprintf(stderr, "stmt_begin: "INT64_FORMAT" now_us: "INT64_FORMAT" wait:"INT64_FORMAT" until:"INT64_FORMAT" lag:"INT64_FORMAT"\n", start_us, now_us, st->wait, st->until, lag);
+
+				thread->throttle_lag += lag;
+				if (lag > thread->throttle_lag_max)
+					thread->throttle_lag_max = lag;
+				st->throttling = false;
+			}
+		}
 		else
+		{
+			if (debug)
+				fprintf(stderr, "still sleeping\n");
+
 			return true;		/* Still sleeping, nothing to do here */
+		}
 	}
 
 	if (st->listen)
@@ -1095,6 +1172,15 @@ top:
 			st->state = 0;
 			st->use_file = (int) getrand(thread, 0, num_files - 1);
 			commands = sql_files[st->use_file];
+			st->is_throttled = false;
+			/*
+			 * No transaction is underway anymore, which means there is nothing
+			 * to listen to right now.  When throttling rate limits are active,
+			 * a sleep will happen next, as the next transaction starts.  And
+			 * then in any case the next SQL command will set listen back to 1.
+			 */
+			st->listen = 0;
+			trans_needs_throttle = (throttle_delay>0);
 		}
 	}
 
@@ -1113,6 +1199,16 @@ top:
 		INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
 	}
 
+	/*
+	 * This ensures that a throttling delay is inserted before proceeding
+	 * with sql commands, after the first transaction. The first transaction
+	 * throttling is performed when first entering doCustom.
+	 */
+	if (trans_needs_throttle) {
+		trans_needs_throttle = false;
+		goto top;
+	}
+
 	/* Record transaction start time if logging is enabled */
 	if (logfile && st->state == 0)
 		INSTR_TIME_SET_CURRENT(st->txn_begin);
@@ -1121,6 +1217,9 @@ top:
 	if (is_latencies)
 		INSTR_TIME_SET_CURRENT(st->stmt_begin);
 
+	if (throttle_delay)
+		INSTR_TIME_SET_CURRENT(st->txn_begin_throttle);
+
 	if (commands[st->state]->type == SQL_COMMAND)
 	{
 		const Command *command = commands[st->state];
@@ -2017,7 +2116,8 @@ process_builtin(char *tb)
 static void
 printResults(int ttype, int normal_xacts, int nclients,
 			 TState *threads, int nthreads,
-			 instr_time total_time, instr_time conn_total_time)
+			 instr_time total_time, instr_time conn_total_time,
+			 int64 throttle_lag, int64 throttle_lag_max)
 {
 	double		time_include,
 				tps_include,
@@ -2055,6 +2155,19 @@ printResults(int ttype, int normal_xacts, int nclients,
 		printf("number of transactions actually processed: %d\n",
 			   normal_xacts);
 	}
+
+	if (throttle_delay)
+	{
+		/*
+		 * Report average transaction lag under rate limit throttling.  This
+		 * is the delay between scheduled and actual start times for the
+		 * transaction.  The measured lag may be caused by thread/client load,
+		 * the database load, or the Poisson throttling process.
+		 */
+		printf("average rate limit lag: %.3f ms (max %.3f ms)\n",
+			   0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+	}
+
 	printf("tps = %f (including connections establishing)\n", tps_include);
 	printf("tps = %f (excluding connections establishing)\n", tps_exclude);
 
@@ -2140,6 +2253,7 @@ main(int argc, char **argv)
 		{"unlogged-tables", no_argument, &unlogged_tables, 1},
 		{"sampling-rate", required_argument, NULL, 4},
 		{"aggregate-interval", required_argument, NULL, 5},
+		{"rate", required_argument, NULL, 'R'},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -2162,6 +2276,8 @@ main(int argc, char **argv)
 	instr_time	total_time;
 	instr_time	conn_total_time;
 	int			total_xacts;
+	int64       throttle_lag = 0;
+	int64       throttle_lag_max = 0;
 
 	int			i;
 
@@ -2206,7 +2322,7 @@ main(int argc, char **argv)
 	state = (CState *) pg_malloc(sizeof(CState));
 	memset(state, 0, sizeof(CState));
 
-	while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -2371,6 +2487,19 @@ main(int argc, char **argv)
 					exit(1);
 				}
 				break;
+			case 'R':
+			{
+				/* get a double from the beginning of option value */
+				double throttle_value = atof(optarg);
+				if (throttle_value <= 0.0)
+				{
+					fprintf(stderr, "invalid rate limit: %s\n", optarg);
+					exit(1);
+				}
+				/* Invert rate limit into a time offset */
+				throttle_delay = (int64) (1000000.0 / throttle_value);
+			}
+				break;
 			case 0:
 				/* This covers long options which take no argument. */
 				break;
@@ -2408,6 +2537,9 @@ main(int argc, char **argv)
 		}
 	}
 
+    /* compute a per thread delay */
+	throttle_delay *= nthreads;
+
 	if (argc > optind)
 		dbName = argv[optind];
 	else
@@ -2721,6 +2853,9 @@ main(int argc, char **argv)
 			TResult    *r = (TResult *) ret;
 
 			total_xacts += r->xacts;
+			throttle_lag += r->throttle_lag;
+			if (r->throttle_lag_max > throttle_lag_max)
+				throttle_lag_max = r->throttle_lag_max;
 			INSTR_TIME_ADD(conn_total_time, r->conn_time);
 			free(ret);
 		}
@@ -2731,7 +2866,7 @@ main(int argc, char **argv)
 	INSTR_TIME_SET_CURRENT(total_time);
 	INSTR_TIME_SUBTRACT(total_time, start_time);
 	printResults(ttype, total_xacts, nclients, threads, nthreads,
-				 total_time, conn_total_time);
+				 total_time, conn_total_time, throttle_lag, throttle_lag_max);
 
 	return 0;
 }
@@ -2756,6 +2891,17 @@ threadRun(void *arg)
 
 	AggVals		aggs;
 
+	/*
+	 * Initialize throttling rate target for all of the thread's clients.  It
+	 * might be a little more accurate to reset thread->start_time here too.
+	 * The possible drift seems too small relative to typical throttle delay
+	 * times to worry about it.
+	 */
+	INSTR_TIME_SET_CURRENT(start);
+	thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
+	thread->throttle_lag = 0;
+	thread->throttle_lag_max = 0;
+
 	result = pg_malloc(sizeof(TResult));
 
 	INSTR_TIME_SET_ZERO(result->conn_time);
@@ -2831,25 +2977,38 @@ threadRun(void *arg)
 			Command   **commands = sql_files[st->use_file];
 			int			sock;
 
-			if (st->sleeping)
+			if (st->con == NULL)
 			{
-				int			this_usec;
-
-				if (min_usec == INT64_MAX)
+				continue;
+			}
+			else if (st->sleeping)
+			{
+				if (st->throttling && timer_exceeded)
 				{
-					instr_time	now;
-
-					INSTR_TIME_SET_CURRENT(now);
-					now_usec = INSTR_TIME_GET_MICROSEC(now);
+					/* interrupt client which has not started a transaction */
+					remains--;
+					st->sleeping = 0;
+					st->throttling = false;
+					PQfinish(st->con);
+					st->con = NULL;
+					continue;
 				}
+				else /* just a nap from the script */
+				{
+					int			this_usec;
 
-				this_usec = st->until - now_usec;
-				if (min_usec > this_usec)
-					min_usec = this_usec;
-			}
-			else if (st->con == NULL)
-			{
-				continue;
+					if (min_usec == INT64_MAX)
+					{
+						instr_time	now;
+
+						INSTR_TIME_SET_CURRENT(now);
+						now_usec = INSTR_TIME_GET_MICROSEC(now);
+					}
+
+					this_usec = st->until - now_usec;
+					if (min_usec > this_usec)
+						min_usec = this_usec;
+				}
 			}
 			else if (commands[st->state]->type == META_COMMAND)
 			{
@@ -2986,6 +3145,8 @@ done:
 	result->xacts = 0;
 	for (i = 0; i < nstate; i++)
 		result->xacts += state[i].cnt;
+	result->throttle_lag = thread->throttle_lag;
+	result->throttle_lag_max = thread->throttle_lag_max;
 	INSTR_TIME_SET_CURRENT(end);
 	INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
 	if (logfile)
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to