Hello Heikki,

[...]
So threadRun() would not have the opportunity to stop the scheduled
transaction, even if beyond the end of run, because it would not have got
out of doCustom, in the case I outlined above.

I see. Instead of moving to FINISHED state, then, we could stay in THROTTLE state, and 'return' out of doCustom(), to give the code in threadRun() a chance to detect that the timer is up. Something like the attached. (I moved the check after the check for latency_limit, because that code updates txn_scheduled. Seems more like a more correct place, although that's as a separate issue.)

Although I think it would works, I do not find it better than the previous situation: Before the change throttling simply jumps to finished if time is up, while with that option the jump cannot be seen from within doCustom and relies on threadRun to do so, which is somehow much harder to see from the code because things happen in two functions.

I would rather move state changes from threadRun to doCustom only, so that they are all in one place where it is easier to check and understand.

As a PoC, see attached which does that and also records all stats instead of trying to be clever, and tries to homogeneise comments somehow. As I find it strange that a script can be interrupted in sleep and after a shell command, but not in other states, rather documents that once it started it will run to its end, and only cut it out before or after, but not within. Also, there are no state changes outside doCustom, and threadRun only looks at the states for decisions.

--
Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 41b756c089..5d79761ed3 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -273,8 +273,11 @@ typedef enum
 
        /*
         * CSTATE_START_TX performs start-of-transaction processing.  
Establishes
-        * a new connection for the transaction, in --connect mode, and records
-        * the transaction start time.
+        * a new connection for the transaction, in --connect mode, records
+        * the transaction start time, and proceed to the first command.
+        *
+        * Note: once a script is started, it will either error or run till
+        * its end, where it may be interrupted.
         */
        CSTATE_START_TX,
 
@@ -307,9 +310,10 @@ typedef enum
        /*
         * CSTATE_END_TX performs end-of-transaction processing.  Calculates
         * latency, and logs the transaction.  In --connect mode, closes the
-        * current connection.  Chooses the next script to execute and starts 
over
-        * in CSTATE_START_THROTTLE state, or enters CSTATE_FINISHED if we have 
no
-        * more work to do.
+        * current connection.
+        *
+        * Then either starts over in CSTATE_CHOOSE_SCRIPT, or enters 
CSTATE_FINISHED
+        * if we have no more work to do.
         */
        CSTATE_END_TX,
 
@@ -2683,12 +2687,6 @@ evaluateSleep(CState *st, int argc, char **argv, int 
*usecs)
 static void
 doCustom(TState *thread, CState *st, StatsData *agg)
 {
-       PGresult   *res;
-       Command    *command;
-       instr_time      now;
-       bool            end_tx_processed = false;
-       int64           wait;
-
        /*
         * gettimeofday() isn't free, so we get the current timestamp lazily the
         * first time it's needed, and reuse the same value throughout this
@@ -2697,37 +2695,44 @@ doCustom(TState *thread, CState *st, StatsData *agg)
         * means "not set yet".  Reset "now" when we execute shell commands or
         * expressions, which might take a non-negligible amount of time, 
though.
         */
+       instr_time      now;
        INSTR_TIME_SET_ZERO(now);
 
        /*
         * Loop in the state machine, until we have to wait for a result from 
the
-        * server (or have to sleep, for throttling or for \sleep).
+        * server or have to sleep for throttling or \sleep.
         *
         * Note: In the switch-statement below, 'break' will loop back here,
         * meaning "continue in the state machine".  Return is used to return to
-        * the caller.
+        * the caller, giving the thread opportunity to move forward another 
client.
         */
        for (;;)
        {
+               PGresult   *res;
+               Command    *command;
+
                switch (st->state)
                {
                                /*
                                 * Select transaction to run.
                                 */
                        case CSTATE_CHOOSE_SCRIPT:
-
                                st->use_file = chooseScript(thread);
 
                                if (debug)
                                        fprintf(stderr, "client %d executing 
script \"%s\"\n", st->id,
                                                        
sql_script[st->use_file].desc);
 
-                               if (throttle_delay > 0)
+                               /* check stack consistency */
+                               Assert(conditional_stack_empty(st->cstack));
+
+                               if (timer_exceeded)
+                                       st->state = CSTATE_FINISHED;
+                               else if (throttle_delay > 0)
                                        st->state = CSTATE_START_THROTTLE;
                                else
                                        st->state = CSTATE_START_TX;
-                               /* check consistency */
-                               Assert(conditional_stack_empty(st->cstack));
+
                                break;
 
                                /*
@@ -2745,21 +2750,10 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                 * away.
                                 */
                                Assert(throttle_delay > 0);
-                               wait = getPoissonRand(thread, throttle_delay);
 
-                               thread->throttle_trigger += wait;
+                               thread->throttle_trigger += 
getPoissonRand(thread, throttle_delay);
                                st->txn_scheduled = thread->throttle_trigger;
 
-                               /*
-                                * stop client if next transaction is beyond 
pgbench end of
-                                * execution
-                                */
-                               if (duration > 0 && st->txn_scheduled > 
end_time)
-                               {
-                                       st->state = CSTATE_FINISHED;
-                                       break;
-                               }
-
                                /*
                                 * If --latency-limit is used, and this slot is 
already late
                                 * so that the transaction will miss the 
latency limit even if
@@ -2773,17 +2767,19 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 
                                        if (INSTR_TIME_IS_ZERO(now))
                                                INSTR_TIME_SET_CURRENT(now);
+
                                        now_us = INSTR_TIME_GET_MICROSEC(now);
+
                                        while (thread->throttle_trigger < 
now_us - latency_limit &&
                                                   (nxacts <= 0 || st->cnt < 
nxacts))
                                        {
                                                processXactStats(thread, st, 
&now, true, agg);
                                                /* next rendez-vous */
-                                               wait = getPoissonRand(thread, 
throttle_delay);
-                                               thread->throttle_trigger += 
wait;
+                                               thread->throttle_trigger += 
getPoissonRand(thread, throttle_delay);
                                                st->txn_scheduled = 
thread->throttle_trigger;
                                        }
-                                       /* stop client if -t exceeded */
+
+                                       /* stop client if -t was exceeded in 
the previous skip loop */
                                        if (nxacts > 0 && st->cnt >= nxacts)
                                        {
                                                st->state = CSTATE_FINISHED;
@@ -2791,23 +2787,35 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                        }
                                }
 
+                               /*
+                                * stop client if next transaction is beyond 
pgbench end of
+                                * execution.
+                                */
+                               if (duration > 0 && st->txn_scheduled > 
end_time)
+                               {
+                                       st->state = CSTATE_FINISHED;
+                                       break;
+                               }
+
                                st->state = CSTATE_THROTTLE;
-                               if (debug)
-                                       fprintf(stderr, "client %d throttling " 
INT64_FORMAT " us\n",
-                                                       st->id, wait);
                                break;
 
                                /*
                                 * Wait until it's time to start next 
transaction.
                                 */
                        case CSTATE_THROTTLE:
+
                                if (INSTR_TIME_IS_ZERO(now))
                                        INSTR_TIME_SET_CURRENT(now);
+
                                if (INSTR_TIME_GET_MICROSEC(now) < 
st->txn_scheduled)
-                                       return;         /* Still sleeping, 
nothing to do here */
+                                       return;         /* still sleeping, 
nothing to do here */
 
-                               /* Else done sleeping, start the transaction */
-                               st->state = CSTATE_START_TX;
+                               /* done sleeping, but do not start if 
transaction if we are done */
+                               if (timer_exceeded)
+                                       st->state = CSTATE_FINISHED;
+                               else
+                                       st->state = CSTATE_START_TX;
                                break;
 
                                /* Start new transaction */
@@ -2838,28 +2846,21 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                        memset(st->prepared, 0, 
sizeof(st->prepared));
                                }
 
+                               /* record transaction start time.  */
+                               if (INSTR_TIME_IS_ZERO(now))
+                                       INSTR_TIME_SET_CURRENT(now);
+                               st->txn_begin = now;
+
                                /*
-                                * Record transaction start time under logging, 
progress or
-                                * throttling.
+                                * When not throttling, this is also the 
transaction's
+                                * scheduled start time.
                                 */
-                               if (use_log || progress || throttle_delay || 
latency_limit ||
-                                       per_script_stats)
-                               {
-                                       if (INSTR_TIME_IS_ZERO(now))
-                                               INSTR_TIME_SET_CURRENT(now);
-                                       st->txn_begin = now;
-
-                                       /*
-                                        * When not throttling, this is also 
the transaction's
-                                        * scheduled start time.
-                                        */
-                                       if (!throttle_delay)
-                                               st->txn_scheduled = 
INSTR_TIME_GET_MICROSEC(now);
-                               }
+                               if (!throttle_delay)
+                                       st->txn_scheduled = 
INSTR_TIME_GET_MICROSEC(now);
 
                                /* Begin with the first command */
-                               st->command = 0;
                                st->state = CSTATE_START_COMMAND;
+                               st->command = 0;
                                break;
 
                                /*
@@ -2878,17 +2879,12 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                        break;
                                }
 
-                               /*
-                                * Record statement start time if per-command 
latencies are
-                                * requested
-                                */
-                               if (is_latencies)
-                               {
-                                       if (INSTR_TIME_IS_ZERO(now))
-                                               INSTR_TIME_SET_CURRENT(now);
-                                       st->stmt_begin = now;
-                               }
+                               /* record statement start time. */
+                               if (INSTR_TIME_IS_ZERO(now))
+                                       INSTR_TIME_SET_CURRENT(now);
+                               st->stmt_begin = now;
 
+                               /* execute the command */
                                if (command->type == SQL_COMMAND)
                                {
                                        if (!sendCommand(st, command))
@@ -2933,6 +2929,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
 
                                                if (INSTR_TIME_IS_ZERO(now))
                                                        
INSTR_TIME_SET_CURRENT(now);
+
                                                st->sleep_until = 
INSTR_TIME_GET_MICROSEC(now) + usec;
                                                st->state = CSTATE_SLEEP;
                                                break;
@@ -2983,10 +2980,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                                        }
                                                        else    /* elif */
                                                        {
-                                                               /*
-                                                                * we should 
get here only if the "elif"
-                                                                * needed 
evaluation
-                                                                */
+                                                               /* we should 
get here only if the "elif" needed evaluation */
                                                                
Assert(conditional_stack_peek(st->cstack) == IFSTATE_FALSE);
                                                                
conditional_stack_poke(st->cstack, cond ? IFSTATE_TRUE : IFSTATE_FALSE);
                                                        }
@@ -3018,43 +3012,23 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                        }
                                        else if (command->meta == META_SETSHELL)
                                        {
-                                               bool            ret = 
runShellCommand(st, argv[1], argv + 2, argc - 2);
-
-                                               if (timer_exceeded) /* timeout 
*/
-                                               {
-                                                       st->state = 
CSTATE_FINISHED;
-                                                       break;
-                                               }
-                                               else if (!ret)  /* on error */
+                                               if (!runShellCommand(st, 
argv[1], argv + 2, argc - 2))
                                                {
                                                        commandFailed(st, 
"setshell", "execution of meta-command failed");
                                                        st->state = 
CSTATE_ABORTED;
                                                        break;
                                                }
-                                               else
-                                               {
-                                                       /* succeeded */
-                                               }
+                                               /* else success */
                                        }
                                        else if (command->meta == META_SHELL)
                                        {
-                                               bool            ret = 
runShellCommand(st, NULL, argv + 1, argc - 1);
-
-                                               if (timer_exceeded) /* timeout 
*/
-                                               {
-                                                       st->state = 
CSTATE_FINISHED;
-                                                       break;
-                                               }
-                                               else if (!ret)  /* on error */
+                                               if (!runShellCommand(st, NULL, 
argv + 1, argc - 1))
                                                {
                                                        commandFailed(st, 
"shell", "execution of meta-command failed");
                                                        st->state = 
CSTATE_ABORTED;
                                                        break;
                                                }
-                                               else
-                                               {
-                                                       /* succeeded */
-                                               }
+                                               /* else success */
                                        }
 
                        move_to_end_command:
@@ -3156,6 +3130,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                        }
 
                                        if (st->state != CSTATE_SKIP_COMMAND)
+                                               /* out of quick skip command 
loop */
                                                break;
                                }
                                break;
@@ -3208,7 +3183,7 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                if (INSTR_TIME_IS_ZERO(now))
                                        INSTR_TIME_SET_CURRENT(now);
                                if (INSTR_TIME_GET_MICROSEC(now) < 
st->sleep_until)
-                                       return;         /* Still sleeping, 
nothing to do here */
+                                       return;         /* still sleeping, 
nothing to do here */
                                /* Else done sleeping. */
                                st->state = CSTATE_END_COMMAND;
                                break;
@@ -3223,17 +3198,14 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                 * in thread-local data structure, if 
per-command latencies
                                 * are requested.
                                 */
-                               if (is_latencies)
-                               {
-                                       if (INSTR_TIME_IS_ZERO(now))
-                                               INSTR_TIME_SET_CURRENT(now);
+                               if (INSTR_TIME_IS_ZERO(now))
+                                       INSTR_TIME_SET_CURRENT(now);
 
-                                       /* XXX could use a mutex here, but we 
choose not to */
-                                       command = 
sql_script[st->use_file].commands[st->command];
-                                       addToSimpleStats(&command->stats,
-                                                                        
INSTR_TIME_GET_DOUBLE(now) -
-                                                                        
INSTR_TIME_GET_DOUBLE(st->stmt_begin));
-                               }
+                               /* XXX could use a mutex here, but we choose 
not to */
+                               command = 
sql_script[st->use_file].commands[st->command];
+                               addToSimpleStats(&command->stats,
+                                                                
INSTR_TIME_GET_DOUBLE(now) -
+                                                                
INSTR_TIME_GET_DOUBLE(st->stmt_begin));
 
                                /* Go ahead with next command, to be executed 
or skipped */
                                st->command++;
@@ -3242,19 +3214,15 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                break;
 
                                /*
-                                * End of transaction.
+                                * End of transaction (end of script, really).
                                 */
                        case CSTATE_END_TX:
 
                                /* transaction finished: calculate latency and 
do log */
                                processXactStats(thread, st, &now, false, agg);
 
-                               /* conditional stack must be empty */
-                               if (!conditional_stack_empty(st->cstack))
-                               {
-                                       fprintf(stderr, "end of script reached 
within a conditional, missing \\endif\n");
-                                       exit(1);
-                               }
+                               /* missing \endif... cannot happen if 
CheckConditional was okay */
+                               Assert(conditional_stack_empty(st->cstack));
 
                                if (is_connect)
                                {
@@ -3268,26 +3236,17 @@ doCustom(TState *thread, CState *st, StatsData *agg)
                                        st->state = CSTATE_FINISHED;
                                        break;
                                }
+                               else
+                               {
+                                       /* next transaction */
+                                       st->state = CSTATE_CHOOSE_SCRIPT;
 
-                               /*
-                                * No transaction is underway anymore.
-                                */
-                               st->state = CSTATE_CHOOSE_SCRIPT;
-
-                               /*
-                                * If we paced through all commands in the 
script in this
-                                * loop, without returning to the caller even 
once, do it now.
-                                * This gives the thread a chance to process 
other
-                                * connections, and to do progress reporting.  
This can
-                                * currently only happen if the script consists 
entirely of
-                                * meta-commands.
-                                */
-                               if (end_tx_processed)
+                                       /*
+                                        * Ensure that we always return on this 
point, so as
+                                        * to avoid an infinite loop if the 
script only contains
+                                        * meta commands.
+                                        */
                                        return;
-                               else
-                               {
-                                       end_tx_processed = true;
-                                       break;
                                }
 
                                /*
@@ -5652,7 +5611,7 @@ threadRun(void *arg)
 
        if (!is_connect)
        {
-               /* make connections to the database */
+               /* make connections to the database before starting */
                for (i = 0; i < nstate; i++)
                {
                        if ((state[i].con = doConnect()) == NULL)
@@ -5686,14 +5645,7 @@ threadRun(void *arg)
                {
                        CState     *st = &state[i];
 
-                       if (st->state == CSTATE_THROTTLE && timer_exceeded)
-                       {
-                               /* interrupt client that has not started a 
transaction */
-                               st->state = CSTATE_FINISHED;
-                               finishCon(st);
-                               remains--;
-                       }
-                       else if (st->state == CSTATE_SLEEP || st->state == 
CSTATE_THROTTLE)
+                       if (st->state == CSTATE_SLEEP || st->state == 
CSTATE_THROTTLE)
                        {
                                /* a nap from the script, or under throttling */
                                int64           this_usec;

Reply via email to