This is an automated email from the ASF dual-hosted git repository.

aconway pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 48e62a11e67cf3a37b5e1f9862d07a939101cdb4
Author: Alan Conway <acon...@redhat.com>
AuthorDate: Fri Mar 1 10:49:54 2019 -0500

    DISPATCH-1274: Optimize qd_timer_schedule(0)
    
    Introduced pn_immediate_t, a simpler schedule for immediate requests.
    qd_timer_schedule delegates schedule(0) requests.
---
 src/CMakeLists.txt                           |  1 +
 src/immediate.c                              | 96 ++++++++++++++++++++++++++++
 src/{timer_private.h => immediate_private.h} | 39 +++++------
 src/server.c                                 | 20 ++++--
 src/server_private.h                         |  1 +
 src/timer.c                                  | 15 ++++-
 src/timer_private.h                          |  2 +
 7 files changed, 150 insertions(+), 24 deletions(-)

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 07ae846..5f4f745 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -51,6 +51,7 @@ set(qpid_dispatch_SOURCES
   entity_cache.c
   failoverlist.c
   hash.c
+  immediate.c
   iterator.c
   log.c
   message.c
diff --git a/src/immediate.c b/src/immediate.c
new file mode 100644
index 0000000..6149be7
--- /dev/null
+++ b/src/immediate.c
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "immediate_private.h"
+#include "server_private.h"
+
+#include <qpid/dispatch/threading.h>
+#include <assert.h>
+
+struct qd_immediate_t {
+    qd_server_t *server;
+    void (*handler)(void* context);
+    void *context;
+    bool armed;
+};
+
+/* Array rather than list for fast access and cache-coherence */
+static qd_immediate_t immediates[256] = {0};
+static size_t count = 0;
+static sys_mutex_t *lock = NULL;
+
+void qd_immediate_initialize(void) {
+    lock = sys_mutex();
+}
+
+void qd_immediate_finalize(void) {
+    sys_mutex_free(lock);
+    lock = 0;
+}
+
+qd_immediate_t *qd_immediate(qd_dispatch_t *qd, void (*handler)(void*), void* 
context) {
+    sys_mutex_lock(lock);
+    if (count >= sizeof(immediates)/sizeof(immediates[0])) {
+        assert("exceeded max number of qd_immediate_t objects" == 0);
+        return 0;
+    }
+    qd_immediate_t *i = &immediates[count++];
+    i->server = qd ? qd->server : NULL;
+    i->handler = handler;
+    i->context = context;
+    i->armed = false;
+    sys_mutex_unlock(lock);
+    return i;
+}
+
+void qd_immediate_arm(qd_immediate_t *i) {
+    bool interrupt = false;
+    sys_mutex_lock(lock);
+    if (!i->armed) {
+        interrupt = i->armed = true;
+    }
+    sys_mutex_unlock(lock);
+    if (interrupt && i->server) {
+        qd_server_interrupt(i->server);
+    }
+}
+
+void qd_immediate_disarm(qd_immediate_t *i) {
+    sys_mutex_lock(lock);
+    i->armed = false;
+    sys_mutex_unlock(lock);
+}
+
+void qd_immediate_free(qd_immediate_t *i) {
+    /* Just disarm, its harmless to leave it in place. */
+    qd_immediate_disarm(i);
+}
+
+void qd_immediate_visit() {
+    sys_mutex_lock(lock);
+    for (qd_immediate_t *i = immediates; i < immediates + count; ++i) {
+        if (i->armed) {
+            i->armed = false;
+            sys_mutex_unlock(lock);
+            i->handler(i->context);
+            sys_mutex_lock(lock);
+        }
+    }
+    sys_mutex_unlock(lock);
+}
diff --git a/src/timer_private.h b/src/immediate_private.h
similarity index 52%
copy from src/timer_private.h
copy to src/immediate_private.h
index 537eb4b..cd8d11b 100644
--- a/src/timer_private.h
+++ b/src/immediate_private.h
@@ -1,5 +1,5 @@
-#ifndef __timer_private_h__
-#define __timer_private_h__ 1
+#ifndef __immediate_private_h__
+#define __immediate_private_h__ 1
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -19,26 +19,27 @@
  * under the License.
  */
 
-#include <qpid/dispatch/ctools.h>
-#include <qpid/dispatch/timer.h>
-#include <qpid/dispatch/threading.h>
 
-struct qd_timer_t {
-    DEQ_LINKS(qd_timer_t);
-    qd_server_t      *server;
-    qd_timer_cb_t     handler;
-    void             *context;
-    qd_timestamp_t    delta_time;
-    bool              scheduled; /* true means on scheduled list, false on 
idle list */
-};
+#include <qpid/dispatch/dispatch.h>
+#include <qpid/dispatch/server.h>
+#include <stdint.h>
 
-DEQ_DECLARE(qd_timer_t, qd_timer_list_t);
+/* Immediate actions - used by timer to optimize schedule(0) */
 
-void qd_timer_initialize(sys_mutex_t *server_lock);
-void qd_timer_finalize(void);
-void qd_timer_visit();
+void qd_immediate_initialize(void);
+void qd_immediate_finalize(void);
+void qd_immediate_visit(void);
 
-/// For tests only
-sys_mutex_t* qd_timer_lock();
+typedef struct qd_immediate_t qd_immediate_t;
+
+qd_immediate_t *qd_immediate(qd_dispatch_t *qd, void (*handler)(void*), void* 
context);
+
+/* Arm causes a call to handler(context) ASAP in a server thread. */
+void qd_immediate_arm(qd_immediate_t *);
+
+/* After disarm() returns, there will be no handler() call unless re-armed. */
+void qd_immediate_disarm(qd_immediate_t *);
+
+void qd_immediate_free(qd_immediate_t *);
 
 #endif
diff --git a/src/server.c b/src/server.c
index 24add4e..1863546 100644
--- a/src/server.c
+++ b/src/server.c
@@ -38,6 +38,7 @@
 #include "entity.h"
 #include "entity_cache.h"
 #include "dispatch_private.h"
+#include "immediate_private.h"
 #include "policy.h"
 #include "server_private.h"
 #include "timer_private.h"
@@ -68,6 +69,7 @@ struct qd_server_t {
     uint64_t                  next_connection_id;
     void                     *py_displayname_obj;
     qd_http_server_t         *http;
+    bool                      stopping;
 };
 
 #define HEARTBEAT_INTERVAL 1000
@@ -905,10 +907,15 @@ static bool handle(qd_server_t *qd_server, pn_event_t *e, 
pn_connection_t *pn_co
     switch (pn_event_type(e)) {
 
     case PN_PROACTOR_INTERRUPT:
-        /* Interrupt the next thread */
-        pn_proactor_interrupt(qd_server->proactor);
-        /* Stop the current thread */
-        return false;
+        if (qd_server->stopping) {
+            /* Interrupt the next thread */
+            pn_proactor_interrupt(qd_server->proactor);
+            /* Stop the current thread */
+            return false;
+        } else {
+            /* Check for immediate tasks */
+            qd_immediate_visit();
+        }
 
     case PN_PROACTOR_TIMEOUT:
         qd_timer_visit();
@@ -1296,6 +1303,7 @@ void qd_server_run(qd_dispatch_t *qd)
 
 void qd_server_stop(qd_dispatch_t *qd)
 {
+    qd->server->stopping = true;
     /* Interrupt the proactor, async-signal-safe */
     pn_proactor_interrupt(qd->server->proactor);
 }
@@ -1505,6 +1513,10 @@ void qd_server_timeout(qd_server_t *server, 
qd_duration_t duration) {
     pn_proactor_set_timeout(server->proactor, duration);
 }
 
+void qd_server_interrupt(qd_server_t *server) {
+    pn_proactor_interrupt(server->proactor);
+}
+
 qd_dispatch_t* qd_server_dispatch(qd_server_t *server) { return server->qd; }
 
 const char* qd_connection_name(const qd_connection_t *c) {
diff --git a/src/server_private.h b/src/server_private.h
index 9f3c75c..9fcec6c 100644
--- a/src/server_private.h
+++ b/src/server_private.h
@@ -38,6 +38,7 @@
 
 qd_dispatch_t* qd_server_dispatch(qd_server_t *server);
 void qd_server_timeout(qd_server_t *server, qd_duration_t delay);
+void qd_server_interrupt(qd_server_t *server);
 
 qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t* 
config);
 
diff --git a/src/timer.c b/src/timer.c
index a4aae2a..0fd87c7 100644
--- a/src/timer.c
+++ b/src/timer.c
@@ -18,8 +18,9 @@
  */
 
 #include "dispatch_private.h"
-#include "timer_private.h"
+#include "immediate_private.h"
 #include "server_private.h"
+#include "timer_private.h"
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/threading.h>
 #include <qpid/dispatch/alloc.h>
@@ -54,6 +55,7 @@ static void timer_cancel_LH(qd_timer_t *timer)
         DEQ_INSERT_TAIL(idle_timers, timer);
         timer->scheduled = false;
     }
+    qd_immediate_disarm(timer->immediate);
 }
 
 /* Adjust timer's time_base and delays for the current time. */
@@ -95,6 +97,7 @@ qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, 
void* context)
     timer->context    = context;
     timer->delta_time = 0;
     timer->scheduled  = false;
+    timer->immediate  = qd_immediate(qd, cb, context);
     sys_mutex_lock(lock);
     DEQ_INSERT_TAIL(idle_timers, timer);
     sys_mutex_unlock(lock);
@@ -109,6 +112,7 @@ void qd_timer_free(qd_timer_t *timer)
     sys_mutex_lock(lock);
     timer_cancel_LH(timer);
     DEQ_REMOVE(idle_timers, timer);
+    qd_immediate_free(timer->immediate);
     sys_mutex_unlock(lock);
     free_qd_timer_t(timer);
 }
@@ -124,6 +128,11 @@ qd_timestamp_t qd_timer_now() {
 void qd_timer_schedule(qd_timer_t *timer, qd_duration_t duration)
 {
     sys_mutex_lock(lock);
+    if (duration == 0) {
+        qd_immediate_arm(timer->immediate);
+        sys_mutex_unlock(lock);
+        return;
+    }
     timer_cancel_LH(timer);  // Timer is now on the idle list
     DEQ_REMOVE(idle_timers, timer);
 
@@ -175,6 +184,7 @@ void qd_timer_cancel(qd_timer_t *timer)
 
 void qd_timer_initialize(sys_mutex_t *server_lock)
 {
+    qd_immediate_initialize();
     lock = server_lock;
     DEQ_INIT(idle_timers);
     DEQ_INIT(scheduled_timers);
@@ -185,6 +195,7 @@ void qd_timer_initialize(sys_mutex_t *server_lock)
 void qd_timer_finalize(void)
 {
     lock = 0;
+    qd_immediate_finalize();
 }
 
 
@@ -196,6 +207,7 @@ void qd_timer_visit()
     qd_timer_t *timer = DEQ_HEAD(scheduled_timers);
     while (timer && timer->delta_time == 0) {
         timer_cancel_LH(timer); /* Removes timer from scheduled_timers */
+        qd_immediate_disarm(timer->immediate);
         sys_mutex_unlock(lock);
         timer->handler(timer->context); /* Call the handler outside the lock, 
may re-schedule */
         sys_mutex_lock(lock);
@@ -206,4 +218,5 @@ void qd_timer_visit()
         qd_server_timeout(first->server, first->delta_time);
     }
     sys_mutex_unlock(lock);
+    qd_immediate_visit();
 }
diff --git a/src/timer_private.h b/src/timer_private.h
index 537eb4b..263fca5 100644
--- a/src/timer_private.h
+++ b/src/timer_private.h
@@ -19,6 +19,7 @@
  * under the License.
  */
 
+#include "immediate_private.h"
 #include <qpid/dispatch/ctools.h>
 #include <qpid/dispatch/timer.h>
 #include <qpid/dispatch/threading.h>
@@ -29,6 +30,7 @@ struct qd_timer_t {
     qd_timer_cb_t     handler;
     void             *context;
     qd_timestamp_t    delta_time;
+    qd_immediate_t   *immediate; /* Optimized path for schedule(0) */
     bool              scheduled; /* true means on scheduled list, false on 
idle list */
 };
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to