Hello Greg,

Correct patch (and the little one from me again) attached this time.

Please find an updated v15 with only comment changes:

* The new comment about "is_throttled" was inverted wrt the meaning of the variable, at least to my understanding.

* ISTM that the impact of the chosen 1000 should appear somewhere.

* The trans_need_throttle comment was slightly wrong: the first transaction is also throttled, when initially entering doCustom.


About 123456 12345 vs 123456.012345: My data parser is usually "gnuplot" or "my eyes", and in both cases the later option is better:-)


About the little patch: Parameter "ok" should be renamed to something meaningful (say "do_finish"?). Also, it seems that when timer is exceeded in doCustom it is called with true, but maybe you intended that it should be called with false?? Moreover, under normal circumstances (throttling is significantly below the maximum rate) PQfinish will be called directly by threadRun while interrupting sleeping threads. Also, it is important to remove the connection because it serves as a marker to know whether a client must run or not. So basically I do not understand how it works. Note that it does not mean that it does not work, it just means that I do not really understand:-)

--
Fabien.
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index 8dc81e5..4b379ab 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -137,6 +137,12 @@ int			unlogged_tables = 0;
 double		sample_rate = 0.0;
 
 /*
+ * When threads are throttled to a given rate limit, this is the target delay
+ * to reach that rate in usec.  0 is the default and means no throttling.
+ */
+int64		throttle_delay = 0;
+
+/*
  * tablespace selection
  */
 char	   *tablespace = NULL;
@@ -200,11 +206,13 @@ typedef struct
 	int			listen;			/* 0 indicates that an async query has been
 								 * sent */
 	int			sleeping;		/* 1 indicates that the client is napping */
+	bool        throttling;     /* whether nap is for throttling */
 	int64		until;			/* napping until (usec) */
 	Variable   *variables;		/* array of variable definitions */
 	int			nvariables;
 	instr_time	txn_begin;		/* used for measuring transaction latencies */
 	instr_time	stmt_begin;		/* used for measuring statement latencies */
+	bool		is_throttled;	/* whether transaction throttling is done */
 	int			use_file;		/* index in sql_files for this client */
 	bool		prepared[MAX_FILES];
 } CState;
@@ -222,6 +230,9 @@ typedef struct
 	instr_time *exec_elapsed;	/* time spent executing cmds (per Command) */
 	int		   *exec_count;		/* number of cmd executions (per Command) */
 	unsigned short random_state[3];		/* separate randomness for each thread */
+	int64       throttle_trigger; 	/* previous/next throttling (us) */
+	int64       throttle_lag; 		/* total transaction lag behind throttling */
+	int64       throttle_lag_max; 	/* max transaction lag */
 } TState;
 
 #define INVALID_THREAD		((pthread_t) 0)
@@ -230,6 +241,8 @@ typedef struct
 {
 	instr_time	conn_time;
 	int			xacts;
+	int64       throttle_lag;
+	int64       throttle_lag_max;
 } TResult;
 
 /*
@@ -353,6 +366,7 @@ usage(void)
 		   "  -n, --no-vacuum          do not run VACUUM before tests\n"
 		   "  -N, --skip-some-updates  skip updates of pgbench_tellers and pgbench_branches\n"
 		   "  -r, --report-latencies   report average latency per command\n"
+		   "  -R, --rate SPEC          target rate in transactions per second\n"
 		   "  -s, --scale=NUM          report this scale factor in output\n"
 		   "  -S, --select-only        perform SELECT-only transactions\n"
 		   "  -t, --transactions       number of transactions each client runs "
@@ -895,17 +909,62 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
 {
 	PGresult   *res;
 	Command   **commands;
+	bool        trans_needs_throttle = false;
 
 top:
 	commands = sql_files[st->use_file];
 
+	/*
+	 * Handle throttling once per transaction by sleeping.  It is simpler
+	 * to do this here rather than at the end, because so much complicated
+	 * logic happens below when statements finish.
+	 */
+	if (throttle_delay && ! st->is_throttled)
+	{
+		/*
+		 * Use inverse transform sampling to randomly generate a delay, such
+		 * that the series of delays will approximate a Poisson distribution
+		 * centered on the throttle_delay time.
+                 *
+                 * 1000 implies a 6.9 (-log(1/1000)) to 0.0 (log 1.0) delay multiplier.
+		 *
+		 * If transactions are too slow or a given wait is shorter than
+		 * a transaction, the next transaction will start right away.
+		 */
+		int64 wait = (int64)
+			throttle_delay * -log(getrand(thread, 1, 1000)/1000.0);
+
+		thread->throttle_trigger += wait;
+
+		st->until = thread->throttle_trigger;
+		st->sleeping = 1;
+		st->throttling = true;
+		st->is_throttled = true;
+		if (debug)
+			fprintf(stderr, "client %d throttling "INT64_FORMAT" us\n",
+					st->id, wait);
+	}
+
 	if (st->sleeping)
 	{							/* are we sleeping? */
 		instr_time	now;
+		int64 now_us;
 
 		INSTR_TIME_SET_CURRENT(now);
-		if (st->until <= INSTR_TIME_GET_MICROSEC(now))
+		now_us = INSTR_TIME_GET_MICROSEC(now);
+		if (st->until <= now_us)
+		{
 			st->sleeping = 0;	/* Done sleeping, go ahead with next command */
+			if (st->throttling)
+			{
+				/* Measure lag of throttled transaction relative to target */
+				int64 lag = now_us - st->until;
+				thread->throttle_lag += lag;
+				if (lag > thread->throttle_lag_max)
+					thread->throttle_lag_max = lag;
+				st->throttling = false;
+			}
+		}
 		else
 			return true;		/* Still sleeping, nothing to do here */
 	}
@@ -1092,6 +1151,15 @@ top:
 			st->state = 0;
 			st->use_file = (int) getrand(thread, 0, num_files - 1);
 			commands = sql_files[st->use_file];
+			st->is_throttled = false;
+			/*
+			 * No transaction is underway anymore, which means there is nothing
+			 * to listen to right now.  When throttling rate limits are active,
+			 * a sleep will happen next, as the next transaction starts.  And
+			 * then in any case the next SQL command will set listen back to 1.
+			 */
+			st->listen = 0;
+			trans_needs_throttle = (throttle_delay>0);
 		}
 	}
 
@@ -1110,6 +1178,16 @@ top:
 		INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
 	}
 
+	/*
+         * This ensures that a throttling delay is inserted before proceeding
+         * with sql commands, after the first transaction. The first transaction
+         * throttling is performed when first entering doCustom.
+	 */
+	if (trans_needs_throttle) {
+		trans_needs_throttle = false;
+		goto top;
+	}
+
 	/* Record transaction start time if logging is enabled */
 	if (logfile && st->state == 0)
 		INSTR_TIME_SET_CURRENT(st->txn_begin);
@@ -2014,7 +2092,8 @@ process_builtin(char *tb)
 static void
 printResults(int ttype, int normal_xacts, int nclients,
 			 TState *threads, int nthreads,
-			 instr_time total_time, instr_time conn_total_time)
+			 instr_time total_time, instr_time conn_total_time,
+			 int64 throttle_lag, int64 throttle_lag_max)
 {
 	double		time_include,
 				tps_include,
@@ -2052,6 +2131,19 @@ printResults(int ttype, int normal_xacts, int nclients,
 		printf("number of transactions actually processed: %d\n",
 			   normal_xacts);
 	}
+
+	if (throttle_delay)
+	{
+		/*
+		 * Report average transaction lag under rate limit throttling.  This
+		 * is the delay between scheduled and actual start times for the
+		 * transaction.  The measured lag may be caused by thread/client load,
+		 * the database load, or the Poisson throttling process.
+		 */
+		printf("average rate limit lag: %.3f ms (max %.3f ms)\n",
+			   0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+	}
+
 	printf("tps = %f (including connections establishing)\n", tps_include);
 	printf("tps = %f (excluding connections establishing)\n", tps_exclude);
 
@@ -2136,6 +2228,7 @@ main(int argc, char **argv)
 		{"unlogged-tables", no_argument, &unlogged_tables, 1},
 		{"sampling-rate", required_argument, NULL, 4},
 		{"aggregate-interval", required_argument, NULL, 5},
+		{"rate", required_argument, NULL, 'R'},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -2158,6 +2251,8 @@ main(int argc, char **argv)
 	instr_time	total_time;
 	instr_time	conn_total_time;
 	int			total_xacts;
+	int64       throttle_lag = 0;
+	int64       throttle_lag_max = 0;
 
 	int			i;
 
@@ -2202,7 +2297,7 @@ main(int argc, char **argv)
 	state = (CState *) pg_malloc(sizeof(CState));
 	memset(state, 0, sizeof(CState));
 
-	while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:R:", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -2357,6 +2452,19 @@ main(int argc, char **argv)
 					exit(1);
 				}
 				break;
+			case 'R':
+			{
+				/* get a double from the beginning of option value */
+				double throttle_value = atof(optarg);
+				if (throttle_value <= 0.0)
+				{
+					fprintf(stderr, "invalid rate limit: %s\n", optarg);
+					exit(1);
+				}
+				/* Invert rate limit into a time offset */
+				throttle_delay = (int64) (1000000.0 / throttle_value);
+			}
+				break;
 			case 0:
 				/* This covers long options which take no argument. */
 				break;
@@ -2394,6 +2502,9 @@ main(int argc, char **argv)
 		}
 	}
 
+    /* compute a per thread delay */
+	throttle_delay *= nthreads;
+
 	if (argc > optind)
 		dbName = argv[optind];
 	else
@@ -2706,6 +2817,9 @@ main(int argc, char **argv)
 			TResult    *r = (TResult *) ret;
 
 			total_xacts += r->xacts;
+			throttle_lag += r->throttle_lag;
+			if (r->throttle_lag_max > throttle_lag_max)
+				throttle_lag_max = r->throttle_lag_max;
 			INSTR_TIME_ADD(conn_total_time, r->conn_time);
 			free(ret);
 		}
@@ -2716,7 +2830,7 @@ main(int argc, char **argv)
 	INSTR_TIME_SET_CURRENT(total_time);
 	INSTR_TIME_SUBTRACT(total_time, start_time);
 	printResults(ttype, total_xacts, nclients, threads, nthreads,
-				 total_time, conn_total_time);
+				 total_time, conn_total_time, throttle_lag, throttle_lag_max);
 
 	return 0;
 }
@@ -2736,6 +2850,17 @@ threadRun(void *arg)
 
 	AggVals		aggs;
 
+	/*
+	 * Initialize throttling rate target for all of the thread's clients.  It
+	 * might be a little more accurate to reset thread->start_time here too.
+	 * The possible drift seems too small relative to typical throttle delay
+	 * times to worry about it.
+	 */
+	INSTR_TIME_SET_CURRENT(start);
+	thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
+	thread->throttle_lag = 0;
+	thread->throttle_lag_max = 0;
+
 	result = pg_malloc(sizeof(TResult));
 
 	INSTR_TIME_SET_ZERO(result->conn_time);
@@ -2811,25 +2936,38 @@ threadRun(void *arg)
 			Command   **commands = sql_files[st->use_file];
 			int			sock;
 
-			if (st->sleeping)
+			if (st->con == NULL)
 			{
-				int			this_usec;
-
-				if (min_usec == INT64_MAX)
+				continue;
+			}
+			else if (st->sleeping)
+			{
+				if (st->throttling && timer_exceeded)
 				{
-					instr_time	now;
-
-					INSTR_TIME_SET_CURRENT(now);
-					now_usec = INSTR_TIME_GET_MICROSEC(now);
+					/* interrupt client which has not started a transaction */
+					remains--;
+					st->sleeping = 0;
+					st->throttling = false;
+					PQfinish(st->con);
+					st->con = NULL;
+					continue;
 				}
+				else /* just a nap from the script */
+				{
+					int			this_usec;
 
-				this_usec = st->until - now_usec;
-				if (min_usec > this_usec)
-					min_usec = this_usec;
-			}
-			else if (st->con == NULL)
-			{
-				continue;
+					if (min_usec == INT64_MAX)
+					{
+						instr_time	now;
+
+						INSTR_TIME_SET_CURRENT(now);
+						now_usec = INSTR_TIME_GET_MICROSEC(now);
+					}
+
+					this_usec = st->until - now_usec;
+					if (min_usec > this_usec)
+						min_usec = this_usec;
+				}
 			}
 			else if (commands[st->state]->type == META_COMMAND)
 			{
@@ -2904,6 +3042,8 @@ done:
 	result->xacts = 0;
 	for (i = 0; i < nstate; i++)
 		result->xacts += state[i].cnt;
+	result->throttle_lag = thread->throttle_lag;
+	result->throttle_lag_max = thread->throttle_lag_max;
 	INSTR_TIME_SET_CURRENT(end);
 	INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
 	if (logfile)
diff --git a/doc/src/sgml/pgbench.sgml b/doc/src/sgml/pgbench.sgml
index a7f41e1..a6ab1bc 100644
--- a/doc/src/sgml/pgbench.sgml
+++ b/doc/src/sgml/pgbench.sgml
@@ -408,6 +408,27 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
      </varlistentry>
 
      <varlistentry>
+      <term><option>-R</option> <replaceable>rate</></term>
+      <term><option>--rate</option> <replaceable>rate</></term>
+      <listitem>
+       <para>
+        Execute transactions targeting the specified rate instead of
+        running as fast as possible (the default).  The rate is given in
+        transactions per second.  If the targeted rate is
+        above the maximum possible rate these transactions can execute at,
+        the rate limit won't have any impact on results.
+
+        The rate is targeted by starting transactions along a
+        Poisson-distributed event time line.  When a rate limit is
+        active, the average and maximum transaction lag time
+        (the delay between the scheduled and actual transaction start times)
+        are reported in ms. High values indicate that the database
+        could not handle the scheduled load at some time.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>-s</option> <replaceable>scale_factor</></term>
       <term><option>--scale=</option><replaceable>scale_factor</></term>
       <listitem>
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to