Hello Heikki,
Yeah, it really is quite a mess. I tried to review your patch, and I think
it's correct, but I couldn't totally convince myself, because of the existing
messiness of the logic. So I bit the bullet and started refactoring.
I came up with the attached. It refactors the logic in doCustom() into a
state machine. I think this is much clearer, what do you think?
The patch did not apply to master because of you committed the sleep fix
in between. I updated the patch so that the fix is included as well.
I think that this is really needed. The code is much clearer and simple to
understand with the state machines & additional functions. This is a
definite improvement to the code base.
I've done quite some testing with various options (-r, --rate,
--latency-limit, -C...) and got pretty reasonnable results.
Although I cannot be absolutely sure that the refactoring does not
introduce any new bug, I'm convinced that it will be much easier to find
them:-)
Attached are some small changes to your version:
I have added the sleep_until fix.
I have fixed a bug introduced in the patch by changing && by || in the
(min_sec > 0 && maxsock != -1) condition which was inducing errors with
multi-threads & clients...
I have factored out several error messages in "commandFailed", in place of
the "metaCommandFailed", and added the script number as well in the error
messages. All messages are now specific to the failed command.
I have added two states to the machine:
- CSTATE_CHOOSE_SCRIPT which simplifies threadRun, there is now one call
to chooseScript instead of two before.
- CSTATE_END_COMMAND which manages is_latencies and proceeding to the
next command, thus merging the three instances of updating the stats
that were in the first version.
The later state means that processing query results is included in the per
statement latency, which is an improvement because before I was getting
some transaction latency significantly larger that the apparent sum of the
per-statement latencies, which did not make much sense...
I have added & updated a few comments. There are some places where the
break could be a pass through instead, not sure how desirable it is, I'm
fine with break.
Well, the comment right there says "note this is not included in the
statement latency numbers", so apparently it's intentional. Whether it's a
good idea or not, I don't know :-). It does seem a bit surprising.
Indeed, it also results in apparently inconsistent numbers, and it creates
a mess for recording the statement latency because it meant that in some
case the latency was collected before the actual end of the command, see
the discussion about CSTATE_END_COMMAND above.
But what seems more bogus to me is that we do that after recording the
*transaction* stats, if this was the last command. So the PQgetResult() of
the last command in the transaction is not included in the transaction stats,
even though the PQgetResult() calls for any previous commands are. (Perhaps
that's what you meant too?)
I changed that in my patch, it would've been inconvenient to keep that old
behavior, and it doesn't make any sense to me anyway.
Fine with me.
--
Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 8b24ad5..502e644 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -235,25 +235,97 @@ typedef struct StatsData
} StatsData;
/*
- * Connection state
+ * Connection state machine states.
+ */
+typedef enum
+{
+ /*
+ * The client must first choose a script to execute. Once chosen, it can
+ * either be throttled (state CSTATE_START_THROTTLE under --rate) or start
+ * right away (state CSTATE_START_TX).
+ */
+ CSTATE_CHOOSE_SCRIPT,
+
+ /*
+ * In CSTATE_START_THROTTLE state, we calculate when to begin the next
+ * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
+ * sleeps until that moment. (If throttling is not enabled, doCustom()
+ * falls directly through from CSTATE_START_THROTTLE to CSTATE_START_TX.)
+ */
+ CSTATE_START_THROTTLE,
+ CSTATE_THROTTLE,
+
+ /*
+ * CSTATE_START_TX performs start-of-transaction processing. Establishes
+ * a new connection for the transaction, in --connect mode, and records
+ * the transaction start time.
+ */
+ CSTATE_START_TX,
+
+ /*
+ * We loop through these states, to process each command in the
+ * script:
+ *
+ * CSTATE_START_COMMAND starts the execution of a command. On a SQL
+ * command, the command is sent to the server, and we move to
+ * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is
+ * set, and we enter the CSTATE_SLEEP state to wait for it to expire.
+ * Other meta-commands are executed immediately, and we proceed to next
+ * command.
+ *
+ * CSTATE_WAIT_RESULT waits until we get a result set back from the server
+ * for the current command, and proceeds to CSTATE_END_COMMAND.
+ *
+ * CSTATE_SLEEP waits until the end of \sleep and proceeds to CSTATE_END_COMMAND.
+ *
+ * CSTATE_END_COMMAND does some house keeping stats, then jumps back to
+ * start the next command with CSTATE_START_COMMAND.
+ */
+ CSTATE_START_COMMAND,
+ CSTATE_WAIT_RESULT,
+ CSTATE_SLEEP,
+ CSTATE_END_COMMAND,
+
+ /*
+ * CSTATE_END_TX performs end-of-transaction processing. Calculates
+ * latency, and logs the transaction. In --connect mode, closes the
+ * current connection. Chooses the next script to execute and starts
+ * over in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we
+ * have no more work to do.
+ */
+ CSTATE_END_TX,
+
+ /*
+ * Final states. CSTATE_ABORTED means that the script execution was
+ * aborted because a command failed, CSTATE_FINISHED means success.
+ */
+ CSTATE_ABORTED,
+ CSTATE_FINISHED,
+} ConnectionStateEnum;
+
+/*
+ * Connection state.
*/
typedef struct
{
PGconn *con; /* connection handle to DB */
int id; /* client No. */
- int state; /* state No. */
- bool listen; /* whether an async query has been sent */
- bool sleeping; /* whether the client is napping */
- bool throttling; /* whether nap is for throttling */
- bool is_throttled; /* whether transaction throttling is done */
+ ConnectionStateEnum state; /* state machine's current state. */
+
+ int use_file; /* index in sql_script for this client */
+ int command; /* command number in script */
+
+ /* client variables */
Variable *variables; /* array of variable definitions */
int nvariables; /* number of variables */
bool vars_sorted; /* are variables sorted by name? */
+
+ /* various times about current transaction */
int64 txn_scheduled; /* scheduled start time of transaction (usec) */
int64 sleep_until; /* scheduled start time of next cmd (usec) */
instr_time txn_begin; /* used for measuring schedule lag times */
instr_time stmt_begin; /* used for measuring statement latencies */
- int use_file; /* index in sql_scripts for this client */
+
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
/* per client collected stats */
@@ -1382,7 +1454,7 @@ evalFunc(TState *thread, CState *st,
Assert(nargs == 1);
fprintf(stderr, "debug(script=%d,command=%d): ",
- st->use_file, st->state + 1);
+ st->use_file, st->command + 1);
if (varg->type == PGBT_INT)
fprintf(stderr, "int " INT64_FORMAT "\n", varg->u.ival);
@@ -1733,15 +1805,12 @@ preparedStatementName(char *buffer, int file, int state)
sprintf(buffer, "P%d_%d", file, state);
}
-static bool
-clientDone(CState *st)
+static void
+commandFailed(CState *st, char *message)
{
- if (st->con != NULL)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
- return false; /* always false */
+ fprintf(stderr,
+ "client %d aborted in command %d of script %d; %s\n",
+ st->id, st->command, st->use_file, message);
}
/* return a script number with a weighted choice. */
@@ -1763,425 +1832,593 @@ chooseScript(TState *thread)
return i - 1;
}
-/* return false iff client should be disconnected */
+/* Send a SQL command, using the chosen querymode */
static bool
+sendCommand(CState *st, Command *command)
+{
+ int r;
+
+ if (querymode == QUERY_SIMPLE)
+ {
+ char *sql;
+
+ sql = pg_strdup(command->argv[0]);
+ sql = assignVariables(st, sql);
+
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", st->id, sql);
+ r = PQsendQuery(st->con, sql);
+ free(sql);
+ }
+ else if (querymode == QUERY_EXTENDED)
+ {
+ const char *sql = command->argv[0];
+ const char *params[MAX_ARGS];
+
+ getQueryParams(st, command, params);
+
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", st->id, sql);
+ r = PQsendQueryParams(st->con, sql, command->argc - 1,
+ NULL, params, NULL, NULL, 0);
+ }
+ else if (querymode == QUERY_PREPARED)
+ {
+ char name[MAX_PREPARE_NAME];
+ const char *params[MAX_ARGS];
+
+ if (!st->prepared[st->use_file])
+ {
+ int j;
+ Command **commands = sql_script[st->use_file].commands;
+
+ for (j = 0; commands[j] != NULL; j++)
+ {
+ PGresult *res;
+ char name[MAX_PREPARE_NAME];
+
+ if (commands[j]->type != SQL_COMMAND)
+ continue;
+ preparedStatementName(name, st->use_file, j);
+ res = PQprepare(st->con, name,
+ commands[j]->argv[0], commands[j]->argc - 1, NULL);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ fprintf(stderr, "%s", PQerrorMessage(st->con));
+ PQclear(res);
+ }
+ st->prepared[st->use_file] = true;
+ }
+
+ getQueryParams(st, command, params);
+ preparedStatementName(name, st->use_file, st->command);
+
+ if (debug)
+ fprintf(stderr, "client %d sending %s\n", st->id, name);
+ r = PQsendQueryPrepared(st->con, name, command->argc - 1,
+ params, NULL, NULL, 0);
+ }
+ else /* unknown sql mode */
+ r = 0;
+
+ if (r == 0)
+ {
+ if (debug)
+ fprintf(stderr, "client %d could not send %s\n",
+ st->id, command->argv[0]);
+ st->ecnt++;
+ return false;
+ }
+ else
+ return true;
+}
+
+/*
+ * "execute" a sleep command. Parses the argument, and returns the
+ * requested amount of delay, in microseconds. Returns true on
+ * success, false on error.
+ */
+static bool
+evaluateSleep(CState *st, int argc, char **argv, int *usecs)
+{
+ char *var;
+ int usec;
+
+ if (*argv[1] == ':')
+ {
+ if ((var = getVariable(st, argv[1] + 1)) == NULL)
+ {
+ fprintf(stderr, "%s: undefined variable \"%s\"\n",
+ argv[0], argv[1]);
+ return false;
+ }
+ usec = atoi(var);
+ }
+ else
+ usec = atoi(argv[1]);
+
+ if (argc > 2)
+ {
+ if (pg_strcasecmp(argv[2], "ms") == 0)
+ usec *= 1000;
+ else if (pg_strcasecmp(argv[2], "s") == 0)
+ usec *= 1000000;
+ }
+ else
+ usec *= 1000000;
+
+ *usecs = usec;
+ return true;
+}
+
+/*
+ * Advance the state machine of a connection, if possible.
+ */
+static void
doCustom(TState *thread, CState *st, StatsData *agg)
{
PGresult *res;
- Command **commands;
- bool trans_needs_throttle = false;
+ Command *command;
instr_time now;
+ bool end_tx_processed = false;
+ int64 wait;
/*
* gettimeofday() isn't free, so we get the current timestamp lazily the
* first time it's needed, and reuse the same value throughout this
- * function after that. This also ensures that e.g. the calculated latency
+ * function after that. This also ensures that e.g. the calculated latency
* reported in the log file and in the totals are the same. Zero means
- * "not set yet". Reset "now" when we step to the next command with "goto
- * top", though.
+ * "not set yet". Reset "now" when we execute shell commands or expressions,
+ * which might take a non-neglicible amount of time, though.
*/
-top:
INSTR_TIME_SET_ZERO(now);
- commands = sql_script[st->use_file].commands;
-
/*
- * Handle throttling once per transaction by sleeping. It is simpler to
- * do this here rather than at the end, because so much complicated logic
- * happens below when statements finish.
+ * Loop in the state machine, until we have to wait for a result from the
+ * server (or have to sleep, for throttling or for \sleep).
+ *
+ * Note: In the switch-statement below, 'break' will loop back here,
+ * meaning "continue in the state machine". Return is used to return to
+ * the caller. (XXX: Would it be better to replace the 'break's with
+ * 'continue's?)
*/
- if (throttle_delay && !st->is_throttled)
+ for (;;)
{
- /*
- * Generate a delay such that the series of delays will approximate a
- * Poisson distribution centered on the throttle_delay time.
- *
- * If transactions are too slow or a given wait is shorter than a
- * transaction, the next transaction will start right away.
- */
- int64 wait = getPoissonRand(thread, throttle_delay);
-
- thread->throttle_trigger += wait;
- st->txn_scheduled = thread->throttle_trigger;
-
- /* stop client if next transaction is beyond pgbench end of execution */
- if (duration > 0 && st->txn_scheduled > end_time)
- return clientDone(st);
-
- /*
- * If this --latency-limit is used, and this slot is already late so
- * that the transaction will miss the latency limit even if it
- * completed immediately, we skip this time slot and iterate till the
- * next slot that isn't late yet.
- */
- if (latency_limit)
+ switch(st->state)
{
- int64 now_us;
-
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
- now_us = INSTR_TIME_GET_MICROSEC(now);
- while (thread->throttle_trigger < now_us - latency_limit)
- {
- processXactStats(thread, st, &now, true, agg);
- /* next rendez-vous */
+ /*
+ * Select transaction to run.
+ */
+ case CSTATE_CHOOSE_SCRIPT:
+
+ st->use_file = chooseScript(thread);
+
+ if (debug)
+ fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
+ sql_script[st->use_file].desc);
+
+ st->state = (throttle_delay > 0) ? CSTATE_START_THROTTLE : CSTATE_START_TX;
+
+ break;
+
+ /*
+ * Handle throttling once per transaction by sleeping.
+ */
+ case CSTATE_START_THROTTLE:
+ /*
+ * Generate a delay such that the series of delays will
+ * approximate a Poisson distribution centered on the
+ * throttle_delay time.
+ *
+ * If transactions are too slow or a given wait is shorter
+ * than a transaction, the next transaction will start
+ * right away.
+ */
+ /* Assert(throttle_delay > 0); */
wait = getPoissonRand(thread, throttle_delay);
+
thread->throttle_trigger += wait;
st->txn_scheduled = thread->throttle_trigger;
- }
- }
-
- st->sleep_until = st->txn_scheduled;
- st->sleeping = true;
- st->throttling = true;
- st->is_throttled = true;
- if (debug)
- fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
- st->id, wait);
- }
- if (st->sleeping)
- { /* are we sleeping? */
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
- if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
- return true; /* Still sleeping, nothing to do here */
- /* Else done sleeping, go ahead with next command */
- st->sleeping = false;
- st->throttling = false;
- }
+ /*
+ * stop client if next transaction is beyond pgbench end
+ * of execution
+ */
+ if (duration > 0 && st->txn_scheduled > end_time)
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
- if (st->listen)
- { /* are we receiver? */
- if (commands[st->state]->type == SQL_COMMAND)
- {
- if (debug)
- fprintf(stderr, "client %d receiving\n", st->id);
- if (!PQconsumeInput(st->con))
- { /* there's something wrong */
- fprintf(stderr, "client %d aborted in state %d; perhaps the backend died while processing\n", st->id, st->state);
- return clientDone(st);
- }
- if (PQisBusy(st->con))
- return true; /* don't have the whole result yet */
- }
+ /*
+ * If this --latency-limit is used, and this slot is
+ * already late so that the transaction will miss the
+ * latency limit even if it completed immediately, we skip
+ * this time slot and iterate till the next slot that
+ * isn't late yet.
+ */
+ if (latency_limit)
+ {
+ int64 now_us;
+
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ now_us = INSTR_TIME_GET_MICROSEC(now);
+ while (thread->throttle_trigger < now_us - latency_limit)
+ {
+ processXactStats(thread, st, &now, true, agg);
+ /* next rendez-vous */
+ wait = getPoissonRand(thread, throttle_delay);
+ thread->throttle_trigger += wait;
+ st->txn_scheduled = thread->throttle_trigger;
+ }
+ }
- /*
- * command finished: accumulate per-command execution times in
- * thread-local data structure, if per-command latencies are requested
- */
- if (is_latencies)
- {
- if (INSTR_TIME_IS_ZERO(now))
- INSTR_TIME_SET_CURRENT(now);
-
- /* XXX could use a mutex here, but we choose not to */
- addToSimpleStats(&commands[st->state]->stats,
- INSTR_TIME_GET_DOUBLE(now) -
- INSTR_TIME_GET_DOUBLE(st->stmt_begin));
- }
+ st->state = CSTATE_THROTTLE;
+ if (debug)
+ fprintf(stderr, "client %d throttling " INT64_FORMAT " us\n",
+ st->id, wait);
- /* transaction finished: calculate latency and log the transaction */
- if (commands[st->state + 1] == NULL)
- {
- if (progress || throttle_delay || latency_limit ||
- per_script_stats || use_log)
- processXactStats(thread, st, &now, false, agg);
- else
- thread->stats.cnt++;
- }
+ /* could pass through? could return? */
+ break;
- if (commands[st->state]->type == SQL_COMMAND)
- {
/*
- * Read and discard the query result; note this is not included in
- * the statement latency numbers.
+ * Wait until it's time to start next transaction.
*/
- res = PQgetResult(st->con);
- switch (PQresultStatus(res))
- {
- case PGRES_COMMAND_OK:
- case PGRES_TUPLES_OK:
- case PGRES_EMPTY_QUERY:
- break; /* OK */
- default:
- fprintf(stderr, "client %d aborted in state %d: %s",
- st->id, st->state, PQerrorMessage(st->con));
- PQclear(res);
- return clientDone(st);
- }
- PQclear(res);
- discard_response(st);
- }
+ case CSTATE_THROTTLE:
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
+ return; /* Still sleeping, nothing to do here */
+
+ /* Else done sleeping, start the transaction */
+ st->state = CSTATE_START_TX;
+
+ /* could pass through */
+ break;
+
+ /* Start new transaction */
+ case CSTATE_START_TX:
+ /*
+ * Establish connection on first call, or if is_connect is
+ * true.
+ */
+ if (st->con == NULL)
+ {
+ instr_time start;
+
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ start = now;
+ if ((st->con = doConnect()) == NULL)
+ {
+ fprintf(stderr, "client %d aborted while establishing connection\n",
+ st->id);
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ INSTR_TIME_SET_CURRENT(now);
+ INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
+
+ /* Reset session-local state */
+ memset(st->prepared, 0, sizeof(st->prepared));
+ }
- if (commands[st->state + 1] == NULL)
- {
- if (is_connect)
- {
- PQfinish(st->con);
- st->con = NULL;
- }
-
- ++st->cnt;
- if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
- return clientDone(st); /* exit success */
- }
+ /*
+ * Record transaction start time under logging, progress or
+ * throttling.
+ */
+ if (use_log || progress || throttle_delay || latency_limit ||
+ per_script_stats)
+ {
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ st->txn_begin = now;
+
+ /*
+ * When not throttling, this is also the transaction's
+ * scheduled start time.
+ */
+ if (!throttle_delay)
+ st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
+ }
+
+ /* Begin with the first command */
+ st->command = 0;
+ st->state = CSTATE_START_COMMAND;
- /* increment state counter */
- st->state++;
- if (commands[st->state] == NULL)
- {
- st->state = 0;
- st->use_file = chooseScript(thread);
- commands = sql_script[st->use_file].commands;
- if (debug)
- fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
- sql_script[st->use_file].desc);
- st->is_throttled = false;
+ /* could pass through */
+ break;
/*
- * No transaction is underway anymore, which means there is
- * nothing to listen to right now. When throttling rate limits
- * are active, a sleep will happen next, as the next transaction
- * starts. And then in any case the next SQL command will set
- * listen back to true.
+ * Send a command to server (or execute a meta-command)
*/
- st->listen = false;
- trans_needs_throttle = (throttle_delay > 0);
- }
- }
-
- if (st->con == NULL)
- {
- instr_time start,
- end;
-
- INSTR_TIME_SET_CURRENT(start);
- if ((st->con = doConnect()) == NULL)
- {
- fprintf(stderr, "client %d aborted while establishing connection\n",
- st->id);
- return clientDone(st);
- }
- INSTR_TIME_SET_CURRENT(end);
- INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
-
- /* Reset session-local state */
- st->listen = false;
- st->sleeping = false;
- st->throttling = false;
- st->is_throttled = false;
- memset(st->prepared, 0, sizeof(st->prepared));
- }
-
- /*
- * This ensures that a throttling delay is inserted before proceeding with
- * sql commands, after the first transaction. The first transaction
- * throttling is performed when first entering doCustom.
- */
- if (trans_needs_throttle)
- {
- trans_needs_throttle = false;
- goto top;
- }
-
- /* Record transaction start time under logging, progress or throttling */
- if ((use_log || progress || throttle_delay || latency_limit ||
- per_script_stats) && st->state == 0)
- {
- INSTR_TIME_SET_CURRENT(st->txn_begin);
-
- /*
- * When not throttling, this is also the transaction's scheduled start
- * time.
- */
- if (!throttle_delay)
- st->txn_scheduled = INSTR_TIME_GET_MICROSEC(st->txn_begin);
- }
-
- /* Record statement start time if per-command latencies are requested */
- if (is_latencies)
- INSTR_TIME_SET_CURRENT(st->stmt_begin);
-
- if (commands[st->state]->type == SQL_COMMAND)
- {
- const Command *command = commands[st->state];
- int r;
-
- if (querymode == QUERY_SIMPLE)
- {
- char *sql;
+ case CSTATE_START_COMMAND:
+ command = sql_script[st->use_file].commands[st->command];
+
+ /*
+ * If we reached the end of the script, move to end-of-xact
+ * processing.
+ */
+ if (command == NULL)
+ {
+ st->state = CSTATE_END_TX;
+ break;
+ }
- sql = pg_strdup(command->argv[0]);
- sql = assignVariables(st, sql);
+ /*
+ * Record statement start time if per-command latencies
+ * are requested
+ */
+ if (is_latencies)
+ {
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ st->stmt_begin = now;
+ }
- if (debug)
- fprintf(stderr, "client %d sending %s\n", st->id, sql);
- r = PQsendQuery(st->con, sql);
- free(sql);
- }
- else if (querymode == QUERY_EXTENDED)
- {
- const char *sql = command->argv[0];
- const char *params[MAX_ARGS];
+ if (command->type == SQL_COMMAND)
+ {
+ if (!sendCommand(st, command))
+ {
+ /*
+ * Failed. Stay in CSTATE_START_COMMAND state, to retry.
+ * ??? What the point or retrying? Should rather abort?
+ */
+ return;
+ }
+ else
+ st->state = CSTATE_WAIT_RESULT;
+ }
+ else if (command->type == META_COMMAND)
+ {
+ int argc = command->argc,
+ i;
+ char **argv = command->argv;
+
+ if (debug)
+ {
+ fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
+ for (i = 1; i < argc; i++)
+ fprintf(stderr, " %s", argv[i]);
+ fprintf(stderr, "\n");
+ }
+
+ if (pg_strcasecmp(argv[0], "sleep") == 0)
+ {
+ /*
+ * A \sleep doesn't execute anything, we just get the
+ * delay from the argument, and enter the CSTATE_SLEEP
+ * state. (The per-command latency will be recorded
+ * in CSTATE_SLEEP state, not here, after the delay has
+ * elapsed.)
+ */
+ int usec;
+
+ if (!evaluateSleep(st, argc, argv, &usec))
+ {
+ commandFailed(st, "execution of meta-command 'sleep' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
+ st->state = CSTATE_SLEEP;
+ break;
+ }
+ else
+ {
+ if (pg_strcasecmp(argv[0], "set") == 0)
+ {
+ PgBenchExpr *expr = command->expr;
+ PgBenchValue result;
+
+ if (!evaluateExpr(thread, st, expr, &result))
+ {
+ commandFailed(st, "evaluation of meta-command 'set' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+
+ if (!putVariableNumber(st, argv[0], argv[1], &result))
+ {
+ commandFailed(st, "assignment of meta-command 'set' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ }
+ else if (pg_strcasecmp(argv[0], "setshell") == 0)
+ {
+ bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
+
+ if (timer_exceeded) /* timeout */
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
+ else if (!ret) /* on error */
+ {
+ commandFailed(st, "execution of meta-command 'setshell' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ else
+ {
+ /* succeeded */
+ }
+ }
+ else if (pg_strcasecmp(argv[0], "shell") == 0)
+ {
+ bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
+
+ if (timer_exceeded) /* timeout */
+ {
+ st->state = CSTATE_FINISHED;
+ break;
+ }
+ else if (!ret) /* on error */
+ {
+ commandFailed(st, "execution of meta-command 'shell' failed");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ else
+ {
+ /* succeeded */
+ }
+ }
+ /*
+ * executing the expression or shell command might
+ * take a non-neglicible amount of time, so reset
+ * 'now'
+ */
+ INSTR_TIME_SET_ZERO(now);
+
+ st->state = CSTATE_END_COMMAND;
+ }
+ }
+ break;
- getQueryParams(st, command, params);
+ /*
+ * Wait for the current SQL command to complete
+ */
+ case CSTATE_WAIT_RESULT:
+ command = sql_script[st->use_file].commands[st->command];
+ if (debug)
+ fprintf(stderr, "client %d receiving\n", st->id);
+ if (!PQconsumeInput(st->con))
+ { /* there's something wrong */
+ commandFailed(st, "perhaps the backend died while processing");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ if (PQisBusy(st->con))
+ return; /* don't have the whole result yet */
+
+ /*
+ * Read and discard the query result;
+ */
+ res = PQgetResult(st->con);
+ switch (PQresultStatus(res))
+ {
+ case PGRES_COMMAND_OK:
+ case PGRES_TUPLES_OK:
+ case PGRES_EMPTY_QUERY:
+ /* OK */
+ PQclear(res);
+ discard_response(st);
+ st->state = CSTATE_END_COMMAND;
+ break;
+ default:
+ commandFailed(st, PQerrorMessage(st->con));
+ PQclear(res);
+ st->state = CSTATE_ABORTED;
+ break;
+ }
- if (debug)
- fprintf(stderr, "client %d sending %s\n", st->id, sql);
- r = PQsendQueryParams(st->con, sql, command->argc - 1,
- NULL, params, NULL, NULL, 0);
- }
- else if (querymode == QUERY_PREPARED)
- {
- char name[MAX_PREPARE_NAME];
- const char *params[MAX_ARGS];
+ break;
- if (!st->prepared[st->use_file])
- {
- int j;
+ /*
+ * Wait until sleep is done.
+ * This state is entered after a \sleep metacommand.
+ * The behavior is similar to CSTATE_THROTTLE, but proceeds
+ * to CSTATE_START_COMMAND instead of CSTATE_START_TX.
+ */
+ case CSTATE_SLEEP:
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+ if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
+ return; /* Still sleeping, nothing to do here */
+ /* Else done sleeping. */
+ st->state = CSTATE_END_COMMAND;
+ break;
- for (j = 0; commands[j] != NULL; j++)
+ /*
+ * End of command: do some house keeping and proceed to next.
+ */
+ case CSTATE_END_COMMAND:
+ /*
+ * command executed: accumulate per-command execution
+ * times in thread-local data structure, if per-command
+ * latencies are requested.
+ */
+ if (is_latencies)
{
- PGresult *res;
- char name[MAX_PREPARE_NAME];
-
- if (commands[j]->type != SQL_COMMAND)
- continue;
- preparedStatementName(name, st->use_file, j);
- res = PQprepare(st->con, name,
- commands[j]->argv[0], commands[j]->argc - 1, NULL);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- fprintf(stderr, "%s", PQerrorMessage(st->con));
- PQclear(res);
+ if (INSTR_TIME_IS_ZERO(now))
+ INSTR_TIME_SET_CURRENT(now);
+
+ /* XXX could use a mutex here, but we choose not to */
+ command = sql_script[st->use_file].commands[st->command];
+ addToSimpleStats(&command->stats,
+ INSTR_TIME_GET_DOUBLE(now) -
+ INSTR_TIME_GET_DOUBLE(st->stmt_begin));
}
- st->prepared[st->use_file] = true;
- }
-
- getQueryParams(st, command, params);
- preparedStatementName(name, st->use_file, st->state);
- if (debug)
- fprintf(stderr, "client %d sending %s\n", st->id, name);
- r = PQsendQueryPrepared(st->con, name, command->argc - 1,
- params, NULL, NULL, 0);
- }
- else /* unknown sql mode */
- r = 0;
-
- if (r == 0)
- {
- if (debug)
- fprintf(stderr, "client %d could not send %s\n",
- st->id, command->argv[0]);
- st->ecnt++;
- }
- else
- st->listen = true; /* flags that should be listened */
- }
- else if (commands[st->state]->type == META_COMMAND)
- {
- int argc = commands[st->state]->argc,
- i;
- char **argv = commands[st->state]->argv;
-
- if (debug)
- {
- fprintf(stderr, "client %d executing \\%s", st->id, argv[0]);
- for (i = 1; i < argc; i++)
- fprintf(stderr, " %s", argv[i]);
- fprintf(stderr, "\n");
- }
+ /* Go ahead with next command */
+ st->command++;
+ st->state = CSTATE_START_COMMAND;
+ break;
- if (pg_strcasecmp(argv[0], "set") == 0)
- {
- PgBenchExpr *expr = commands[st->state]->expr;
- PgBenchValue result;
+ /*
+ * End of transaction.
+ */
+ case CSTATE_END_TX:
+ /* transaction finished: calculate latency and log the transaction */
+ if (progress || throttle_delay || latency_limit ||
+ per_script_stats || use_log)
+ processXactStats(thread, st, &now, false, agg);
+ else
+ thread->stats.cnt++;
+
+ if (is_connect)
+ {
+ PQfinish(st->con);
+ st->con = NULL;
+ INSTR_TIME_SET_ZERO(now);
+ }
- if (!evaluateExpr(thread, st, expr, &result))
- {
- st->ecnt++;
- return true;
- }
+ ++st->cnt;
+ if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
+ {
+ /* exit success */
+ st->state = CSTATE_FINISHED;
+ break;
+ }
- if (!putVariableNumber(st, argv[0], argv[1], &result))
- {
- st->ecnt++;
- return true;
- }
+ /*
+ * No transaction is underway anymore.
+ */
+ st->state = CSTATE_CHOOSE_SCRIPT;
+
+ /*
+ * If we paced through all commands in the script in this loop,
+ * without returning to the caller even once, do it now. This
+ * gives the thread a chance to process other connections, and
+ * to do progress reporting. This can currently only happen
+ * if the script consists entirely of meta-commands.
+ */
+ if (end_tx_processed)
+ return;
+ else
+ {
+ end_tx_processed = true;
+ break;
+ }
- st->listen = true;
- }
- else if (pg_strcasecmp(argv[0], "sleep") == 0)
- {
- char *var;
- int usec;
- instr_time now;
-
- if (*argv[1] == ':')
- {
- if ((var = getVariable(st, argv[1] + 1)) == NULL)
+ /*
+ * Final states. Close the connection if it's still open.
+ */
+ case CSTATE_ABORTED:
+ case CSTATE_FINISHED:
+ if (st->con != NULL)
{
- fprintf(stderr, "%s: undefined variable \"%s\"\n",
- argv[0], argv[1]);
- st->ecnt++;
- return true;
+ PQfinish(st->con);
+ st->con = NULL;
}
- usec = atoi(var);
- }
- else
- usec = atoi(argv[1]);
-
- if (argc > 2)
- {
- if (pg_strcasecmp(argv[2], "ms") == 0)
- usec *= 1000;
- else if (pg_strcasecmp(argv[2], "s") == 0)
- usec *= 1000000;
- }
- else
- usec *= 1000000;
-
- INSTR_TIME_SET_CURRENT(now);
- st->sleep_until = INSTR_TIME_GET_MICROSEC(now) + usec;
- st->sleeping = true;
-
- st->listen = true;
+ return;
}
- else if (pg_strcasecmp(argv[0], "setshell") == 0)
- {
- bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2);
-
- if (timer_exceeded) /* timeout */
- return clientDone(st);
- else if (!ret) /* on error */
- {
- st->ecnt++;
- return true;
- }
- else /* succeeded */
- st->listen = true;
- }
- else if (pg_strcasecmp(argv[0], "shell") == 0)
- {
- bool ret = runShellCommand(st, NULL, argv + 1, argc - 1);
-
- if (timer_exceeded) /* timeout */
- return clientDone(st);
- else if (!ret) /* on error */
- {
- st->ecnt++;
- return true;
- }
- else /* succeeded */
- st->listen = true;
- }
-
- /* after a meta command, immediately proceed with next command */
- goto top;
}
-
- return true;
}
/*
@@ -4183,29 +4420,10 @@ threadRun(void *arg)
initStats(&aggs, INSTR_TIME_GET_DOUBLE(thread->start_time));
last = aggs;
- /* send start up queries in async manner */
+ /* initialize explicitely the state machines */
for (i = 0; i < nstate; i++)
{
- CState *st = &state[i];
- int prev_ecnt = st->ecnt;
- Command **commands;
-
- st->use_file = chooseScript(thread);
- commands = sql_script[st->use_file].commands;
- if (debug)
- fprintf(stderr, "client %d executing script \"%s\"\n", st->id,
- sql_script[st->use_file].desc);
- if (!doCustom(thread, st, &aggs))
- remains--; /* I've aborted */
-
- if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
- {
- fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
- i, st->state);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
- }
+ state[i].state = CSTATE_CHOOSE_SCRIPT;
}
while (remains > 0)
@@ -4222,59 +4440,60 @@ threadRun(void *arg)
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
- Command **commands = sql_script[st->use_file].commands;
int sock;
- if (st->con == NULL)
+ if (st->state == CSTATE_THROTTLE && timer_exceeded)
{
+ /* interrupt client which has not started a transaction */
+ st->state = CSTATE_FINISHED;
+ remains--;
+ PQfinish(st->con);
+ st->con = NULL;
continue;
}
- else if (st->sleeping)
+ else if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
{
- if (st->throttling && timer_exceeded)
- {
- /* interrupt client which has not started a transaction */
- remains--;
- st->sleeping = false;
- st->throttling = false;
- PQfinish(st->con);
- st->con = NULL;
- continue;
- }
- else /* just a nap from the script */
- {
- int this_usec;
-
- if (min_usec == PG_INT64_MAX)
- {
- instr_time now;
+ /* a nap from the script, or under throttling */
+ int this_usec;
- INSTR_TIME_SET_CURRENT(now);
- now_usec = INSTR_TIME_GET_MICROSEC(now);
- }
+ if (min_usec == PG_INT64_MAX)
+ {
+ instr_time now;
- this_usec = st->txn_scheduled - now_usec;
- if (min_usec > this_usec)
- min_usec = this_usec;
+ INSTR_TIME_SET_CURRENT(now);
+ now_usec = INSTR_TIME_GET_MICROSEC(now);
}
+
+ this_usec = (st->state == CSTATE_SLEEP ?
+ st->sleep_until : st->txn_scheduled) - now_usec;
+ if (min_usec > this_usec)
+ min_usec = this_usec;
}
- else if (commands[st->state]->type == META_COMMAND)
+ else if (st->state == CSTATE_WAIT_RESULT)
{
- min_usec = 0; /* the connection is ready to run */
+ /*
+ * waiting for result from server - nothing to do unless the
+ * socket is readable
+ */
+ sock = PQsocket(st->con);
+ if (sock < 0)
+ {
+ fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
+ goto done;
+ }
+
+ FD_SET(sock, &input_mask);
+
+ if (maxsock < sock)
+ maxsock = sock;
break;
}
-
- sock = PQsocket(st->con);
- if (sock < 0)
+ else if (st->state != CSTATE_ABORTED && st->state != CSTATE_FINISHED)
{
- fprintf(stderr, "invalid socket: %s", PQerrorMessage(st->con));
- goto done;
+ /* the connection is ready to run */
+ min_usec = 0;
+ break;
}
-
- FD_SET(sock, &input_mask);
-
- if (maxsock < sock)
- maxsock = sock;
}
/* also wake up to print the next progress report on time */
@@ -4324,14 +4543,13 @@ threadRun(void *arg)
}
}
- /* ok, backend returns reply */
+ /* ok, advance the state machine of each connection */
for (i = 0; i < nstate; i++)
{
CState *st = &state[i];
- Command **commands = sql_script[st->use_file].commands;
- int prev_ecnt = st->ecnt;
+ bool ready;
- if (st->con)
+ if (st->state == CSTATE_WAIT_RESULT && st->con)
{
int sock = PQsocket(st->con);
@@ -4341,21 +4559,19 @@ threadRun(void *arg)
PQerrorMessage(st->con));
goto done;
}
- if (FD_ISSET(sock, &input_mask) ||
- commands[st->state]->type == META_COMMAND)
- {
- if (!doCustom(thread, st, &aggs))
- remains--; /* I've aborted */
- }
+
+ ready = FD_ISSET(sock, &input_mask);
}
+ else if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ ready = false;
+ else
+ ready = true;
- if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
+ if (ready)
{
- fprintf(stderr, "client %d aborted in state %d; execution of meta-command failed\n",
- i, st->state);
- remains--; /* I've aborted */
- PQfinish(st->con);
- st->con = NULL;
+ doCustom(thread, st, &aggs);
+ if (st->state == CSTATE_FINISHED || st->state == CSTATE_ABORTED)
+ remains--;
}
}
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers