Hello Heikki,

Yeah, it really is quite a mess. I tried to review your patch, and I think it's correct, but I couldn't totally convince myself, because of the existing messiness of the logic. So I bit the bullet and started refactoring.

I came up with the attached. It refactors the logic in doCustom() into a state machine. I think this is much clearer, what do you think?

The patch did not apply to master because of you committed the sleep fix in between. I updated the patch so that the fix is included as well.

I think that this is really needed. The code is much clearer and simple to understand with the state machines & additional functions. This is a definite improvement to the code base.

I've done quite some testing with various options (-r, --rate, --latency-limit, -C...) and got pretty reasonnable results.

Although I cannot be absolutely sure that the refactoring does not introduce any new bug, I'm convinced that it will be much easier to find them:-)


Attached are some small changes to your version:

I have added the sleep_until fix.

I have fixed a bug introduced in the patch by changing && by || in the (min_sec > 0 && maxsock != -1) condition which was inducing errors with multi-threads & clients...

I have factored out several error messages in "commandFailed", in place of the "metaCommandFailed", and added the script number as well in the error messages. All messages are now specific to the failed command.

I have added two states to the machine:

 - CSTATE_CHOOSE_SCRIPT which simplifies threadRun, there is now one call
   to chooseScript instead of two before.

 - CSTATE_END_COMMAND which manages is_latencies and proceeding to the
   next command, thus merging the three instances of updating the stats
   that were in the first version.

The later state means that processing query results is included in the per statement latency, which is an improvement because before I was getting some transaction latency significantly larger that the apparent sum of the per-statement latencies, which did not make much sense...

I have added & updated a few comments. There are some places where the break could be a pass through instead, not sure how desirable it is, I'm fine with break.


Well, the comment right there says "note this is not included in the statement latency numbers", so apparently it's intentional. Whether it's a good idea or not, I don't know :-). It does seem a bit surprising.

Indeed, it also results in apparently inconsistent numbers, and it creates a mess for recording the statement latency because it meant that in some case the latency was collected before the actual end of the command, see the discussion about CSTATE_END_COMMAND above.

But what seems more bogus to me is that we do that after recording the *transaction* stats, if this was the last command. So the PQgetResult() of the last command in the transaction is not included in the transaction stats, even though the PQgetResult() calls for any previous commands are. (Perhaps that's what you meant too?)

I changed that in my patch, it would've been inconvenient to keep that old behavior, and it doesn't make any sense to me anyway.

Fine with me.

--
Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 8b24ad5..502e644 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -235,25 +235,97 @@ typedef struct StatsData
 } StatsData;
 
 /*
- * Connection state
+ * Connection state machine states.
+ */
+typedef enum
+{
+	/*
+	 * The client must first choose a script to execute.  Once chosen, it can
+	 * either be throttled (state CSTATE_START_THROTTLE under --rate) or start
+	 * right away (state CSTATE_START_TX).
+	 */
+	CSTATE_CHOOSE_SCRIPT,
+
+	/*
+	 * In CSTATE_START_THROTTLE state, we calculate when to begin the next
+	 * transaction, and advance to CSTATE_THROTTLE.  CSTATE_THROTTLE state
+	 * sleeps until that moment.  (If throttling is not enabled, doCustom()
+	 * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
+	 */
+	CSTATE_START_THROTTLE,
+	CSTATE_THROTTLE,
+
+	/*
+	 * CSTATE_START_TX performs start-of-transaction processing.  Establishes
+	 * a new connection for the transaction, in --connect mode, and records
+	 * the transaction start time.
+	 */
+	CSTATE_START_TX,
+
+	/*
+	 * We loop through these states, to process each command in the
+	 * script:
+	 *
+	 * CSTATE_START_COMMAND starts the execution of a command.  On a SQL
+	 * command, the command is sent to the server, and we move to
+	 * CSTATE_WAIT_RESULT state.  On a \sleep meta-command, the timer is
+	 * set, and we enter the CSTATE_SLEEP state to wait for it to expire.
+	 * Other meta-commands are executed immediately, and we proceed to next
+	 * command.
+	 *
+	 * CSTATE_WAIT_RESULT waits until we get a result set back from the server
+	 * for the current command, and proceeds to CSTATE_END_COMMAND.
+	 *
+	 * CSTATE_SLEEP waits until the end of \sleep and proceeds to CSTATE_END_COMMAND.
+	 *
+	 * CSTATE_END_COMMAND does some house keeping stats, then jumps back to
+	 * start the next command with CSTATE_START_COMMAND.
+	 */
+	CSTATE_START_COMMAND,
+	CSTATE_WAIT_RESULT,
+	CSTATE_SLEEP,
+	CSTATE_END_COMMAND,
+
+	/*
+	 * CSTATE_END_TX performs end-of-transaction processing.  Calculates
+	 * latency, and logs the transaction.  In --connect mode, closes the
+	 * current connection.  Chooses the next script to execute and starts
+	 * over in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we
+	 * have no more work to do.
+	 */
+	CSTATE_END_TX,
+
+	/*
+	 * Final states.  CSTATE_ABORTED means that the script execution was
+	 * aborted because a command failed, CSTATE_FINISHED means success.
+	 */
+	CSTATE_ABORTED,
+	CSTATE_FINISHED,
+} ConnectionStateEnum;
+
+/*
+ * Connection state.
  */
 typedef struct
 {
 	PGconn	   *con;			/* connection handle to DB */
 	int			id;				/* client No. */
-	int			state;			/* state No. */
-	bool		listen;			/* whether an async query has been sent */
-	bool		sleeping;		/* whether the client is napping */
-	bool		throttling;		/* whether nap is for throttling */
-	bool		is_throttled;	/* whether transaction throttling is done */
+	ConnectionStateEnum state;	/* state machine's current state. */
+
+	int			use_file;		/* index in sql_script for this client */
+	int			command;		/* command number in script */
+
+	/* client variables */
 	Variable   *variables;		/* array of variable definitions */
 	int			nvariables;		/* number of variables */
 	bool		vars_sorted;	/* are variables sorted by name? */
+
+	/* various times about current transaction */
 	int64		txn_scheduled;	/* scheduled start time of transaction (usec) */
 	int64		sleep_until;	/* scheduled start time of next cmd (usec) */
 	instr_time	txn_begin;		/* used for measuring schedule lag times */
 	instr_time	stmt_begin;		/* used for measuring statement latencies */
-	int			use_file;		/* index in sql_scripts for this client */
+
 	bool		prepared[MAX_SCRIPTS];	/* whether client prepared the script */
 
 	/* per client collected stats */
@@ -1382,7 +1454,7 @@ evalFunc(TState *thread, CState *st,
 				Assert(nargs == 1);
 
 				fprintf(stderr, "debug(script=%d,command=%d): ",
-						st->use_file, st->state + 1);
+						st->use_file, st->command + 1);
 
 				if (varg->type == PGBT_INT)
 					fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
@@ -1733,15 +1805,12 @@ preparedStatementName(char *buffer, int file, int state)
 	sprintf(buffer, "P%d_%d", file, state);
 }
 
-static bool
-clientDone(CState *st)
+static void
+commandFailed(CState *st, char *message)
 {
-	if (st->con != NULL)
-	{
-		PQfinish(st->con);
-		st->con = NULL;
-	}
-	return false;				/* always false */
+	fprintf(stderr,
+			"client %d aborted in command %d of script %d; %s\n",
+			st->id, st->command, st->use_file, message);
 }
 
 /* return a script number with a weighted choice. */
@@ -1763,425 +1832,593 @@ chooseScript(TState *thread)
 	return i - 1;
 }
 
-/* return false iff client should be disconnected */
+/* Send a SQL command, using the chosen querymode */
 static bool
+sendCommand(CState *st, Command *command)
+{
+	int			r;
+
+	if (querymode == QUERY_SIMPLE)
+	{
+		char	   *sql;
+
+		sql = pg_strdup(command->argv[0]);
+		sql = assignVariables(st, sql);
+
+		if (debug)
+			fprintf(stderr, "client %d sending %s\n", st->id, sql);
+		r = PQsendQuery(st->con, sql);
+		free(sql);
+	}
+	else if (querymode == QUERY_EXTENDED)
+	{
+		const char *sql = command->argv[0];
+		const char *params[MAX_ARGS];
+
+		getQueryParams(st, command, params);
+
+		if (debug)
+			fprintf(stderr, "client %d sending %s\n", st->id, sql);
+		r = PQsendQueryParams(st->con, sql, command->argc - 1,
+							  NULL, params, NULL, NULL, 0);
+	}
+	else if (querymode == QUERY_PREPARED)
+	{
+		char		name[MAX_PREPARE_NAME];
+		const char *params[MAX_ARGS];
+
+		if (!st->prepared[st->use_file])
+		{
+			int			j;
+			Command **commands = sql_script[st->use_file].commands;
+
+			for (j = 0; commands[j] != NULL; j++)
+			{
+				PGresult   *res;
+				char		name[MAX_PREPARE_NAME];
+
+				if (commands[j]->type != SQL_COMMAND)
+					continue;
+				preparedStatementName(name, st->use_file, j);
+				res = PQprepare(st->con, name,
+								commands[j]->argv[0], commands[j]->argc - 1, NULL);
+				if (PQresultStatus(res) != PGRES_COMMAND_OK)
+					fprintf(stderr, "%s", PQerrorMessage(st->con));
+				PQclear(res);
+			}
+			st->prepared[st->use_file] = true;
+		}
+
+		getQueryParams(st, command, params);
+		preparedStatementName(name, st->use_file, st->command);
+
+		if (debug)
+			fprintf(stderr, "client %d sending %s\n", st->id, name);
+		r = PQsendQueryPrepared(st->con, name, command->argc - 1,
+								params, NULL, NULL, 0);
+	}
+	else	/* unknown sql mode */
+		r = 0;
+
+	if (r == 0)
+	{
+		if (debug)
+			fprintf(stderr, "client %d could not send %s\n",
+					st->id, command->argv[0]);
+		st->ecnt++;
+		return false;
+	}
+	else
+		return true;
+}
+
+/*
+ * "execute" a sleep command. Parses the argument, and returns the
+ * requested amount of delay, in microseconds. Returns true on
+ * success, false on error.
+ */
+static bool
+evaluateSleep(CState *st, int argc, char **argv, int *usecs)
+{
+	char	   *var;
+	int			usec;
+
+	if (*argv[1] == ':')
+	{
+		if ((var = getVariable(st, argv[1] + 1)) == NULL)
+		{
+			fprintf(stderr, "%s: undefined variable \"%s\"\n",
+					argv[0], argv[1]);
+			return false;
+		}
+		usec = atoi(var);
+	}
+	else
+		usec = atoi(argv[1]);
+
+	if (argc > 2)
+	{
+		if (pg_strcasecmp(argv[2], "ms") == 0)
+			usec *= 1000;
+		else if (pg_strcasecmp(argv[2], "s") == 0)
+			usec *= 1000000;
+	}
+	else
+		usec *= 1000000;
+
+	*usecs = usec;
+	return true;
+}
+
+/*
+ * Advance the state machine of a connection, if possible.
+ */
+static void
 doCustom(TState *thread, CState *st, StatsData *agg)
 {
 	PGresult   *res;
-	Command   **commands;
-	bool		trans_needs_throttle = false;
+	Command	   *command;
 	instr_time	now;
+	bool		end_tx_processed = false;
+	int64		wait;
 
 	/*
 	 * gettimeofday() isn't free, so we get the current timestamp lazily the
 	 * first time it's needed, and reuse the same value throughout this
-	 * function after that. This also ensures that e.g. the calculated latency
+	 * function after that.  This also ensures that e.g. the calculated latency
 	 * reported in the log file and in the totals are the same. Zero means
-	 * "not set yet". Reset "now" when we step to the next command with "goto
-	 * top", though.
+	 * "not set yet".  Reset "now" when we execute shell commands or expressions,
+	 * which might take a non-neglicible amount of time, though.
 	 */
-top:
 	INSTR_TIME_SET_ZERO(now);
 
-	commands = sql_script[st->use_file].commands;
-
 	/*
-	 * Handle throttling once per transaction by sleeping.  It is simpler to
-	 * do this here rather than at the end, because so much complicated logic
-	 * happens below when statements finish.
+	 * Loop in the state machine, until we have to wait for a result from the
+	 * server (or have to sleep, for throttling or for \sleep).
+	 *
+	 * Note: In the switch-statement below, 'break' will loop back here,
+	 * meaning "continue in the state machine".  Return is used to return to
+	 * the caller.  (XXX: Would it be better to replace the 'break's with
+	 * 'continue's?)
 	 */
-	if (throttle_delay && !st->is_throttled)
+	for (;;)
 	{
-		/*
-		 * Generate a delay such that the series of delays will approximate a
-		 * Poisson distribution centered on the throttle_delay time.
-		 *
-		 * If transactions are too slow or a given wait is shorter than a
-		 * transaction, the next transaction will start right away.
-		 */
-		int64		wait = getPoissonRand(thread, throttle_delay);
-
-		thread->throttle_trigger += wait;
-		st->txn_scheduled = thread->throttle_trigger;
-
-		/* stop client if next transaction is beyond pgbench end of execution */
-		if (duration > 0 && st->txn_scheduled > end_time)
-			return clientDone(st);
-
-		/*
-		 * If this --latency-limit is used, and this slot is already late so
-		 * that the transaction will miss the latency limit even if it
-		 * completed immediately, we skip this time slot and iterate till the
-		 * next slot that isn't late yet.
-		 */
-		if (latency_limit)
+		switch(st->state)
 		{
-			int64		now_us;
-
-			if (INSTR_TIME_IS_ZERO(now))
-				INSTR_TIME_SET_CURRENT(now);
-			now_us = INSTR_TIME_GET_MICROSEC(now);
-			while (thread->throttle_trigger < now_us - latency_limit)
-			{
-				processXactStats(thread, st, &now, true, agg);
-				/* next rendez-vous */
+			/*
+			 * Select transaction to run.
+			 */
+			case CSTATE_CHOOSE_SCRIPT:
+
+				st->use_file = chooseScript(thread);
+
+				if (debug)
+					fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
+							sql_script[st->use_file].desc);
+
+				st->state = (throttle_delay > 0) ? CSTATE_START_THROTTLE : CSTATE_START_TX;
+
+				break;
+
+			/*
+			 * Handle throttling once per transaction by sleeping.
+			 */
+			case CSTATE_START_THROTTLE:
+				/*
+				 * Generate a delay such that the series of delays will
+				 * approximate a Poisson distribution centered on the
+				 * throttle_delay time.
+				 *
+				 * If transactions are too slow or a given wait is shorter
+				 * than a transaction, the next transaction will start
+				 * right away.
+				 */
+				/* Assert(throttle_delay > 0); */
 				wait = getPoissonRand(thread, throttle_delay);
+
 				thread->throttle_trigger += wait;
 				st->txn_scheduled = thread->throttle_trigger;
-			}
-		}
-
-		st->sleep_until = st->txn_scheduled;
-		st->sleeping = true;
-		st->throttling = true;
-		st->is_throttled = true;
-		if (debug)
-			fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
-					st->id, wait);
-	}
 
-	if (st->sleeping)
-	{							/* are we sleeping? */
-		if (INSTR_TIME_IS_ZERO(now))
-			INSTR_TIME_SET_CURRENT(now);
-		if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
-			return true;		/* Still sleeping, nothing to do here */
-		/* Else done sleeping, go ahead with next command */
-		st->sleeping = false;
-		st->throttling = false;
-	}
+				/*
+				 * stop client if next transaction is beyond pgbench end
+				 * of execution
+				 */
+				if (duration > 0 && st->txn_scheduled > end_time)
+				{
+					st->state = CSTATE_FINISHED;
+					break;
+				}
 
-	if (st->listen)
-	{							/* are we receiver? */
-		if (commands[st->state]->type == SQL_COMMAND)
-		{
-			if (debug)
-				fprintf(stderr, "client %d receiving\n", st->id);
-			if (!PQconsumeInput(st->con))
-			{					/* there's something wrong */
-				fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
-				return clientDone(st);
-			}
-			if (PQisBusy(st->con))
-				return true;	/* don't have the whole result yet */
-		}
+				/*
+				 * If this --latency-limit is used, and this slot is
+				 * already late so that the transaction will miss the
+				 * latency limit even if it completed immediately, we skip
+				 * this time slot and iterate till the next slot that
+				 * isn't late yet.
+				 */
+				if (latency_limit)
+				{
+					int64		now_us;
+
+					if (INSTR_TIME_IS_ZERO(now))
+						INSTR_TIME_SET_CURRENT(now);
+					now_us = INSTR_TIME_GET_MICROSEC(now);
+					while (thread->throttle_trigger < now_us - latency_limit)
+					{
+						processXactStats(thread, st, &now, true, agg);
+						/* next rendez-vous */
+						wait = getPoissonRand(thread, throttle_delay);
+						thread->throttle_trigger += wait;
+						st->txn_scheduled = thread->throttle_trigger;
+					}
+				}
 
-		/*
-		 * command finished: accumulate per-command execution times in
-		 * thread-local data structure, if per-command latencies are requested
-		 */
-		if (is_latencies)
-		{
-			if (INSTR_TIME_IS_ZERO(now))
-				INSTR_TIME_SET_CURRENT(now);
-
-			/* XXX could use a mutex here, but we choose not to */
-			addToSimpleStats(&commands[st->state]->stats,
-							 INSTR_TIME_GET_DOUBLE(now) -
-							 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
-		}
+				st->state = CSTATE_THROTTLE;
+				if (debug)
+					fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
+							st->id, wait);
 
-		/* transaction finished: calculate latency and log the transaction */
-		if (commands[st->state + 1] == NULL)
-		{
-			if (progress || throttle_delay || latency_limit ||
-				per_script_stats || use_log)
-				processXactStats(thread, st, &now, false, agg);
-			else
-				thread->stats.cnt++;
-		}
+				/* could pass through? could return? */
+				break;
 
-		if (commands[st->state]->type == SQL_COMMAND)
-		{
 			/*
-			 * Read and discard the query result; note this is not included in
-			 * the statement latency numbers.
+			 * Wait until it's time to start next transaction.
 			 */
-			res = PQgetResult(st->con);
-			switch (PQresultStatus(res))
-			{
-				case PGRES_COMMAND_OK:
-				case PGRES_TUPLES_OK:
-				case PGRES_EMPTY_QUERY:
-					break;		/* OK */
-				default:
-					fprintf(stderr, "client %d aborted in state %d: %s",
-							st->id, st->state, PQerrorMessage(st->con));
-					PQclear(res);
-					return clientDone(st);
-			}
-			PQclear(res);
-			discard_response(st);
-		}
+			case CSTATE_THROTTLE:
+				if (INSTR_TIME_IS_ZERO(now))
+					INSTR_TIME_SET_CURRENT(now);
+				if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
+					return;			/* Still sleeping, nothing to do here */
+
+				/* Else done sleeping, start the transaction */
+				st->state = CSTATE_START_TX;
+
+				/* could pass through */
+				break;
+
+			/* Start new transaction */
+			case CSTATE_START_TX:
+				/*
+				 * Establish connection on first call, or if is_connect is
+				 * true.
+				 */
+				if (st->con == NULL)
+				{
+					instr_time	start;
+
+					if (INSTR_TIME_IS_ZERO(now))
+						INSTR_TIME_SET_CURRENT(now);
+					start = now;
+					if ((st->con = doConnect()) == NULL)
+					{
+						fprintf(stderr, "client %d aborted while establishing connection\n",
+							st->id);
+						st->state = CSTATE_ABORTED;
+						break;
+					}
+					INSTR_TIME_SET_CURRENT(now);
+					INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
+
+					/* Reset session-local state */
+					memset(st->prepared, 0, sizeof(st->prepared));
+				}
 
-		if (commands[st->state + 1] == NULL)
-		{
-			if (is_connect)
-			{
-				PQfinish(st->con);
-				st->con = NULL;
-			}
-
-			++st->cnt;
-			if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
-				return clientDone(st);	/* exit success */
-		}
+				/*
+				 * Record transaction start time under logging, progress or
+				 * throttling.
+				 */
+				if (use_log || progress || throttle_delay || latency_limit ||
+					per_script_stats)
+				{
+					if (INSTR_TIME_IS_ZERO(now))
+						INSTR_TIME_SET_CURRENT(now);
+					st->txn_begin = now;
+
+					/*
+					 * When not throttling, this is also the transaction's
+					 * scheduled start time.
+					 */
+					if (!throttle_delay)
+						st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
+				}
+
+				/* Begin with the first command */
+				st->command = 0;
+				st->state = CSTATE_START_COMMAND;
 
-		/* increment state counter */
-		st->state++;
-		if (commands[st->state] == NULL)
-		{
-			st->state = 0;
-			st->use_file = chooseScript(thread);
-			commands = sql_script[st->use_file].commands;
-			if (debug)
-				fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
-						sql_script[st->use_file].desc);
-			st->is_throttled = false;
+				/* could pass through */
+				break;
 
 			/*
-			 * No transaction is underway anymore, which means there is
-			 * nothing to listen to right now.  When throttling rate limits
-			 * are active, a sleep will happen next, as the next transaction
-			 * starts.  And then in any case the next SQL command will set
-			 * listen back to true.
+			 * Send a command to server (or execute a meta-command)
 			 */
-			st->listen = false;
-			trans_needs_throttle = (throttle_delay > 0);
-		}
-	}
-
-	if (st->con == NULL)
-	{
-		instr_time	start,
-					end;
-
-		INSTR_TIME_SET_CURRENT(start);
-		if ((st->con = doConnect()) == NULL)
-		{
-			fprintf(stderr, "client %d aborted while establishing connection\n",
-					st->id);
-			return clientDone(st);
-		}
-		INSTR_TIME_SET_CURRENT(end);
-		INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
-
-		/* Reset session-local state */
-		st->listen = false;
-		st->sleeping = false;
-		st->throttling = false;
-		st->is_throttled = false;
-		memset(st->prepared, 0, sizeof(st->prepared));
-	}
-
-	/*
-	 * This ensures that a throttling delay is inserted before proceeding with
-	 * sql commands, after the first transaction. The first transaction
-	 * throttling is performed when first entering doCustom.
-	 */
-	if (trans_needs_throttle)
-	{
-		trans_needs_throttle = false;
-		goto top;
-	}
-
-	/* Record transaction start time under logging, progress or throttling */
-	if ((use_log || progress || throttle_delay || latency_limit ||
-		 per_script_stats) && st->state == 0)
-	{
-		INSTR_TIME_SET_CURRENT(st->txn_begin);
-
-		/*
-		 * When not throttling, this is also the transaction's scheduled start
-		 * time.
-		 */
-		if (!throttle_delay)
-			st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
-	}
-
-	/* Record statement start time if per-command latencies are requested */
-	if (is_latencies)
-		INSTR_TIME_SET_CURRENT(st->stmt_begin);
-
-	if (commands[st->state]->type == SQL_COMMAND)
-	{
-		const Command *command = commands[st->state];
-		int			r;
-
-		if (querymode == QUERY_SIMPLE)
-		{
-			char	   *sql;
+			case CSTATE_START_COMMAND:
+				command = sql_script[st->use_file].commands[st->command];
+
+				/*
+				 * If we reached the end of the script, move to end-of-xact
+				 * processing.
+				 */
+				if (command == NULL)
+				{
+					st->state = CSTATE_END_TX;
+					break;
+				}
 
-			sql = pg_strdup(command->argv[0]);
-			sql = assignVariables(st, sql);
+				/*
+				 * Record statement start time if per-command latencies
+				 * are requested
+				 */
+				if (is_latencies)
+				{
+					if (INSTR_TIME_IS_ZERO(now))
+						INSTR_TIME_SET_CURRENT(now);
+					st->stmt_begin = now;
+				}
 
-			if (debug)
-				fprintf(stderr, "client %d sending %s\n", st->id, sql);
-			r = PQsendQuery(st->con, sql);
-			free(sql);
-		}
-		else if (querymode == QUERY_EXTENDED)
-		{
-			const char *sql = command->argv[0];
-			const char *params[MAX_ARGS];
+				if (command->type == SQL_COMMAND)
+				{
+					if (!sendCommand(st, command))
+					{
+						/*
+						 * Failed. Stay in CSTATE_START_COMMAND state, to retry.
+						 * ??? What the point or retrying? Should rather abort?
+						 */
+						return;
+					}
+					else
+						st->state = CSTATE_WAIT_RESULT;
+				}
+				else if (command->type == META_COMMAND)
+				{
+					int			argc = command->argc,
+								i;
+					char	  **argv = command->argv;
+
+					if (debug)
+					{
+						fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
+						for (i = 1; i < argc; i++)
+							fprintf(stderr, " %s", argv[i]);
+						fprintf(stderr, "\n");
+					}
+
+					if (pg_strcasecmp(argv[0], "sleep") == 0)
+					{
+						/*
+						 * A \sleep doesn't execute anything, we just get the
+						 * delay from the argument, and enter the CSTATE_SLEEP
+						 * state.  (The per-command latency will be recorded
+						 * in CSTATE_SLEEP state, not here, after the delay has
+						 * elapsed.)
+						 */
+						int			usec;
+
+						if (!evaluateSleep(st, argc, argv, &usec))
+						{
+							commandFailed(st, "execution of meta-command 'sleep' failed");
+							st->state = CSTATE_ABORTED;
+							break;
+						}
+
+						if (INSTR_TIME_IS_ZERO(now))
+							INSTR_TIME_SET_CURRENT(now);
+						st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
+						st->state = CSTATE_SLEEP;
+						break;
+					}
+					else
+					{
+						if (pg_strcasecmp(argv[0], "set") == 0)
+						{
+							PgBenchExpr *expr = command->expr;
+							PgBenchValue result;
+
+							if (!evaluateExpr(thread, st, expr, &result))
+							{
+								commandFailed(st, "evaluation of meta-command 'set' failed");
+								st->state = CSTATE_ABORTED;
+								break;
+							}
+
+							if (!putVariableNumber(st, argv[0], argv[1], &result))
+							{
+								commandFailed(st, "assignment of meta-command 'set' failed");
+								st->state = CSTATE_ABORTED;
+								break;
+							}
+						}
+						else if (pg_strcasecmp(argv[0], "setshell") == 0)
+						{
+							bool		ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
+
+							if (timer_exceeded) /* timeout */
+							{
+								st->state = CSTATE_FINISHED;
+								break;
+							}
+							else if (!ret)		/* on error */
+							{
+								commandFailed(st, "execution of meta-command 'setshell' failed");
+								st->state = CSTATE_ABORTED;
+								break;
+							}
+							else
+							{
+								/* succeeded */
+							}
+						}
+						else if (pg_strcasecmp(argv[0], "shell") == 0)
+						{
+							bool		ret = runShellCommand(st, NULL, argv + 1, argc - 1);
+
+							if (timer_exceeded) /* timeout */
+							{
+								st->state = CSTATE_FINISHED;
+								break;
+							}
+							else if (!ret)		/* on error */
+							{
+								commandFailed(st, "execution of meta-command 'shell' failed");
+								st->state = CSTATE_ABORTED;
+								break;
+							}
+							else
+							{
+								/* succeeded */
+							}
+						}
+						/*
+						 * executing the expression or shell command might
+						 * take a non-neglicible amount of time, so reset
+						 * 'now'
+						 */
+						INSTR_TIME_SET_ZERO(now);
+
+						st->state = CSTATE_END_COMMAND;
+					}
+				}
+				break;
 
-			getQueryParams(st, command, params);
+			/*
+			 * Wait for the current SQL command to complete
+			 */
+			case CSTATE_WAIT_RESULT:
+				command = sql_script[st->use_file].commands[st->command];
+				if (debug)
+					fprintf(stderr, "client %d receiving\n", st->id);
+				if (!PQconsumeInput(st->con))
+				{					/* there's something wrong */
+					commandFailed(st, "perhaps the backend died while processing");
+					st->state = CSTATE_ABORTED;
+					break;
+				}
+				if (PQisBusy(st->con))
+					return;			/* don't have the whole result yet */
+
+				/*
+				 * Read and discard the query result;
+				 */
+				res = PQgetResult(st->con);
+				switch (PQresultStatus(res))
+				{
+					case PGRES_COMMAND_OK:
+					case PGRES_TUPLES_OK:
+					case PGRES_EMPTY_QUERY:
+						/* OK */
+						PQclear(res);
+						discard_response(st);
+						st->state = CSTATE_END_COMMAND;
+						break;
+					default:
+						commandFailed(st, PQerrorMessage(st->con));
+						PQclear(res);
+						st->state = CSTATE_ABORTED;
+						break;
+				}
 
-			if (debug)
-				fprintf(stderr, "client %d sending %s\n", st->id, sql);
-			r = PQsendQueryParams(st->con, sql, command->argc - 1,
-								  NULL, params, NULL, NULL, 0);
-		}
-		else if (querymode == QUERY_PREPARED)
-		{
-			char		name[MAX_PREPARE_NAME];
-			const char *params[MAX_ARGS];
+				break;
 
-			if (!st->prepared[st->use_file])
-			{
-				int			j;
+			/*
+			 * Wait until sleep is done.
+			 * This state is entered after a \sleep metacommand.
+			 * The behavior is similar to CSTATE_THROTTLE, but proceeds
+			 * to CSTATE_START_COMMAND instead of CSTATE_START_TX.
+			 */
+			case CSTATE_SLEEP:
+				if (INSTR_TIME_IS_ZERO(now))
+					INSTR_TIME_SET_CURRENT(now);
+				if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
+					return;			/* Still sleeping, nothing to do here */
+				/* Else done sleeping. */
+				st->state = CSTATE_END_COMMAND;
+				break;
 
-				for (j = 0; commands[j] != NULL; j++)
+			/*
+			 * End of command: do some house keeping and proceed to next.
+			 */
+			case CSTATE_END_COMMAND:
+				/*
+				 * command executed: accumulate per-command execution
+				 * times in thread-local data structure, if per-command
+				 * latencies are requested.
+				 */
+				if (is_latencies)
 				{
-					PGresult   *res;
-					char		name[MAX_PREPARE_NAME];
-
-					if (commands[j]->type != SQL_COMMAND)
-						continue;
-					preparedStatementName(name, st->use_file, j);
-					res = PQprepare(st->con, name,
-						  commands[j]->argv[0], commands[j]->argc - 1, NULL);
-					if (PQresultStatus(res) != PGRES_COMMAND_OK)
-						fprintf(stderr, "%s", PQerrorMessage(st->con));
-					PQclear(res);
+					if (INSTR_TIME_IS_ZERO(now))
+						INSTR_TIME_SET_CURRENT(now);
+
+					/* XXX could use a mutex here, but we choose not to */
+					command = sql_script[st->use_file].commands[st->command];
+					addToSimpleStats(&command->stats,
+									 INSTR_TIME_GET_DOUBLE(now) -
+									 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
 				}
-				st->prepared[st->use_file] = true;
-			}
-
-			getQueryParams(st, command, params);
-			preparedStatementName(name, st->use_file, st->state);
 
-			if (debug)
-				fprintf(stderr, "client %d sending %s\n", st->id, name);
-			r = PQsendQueryPrepared(st->con, name, command->argc - 1,
-									params, NULL, NULL, 0);
-		}
-		else	/* unknown sql mode */
-			r = 0;
-
-		if (r == 0)
-		{
-			if (debug)
-				fprintf(stderr, "client %d could not send %s\n",
-						st->id, command->argv[0]);
-			st->ecnt++;
-		}
-		else
-			st->listen = true;	/* flags that should be listened */
-	}
-	else if (commands[st->state]->type == META_COMMAND)
-	{
-		int			argc = commands[st->state]->argc,
-					i;
-		char	  **argv = commands[st->state]->argv;
-
-		if (debug)
-		{
-			fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
-			for (i = 1; i < argc; i++)
-				fprintf(stderr, " %s", argv[i]);
-			fprintf(stderr, "\n");
-		}
+				/* Go ahead with next command */
+				st->command++;
+				st->state = CSTATE_START_COMMAND;
+				break;
 
-		if (pg_strcasecmp(argv[0], "set") == 0)
-		{
-			PgBenchExpr *expr = commands[st->state]->expr;
-			PgBenchValue result;
+			/*
+			 * End of transaction.
+			 */
+			case CSTATE_END_TX:
+				/* transaction finished: calculate latency and log the transaction */
+				if (progress || throttle_delay || latency_limit ||
+					per_script_stats || use_log)
+					processXactStats(thread, st, &now, false, agg);
+				else
+					thread->stats.cnt++;
+
+				if (is_connect)
+				{
+					PQfinish(st->con);
+					st->con = NULL;
+					INSTR_TIME_SET_ZERO(now);
+				}
 
-			if (!evaluateExpr(thread, st, expr, &result))
-			{
-				st->ecnt++;
-				return true;
-			}
+				++st->cnt;
+				if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
+				{
+					/* exit success */
+					st->state = CSTATE_FINISHED;
+					break;
+				}
 
-			if (!putVariableNumber(st, argv[0], argv[1], &result))
-			{
-				st->ecnt++;
-				return true;
-			}
+				/*
+				 * No transaction is underway anymore.
+				 */
+				st->state = CSTATE_CHOOSE_SCRIPT;
+
+				/*
+				 * If we paced through all commands in the script in this loop,
+				 * without returning to the caller even once, do it now.  This
+				 * gives the thread a chance to process other connections, and
+				 * to do progress reporting.  This can currently only happen
+				 * if the script consists entirely of meta-commands.
+				 */
+				if (end_tx_processed)
+					return;
+				else
+				{
+					end_tx_processed = true;
+					break;
+				}
 
-			st->listen = true;
-		}
-		else if (pg_strcasecmp(argv[0], "sleep") == 0)
-		{
-			char	   *var;
-			int			usec;
-			instr_time	now;
-
-			if (*argv[1] == ':')
-			{
-				if ((var = getVariable(st, argv[1] + 1)) == NULL)
+			/*
+			 * Final states.  Close the connection if it's still open.
+			 */
+			case CSTATE_ABORTED:
+			case CSTATE_FINISHED:
+				if (st->con != NULL)
 				{
-					fprintf(stderr, "%s: undefined variable \"%s\"\n",
-							argv[0], argv[1]);
-					st->ecnt++;
-					return true;
+					PQfinish(st->con);
+					st->con = NULL;
 				}
-				usec = atoi(var);
-			}
-			else
-				usec = atoi(argv[1]);
-
-			if (argc > 2)
-			{
-				if (pg_strcasecmp(argv[2], "ms") == 0)
-					usec *= 1000;
-				else if (pg_strcasecmp(argv[2], "s") == 0)
-					usec *= 1000000;
-			}
-			else
-				usec *= 1000000;
-
-			INSTR_TIME_SET_CURRENT(now);
-			st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
-			st->sleeping = true;
-
-			st->listen = true;
+				return;
 		}
-		else if (pg_strcasecmp(argv[0], "setshell") == 0)
-		{
-			bool		ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
-
-			if (timer_exceeded) /* timeout */
-				return clientDone(st);
-			else if (!ret)		/* on error */
-			{
-				st->ecnt++;
-				return true;
-			}
-			else	/* succeeded */
-				st->listen = true;
-		}
-		else if (pg_strcasecmp(argv[0], "shell") == 0)
-		{
-			bool		ret = runShellCommand(st, NULL, argv + 1, argc - 1);
-
-			if (timer_exceeded) /* timeout */
-				return clientDone(st);
-			else if (!ret)		/* on error */
-			{
-				st->ecnt++;
-				return true;
-			}
-			else	/* succeeded */
-				st->listen = true;
-		}
-
-		/* after a meta command, immediately proceed with next command */
-		goto top;
 	}
-
-	return true;
 }
 
 /*
@@ -4183,29 +4420,10 @@ threadRun(void *arg)
 	initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
 	last = aggs;
 
-	/* send start up queries in async manner */
+	/* initialize explicitely the state machines */
 	for (i = 0; i < nstate; i++)
 	{
-		CState	   *st = &state[i];
-		int			prev_ecnt = st->ecnt;
-		Command   **commands;
-
-		st->use_file = chooseScript(thread);
-		commands = sql_script[st->use_file].commands;
-		if (debug)
-			fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
-					sql_script[st->use_file].desc);
-		if (!doCustom(thread, st, &aggs))
-			remains--;			/* I've aborted */
-
-		if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
-		{
-			fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
-					i, st->state);
-			remains--;			/* I've aborted */
-			PQfinish(st->con);
-			st->con = NULL;
-		}
+		state[i].state = CSTATE_CHOOSE_SCRIPT;
 	}
 
 	while (remains > 0)
@@ -4222,59 +4440,60 @@ threadRun(void *arg)
 		for (i = 0; i < nstate; i++)
 		{
 			CState	   *st = &state[i];
-			Command   **commands = sql_script[st->use_file].commands;
 			int			sock;
 
-			if (st->con == NULL)
+			if (st->state == CSTATE_THROTTLE && timer_exceeded)
 			{
+				/* interrupt client which has not started a transaction */
+				st->state = CSTATE_FINISHED;
+				remains--;
+				PQfinish(st->con);
+				st->con = NULL;
 				continue;
 			}
-			else if (st->sleeping)
+			else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
 			{
-				if (st->throttling && timer_exceeded)
-				{
-					/* interrupt client which has not started a transaction */
-					remains--;
-					st->sleeping = false;
-					st->throttling = false;
-					PQfinish(st->con);
-					st->con = NULL;
-					continue;
-				}
-				else	/* just a nap from the script */
-				{
-					int			this_usec;
-
-					if (min_usec == PG_INT64_MAX)
-					{
-						instr_time	now;
+				/* a nap from the script, or under throttling */
+				int			this_usec;
 
-						INSTR_TIME_SET_CURRENT(now);
-						now_usec = INSTR_TIME_GET_MICROSEC(now);
-					}
+				if (min_usec == PG_INT64_MAX)
+				{
+					instr_time	now;
 
-					this_usec = st->txn_scheduled - now_usec;
-					if (min_usec > this_usec)
-						min_usec = this_usec;
+					INSTR_TIME_SET_CURRENT(now);
+					now_usec = INSTR_TIME_GET_MICROSEC(now);
 				}
+
+				this_usec = (st->state == CSTATE_SLEEP ?
+							 st->sleep_until : st->txn_scheduled) - now_usec;
+				if (min_usec > this_usec)
+					min_usec = this_usec;
 			}
-			else if (commands[st->state]->type == META_COMMAND)
+			else if (st->state == CSTATE_WAIT_RESULT)
 			{
-				min_usec = 0;	/* the connection is ready to run */
+				/*
+				 * waiting for result from server - nothing to do unless the
+				 * socket is readable
+				 */
+				sock = PQsocket(st->con);
+				if (sock < 0)
+				{
+					fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
+					goto done;
+				}
+
+				FD_SET(sock, &input_mask);
+
+				if (maxsock < sock)
+					maxsock = sock;
 				break;
 			}
-
-			sock = PQsocket(st->con);
-			if (sock < 0)
+			else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED)
 			{
-				fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
-				goto done;
+				/* the connection is ready to run */
+				min_usec = 0;
+				break;
 			}
-
-			FD_SET(sock, &input_mask);
-
-			if (maxsock < sock)
-				maxsock = sock;
 		}
 
 		/* also wake up to print the next progress report on time */
@@ -4324,14 +4543,13 @@ threadRun(void *arg)
 			}
 		}
 
-		/* ok, backend returns reply */
+		/* ok, advance the state machine of each connection */
 		for (i = 0; i < nstate; i++)
 		{
 			CState	   *st = &state[i];
-			Command   **commands = sql_script[st->use_file].commands;
-			int			prev_ecnt = st->ecnt;
+			bool		ready;
 
-			if (st->con)
+			if (st->state == CSTATE_WAIT_RESULT && st->con)
 			{
 				int			sock = PQsocket(st->con);
 
@@ -4341,21 +4559,19 @@ threadRun(void *arg)
 							PQerrorMessage(st->con));
 					goto done;
 				}
-				if (FD_ISSET(sock, &input_mask) ||
-					commands[st->state]->type == META_COMMAND)
-				{
-					if (!doCustom(thread, st, &aggs))
-						remains--;		/* I've aborted */
-				}
+
+				ready = FD_ISSET(sock, &input_mask);
 			}
+			else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+				ready = false;
+			else
+				ready = true;
 
-			if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
+			if (ready)
 			{
-				fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
-						i, st->state);
-				remains--;		/* I've aborted */
-				PQfinish(st->con);
-				st->con = NULL;
+				doCustom(thread, st, &aggs);
+				if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+					remains--;
 			}
 		}
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to