Here is submission v9 based on your v8 version.

 - the tps is global, with a mutex to share the global stochastic process
 - there is an adaptation for the "fork" emulation
 - I do not know wheter this works with Win32 pthread stuff.
 - reduced multiplier ln(1000000) -> ln(1000)
 - avg & max throttling lag are reported

There's a little more play outside of the target than ideal for this case. Maybe it's worth tightening the Poisson curve a bit around its center?

A stochastic process moves around the target value, but is not right on it.

--
Fabien.
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index 8c202bf..6e52dee 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -75,6 +75,7 @@ static int	pthread_join(pthread_t th, void **thread_return);
 #else
 /* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
 
+#define PTHREAD_FORK_EMULATION
 #include <sys/wait.h>
 
 #define pthread_t				pg_pthread_t
@@ -82,6 +83,11 @@ static int	pthread_join(pthread_t th, void **thread_return);
 #define pthread_create			pg_pthread_create
 #define pthread_join			pg_pthread_join
 
+#define pthread_mutex_t int
+#define PTHREAD_MUTEX_INITIALIZER 0
+#define pthread_mutex_lock(m)
+#define pthread_mutex_unlock(m)
+
 typedef struct fork_pthread *pthread_t;
 typedef int pthread_attr_t;
 
@@ -137,6 +143,12 @@ int			unlogged_tables = 0;
 double		sample_rate = 0.0;
 
 /*
+ * When clients are throttled to a given rate limit, this is the target delay
+ * to reach that rate in usec.  0 is the default and means no throttling.
+ */
+int64		throttle_delay = 0;
+
+/*
  * tablespace selection
  */
 char	   *tablespace = NULL;
@@ -205,6 +217,7 @@ typedef struct
 	int			nvariables;
 	instr_time	txn_begin;		/* used for measuring transaction latencies */
 	instr_time	stmt_begin;		/* used for measuring statement latencies */
+	bool		throttled;      /* whether current transaction was throttled */
 	int			use_file;		/* index in sql_files for this client */
 	bool		prepared[MAX_FILES];
 } CState;
@@ -222,6 +235,8 @@ typedef struct
 	instr_time *exec_elapsed;	/* time spent executing cmds (per Command) */
 	int		   *exec_count;		/* number of cmd executions (per Command) */
 	unsigned short random_state[3];		/* separate randomness for each thread */
+	int64       throttle_lag;   /* transaction lag behind throttling */
+	int64       throttle_lag_max;
 } TState;
 
 #define INVALID_THREAD		((pthread_t) 0)
@@ -230,9 +245,17 @@ typedef struct
 {
 	instr_time	conn_time;
 	int			xacts;
+	int64       throttle_lag;
+	int64       throttle_lag_max;
 } TResult;
 
 /*
+ * throttling management
+ */
+pthread_mutex_t throttle_trigger_mutex = PTHREAD_MUTEX_INITIALIZER;
+int64 throttle_trigger;	    /* previous/next throttling (us) */
+
+/*
  * queries read from files
  */
 #define SQL_COMMAND		1
@@ -355,6 +378,8 @@ usage(void)
 		   "  -n           do not run VACUUM before tests\n"
 		   "  -N           do not update tables \"pgbench_tellers\" and \"pgbench_branches\"\n"
 		   "  -r           report average latency per command\n"
+		   "  -R SPEC, --rate SPEC\n"
+		   "               target rate in transactions per second\n"
 		   "  -s NUM       report this scale factor in output\n"
 		   "  -S           perform SELECT-only transactions\n"
 	 "  -t NUM       number of transactions each client runs (default: 10)\n"
@@ -902,13 +927,53 @@ doCustom(TState *thread, CState *st, instr_time *conn_time, FILE *logfile, AggVa
 top:
 	commands = sql_files[st->use_file];
 
+	/* handle throttling once per transaction by inserting a sleep.
+	 * this is simpler than doing it at the end.
+	 */
+	if (throttle_delay && ! st->throttled)
+	{
+		/* compute delay to approximate a Poisson distribution
+		 * 1000000 => 13.8 .. 0 multiplier
+		 *  100000 => 11.5 .. 0
+		 *   10000 =>  9.2 .. 0
+		 *    1000 =>  6.9 .. 0
+		 * if transactions are too slow or a given wait shorter than
+		 * a transaction, the next transaction will start right away.
+		 */
+		int64 wait = (int64)
+			throttle_delay * -log(getrand(thread, 1, 1000)/1000.0);
+
+		pthread_mutex_lock(&throttle_trigger_mutex);
+		throttle_trigger += wait;
+		st->until = throttle_trigger;
+		pthread_mutex_unlock(&throttle_trigger_mutex);
+
+		st->sleeping = 1;
+		st->throttled = true;
+		if (debug)
+			fprintf(stderr, "client %d throttling "INT64_FORMAT" us\n",
+					st->id, wait);
+	}
+
 	if (st->sleeping)
 	{							/* are we sleeping? */
 		instr_time	now;
+		int64 now_us;
 
 		INSTR_TIME_SET_CURRENT(now);
-		if (st->until <= INSTR_TIME_GET_MICROSEC(now))
+		now_us = INSTR_TIME_GET_MICROSEC(now);
+		if (st->until <= now_us)
+		{
 			st->sleeping = 0;	/* Done sleeping, go ahead with next command */
+			if (throttle_delay && st->state==0)
+			{
+				/* measure lag of throttled transaction */
+				int64 lag = now_us - st->until;
+				thread->throttle_lag += lag;
+				if (lag > thread->throttle_lag_max)
+					thread->throttle_lag_max = lag;
+			}
+		}
 		else
 			return true;		/* Still sleeping, nothing to do here */
 	}
@@ -1095,6 +1160,7 @@ top:
 			st->state = 0;
 			st->use_file = (int) getrand(thread, 0, num_files - 1);
 			commands = sql_files[st->use_file];
+			st->throttled = false;
 		}
 	}
 
@@ -2015,9 +2081,11 @@ process_builtin(char *tb)
 
 /* print out results */
 static void
-printResults(int ttype, int normal_xacts, int nclients,
+printResults(int ttype, int normal_xacts,
+			 CState *clients, int nclients,
 			 TState *threads, int nthreads,
-			 instr_time total_time, instr_time conn_total_time)
+			 instr_time total_time, instr_time conn_total_time,
+			 int64 throttle_lag, int64 throttle_lag_max)
 {
 	double		time_include,
 				tps_include,
@@ -2055,6 +2123,18 @@ printResults(int ttype, int normal_xacts, int nclients,
 		printf("number of transactions actually processed: %d\n",
 			   normal_xacts);
 	}
+
+	if (throttle_delay)
+	{
+		/* Report average transaction lag under throttling, i.e. the delay
+		   between scheduled and actual start times for the transaction.
+		   The measured lag may be linked to the thread/client load,
+		   the database load, or the Poisson throttling process.
+		 */
+		printf("average transaction lag: %.3f ms (max %.3f ms)\n",
+			   0.001 * throttle_lag / normal_xacts, 0.001 * throttle_lag_max);
+	}
+
 	printf("tps = %f (including connections establishing)\n", tps_include);
 	printf("tps = %f (excluding connections establishing)\n", tps_exclude);
 
@@ -2115,6 +2195,7 @@ main(int argc, char **argv)
 		{"unlogged-tables", no_argument, &unlogged_tables, 1},
 		{"sampling-rate", required_argument, NULL, 4},
 		{"aggregate-interval", required_argument, NULL, 5},
+		{"rate", required_argument, NULL, 'R'},
 		{NULL, 0, NULL, 0}
 	};
 
@@ -2137,6 +2218,8 @@ main(int argc, char **argv)
 	instr_time	total_time;
 	instr_time	conn_total_time;
 	int			total_xacts;
+	int64       throttle_lag = 0;
+	int64       throttle_lag_max = 0;
 
 	int			i;
 
@@ -2181,7 +2264,7 @@ main(int argc, char **argv)
 	state = (CState *) pg_malloc(sizeof(CState));
 	memset(state, 0, sizeof(CState));
 
-	while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:R:", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -2336,6 +2419,19 @@ main(int argc, char **argv)
 					exit(1);
 				}
 				break;
+			case 'R':
+			{
+				/* get a double from the beginning of option value */
+				double throttle_value = atof(optarg);
+				if (throttle_value <= 0.0)
+				{
+					fprintf(stderr, "invalid rate limit: %s\n", optarg);
+					exit(1);
+				}
+				/* Invert rate limit into a time offset */
+				throttle_delay = (int64) (1000000.0 / throttle_value);
+			}
+				break;
 			case 0:
 				/* This covers long options which take no argument. */
 				break;
@@ -2373,6 +2469,12 @@ main(int argc, char **argv)
 		}
 	}
 
+#ifdef PTHREAD_FORK_EMULATION
+    /* each processus has an independent delay */
+	if (throttle_delay)
+		throttle_delay *= nthreads;
+#endif
+
 	if (argc > optind)
 		dbName = argv[optind];
 	else
@@ -2626,6 +2728,9 @@ main(int argc, char **argv)
 	/* get start up time */
 	INSTR_TIME_SET_CURRENT(start_time);
 
+	/* set initial throttling trigger */
+	throttle_trigger = INSTR_TIME_GET_MICROSEC(start_time);
+
 	/* set alarm if duration is specified. */
 	if (duration > 0)
 		setalarm(duration);
@@ -2636,6 +2741,8 @@ main(int argc, char **argv)
 		TState	   *thread = &threads[i];
 
 		INSTR_TIME_SET_CURRENT(thread->start_time);
+		thread->throttle_lag = 0;
+		thread->throttle_lag_max = 0;
 
 		/* the first thread (i = 0) is executed by main thread */
 		if (i > 0)
@@ -2671,6 +2778,9 @@ main(int argc, char **argv)
 			TResult    *r = (TResult *) ret;
 
 			total_xacts += r->xacts;
+			throttle_lag += r->throttle_lag;
+			if (r->throttle_lag_max > throttle_lag_max)
+				throttle_lag_max = r->throttle_lag_max;
 			INSTR_TIME_ADD(conn_total_time, r->conn_time);
 			free(ret);
 		}
@@ -2680,8 +2790,8 @@ main(int argc, char **argv)
 	/* get end time */
 	INSTR_TIME_SET_CURRENT(total_time);
 	INSTR_TIME_SUBTRACT(total_time, start_time);
-	printResults(ttype, total_xacts, nclients, threads, nthreads,
-				 total_time, conn_total_time);
+	printResults(ttype, total_xacts, state, nclients, threads, nthreads,
+				 total_time, conn_total_time, throttle_lag, throttle_lag_max);
 
 	return 0;
 }
@@ -2869,6 +2979,8 @@ done:
 	result->xacts = 0;
 	for (i = 0; i < nstate; i++)
 		result->xacts += state[i].cnt;
+	result->throttle_lag = thread->throttle_lag;
+	result->throttle_lag_max = thread->throttle_lag_max;
 	INSTR_TIME_SET_CURRENT(end);
 	INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
 	if (logfile)
diff --git a/doc/src/sgml/pgbench.sgml b/doc/src/sgml/pgbench.sgml
index e9900d3..c6af1ba 100644
--- a/doc/src/sgml/pgbench.sgml
+++ b/doc/src/sgml/pgbench.sgml
@@ -392,6 +392,27 @@ pgbench <optional> <replaceable>options</> </optional> <replaceable>dbname</>
      </varlistentry>
 
      <varlistentry>
+      <term><option>-R</option> <replaceable>rate</></term>
+      <term><option>--rate</option> <replaceable>rate</></term>
+      <listitem>
+       <para>
+	Execute transactions targeting the specified rate instead of
+	running as fast as possible (the default).  The rate is given in
+        transactions per second.  If the targeted rate is
+        above the maximum possible rate these transactions can execute at,
+        the rate limit won't have any impact on results.
+
+	The rate is targeted by starting transactions along a
+	Poisson-distributed event time line.  When a rate limit is
+        active, the average and maximum transaction lag time
+	(the delay between the scheduled and actual transaction start times)
+	are reported in ms. High values indicate that the database
+	could handle the scheduled load at some time.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>-s</option> <replaceable>scale_factor</></term>
       <listitem>
        <para>
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to