On 7/13/13 12:13 PM, Fabien COELHO wrote:
My 0.02€: if it means adding complexity to the pgbench code, I think
that it is not worth it. The point of pgbench is to look at a steady
state, not to end in the most graceful possible way as far as measures
are concerned.

That's how some people use pgbench. I'm just as likely to use it to characterize system latency. If there's a source of latency that's specific to the pgbench code, I want that out of there even if it's hard.

But we don't have to argue about that because it isn't. The attached new patch seems to fix the latency spikes at the end, with -2 lines of new code! With that resolved I did a final pass across the rate limit code too, attached as a v14 and ready for a committer. I don't really care what order these two changes are committed, there's no hard dependency, but I would like to see them both go in eventually.

No functional code was changed from your v13 except for tweaking the output. The main thing I did was expand/edit comments and rename a few variables to try and make this easier to read. If you have any objections to my cosmetic changes feel free to post an update. I've put a good bit of time into trying to simplify this further, thinking it can't really be this hard. But this seems to be the minimum complexity that still works given the mess of the pgbench state machine. Every change I try now breaks something.

To wrap up the test results motivating my little pgbench-delay-finish patch, the throttled cases that were always giving >100ms of latency clustered at the end here now look like this:

average rate limit lag: 0.181 ms (max 53.108 ms)
tps = 10088.727398 (including connections establishing)
tps = 10105.775864 (excluding connections establishing)

There are still some of these cases where latency spikes, but they're not as big and they're randomly distributed throughout the run. The problem I had with the ones at the end is how they tended to happen a few times in a row. I kept seeing multiple of these ~50ms lulls adding up to a huge one, because the source of the lag kept triggering at every connection close.

pgbench was already cleaning up all of its connections at the end, after all the transactions were finished. It looks safe to me to just rely on that for calling PQfinish in the normal case. And calls to client_done already label themselves ok or abort, the code just didn't do anything with that state before. I tried adding some more complicated state tracking, but that adds complexity while doing the exact same thing as the simple implementation I did.

The only part of your code change I reverted was altering the latency log transaction timestamps to read like "1373821907.65702" instead of "1373821907 65702". Both formats were considered when I added that feature, and I completely understand why you'd like to change it. One problem is that doing so introduces a new class of float parsing and rounding issues to consumers of that data. I'd rather not revisit that without a better reason to break the output format. Parsing tools like my pgbench-tools already struggle trying to support multiple versions of pgbench, and I don't think there's enough benefit to the float format to bother breaking them today.

--
Greg Smith   2ndQuadrant US    g...@2ndquadrant.com   Baltimore, MD
PostgreSQL Training, Services, and 24x7 Support www.2ndQuadrant.com
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
new file mode 100644
index 08095a9..8dc81e5
*** a/contrib/pgbench/pgbench.c
--- b/contrib/pgbench/pgbench.c
*************** preparedStatementName(char *buffer, int 
*** 862,875 ****
  static bool
  clientDone(CState *st, bool ok)
  {
!       /*
!        * When the connection finishes normally, don't call PQfinish yet.
!        * PQfinish can cause significant delays in other clients that are
!        * still running.  Rather than finishing all of them here, in the
!        * normal case clients are instead closed in bulk by disconnect_all,
!        * after they have all stopped.
!        */
!       if ((st->con != NULL) && ok)
        {
                PQfinish(st->con);
                st->con = NULL;
--- 862,870 ----
  static bool
  clientDone(CState *st, bool ok)
  {
!       (void) ok;                                      /* unused */
! 
!       if (st->con != NULL)
        {
                PQfinish(st->con);
                st->con = NULL;
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
index 80203d6..da88bd7 100644
--- a/contrib/pgbench/pgbench.c
+++ b/contrib/pgbench/pgbench.c
@@ -137,6 +137,12 @@ int                        unlogged_tables = 0;
 double         sample_rate = 0.0;
 
 /*
+ * When threads are throttled to a given rate limit, this is the target delay
+ * to reach that rate in usec.  0 is the default and means no throttling.
+ */
+int64          throttle_delay = 0;
+
+/*
  * tablespace selection
  */
 char      *tablespace = NULL;
@@ -200,11 +206,13 @@ typedef struct
        int                     listen;                 /* 0 indicates that an 
async query has been
                                                                 * sent */
        int                     sleeping;               /* 1 indicates that the 
client is napping */
+    bool        throttling;     /* whether nap is for throttling */
        int64           until;                  /* napping until (usec) */
        Variable   *variables;          /* array of variable definitions */
        int                     nvariables;
        instr_time      txn_begin;              /* used for measuring 
transaction latencies */
        instr_time      stmt_begin;             /* used for measuring statement 
latencies */
+       bool            throttled;      /* whether current transaction was 
throttled */
        int                     use_file;               /* index in sql_files 
for this client */
        bool            prepared[MAX_FILES];
 } CState;
@@ -222,6 +230,10 @@ typedef struct
        instr_time *exec_elapsed;       /* time spent executing cmds (per 
Command) */
        int                *exec_count;         /* number of cmd executions 
(per Command) */
        unsigned short random_state[3];         /* separate randomness for each 
thread */
+    int64       throttle_trigger;  /* previous/next throttling (us) */
+       int64       throttle_lag;      /* total transaction lag behind 
throttling */
+       int64       throttle_lag_max;  /* max transaction lag */
+
 } TState;
 
 #define INVALID_THREAD         ((pthread_t) 0)
@@ -230,6 +242,8 @@ typedef struct
 {
        instr_time      conn_time;
        int                     xacts;
+       int64       throttle_lag;
+       int64       throttle_lag_max;
 } TResult;
 
 /*
@@ -353,6 +367,7 @@ usage(void)
                   "  -n, --no-vacuum          do not run VACUUM before tests\n"
                   "  -N, --skip-some-updates  skip updates of pgbench_tellers 
and pgbench_branches\n"
                   "  -r, --report-latencies   report average latency per 
command\n"
+                  "  -R, --rate SPEC          target rate in transactions per 
second\n"
                   "  -s, --scale=NUM          report this scale factor in 
output\n"
                   "  -S, --select-only        perform SELECT-only 
transactions\n"
                   "  -t, --transactions       number of transactions each 
client runs "
@@ -895,17 +910,58 @@ doCustom(TState *thread, CState *st, instr_time 
*conn_time, FILE *logfile, AggVa
 {
        PGresult   *res;
        Command   **commands;
+       bool        do_throttle = false;
 
 top:
        commands = sql_files[st->use_file];
 
+       /* handle throttling once per transaction by inserting a sleep.
+        * this is simpler than doing it at the end.
+        */
+       if (throttle_delay && ! st->throttled)
+       {
+               /* compute delay to approximate a Poisson distribution
+                * 1000000 => 13.8 .. 0 multiplier
+                *  100000 => 11.5 .. 0
+                *   10000 =>  9.2 .. 0
+                *    1000 =>  6.9 .. 0
+                * if transactions are too slow or a given wait shorter than
+                * a transaction, the next transaction will start right away.
+                */
+               int64 wait = (int64)
+                       throttle_delay * -log(getrand(thread, 1, 1000)/1000.0);
+
+               thread->throttle_trigger += wait;
+
+               st->until = thread->throttle_trigger;
+               st->sleeping = 1;
+               st->throttling = true;
+               st->throttled = true;
+               if (debug)
+                       fprintf(stderr, "client %d throttling "INT64_FORMAT" 
us\n",
+                                       st->id, wait);
+       }
+
        if (st->sleeping)
        {                                                       /* are we 
sleeping? */
                instr_time      now;
+               int64 now_us;
 
                INSTR_TIME_SET_CURRENT(now);
-               if (st->until <= INSTR_TIME_GET_MICROSEC(now))
+               now_us = INSTR_TIME_GET_MICROSEC(now);
+               if (st->until <= now_us)
+               {
                        st->sleeping = 0;       /* Done sleeping, go ahead with 
next command */
+                       if (st->throttling)
+                       {
+                               /* measure lag of throttled transaction */
+                               int64 lag = now_us - st->until;
+                               thread->throttle_lag += lag;
+                               if (lag > thread->throttle_lag_max)
+                                       thread->throttle_lag_max = lag;
+                               st->throttling = false;
+                       }
+               }
                else
                        return true;            /* Still sleeping, nothing to 
do here */
        }
@@ -1034,7 +1090,7 @@ top:
                                         * This is more than we really ought to 
know about
                                         * instr_time
                                         */
-                                       fprintf(logfile, "%d %d %.0f %d %ld 
%ld\n",
+                                       fprintf(logfile, "%d %d %.0f %d 
%ld.%06ld\n",
                                                        st->id, st->cnt, usec, 
st->use_file,
                                                        (long) now.tv_sec, 
(long) now.tv_usec);
 #else
@@ -1043,7 +1099,7 @@ top:
                                         * On Windows, instr_time doesn't 
provide a timestamp
                                         * anyway
                                         */
-                                       fprintf(logfile, "%d %d %.0f %d 0 0\n",
+                                       fprintf(logfile, "%d %d %.0f %d 0.0\n",
                                                        st->id, st->cnt, usec, 
st->use_file);
 #endif
                                }
@@ -1092,6 +1148,13 @@ top:
                        st->state = 0;
                        st->use_file = (int) getrand(thread, 0, num_files - 1);
                        commands = sql_files[st->use_file];
+                       st->throttled = false;
+                       /* no transaction is underway, there is nothing to 
listen any more.
+                        * under throttling, a sleep is going to be inserted, 
and then
+                        * some SQL command will set listen back to 1.
+                        */
+                       st->listen = 0;
+                       do_throttle = (throttle_delay>0);
                }
        }
 
@@ -1110,6 +1173,12 @@ top:
                INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
        }
 
+       if (do_throttle) {
+               /* delay throttling after reopenning the connection */
+               do_throttle = false;
+               goto top;
+       }
+
        /* Record transaction start time if logging is enabled */
        if (logfile && st->state == 0)
                INSTR_TIME_SET_CURRENT(st->txn_begin);
@@ -2014,7 +2083,8 @@ process_builtin(char *tb)
 static void
 printResults(int ttype, int normal_xacts, int nclients,
                         TState *threads, int nthreads,
-                        instr_time total_time, instr_time conn_total_time)
+                        instr_time total_time, instr_time conn_total_time,
+                        int64 throttle_lag, int64 throttle_lag_max)
 {
        double          time_include,
                                tps_include,
@@ -2052,6 +2122,18 @@ printResults(int ttype, int normal_xacts, int nclients,
                printf("number of transactions actually processed: %d\n",
                           normal_xacts);
        }
+
+       if (throttle_delay)
+       {
+               /* Report average transaction lag under throttling, i.e. the 
delay
+                  between scheduled and actual start times for the transaction.
+                  The measured lag may be linked to the thread/client load,
+                  the database load, or the Poisson throttling process.
+                */
+               printf("average transaction lag: %.3f ms (max %.3f ms)\n",
+                          0.001 * throttle_lag / normal_xacts, 0.001 * 
throttle_lag_max);
+       }
+
        printf("tps = %f (including connections establishing)\n", tps_include);
        printf("tps = %f (excluding connections establishing)\n", tps_exclude);
 
@@ -2136,6 +2218,7 @@ main(int argc, char **argv)
                {"unlogged-tables", no_argument, &unlogged_tables, 1},
                {"sampling-rate", required_argument, NULL, 4},
                {"aggregate-interval", required_argument, NULL, 5},
+               {"rate", required_argument, NULL, 'R'},
                {NULL, 0, NULL, 0}
        };
 
@@ -2158,6 +2241,8 @@ main(int argc, char **argv)
        instr_time      total_time;
        instr_time      conn_total_time;
        int                     total_xacts;
+       int64       throttle_lag = 0;
+       int64       throttle_lag_max = 0;
 
        int                     i;
 
@@ -2202,7 +2287,7 @@ main(int argc, char **argv)
        state = (CState *) pg_malloc(sizeof(CState));
        memset(state, 0, sizeof(CState));
 
-       while ((c = getopt_long(argc, argv, 
"ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
+       while ((c = getopt_long(argc, argv, 
"ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:R:", long_options, &optindex)) != -1)
        {
                switch (c)
                {
@@ -2357,6 +2442,19 @@ main(int argc, char **argv)
                                        exit(1);
                                }
                                break;
+                       case 'R':
+                       {
+                               /* get a double from the beginning of option 
value */
+                               double throttle_value = atof(optarg);
+                               if (throttle_value <= 0.0)
+                               {
+                                       fprintf(stderr, "invalid rate limit: 
%s\n", optarg);
+                                       exit(1);
+                               }
+                               /* Invert rate limit into a time offset */
+                               throttle_delay = (int64) (1000000.0 / 
throttle_value);
+                       }
+                               break;
                        case 0:
                                /* This covers long options which take no 
argument. */
                                break;
@@ -2394,6 +2492,9 @@ main(int argc, char **argv)
                }
        }
 
+    /* compute a per thread delay */
+       throttle_delay *= nthreads;
+
        if (argc > optind)
                dbName = argv[optind];
        else
@@ -2706,6 +2807,9 @@ main(int argc, char **argv)
                        TResult    *r = (TResult *) ret;
 
                        total_xacts += r->xacts;
+                       throttle_lag += r->throttle_lag;
+                       if (r->throttle_lag_max > throttle_lag_max)
+                               throttle_lag_max = r->throttle_lag_max;
                        INSTR_TIME_ADD(conn_total_time, r->conn_time);
                        free(ret);
                }
@@ -2716,7 +2820,7 @@ main(int argc, char **argv)
        INSTR_TIME_SET_CURRENT(total_time);
        INSTR_TIME_SUBTRACT(total_time, start_time);
        printResults(ttype, total_xacts, nclients, threads, nthreads,
-                                total_time, conn_total_time);
+                                total_time, conn_total_time, throttle_lag, 
throttle_lag_max);
 
        return 0;
 }
@@ -2736,6 +2840,15 @@ threadRun(void *arg)
 
        AggVals         aggs;
 
+       /* SHOULD take actual thread start time when the thread is running? */
+       /* INSTR_TIME_SET_CURRENT(thread->start_time); */
+
+       /* throttling for all thread's clients */
+       INSTR_TIME_SET_CURRENT(start);
+       thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
+       thread->throttle_lag = 0;
+       thread->throttle_lag_max = 0;
+
        result = pg_malloc(sizeof(TResult));
 
        INSTR_TIME_SET_ZERO(result->conn_time);
@@ -2811,25 +2924,38 @@ threadRun(void *arg)
                        Command   **commands = sql_files[st->use_file];
                        int                     sock;
 
-                       if (st->sleeping)
+                       if (st->con == NULL)
                        {
-                               int                     this_usec;
-
-                               if (min_usec == INT64_MAX)
+                               continue;
+                       }
+                       else if (st->sleeping)
+                       {
+                               if (st->throttling && timer_exceeded)
                                {
-                                       instr_time      now;
-
-                                       INSTR_TIME_SET_CURRENT(now);
-                                       now_usec = INSTR_TIME_GET_MICROSEC(now);
+                                       /* interrupt client which has not 
started a transaction */
+                                       remains--;
+                                       st->sleeping = 0;
+                                       st->throttling = false;
+                                       PQfinish(st->con);
+                                       st->con = NULL;
+                                       continue;
                                }
+                               else /* just a nap from the script */
+                               {
+                                       int                     this_usec;
 
-                               this_usec = st->until - now_usec;
-                               if (min_usec > this_usec)
-                                       min_usec = this_usec;
-                       }
-                       else if (st->con == NULL)
-                       {
-                               continue;
+                                       if (min_usec == INT64_MAX)
+                                       {
+                                               instr_time      now;
+
+                                               INSTR_TIME_SET_CURRENT(now);
+                                               now_usec = 
INSTR_TIME_GET_MICROSEC(now);
+                                       }
+
+                                       this_usec = st->until - now_usec;
+                                       if (min_usec > this_usec)
+                                               min_usec = this_usec;
+                               }
                        }
                        else if (commands[st->state]->type == META_COMMAND)
                        {
@@ -2904,6 +3030,8 @@ done:
        result->xacts = 0;
        for (i = 0; i < nstate; i++)
                result->xacts += state[i].cnt;
+       result->throttle_lag = thread->throttle_lag;
+       result->throttle_lag_max = thread->throttle_lag_max;
        INSTR_TIME_SET_CURRENT(end);
        INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
        if (logfile)
diff --git a/doc/src/sgml/pgbench.sgml b/doc/src/sgml/pgbench.sgml
index a7f41e1..a5fd218 100644
--- a/doc/src/sgml/pgbench.sgml
+++ b/doc/src/sgml/pgbench.sgml
@@ -408,6 +408,27 @@ pgbench <optional> <replaceable>options</> </optional> 
<replaceable>dbname</>
      </varlistentry>
 
      <varlistentry>
+      <term><option>-R</option> <replaceable>rate</></term>
+      <term><option>--rate</option> <replaceable>rate</></term>
+      <listitem>
+       <para>
+       Execute transactions targeting the specified rate instead of
+       running as fast as possible (the default).  The rate is given in
+        transactions per second.  If the targeted rate is
+        above the maximum possible rate these transactions can execute at,
+        the rate limit won't have any impact on results.
+
+       The rate is targeted by starting transactions along a
+       Poisson-distributed event time line.  When a rate limit is
+        active, the average and maximum transaction lag time
+       (the delay between the scheduled and actual transaction start times)
+       are reported in ms. High values indicate that the database
+       could not handle the scheduled load at some time.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>-s</option> <replaceable>scale_factor</></term>
       <term><option>--scale=</option><replaceable>scale_factor</></term>
       <listitem>
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to