Attached is an update that I think sorts out all of the documentation concerns. I broke this section into paragraphs now that it's getting so long too.

The only code change is that this now labels the controversial lag here "average rate limit schedule lag". That way if someone wants to introduce other measures of rate limit lag, like a more transaction oriented one, you might call that "average rate limit transaction lag" and tell the two apart.

The rewritten documentation here tries to communicate that there is a schedule that acts like it was pre-computed at the start of each client too. It's not ever adjusted based on what individual transactions do. I also noted the way this can cause schedule lag for some time after a slow transaction finishes, since that's the main issue observed so far.

--
Greg Smith   2ndQuadrant US    g...@2ndquadrant.com   Baltimore, MD
PostgreSQL Training, Services, and 24x7 Support www.2ndQuadrant.com
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
new file mode 100644
index 2ad8f0b..4e6b608
*** a/contrib/pgbench/pgbench.c
--- b/contrib/pgbench/pgbench.c
*************** int                     unlogged_tables = 0;
*** 137,142 ****
--- 137,148 ----
  double                sample_rate = 0.0;
  
  /*
+  * When threads are throttled to a given rate limit, this is the target delay
+  * to reach that rate in usec.  0 is the default and means no throttling.
+  */
+ int64         throttle_delay = 0;
+ 
+ /*
   * tablespace selection
   */
  char     *tablespace = NULL;
*************** typedef struct
*** 202,212 ****
--- 208,220 ----
        int                     listen;                 /* 0 indicates that an 
async query has been
                                                                 * sent */
        int                     sleeping;               /* 1 indicates that the 
client is napping */
+       bool        throttling;     /* whether nap is for throttling */
        int64           until;                  /* napping until (usec) */
        Variable   *variables;          /* array of variable definitions */
        int                     nvariables;
        instr_time      txn_begin;              /* used for measuring 
transaction latencies */
        instr_time      stmt_begin;             /* used for measuring statement 
latencies */
+       bool            is_throttled;   /* whether transaction throttling is 
done */
        int                     use_file;               /* index in sql_files 
for this client */
        bool            prepared[MAX_FILES];
  } CState;
*************** typedef struct
*** 224,229 ****
--- 232,240 ----
        instr_time *exec_elapsed;       /* time spent executing cmds (per 
Command) */
        int                *exec_count;         /* number of cmd executions 
(per Command) */
        unsigned short random_state[3];         /* separate randomness for each 
thread */
+       int64       throttle_trigger;   /* previous/next throttling (us) */
+       int64       throttle_lag;               /* total transaction lag behind 
throttling */
+       int64       throttle_lag_max;   /* max transaction lag */
  } TState;
  
  #define INVALID_THREAD                ((pthread_t) 0)
*************** typedef struct
*** 232,237 ****
--- 243,250 ----
  {
        instr_time      conn_time;
        int                     xacts;
+       int64       throttle_lag;
+       int64       throttle_lag_max;
  } TResult;
  
  /*
*************** usage(void)
*** 356,361 ****
--- 369,375 ----
                   "  -N, --skip-some-updates  skip updates of pgbench_tellers 
and pgbench_branches\n"
                   "  -P, --progress=NUM       show thread progress report 
every NUM seconds\n"
                   "  -r, --report-latencies   report average latency per 
command\n"
+                  "  -R, --rate SPEC          target rate in transactions per 
second\n"
                   "  -s, --scale=NUM          report this scale factor in 
output\n"
                   "  -S, --select-only        perform SELECT-only 
transactions\n"
                   "  -t, --transactions       number of transactions each 
client runs "
*************** doCustom(TState *thread, CState *st, ins
*** 898,914 ****
  {
        PGresult   *res;
        Command   **commands;
  
  top:
        commands = sql_files[st->use_file];
  
        if (st->sleeping)
        {                                                       /* are we 
sleeping? */
                instr_time      now;
  
                INSTR_TIME_SET_CURRENT(now);
!               if (st->until <= INSTR_TIME_GET_MICROSEC(now))
                        st->sleeping = 0;       /* Done sleeping, go ahead with 
next command */
                else
                        return true;            /* Still sleeping, nothing to 
do here */
        }
--- 912,973 ----
  {
        PGresult   *res;
        Command   **commands;
+       bool        trans_needs_throttle = false;
  
  top:
        commands = sql_files[st->use_file];
  
+       /*
+        * Handle throttling once per transaction by sleeping.  It is simpler
+        * to do this here rather than at the end, because so much complicated
+        * logic happens below when statements finish.
+        */
+       if (throttle_delay && ! st->is_throttled)
+       {
+               /*
+                * Use inverse transform sampling to randomly generate a delay, 
such
+                * that the series of delays will approximate a Poisson 
distribution
+                * centered on the throttle_delay time.
+                  *
+                  * 1000 implies a 6.9 (-log(1/1000)) to 0.0 (log 1.0) delay 
multiplier.
+                *
+                * If transactions are too slow or a given wait is shorter than
+                * a transaction, the next transaction will start right away.
+                */
+               int64 wait = (int64)
+                       throttle_delay * -log(getrand(thread, 1, 1000)/1000.0);
+ 
+               thread->throttle_trigger += wait;
+ 
+               st->until = thread->throttle_trigger;
+               st->sleeping = 1;
+               st->throttling = true;
+               st->is_throttled = true;
+               if (debug)
+                       fprintf(stderr, "client %d throttling "INT64_FORMAT" 
us\n",
+                                       st->id, wait);
+       }
+ 
        if (st->sleeping)
        {                                                       /* are we 
sleeping? */
                instr_time      now;
+               int64 now_us;
  
                INSTR_TIME_SET_CURRENT(now);
!               now_us = INSTR_TIME_GET_MICROSEC(now);
!               if (st->until <= now_us)
!               {
                        st->sleeping = 0;       /* Done sleeping, go ahead with 
next command */
+                       if (st->throttling)
+                       {
+                               /* Measure lag of throttled transaction 
relative to target */
+                               int64 lag = now_us - st->until;
+                               thread->throttle_lag += lag;
+                               if (lag > thread->throttle_lag_max)
+                                       thread->throttle_lag_max = lag;
+                               st->throttling = false;
+                       }
+               }
                else
                        return true;            /* Still sleeping, nothing to 
do here */
        }
*************** top:
*** 1095,1100 ****
--- 1154,1168 ----
                        st->state = 0;
                        st->use_file = (int) getrand(thread, 0, num_files - 1);
                        commands = sql_files[st->use_file];
+                       st->is_throttled = false;
+                       /*
+                        * No transaction is underway anymore, which means 
there is nothing
+                        * to listen to right now.  When throttling rate limits 
are active,
+                        * a sleep will happen next, as the next transaction 
starts.  And
+                        * then in any case the next SQL command will set 
listen back to 1.
+                        */
+                       st->listen = 0;
+                       trans_needs_throttle = (throttle_delay>0);
                }
        }
  
*************** top:
*** 1113,1118 ****
--- 1181,1196 ----
                INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
        }
  
+       /*
+          * This ensures that a throttling delay is inserted before proceeding
+          * with sql commands, after the first transaction. The first 
transaction
+          * throttling is performed when first entering doCustom.
+        */
+       if (trans_needs_throttle) {
+               trans_needs_throttle = false;
+               goto top;
+       }
+ 
        /* Record transaction start time if logging is enabled */
        if (logfile && st->state == 0)
                INSTR_TIME_SET_CURRENT(st->txn_begin);
*************** process_builtin(char *tb)
*** 2017,2023 ****
  static void
  printResults(int ttype, int normal_xacts, int nclients,
                         TState *threads, int nthreads,
!                        instr_time total_time, instr_time conn_total_time)
  {
        double          time_include,
                                tps_include,
--- 2095,2102 ----
  static void
  printResults(int ttype, int normal_xacts, int nclients,
                         TState *threads, int nthreads,
!                        instr_time total_time, instr_time conn_total_time,
!                        int64 throttle_lag, int64 throttle_lag_max)
  {
        double          time_include,
                                tps_include,
*************** printResults(int ttype, int normal_xacts
*** 2055,2060 ****
--- 2134,2152 ----
                printf("number of transactions actually processed: %d\n",
                           normal_xacts);
        }
+ 
+       if (throttle_delay)
+       {
+               /*
+                * Report average transaction lag under rate limit throttling.  
This
+                * is the delay between scheduled and actual start times for the
+                * transaction.  The measured lag may be caused by 
thread/client load,
+                * the database load, or the Poisson throttling process.
+                */
+               printf("average rate limit schedule lag: %.3f ms (max %.3f 
ms)\n",
+                          0.001 * throttle_lag / normal_xacts, 0.001 * 
throttle_lag_max);
+       }
+ 
        printf("tps = %f (including connections establishing)\n", tps_include);
        printf("tps = %f (excluding connections establishing)\n", tps_exclude);
  
*************** main(int argc, char **argv)
*** 2140,2145 ****
--- 2232,2238 ----
                {"unlogged-tables", no_argument, &unlogged_tables, 1},
                {"sampling-rate", required_argument, NULL, 4},
                {"aggregate-interval", required_argument, NULL, 5},
+               {"rate", required_argument, NULL, 'R'},
                {NULL, 0, NULL, 0}
        };
  
*************** main(int argc, char **argv)
*** 2162,2167 ****
--- 2255,2262 ----
        instr_time      total_time;
        instr_time      conn_total_time;
        int                     total_xacts;
+       int64       throttle_lag = 0;
+       int64       throttle_lag_max = 0;
  
        int                     i;
  
*************** main(int argc, char **argv)
*** 2206,2212 ****
        state = (CState *) pg_malloc(sizeof(CState));
        memset(state, 0, sizeof(CState));
  
!       while ((c = getopt_long(argc, argv, 
"ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:", long_options, &optindex)) != -1)
        {
                switch (c)
                {
--- 2301,2307 ----
        state = (CState *) pg_malloc(sizeof(CState));
        memset(state, 0, sizeof(CState));
  
!       while ((c = getopt_long(argc, argv, 
"ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:P:R:", long_options, &optindex)) != -1)
        {
                switch (c)
                {
*************** main(int argc, char **argv)
*** 2371,2376 ****
--- 2466,2484 ----
                                        exit(1);
                                }
                                break;
+                       case 'R':
+                       {
+                               /* get a double from the beginning of option 
value */
+                               double throttle_value = atof(optarg);
+                               if (throttle_value <= 0.0)
+                               {
+                                       fprintf(stderr, "invalid rate limit: 
%s\n", optarg);
+                                       exit(1);
+                               }
+                               /* Invert rate limit into a time offset */
+                               throttle_delay = (int64) (1000000.0 / 
throttle_value);
+                       }
+                               break;
                        case 0:
                                /* This covers long options which take no 
argument. */
                                break;
*************** main(int argc, char **argv)
*** 2408,2413 ****
--- 2516,2524 ----
                }
        }
  
+     /* compute a per thread delay */
+       throttle_delay *= nthreads;
+ 
        if (argc > optind)
                dbName = argv[optind];
        else
*************** main(int argc, char **argv)
*** 2721,2726 ****
--- 2832,2840 ----
                        TResult    *r = (TResult *) ret;
  
                        total_xacts += r->xacts;
+                       throttle_lag += r->throttle_lag;
+                       if (r->throttle_lag_max > throttle_lag_max)
+                               throttle_lag_max = r->throttle_lag_max;
                        INSTR_TIME_ADD(conn_total_time, r->conn_time);
                        free(ret);
                }
*************** main(int argc, char **argv)
*** 2731,2737 ****
        INSTR_TIME_SET_CURRENT(total_time);
        INSTR_TIME_SUBTRACT(total_time, start_time);
        printResults(ttype, total_xacts, nclients, threads, nthreads,
!                                total_time, conn_total_time);
  
        return 0;
  }
--- 2845,2851 ----
        INSTR_TIME_SET_CURRENT(total_time);
        INSTR_TIME_SUBTRACT(total_time, start_time);
        printResults(ttype, total_xacts, nclients, threads, nthreads,
!                                total_time, conn_total_time, throttle_lag, 
throttle_lag_max);
  
        return 0;
  }
*************** threadRun(void *arg)
*** 2756,2761 ****
--- 2870,2886 ----
  
        AggVals         aggs;
  
+       /*
+        * Initialize throttling rate target for all of the thread's clients.  
It
+        * might be a little more accurate to reset thread->start_time here too.
+        * The possible drift seems too small relative to typical throttle delay
+        * times to worry about it.
+        */
+       INSTR_TIME_SET_CURRENT(start);
+       thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
+       thread->throttle_lag = 0;
+       thread->throttle_lag_max = 0;
+ 
        result = pg_malloc(sizeof(TResult));
  
        INSTR_TIME_SET_ZERO(result->conn_time);
*************** threadRun(void *arg)
*** 2831,2855 ****
                        Command   **commands = sql_files[st->use_file];
                        int                     sock;
  
!                       if (st->sleeping)
                        {
!                               int                     this_usec;
! 
!                               if (min_usec == INT64_MAX)
                                {
!                                       instr_time      now;
! 
!                                       INSTR_TIME_SET_CURRENT(now);
!                                       now_usec = INSTR_TIME_GET_MICROSEC(now);
                                }
  
!                               this_usec = st->until - now_usec;
!                               if (min_usec > this_usec)
!                                       min_usec = this_usec;
!                       }
!                       else if (st->con == NULL)
!                       {
!                               continue;
                        }
                        else if (commands[st->state]->type == META_COMMAND)
                        {
--- 2956,2993 ----
                        Command   **commands = sql_files[st->use_file];
                        int                     sock;
  
!                       if (st->con == NULL)
                        {
!                               continue;
!                       }
!                       else if (st->sleeping)
!                       {
!                               if (st->throttling && timer_exceeded)
                                {
!                                       /* interrupt client which has not 
started a transaction */
!                                       remains--;
!                                       st->sleeping = 0;
!                                       st->throttling = false;
!                                       PQfinish(st->con);
!                                       st->con = NULL;
!                                       continue;
                                }
+                               else /* just a nap from the script */
+                               {
+                                       int                     this_usec;
  
!                                       if (min_usec == INT64_MAX)
!                                       {
!                                               instr_time      now;
! 
!                                               INSTR_TIME_SET_CURRENT(now);
!                                               now_usec = 
INSTR_TIME_GET_MICROSEC(now);
!                                       }
! 
!                                       this_usec = st->until - now_usec;
!                                       if (min_usec > this_usec)
!                                               min_usec = this_usec;
!                               }
                        }
                        else if (commands[st->state]->type == META_COMMAND)
                        {
*************** done:
*** 2986,2991 ****
--- 3124,3131 ----
        result->xacts = 0;
        for (i = 0; i < nstate; i++)
                result->xacts += state[i].cnt;
+       result->throttle_lag = thread->throttle_lag;
+       result->throttle_lag_max = thread->throttle_lag_max;
        INSTR_TIME_SET_CURRENT(end);
        INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
        if (logfile)
diff --git a/doc/src/sgml/pgbench.sgml b/doc/src/sgml/pgbench.sgml
new file mode 100644
index 62555e1..84be750
*** a/doc/src/sgml/pgbench.sgml
--- b/doc/src/sgml/pgbench.sgml
*************** pgbench <optional> <replaceable>options<
*** 418,423 ****
--- 418,469 ----
       </varlistentry>
  
       <varlistentry>
+       <term><option>-R</option> <replaceable>rate</></term>
+       <term><option>--rate</option> <replaceable>rate</></term>
+       <listitem>
+        <para>
+         Execute transactions targeting the specified rate instead of running
+         as fast as possible (the default).  The rate is given in transactions
+         per second.  If the targeted rate is above the maximum possible rate,
+         the rate limit won't impact the results.
+        </para>
+        <para>
+         The rate is targeted by starting transactions along a
+         Poisson-distributed schedule time line.  The expected finish time
+         schedule moves forward based on when the client first started, not
+         when the previous transaction ended.  That approach means that when
+         transactions go past their original scheduled end time, it is
+         possible for later ones to catch up again.
+        </para>
+        <para>        
+         When throttling is active, the average and maximum transaction
+         schedule lag time are reported in ms.  This is the delay between
+         the original scheduled transaction time and the actual transaction
+         start times.  The schedule lag shows whether a transaction was
+         started on time or late.  Once a client starts running behind its
+         schedule, every following transaction can continue to be penalized
+         for schedule lag.  If faster transactions are able to catch up, it's
+         possible for them to get back on schedule again.  The lag measurement
+         of every transaction is shown when pgbench is run with debugging
+         output.
+        </para>
+        <para>
+         High rate limit schedule lag values, that is values not small with
+         respect to the actual transaction latency, indicate that something is
+         amiss in the throttling process.  High lag can highlight a subtle
+         problem there even if the target rate limit is met in the end.  One
+         possible cause of schedule lage is insufficient pgbench threads to
+         handle all of the clients.  To improve that, consider reducing the
+         number of clients, increasing the number of threads in pgbench, or
+         running pgbench on a separate host.  Another possibility is that the
+         database is not keeping up with the load at some point.  When that
+         happens, you will have to reduce the expected transaction rate to
+         lower schedule lag.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-s</option> <replaceable>scale_factor</></term>
        <term><option>--scale=</option><replaceable>scale_factor</></term>
        <listitem>
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to