Vadim Tkachenko has proposed merging lp:~vadim-tk/sysbench/inj-rate into 
lp:sysbench.

Requested reviews:
  Alexey Kopytov (akopytov)

For more details, see:
https://code.launchpad.net/~vadim-tk/sysbench/inj-rate/+merge/94662

This is rather a proof of concept than a final proposal.
I am doing merge proposal to seek a feedback what changes needs to be done.
-- 
https://code.launchpad.net/~vadim-tk/sysbench/inj-rate/+merge/94662
Your team sysbench-developers is subscribed to branch lp:sysbench.
=== modified file 'sysbench/db_driver.c'
--- sysbench/db_driver.c	2011-09-26 13:54:56 +0000
+++ sysbench/db_driver.c	2012-02-25 18:13:19 +0000
@@ -77,6 +77,9 @@
 static void db_update_thread_stats(int, db_query_type_t);
 static void db_reset_stats(void);
 
+extern int queue_counter;
+extern int current_concurrency;
+
 /* DB layer arguments */
 
 static sb_arg_t db_args[] =
@@ -830,15 +833,15 @@
     seconds = NS2SEC(sb_timer_split(&sb_globals.exec_timer));
 
     log_timestamp(LOG_NOTICE, &sb_globals.exec_timer,
-                  "threads: %d, tps: %4.2f, reads/s: %4.2f, writes/s: %4.2f "
-                  "response time: %4.2fms (%u%%)",
+                  "threads: %d, tps: %4.2f, reads/s: %4.2f, writes/s: %4.2f, "
+                  "response time: %4.2fms (%u%%), queue size: %d, concurrency: %d",
                   sb_globals.num_threads,
                   (transactions - last_transactions) / seconds,
                   (read_ops - last_read_ops) / seconds,
                   (write_ops - last_write_ops) / seconds,
                   NS2MS(sb_percentile_calculate(&local_percentile,
                                                 sb_globals.percentile_rank)),
-                  sb_globals.percentile_rank);
+                  sb_globals.percentile_rank, queue_counter, current_concurrency);
 
     SB_THREAD_MUTEX_LOCK();
     last_transactions = transactions;

=== modified file 'sysbench/sb_timer.c'
--- sysbench/sb_timer.c	2011-07-21 16:45:22 +0000
+++ sysbench/sb_timer.c	2012-02-25 18:13:19 +0000
@@ -35,7 +35,7 @@
 static inline void sb_timer_update(sb_timer_t *t)
 {
   SB_GETTIME(&t->time_end);
-  t->elapsed = TIMESPEC_DIFF(t->time_end, t->time_start);
+  t->elapsed = TIMESPEC_DIFF(t->time_end, t->time_start) + t->queue_time;
 }
 
 /* initialize timer */
@@ -59,6 +59,7 @@
   t->sum_time = 0;
   t->events = 0;
   t->elapsed = 0;
+  t->queue_time = 0;
 }
 
 

=== modified file 'sysbench/sb_timer.h'
--- sysbench/sb_timer.h	2011-07-21 16:45:22 +0000
+++ sysbench/sb_timer.h	2012-02-25 18:13:19 +0000
@@ -77,6 +77,7 @@
   unsigned long long max_time;
   unsigned long long sum_time;
   unsigned long long events;
+  unsigned long long queue_time;
   timer_state_t      state;
 } sb_timer_t;
 

=== modified file 'sysbench/sysbench.c'
--- sysbench/sysbench.c	2011-09-26 13:54:56 +0000
+++ sysbench/sysbench.c	2012-02-25 18:13:19 +0000
@@ -108,8 +108,6 @@
    SB_ARG_TYPE_STRING, "off"},
   {"thread-stack-size", "size of stack per thread", SB_ARG_TYPE_SIZE, "64K"},
   {"tx-rate", "target transaction rate (tps)", SB_ARG_TYPE_INT, "0"},
-  {"tx-jitter", "target transaction variation, in microseconds",
-    SB_ARG_TYPE_INT, "0"},
   {"report-interval", "periodically report intermediate statistics "
    "with a specified interval in seconds. 0 disables intermediate reports",
     SB_ARG_TYPE_INT, "0"},
@@ -151,6 +149,15 @@
 static pthread_mutex_t thread_start_mutex;
 static pthread_attr_t  thread_attr;
 
+/* structures to handle queue of events, needed for tx_rate mode */
+static pthread_mutex_t queue_mutex;
+static pthread_cond_t queue_cv;
+int queue_counter = 0;
+int current_concurrency = 0;
+static int queue_is_full = 0;
+#define MAX_QUEUE_LEN 100000
+static unsigned long long queue_data[MAX_QUEUE_LEN];
+
 static void print_header(void);
 static void print_usage(void);
 static void print_run_mode(sb_test_t *);
@@ -199,6 +206,8 @@
 static int execute_request(sb_test_t *test, sb_request_t *r,int thread_id)
 {
   unsigned int rc;
+
+
   
   if (test->ops.execute_request != NULL)
     rc = test->ops.execute_request(r, thread_id);
@@ -361,8 +370,7 @@
   if (sb_globals.tx_rate > 0)
   {
     log_text(LOG_NOTICE,
-            "Target transaction rate: %d/sec, with jitter %d usec",
-             sb_globals.tx_rate, sb_globals.tx_jitter);
+            "Target transaction rate: %d/sec", sb_globals.tx_rate);
   }
 
   if (sb_globals.report_interval)
@@ -434,15 +442,13 @@
   sb_thread_ctxt_t *ctxt;
   sb_test_t        *test;
   unsigned int     thread_id;
-  long long        period_ns = 0;
-  long long        jitter_ns = 0;
-  long long        pause_ns;
-  struct timespec  target_tv, now_tv;
-  
+  int     queue_loop;  
+  unsigned long long queue_start_time;
+
   ctxt = (sb_thread_ctxt_t *)arg;
   test = ctxt->test;
   thread_id = ctxt->id;
-  
+
   log_text(LOG_DEBUG, "Runner thread started (%d)!", thread_id);
   if (test->ops.thread_init != NULL && test->ops.thread_init(thread_id) != 0)
   {
@@ -450,17 +456,6 @@
     return NULL; /* thread initialization failed  */
   }
 
-  if (sb_globals.tx_rate > 0)
-  {
-    /* initialize tx_rate variables */
-    period_ns = floor(1e9 / sb_globals.tx_rate * sb_globals.num_threads + 0.5);
-    if (sb_globals.tx_jitter > 0)
-      jitter_ns = sb_globals.tx_jitter * 1000;
-    else
-      /* Default jitter is 1/10th of the period */
-      jitter_ns = period_ns / 10;
-  }
- 
   /* 
     We do this to make sure all threads get to this barrier 
     about the same time 
@@ -469,25 +464,54 @@
   sb_globals.num_running++;
   pthread_mutex_unlock(&thread_start_mutex);
 
-  if (sb_globals.tx_rate > 0)
-  {
-    /* we are time-rating transactions */
-    SB_GETTIME(&target_tv);
-    /* For the first transaction - ramp up */
-    pause_ns = period_ns / sb_globals.num_threads * thread_id;
-    add_ns_to_timespec(&target_tv, period_ns);
-    usleep(pause_ns / 1000);
-  }
-
   do
   {
+
+    /* If we are in tx_rate mode, we take events from queue */
+    if (sb_globals.tx_rate > 0)
+    {
+      if (queue_is_full)
+      {
+	log_errno(LOG_FATAL, "Event queue is full.");
+	break;
+      }
+      pthread_mutex_lock (&queue_mutex);
+      while(!queue_counter)
+	pthread_cond_wait (&queue_cv, &queue_mutex);
+
+      queue_start_time = queue_data[0];
+      /* This is probably a quite uneffective way to handle queue, 
+	 may need to use copy() function */
+
+      for (queue_loop=0; queue_loop < queue_counter; queue_loop++)
+	queue_data[queue_loop] = queue_data[queue_loop+1]; 
+
+      queue_counter--;
+
+      pthread_mutex_unlock(&queue_mutex);
+
+      (&timers[thread_id])->queue_time = sb_timer_value(&sb_globals.exec_timer) - queue_start_time;
+      
+      /* we do it without mutex protection, that's fine to have racing */
+      current_concurrency++;
+    }
+    
+
     request = get_request(test, thread_id);
+
     /* check if we shall execute it */
     if (request.type != SB_REQ_TYPE_NULL)
     {
       if (execute_request(test, &request, thread_id))
         break; /* break if error returned (terminates only one thread) */
     }
+
+    if (sb_globals.tx_rate > 0)
+    {
+      /* we do it without mutex protection, that's fine to have racing */
+      current_concurrency--;
+    }
+
     /* Check if we have a time limit */
     if (sb_globals.max_time != 0 &&
         sb_timer_value(&sb_globals.exec_timer) >= SEC2NS(sb_globals.max_time))
@@ -496,17 +520,6 @@
       break;
     }
 
-    /* check if we are time-rating transactions and need to pause */
-    if (sb_globals.tx_rate > 0)
-    {
-      add_ns_to_timespec(&target_tv, period_ns);
-      SB_GETTIME(&now_tv);
-      pause_ns = TIMESPEC_DIFF(target_tv, now_tv) - (jitter_ns / 2) +
-	(sb_rnd() % jitter_ns);
-      if (pause_ns > 5000)
-        usleep(pause_ns / 1000);
-    }
-
   } while ((request.type != SB_REQ_TYPE_NULL) && (!sb_globals.error) );
 
   if (test->ops.thread_done != NULL)
@@ -519,6 +532,60 @@
   return NULL; 
 }
 
+static void *eventgen_thread_proc(void *arg)
+{
+  unsigned long long       pause_ns;
+  unsigned long long       prev_ns;
+  unsigned long long       next_ns;
+  unsigned long long       curr_ns;
+  unsigned long long       intr_ns;
+
+  (void)arg; /* unused */
+
+  log_text(LOG_DEBUG, "Event generating  thread started");
+
+  pthread_mutex_lock(&thread_start_mutex);
+  pthread_mutex_unlock(&thread_start_mutex);
+
+  curr_ns = sb_timer_value(&sb_globals.exec_timer);
+  /* emulate exponential distribution with Lambda = tx_rate */
+  intr_ns = (long)(log(1-(double)sb_rnd() / (double)SB_MAX_RND)/(-(double)sb_globals.tx_rate)*1000000);
+  next_ns = curr_ns + intr_ns*1000;
+
+  for (;;)
+  {
+    prev_ns = curr_ns;
+
+    curr_ns = sb_timer_value(&sb_globals.exec_timer);
+  
+    /* emulate exponential distribution with Lambda = tx_rate */
+    intr_ns = (long)(log(1-(double)sb_rnd() / (double)SB_MAX_RND)/(-(double)sb_globals.tx_rate)*1000000);
+   
+    next_ns = next_ns + intr_ns*1000;
+    if (next_ns > curr_ns)
+      pause_ns = next_ns - curr_ns;    
+    else
+      pause_ns = 1000;
+
+    usleep(pause_ns/1000);
+
+    pthread_mutex_lock(&queue_mutex);
+    queue_data[queue_counter]=sb_timer_value(&sb_globals.exec_timer);
+    queue_counter++;
+    if (queue_counter >= MAX_QUEUE_LEN)
+      queue_is_full = 1;
+    pthread_cond_signal(&queue_cv);
+    pthread_mutex_unlock(&queue_mutex);
+
+    if (queue_is_full)
+    {
+      log_errno(LOG_FATAL, "Event queue is full.");
+      return NULL;
+    }
+  }
+
+  return NULL;
+}
 
 /* Intermediate reports thread */
 
@@ -627,6 +694,7 @@
   int          err;
   pthread_t    report_thread;
   pthread_t    checkpoints_thread;
+  pthread_t    eventgen_thread;
   int          report_thread_created = 0;
   int          checkpoints_thread_created = 0;
 
@@ -652,6 +720,12 @@
     return 1;
 
   pthread_mutex_init(&sb_globals.exec_mutex, NULL);
+ 
+   
+  pthread_mutex_init(&queue_mutex, NULL);    
+  pthread_cond_init(&queue_cv, NULL);
+  queue_counter = 0;
+  queue_is_full = 0;
 
   /* start mutex used for barrier */
   pthread_mutex_init(&thread_start_mutex,NULL);    
@@ -686,6 +760,13 @@
     report_thread_created = 1;
   }
 
+  if ((err = pthread_create(&eventgen_thread, &thread_attr, &eventgen_thread_proc,
+                            NULL)) != 0)
+  {
+    log_errno(LOG_FATAL, "pthread_create() for the reporting thread failed.");
+    return 1;
+  }
+
   if (sb_globals.n_checkpoints > 0)
   {
     /* Create a thread for checkpoint statistic reports */
@@ -768,6 +849,10 @@
     if (pthread_cancel(report_thread) || pthread_join(report_thread, NULL))
       log_errno(LOG_FATAL, "Terminating the reporting thread failed.");
   }
+
+  if (pthread_cancel(eventgen_thread) || pthread_join(eventgen_thread, NULL))
+    log_errno(LOG_FATAL, "Terminating the event generator thread failed.");
+
   if (checkpoints_thread_created)
   {
     if (pthread_cancel(checkpoints_thread) ||
@@ -914,7 +999,6 @@
   rand_res = sb_get_value_int("rand-spec-res");
 
   sb_globals.tx_rate = sb_get_value_int("tx-rate");
-  sb_globals.tx_jitter = sb_get_value_int("tx-jitter");
   sb_globals.report_interval = sb_get_value_int("report-interval");
 
   sb_globals.n_checkpoints = 0;

=== modified file 'sysbench/sysbench.h'
--- sysbench/sysbench.h	2011-07-21 17:02:14 +0000
+++ sysbench/sysbench.h	2012-02-25 18:13:19 +0000
@@ -97,6 +97,7 @@
 {
   int              type;
   struct sb_test_t *test;
+  unsigned long long start_time_queue; /* bad hack, need to look how to fix */
   
   /* type-specific data */
   union

_______________________________________________
Mailing list: https://launchpad.net/~sysbench-developers
Post to     : [email protected]
Unsubscribe : https://launchpad.net/~sysbench-developers
More help   : https://help.launchpad.net/ListHelp

Reply via email to