This is an automated email from the ASF dual-hosted git repository. aconway pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 48e62a11e67cf3a37b5e1f9862d07a939101cdb4 Author: Alan Conway <acon...@redhat.com> AuthorDate: Fri Mar 1 10:49:54 2019 -0500 DISPATCH-1274: Optimize qd_timer_schedule(0) Introduced pn_immediate_t, a simpler schedule for immediate requests. qd_timer_schedule delegates schedule(0) requests. --- src/CMakeLists.txt | 1 + src/immediate.c | 96 ++++++++++++++++++++++++++++ src/{timer_private.h => immediate_private.h} | 39 +++++------ src/server.c | 20 ++++-- src/server_private.h | 1 + src/timer.c | 15 ++++- src/timer_private.h | 2 + 7 files changed, 150 insertions(+), 24 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 07ae846..5f4f745 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -51,6 +51,7 @@ set(qpid_dispatch_SOURCES entity_cache.c failoverlist.c hash.c + immediate.c iterator.c log.c message.c diff --git a/src/immediate.c b/src/immediate.c new file mode 100644 index 0000000..6149be7 --- /dev/null +++ b/src/immediate.c @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "immediate_private.h" +#include "server_private.h" + +#include <qpid/dispatch/threading.h> +#include <assert.h> + +struct qd_immediate_t { + qd_server_t *server; + void (*handler)(void* context); + void *context; + bool armed; +}; + +/* Array rather than list for fast access and cache-coherence */ +static qd_immediate_t immediates[256] = {0}; +static size_t count = 0; +static sys_mutex_t *lock = NULL; + +void qd_immediate_initialize(void) { + lock = sys_mutex(); +} + +void qd_immediate_finalize(void) { + sys_mutex_free(lock); + lock = 0; +} + +qd_immediate_t *qd_immediate(qd_dispatch_t *qd, void (*handler)(void*), void* context) { + sys_mutex_lock(lock); + if (count >= sizeof(immediates)/sizeof(immediates[0])) { + assert("exceeded max number of qd_immediate_t objects" == 0); + return 0; + } + qd_immediate_t *i = &immediates[count++]; + i->server = qd ? qd->server : NULL; + i->handler = handler; + i->context = context; + i->armed = false; + sys_mutex_unlock(lock); + return i; +} + +void qd_immediate_arm(qd_immediate_t *i) { + bool interrupt = false; + sys_mutex_lock(lock); + if (!i->armed) { + interrupt = i->armed = true; + } + sys_mutex_unlock(lock); + if (interrupt && i->server) { + qd_server_interrupt(i->server); + } +} + +void qd_immediate_disarm(qd_immediate_t *i) { + sys_mutex_lock(lock); + i->armed = false; + sys_mutex_unlock(lock); +} + +void qd_immediate_free(qd_immediate_t *i) { + /* Just disarm, its harmless to leave it in place. */ + qd_immediate_disarm(i); +} + +void qd_immediate_visit() { + sys_mutex_lock(lock); + for (qd_immediate_t *i = immediates; i < immediates + count; ++i) { + if (i->armed) { + i->armed = false; + sys_mutex_unlock(lock); + i->handler(i->context); + sys_mutex_lock(lock); + } + } + sys_mutex_unlock(lock); +} diff --git a/src/timer_private.h b/src/immediate_private.h similarity index 52% copy from src/timer_private.h copy to src/immediate_private.h index 537eb4b..cd8d11b 100644 --- a/src/timer_private.h +++ b/src/immediate_private.h @@ -1,5 +1,5 @@ -#ifndef __timer_private_h__ -#define __timer_private_h__ 1 +#ifndef __immediate_private_h__ +#define __immediate_private_h__ 1 /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -19,26 +19,27 @@ * under the License. */ -#include <qpid/dispatch/ctools.h> -#include <qpid/dispatch/timer.h> -#include <qpid/dispatch/threading.h> -struct qd_timer_t { - DEQ_LINKS(qd_timer_t); - qd_server_t *server; - qd_timer_cb_t handler; - void *context; - qd_timestamp_t delta_time; - bool scheduled; /* true means on scheduled list, false on idle list */ -}; +#include <qpid/dispatch/dispatch.h> +#include <qpid/dispatch/server.h> +#include <stdint.h> -DEQ_DECLARE(qd_timer_t, qd_timer_list_t); +/* Immediate actions - used by timer to optimize schedule(0) */ -void qd_timer_initialize(sys_mutex_t *server_lock); -void qd_timer_finalize(void); -void qd_timer_visit(); +void qd_immediate_initialize(void); +void qd_immediate_finalize(void); +void qd_immediate_visit(void); -/// For tests only -sys_mutex_t* qd_timer_lock(); +typedef struct qd_immediate_t qd_immediate_t; + +qd_immediate_t *qd_immediate(qd_dispatch_t *qd, void (*handler)(void*), void* context); + +/* Arm causes a call to handler(context) ASAP in a server thread. */ +void qd_immediate_arm(qd_immediate_t *); + +/* After disarm() returns, there will be no handler() call unless re-armed. */ +void qd_immediate_disarm(qd_immediate_t *); + +void qd_immediate_free(qd_immediate_t *); #endif diff --git a/src/server.c b/src/server.c index 24add4e..1863546 100644 --- a/src/server.c +++ b/src/server.c @@ -38,6 +38,7 @@ #include "entity.h" #include "entity_cache.h" #include "dispatch_private.h" +#include "immediate_private.h" #include "policy.h" #include "server_private.h" #include "timer_private.h" @@ -68,6 +69,7 @@ struct qd_server_t { uint64_t next_connection_id; void *py_displayname_obj; qd_http_server_t *http; + bool stopping; }; #define HEARTBEAT_INTERVAL 1000 @@ -905,10 +907,15 @@ static bool handle(qd_server_t *qd_server, pn_event_t *e, pn_connection_t *pn_co switch (pn_event_type(e)) { case PN_PROACTOR_INTERRUPT: - /* Interrupt the next thread */ - pn_proactor_interrupt(qd_server->proactor); - /* Stop the current thread */ - return false; + if (qd_server->stopping) { + /* Interrupt the next thread */ + pn_proactor_interrupt(qd_server->proactor); + /* Stop the current thread */ + return false; + } else { + /* Check for immediate tasks */ + qd_immediate_visit(); + } case PN_PROACTOR_TIMEOUT: qd_timer_visit(); @@ -1296,6 +1303,7 @@ void qd_server_run(qd_dispatch_t *qd) void qd_server_stop(qd_dispatch_t *qd) { + qd->server->stopping = true; /* Interrupt the proactor, async-signal-safe */ pn_proactor_interrupt(qd->server->proactor); } @@ -1505,6 +1513,10 @@ void qd_server_timeout(qd_server_t *server, qd_duration_t duration) { pn_proactor_set_timeout(server->proactor, duration); } +void qd_server_interrupt(qd_server_t *server) { + pn_proactor_interrupt(server->proactor); +} + qd_dispatch_t* qd_server_dispatch(qd_server_t *server) { return server->qd; } const char* qd_connection_name(const qd_connection_t *c) { diff --git a/src/server_private.h b/src/server_private.h index 9f3c75c..9fcec6c 100644 --- a/src/server_private.h +++ b/src/server_private.h @@ -38,6 +38,7 @@ qd_dispatch_t* qd_server_dispatch(qd_server_t *server); void qd_server_timeout(qd_server_t *server, qd_duration_t delay); +void qd_server_interrupt(qd_server_t *server); qd_connection_t *qd_server_connection(qd_server_t *server, qd_server_config_t* config); diff --git a/src/timer.c b/src/timer.c index a4aae2a..0fd87c7 100644 --- a/src/timer.c +++ b/src/timer.c @@ -18,8 +18,9 @@ */ #include "dispatch_private.h" -#include "timer_private.h" +#include "immediate_private.h" #include "server_private.h" +#include "timer_private.h" #include <qpid/dispatch/ctools.h> #include <qpid/dispatch/threading.h> #include <qpid/dispatch/alloc.h> @@ -54,6 +55,7 @@ static void timer_cancel_LH(qd_timer_t *timer) DEQ_INSERT_TAIL(idle_timers, timer); timer->scheduled = false; } + qd_immediate_disarm(timer->immediate); } /* Adjust timer's time_base and delays for the current time. */ @@ -95,6 +97,7 @@ qd_timer_t *qd_timer(qd_dispatch_t *qd, qd_timer_cb_t cb, void* context) timer->context = context; timer->delta_time = 0; timer->scheduled = false; + timer->immediate = qd_immediate(qd, cb, context); sys_mutex_lock(lock); DEQ_INSERT_TAIL(idle_timers, timer); sys_mutex_unlock(lock); @@ -109,6 +112,7 @@ void qd_timer_free(qd_timer_t *timer) sys_mutex_lock(lock); timer_cancel_LH(timer); DEQ_REMOVE(idle_timers, timer); + qd_immediate_free(timer->immediate); sys_mutex_unlock(lock); free_qd_timer_t(timer); } @@ -124,6 +128,11 @@ qd_timestamp_t qd_timer_now() { void qd_timer_schedule(qd_timer_t *timer, qd_duration_t duration) { sys_mutex_lock(lock); + if (duration == 0) { + qd_immediate_arm(timer->immediate); + sys_mutex_unlock(lock); + return; + } timer_cancel_LH(timer); // Timer is now on the idle list DEQ_REMOVE(idle_timers, timer); @@ -175,6 +184,7 @@ void qd_timer_cancel(qd_timer_t *timer) void qd_timer_initialize(sys_mutex_t *server_lock) { + qd_immediate_initialize(); lock = server_lock; DEQ_INIT(idle_timers); DEQ_INIT(scheduled_timers); @@ -185,6 +195,7 @@ void qd_timer_initialize(sys_mutex_t *server_lock) void qd_timer_finalize(void) { lock = 0; + qd_immediate_finalize(); } @@ -196,6 +207,7 @@ void qd_timer_visit() qd_timer_t *timer = DEQ_HEAD(scheduled_timers); while (timer && timer->delta_time == 0) { timer_cancel_LH(timer); /* Removes timer from scheduled_timers */ + qd_immediate_disarm(timer->immediate); sys_mutex_unlock(lock); timer->handler(timer->context); /* Call the handler outside the lock, may re-schedule */ sys_mutex_lock(lock); @@ -206,4 +218,5 @@ void qd_timer_visit() qd_server_timeout(first->server, first->delta_time); } sys_mutex_unlock(lock); + qd_immediate_visit(); } diff --git a/src/timer_private.h b/src/timer_private.h index 537eb4b..263fca5 100644 --- a/src/timer_private.h +++ b/src/timer_private.h @@ -19,6 +19,7 @@ * under the License. */ +#include "immediate_private.h" #include <qpid/dispatch/ctools.h> #include <qpid/dispatch/timer.h> #include <qpid/dispatch/threading.h> @@ -29,6 +30,7 @@ struct qd_timer_t { qd_timer_cb_t handler; void *context; qd_timestamp_t delta_time; + qd_immediate_t *immediate; /* Optimized path for schedule(0) */ bool scheduled; /* true means on scheduled list, false on idle list */ }; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org