Hi

This is the first try of a lockless scheduler, by using two 64-bit variables 
per worker thread instead of one active_connections variable.

This way the worker thread and the manager thread only write to their own 
variables, and there is no data race.

I have measured the speed to be within variance on a 6-core cpu.

- Lauri
>From a02332bf19ea52a75fcf00545211722e1993cb8e Mon Sep 17 00:00:00 2001
From: Lauri Kasanen <[email protected]>
Date: Fri, 4 May 2012 19:11:07 +0300
Subject: [PATCH] First try at lockless sched


Signed-off-by: Lauri Kasanen <[email protected]>
---
 src/include/mk_scheduler.h |    2 +-
 src/include/monkey.h       |    3 ---
 src/mk_scheduler.c         |   42 +++++++++++++++++++++++-------------------
 src/monkey.c               |    3 ---
 4 files changed, 24 insertions(+), 26 deletions(-)

diff --git a/src/include/mk_scheduler.h b/src/include/mk_scheduler.h
index 7e74638..661f13b 100644
--- a/src/include/mk_scheduler.h
+++ b/src/include/mk_scheduler.h
@@ -46,7 +46,7 @@ struct sched_connection
 /* Global struct */
 struct sched_list_node
 {
-    unsigned short int active_connections;
+    unsigned long long closed_connections, accepted_connections;
 
     struct mk_list busy_queue;
     struct mk_list av_queue;
diff --git a/src/include/monkey.h b/src/include/monkey.h
index 097da93..e3f9b79 100644
--- a/src/include/monkey.h
+++ b/src/include/monkey.h
@@ -31,9 +31,6 @@
 #define SH_NOCGI 0
 #define SH_CGI 1
 
-
-/* Thread mutexes */
-pthread_mutex_t mutex_sched_active_connections;
 mk_pointer mk_monkey_protocol;
 
 /* Process UID/GID */
diff --git a/src/mk_scheduler.c b/src/mk_scheduler.c
index 6e75ba2..f3ba736 100644
--- a/src/mk_scheduler.c
+++ b/src/mk_scheduler.c
@@ -43,23 +43,33 @@
 #include "mk_macros.h"
 
 /*
- * Returns the worker id which should take a new incomming connection,
- * it returns the worker id with less active connections
+ * Returns the worker id which should take a new incoming connection,
+ * the one with the least active connections.
  */
 static inline int _next_target()
 {
     int i;
     int target = 0;
+    unsigned long long tmp = 0, cur = 0;
+
+    cur = sched_list[0].accepted_connections - 
sched_list[0].closed_connections;
+    if (cur == 0)
+        return 0;
 
     /* Finds the lowest load worker */
     for (i = 1; i < config->workers; i++) {
-        if (sched_list[i].active_connections < 
sched_list[target].active_connections) {
+        tmp = sched_list[i].accepted_connections - 
sched_list[i].closed_connections;
+        if (tmp < cur) {
             target = i;
+            cur = tmp;
+
+            if (cur == 0)
+                break;
         }
     }
 
     /* If sched_list[target] worker is full then the whole server too, because 
it has the lowest load. */
-    if(sched_list[target].active_connections >= config->worker_capacity) {
+    if(cur >= config->worker_capacity) {
         MK_TRACE("Too many clients: %i", config->worker_capacity * 
config->workers);
         return -1;
     }
@@ -88,18 +98,11 @@ inline int mk_sched_add_client(int remote_fd)
 
     MK_TRACE("[FD %i] Balance to WID %i", remote_fd, sched->idx);
 
-    pthread_mutex_lock(&mutex_sched_active_connections);
-    sched->active_connections += 1;
-    pthread_mutex_unlock(&mutex_sched_active_connections);
-
     r  = mk_epoll_add(sched->epoll_fd, remote_fd, MK_EPOLL_WRITE,
                       MK_EPOLL_LEVEL_TRIGGERED);
 
-    /* If epoll has failed, decrement the active connections counter */
-    if (r != 0) {
-        pthread_mutex_lock(&mutex_sched_active_connections);
-        sched->active_connections -= 1;
-        pthread_mutex_unlock(&mutex_sched_active_connections);
+    if (r == 0) {
+        sched->accepted_connections++;
     }
 
     return r;
@@ -115,7 +118,11 @@ int mk_sched_register_client(int remote_fd, struct 
sched_list_node *sched)
     struct sched_connection *sched_conn;
     struct mk_list *av_queue = &sched->av_queue;
 
-    if (sched->active_connections < config->worker_capacity) {
+    unsigned long long active_connections;
+
+    active_connections = sched->accepted_connections - 
sched->closed_connections;
+
+    if (active_connections < config->worker_capacity) {
         sched_conn = mk_list_entry_first(av_queue, struct sched_connection, 
_head);
 
         /* Before to continue, we need to run plugin stage 10 */
@@ -215,7 +222,6 @@ int mk_sched_register_thread(int efd)
     static int wid = 0;
 
     sl = &sched_list[wid];
-    sl->active_connections = 0;
     sl->idx = wid++;
     sl->tid = pthread_self();
 
@@ -265,7 +271,7 @@ int mk_sched_launch_thread(int max_events)
         return -1;
     }
 
-    thconf = mk_mem_malloc(sizeof(sched_thread_conf));
+    thconf = mk_mem_malloc_z(sizeof(sched_thread_conf));
     thconf->epoll_fd = efd;
     thconf->epoll_max_events = max_events*2;
     thconf->max_events = max_events;
@@ -335,9 +341,7 @@ int mk_sched_remove_client(struct sched_list_node *sched, 
int remote_fd)
         /* Invoke plugins in stage 50 */
         mk_plugin_stage_run(MK_PLUGIN_STAGE_50, remote_fd, NULL, NULL, NULL);
 
-        pthread_mutex_lock(&mutex_sched_active_connections);
-        sched->active_connections -= 1;
-        pthread_mutex_unlock(&mutex_sched_active_connections);
+        sched->closed_connections++;
 
         /* Change node status */
         sc->status = MK_SCHEDULER_CONN_AVAILABLE;
diff --git a/src/monkey.c b/src/monkey.c
index 1c70257..630d23d 100644
--- a/src/monkey.c
+++ b/src/monkey.c
@@ -153,9 +153,6 @@ int main(int argc, char **argv)
     mk_sched_init();
     mk_plugin_init();
 
-    /* FIXME: temporal mutex */
-    pthread_mutex_init(&mutex_sched_active_connections, (pthread_mutexattr_t 
*) NULL);
-
     /* Server listening socket */
     config->server_fd = mk_socket_server(config->serverport, 
config->listen_addr);
 
-- 
1.7.2.1

_______________________________________________
Monkey mailing list
[email protected]
http://lists.monkey-project.com/listinfo/monkey

Reply via email to