On 7/14/13 2:48 PM, Fabien COELHO wrote:
You attached my v13. Could you send your v14?

Correct patch (and the little one from me again) attached this time.

--
Greg Smith   2ndQuadrant US    g...@2ndquadrant.com   Baltimore, MD
PostgreSQL Training, Services, and 24x7 Support www.2ndQuadrant.com
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
new file mode 100644
index 08095a9..6564057
*** a/contrib/pgbench/pgbench.c
--- b/contrib/pgbench/pgbench.c
*************** int                     unlogged_tables = 0;
*** 137,142 ****
--- 137,148 ----
  double                sample_rate = 0.0;
  
  /*
+  * When threads are throttled to a given rate limit, this is the target delay
+  * to reach that rate in usec.  0 is the default and means no throttling.
+  */
+ int64         throttle_delay = 0;
+ 
+ /*
   * tablespace selection
   */
  char     *tablespace = NULL;
*************** typedef struct
*** 200,210 ****
--- 206,218 ----
        int                     listen;                 /* 0 indicates that an 
async query has been
                                                                 * sent */
        int                     sleeping;               /* 1 indicates that the 
client is napping */
+       bool        throttling;     /* whether nap is for throttling */
        int64           until;                  /* napping until (usec) */
        Variable   *variables;          /* array of variable definitions */
        int                     nvariables;
        instr_time      txn_begin;              /* used for measuring 
transaction latencies */
        instr_time      stmt_begin;             /* used for measuring statement 
latencies */
+       bool            is_throttled;   /* whether transaction should be 
throttled */
        int                     use_file;               /* index in sql_files 
for this client */
        bool            prepared[MAX_FILES];
  } CState;
*************** typedef struct
*** 222,227 ****
--- 230,238 ----
        instr_time *exec_elapsed;       /* time spent executing cmds (per 
Command) */
        int                *exec_count;         /* number of cmd executions 
(per Command) */
        unsigned short random_state[3];         /* separate randomness for each 
thread */
+       int64       throttle_trigger;   /* previous/next throttling (us) */
+       int64       throttle_lag;               /* total transaction lag behind 
throttling */
+       int64       throttle_lag_max;   /* max transaction lag */
  } TState;
  
  #define INVALID_THREAD                ((pthread_t) 0)
*************** typedef struct
*** 230,235 ****
--- 241,248 ----
  {
        instr_time      conn_time;
        int                     xacts;
+       int64       throttle_lag;
+       int64       throttle_lag_max;
  } TResult;
  
  /*
*************** usage(void)
*** 353,358 ****
--- 366,372 ----
                   "  -n, --no-vacuum          do not run VACUUM before tests\n"
                   "  -N, --skip-some-updates  skip updates of pgbench_tellers 
and pgbench_branches\n"
                   "  -r, --report-latencies   report average latency per 
command\n"
+                  "  -R, --rate SPEC          target rate in transactions per 
second\n"
                   "  -s, --scale=NUM          report this scale factor in 
output\n"
                   "  -S, --select-only        perform SELECT-only 
transactions\n"
                   "  -t, --transactions       number of transactions each 
client runs "
*************** doCustom(TState *thread, CState *st, ins
*** 900,916 ****
  {
        PGresult   *res;
        Command   **commands;
  
  top:
        commands = sql_files[st->use_file];
  
        if (st->sleeping)
        {                                                       /* are we 
sleeping? */
                instr_time      now;
  
                INSTR_TIME_SET_CURRENT(now);
!               if (st->until <= INSTR_TIME_GET_MICROSEC(now))
                        st->sleeping = 0;       /* Done sleeping, go ahead with 
next command */
                else
                        return true;            /* Still sleeping, nothing to 
do here */
        }
--- 914,973 ----
  {
        PGresult   *res;
        Command   **commands;
+       bool        trans_needs_throttle = false;
  
  top:
        commands = sql_files[st->use_file];
  
+       /*
+        * Handle throttling once per transaction by sleeping.  It is simpler
+        * to do this here rather than at the end, because so much complicated
+        * logic happens below when statements finish.
+        */
+       if (throttle_delay && ! st->is_throttled)
+       {
+               /*
+                * Use inverse transform sampling to randomly generate a delay, 
such
+                * that the series of delays will approximate a Poisson 
distribution
+                * centered on the throttle_delay time.
+                *
+                * If transactions are too slow or a given wait is shorter than
+                * a transaction, the next transaction will start right away.
+                */
+               int64 wait = (int64)
+                       throttle_delay * -log(getrand(thread, 1, 1000)/1000.0);
+ 
+               thread->throttle_trigger += wait;
+ 
+               st->until = thread->throttle_trigger;
+               st->sleeping = 1;
+               st->throttling = true;
+               st->is_throttled = true;
+               if (debug)
+                       fprintf(stderr, "client %d throttling "INT64_FORMAT" 
us\n",
+                                       st->id, wait);
+       }
+ 
        if (st->sleeping)
        {                                                       /* are we 
sleeping? */
                instr_time      now;
+               int64 now_us;
  
                INSTR_TIME_SET_CURRENT(now);
!               now_us = INSTR_TIME_GET_MICROSEC(now);
!               if (st->until <= now_us)
!               {
                        st->sleeping = 0;       /* Done sleeping, go ahead with 
next command */
+                       if (st->throttling)
+                       {
+                               /* Measure lag of throttled transaction 
relative to target */
+                               int64 lag = now_us - st->until;
+                               thread->throttle_lag += lag;
+                               if (lag > thread->throttle_lag_max)
+                                       thread->throttle_lag_max = lag;
+                               st->throttling = false;
+                       }
+               }
                else
                        return true;            /* Still sleeping, nothing to 
do here */
        }
*************** top:
*** 1097,1102 ****
--- 1154,1168 ----
                        st->state = 0;
                        st->use_file = (int) getrand(thread, 0, num_files - 1);
                        commands = sql_files[st->use_file];
+                       st->is_throttled = false;
+                       /*
+                        * No transaction is underway anymore, which means 
there is nothing
+                        * to listen to right now.  When throttling rate limits 
are active,
+                        * a sleep will happen next, as the next transaction 
starts.  And
+                        * then in any case the next SQL command will set 
listen back to 1.
+                        */
+                       st->listen = 0;
+                       trans_needs_throttle = (throttle_delay>0);
                }
        }
  
*************** top:
*** 1115,1120 ****
--- 1181,1195 ----
                INSTR_TIME_ACCUM_DIFF(*conn_time, end, start);
        }
  
+       /*
+        * This won't add a throttling delay during the first transaction after
+        * opening a connection.  They will be inserted starting with the 
second.
+        */
+       if (trans_needs_throttle) {
+               trans_needs_throttle = false;
+               goto top;
+       }
+ 
        /* Record transaction start time if logging is enabled */
        if (logfile && st->state == 0)
                INSTR_TIME_SET_CURRENT(st->txn_begin);
*************** process_builtin(char *tb)
*** 2019,2025 ****
  static void
  printResults(int ttype, int normal_xacts, int nclients,
                         TState *threads, int nthreads,
!                        instr_time total_time, instr_time conn_total_time)
  {
        double          time_include,
                                tps_include,
--- 2094,2101 ----
  static void
  printResults(int ttype, int normal_xacts, int nclients,
                         TState *threads, int nthreads,
!                        instr_time total_time, instr_time conn_total_time,
!                        int64 throttle_lag, int64 throttle_lag_max)
  {
        double          time_include,
                                tps_include,
*************** printResults(int ttype, int normal_xacts
*** 2057,2062 ****
--- 2133,2151 ----
                printf("number of transactions actually processed: %d\n",
                           normal_xacts);
        }
+ 
+       if (throttle_delay)
+       {
+               /*
+                * Report average transaction lag under rate limit throttling.  
This
+                * is the delay between scheduled and actual start times for the
+                * transaction.  The measured lag may be caused by 
thread/client load,
+                * the database load, or the Poisson throttling process.
+                */
+               printf("average rate limit lag: %.3f ms (max %.3f ms)\n",
+                          0.001 * throttle_lag / normal_xacts, 0.001 * 
throttle_lag_max);
+       }
+ 
        printf("tps = %f (including connections establishing)\n", tps_include);
        printf("tps = %f (excluding connections establishing)\n", tps_exclude);
  
*************** main(int argc, char **argv)
*** 2141,2146 ****
--- 2230,2236 ----
                {"unlogged-tables", no_argument, &unlogged_tables, 1},
                {"sampling-rate", required_argument, NULL, 4},
                {"aggregate-interval", required_argument, NULL, 5},
+               {"rate", required_argument, NULL, 'R'},
                {NULL, 0, NULL, 0}
        };
  
*************** main(int argc, char **argv)
*** 2163,2168 ****
--- 2253,2260 ----
        instr_time      total_time;
        instr_time      conn_total_time;
        int                     total_xacts;
+       int64       throttle_lag = 0;
+       int64       throttle_lag_max = 0;
  
        int                     i;
  
*************** main(int argc, char **argv)
*** 2207,2213 ****
        state = (CState *) pg_malloc(sizeof(CState));
        memset(state, 0, sizeof(CState));
  
!       while ((c = getopt_long(argc, argv, 
"ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:", long_options, &optindex)) != -1)
        {
                switch (c)
                {
--- 2299,2305 ----
        state = (CState *) pg_malloc(sizeof(CState));
        memset(state, 0, sizeof(CState));
  
!       while ((c = getopt_long(argc, argv, 
"ih:nvp:dqSNc:j:Crs:t:T:U:lf:D:F:M:R:", long_options, &optindex)) != -1)
        {
                switch (c)
                {
*************** main(int argc, char **argv)
*** 2362,2367 ****
--- 2454,2472 ----
                                        exit(1);
                                }
                                break;
+                       case 'R':
+                       {
+                               /* get a double from the beginning of option 
value */
+                               double throttle_value = atof(optarg);
+                               if (throttle_value <= 0.0)
+                               {
+                                       fprintf(stderr, "invalid rate limit: 
%s\n", optarg);
+                                       exit(1);
+                               }
+                               /* Invert rate limit into a time offset */
+                               throttle_delay = (int64) (1000000.0 / 
throttle_value);
+                       }
+                               break;
                        case 0:
                                /* This covers long options which take no 
argument. */
                                break;
*************** main(int argc, char **argv)
*** 2399,2404 ****
--- 2504,2512 ----
                }
        }
  
+     /* compute a per thread delay */
+       throttle_delay *= nthreads;
+ 
        if (argc > optind)
                dbName = argv[optind];
        else
*************** main(int argc, char **argv)
*** 2711,2716 ****
--- 2819,2827 ----
                        TResult    *r = (TResult *) ret;
  
                        total_xacts += r->xacts;
+                       throttle_lag += r->throttle_lag;
+                       if (r->throttle_lag_max > throttle_lag_max)
+                               throttle_lag_max = r->throttle_lag_max;
                        INSTR_TIME_ADD(conn_total_time, r->conn_time);
                        free(ret);
                }
*************** main(int argc, char **argv)
*** 2721,2727 ****
        INSTR_TIME_SET_CURRENT(total_time);
        INSTR_TIME_SUBTRACT(total_time, start_time);
        printResults(ttype, total_xacts, nclients, threads, nthreads,
!                                total_time, conn_total_time);
  
        return 0;
  }
--- 2832,2838 ----
        INSTR_TIME_SET_CURRENT(total_time);
        INSTR_TIME_SUBTRACT(total_time, start_time);
        printResults(ttype, total_xacts, nclients, threads, nthreads,
!                                total_time, conn_total_time, throttle_lag, 
throttle_lag_max);
  
        return 0;
  }
*************** threadRun(void *arg)
*** 2741,2746 ****
--- 2852,2868 ----
  
        AggVals         aggs;
  
+       /*
+        * Initialize throttling rate target for all of the thread's clients.  
It
+        * might be a little more accurate to reset thread->start_time here too.
+        * The possible drift seems too small relative to typical throttle delay
+        * times to worry about it.
+        */
+       INSTR_TIME_SET_CURRENT(start);
+       thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
+       thread->throttle_lag = 0;
+       thread->throttle_lag_max = 0;
+ 
        result = pg_malloc(sizeof(TResult));
  
        INSTR_TIME_SET_ZERO(result->conn_time);
*************** threadRun(void *arg)
*** 2816,2840 ****
                        Command   **commands = sql_files[st->use_file];
                        int                     sock;
  
!                       if (st->sleeping)
                        {
!                               int                     this_usec;
! 
!                               if (min_usec == INT64_MAX)
                                {
!                                       instr_time      now;
! 
!                                       INSTR_TIME_SET_CURRENT(now);
!                                       now_usec = INSTR_TIME_GET_MICROSEC(now);
                                }
  
!                               this_usec = st->until - now_usec;
!                               if (min_usec > this_usec)
!                                       min_usec = this_usec;
!                       }
!                       else if (st->con == NULL)
!                       {
!                               continue;
                        }
                        else if (commands[st->state]->type == META_COMMAND)
                        {
--- 2938,2975 ----
                        Command   **commands = sql_files[st->use_file];
                        int                     sock;
  
!                       if (st->con == NULL)
                        {
!                               continue;
!                       }
!                       else if (st->sleeping)
!                       {
!                               if (st->throttling && timer_exceeded)
                                {
!                                       /* interrupt client which has not 
started a transaction */
!                                       remains--;
!                                       st->sleeping = 0;
!                                       st->throttling = false;
!                                       PQfinish(st->con);
!                                       st->con = NULL;
!                                       continue;
                                }
+                               else /* just a nap from the script */
+                               {
+                                       int                     this_usec;
  
!                                       if (min_usec == INT64_MAX)
!                                       {
!                                               instr_time      now;
! 
!                                               INSTR_TIME_SET_CURRENT(now);
!                                               now_usec = 
INSTR_TIME_GET_MICROSEC(now);
!                                       }
! 
!                                       this_usec = st->until - now_usec;
!                                       if (min_usec > this_usec)
!                                               min_usec = this_usec;
!                               }
                        }
                        else if (commands[st->state]->type == META_COMMAND)
                        {
*************** done:
*** 2909,2914 ****
--- 3044,3051 ----
        result->xacts = 0;
        for (i = 0; i < nstate; i++)
                result->xacts += state[i].cnt;
+       result->throttle_lag = thread->throttle_lag;
+       result->throttle_lag_max = thread->throttle_lag_max;
        INSTR_TIME_SET_CURRENT(end);
        INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
        if (logfile)
diff --git a/doc/src/sgml/pgbench.sgml b/doc/src/sgml/pgbench.sgml
new file mode 100644
index a7f41e1..a6ab1bc
*** a/doc/src/sgml/pgbench.sgml
--- b/doc/src/sgml/pgbench.sgml
*************** pgbench <optional> <replaceable>options<
*** 408,413 ****
--- 408,434 ----
       </varlistentry>
  
       <varlistentry>
+       <term><option>-R</option> <replaceable>rate</></term>
+       <term><option>--rate</option> <replaceable>rate</></term>
+       <listitem>
+        <para>
+         Execute transactions targeting the specified rate instead of
+         running as fast as possible (the default).  The rate is given in
+         transactions per second.  If the targeted rate is
+         above the maximum possible rate these transactions can execute at,
+         the rate limit won't have any impact on results.
+ 
+         The rate is targeted by starting transactions along a
+         Poisson-distributed event time line.  When a rate limit is
+         active, the average and maximum transaction lag time
+         (the delay between the scheduled and actual transaction start times)
+         are reported in ms. High values indicate that the database
+         could not handle the scheduled load at some time.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-s</option> <replaceable>scale_factor</></term>
        <term><option>--scale=</option><replaceable>scale_factor</></term>
        <listitem>
diff --git a/contrib/pgbench/pgbench.c b/contrib/pgbench/pgbench.c
new file mode 100644
index 08095a9..8dc81e5
*** a/contrib/pgbench/pgbench.c
--- b/contrib/pgbench/pgbench.c
*************** preparedStatementName(char *buffer, int 
*** 862,875 ****
  static bool
  clientDone(CState *st, bool ok)
  {
!       /*
!        * When the connection finishes normally, don't call PQfinish yet.
!        * PQfinish can cause significant delays in other clients that are
!        * still running.  Rather than finishing all of them here, in the
!        * normal case clients are instead closed in bulk by disconnect_all,
!        * after they have all stopped.
!        */
!       if ((st->con != NULL) && ok)
        {
                PQfinish(st->con);
                st->con = NULL;
--- 862,870 ----
  static bool
  clientDone(CState *st, bool ok)
  {
!       (void) ok;                                      /* unused */
! 
!       if (st->con != NULL)
        {
                PQfinish(st->con);
                st->con = NULL;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to