Based on the "slow Apache 2.0" thread earlier today,
and my observation therein that it's possible for a
worker child process to block on a full file descriptor
queue (all threads busy) while other child procs have
idle threads, I decided to revive the idea of switching
the worker thread management to a leader/followers
pattern.
The way it works is:
* There's no dedicated listener thread. The workers
take turns serving as the listener.
* Idle threads are listed in a stack. Each thread has
a condition variable. When the current listener
accepts a connection, it pops the next idle thread
from the stack and wakes it up using the condition
variable. The newly awakened thread becomes the
new listener.
* If there is no idle thread available to become
the new listener, the next thread to finish handling
its current connection takes over as listener.
(Thus a process that's already saturated with
connections won't call accept() until it actually
has an idle thread available.)
In order to implement the patch quickly, I've used a
mutex to guard the stack for now, rather than using
atomic compare-and-swap operations like I'd once
proposed. In order to improve scalability, though,
this mutex is *not* used for the condition variable
signaling. Instead, each worker thread has a private
mutex for use with its condition variable. This
thread-private mutex is locked at thread creation,
and the only subsequent operations on it are those
done implicitly by the cond_signal/cond_wait. Thus
only the thread associated with that mutex ever locks
or unlocks it, which should help to reduce synchronization
overhead. (The design is dependent on the semantics
of the one-listener-at-a-time model to synchronize
the cond_signal with the cond_wait.)
Can I get a few volunteers to test/review this?
Thanks,
--Brian
Index: server/mpm/worker/worker.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/worker/worker.c,v
retrieving revision 1.114
diff -u -r1.114 worker.c
--- server/mpm/worker/worker.c 8 Apr 2002 16:57:06 -0000 1.114
+++ server/mpm/worker/worker.c 10 Apr 2002 05:22:03 -0000
@@ -69,6 +69,7 @@
#include "apr_file_io.h"
#include "apr_thread_proc.h"
#include "apr_signal.h"
+#include "apr_thread_cond.h"
#include "apr_thread_mutex.h"
#include "apr_proc_mutex.h"
#define APR_WANT_STRFUNC
@@ -105,7 +106,6 @@
#include "mpm_common.h"
#include "ap_listen.h"
#include "scoreboard.h"
-#include "fdqueue.h"
#include "mpm_default.h"
#include <signal.h>
@@ -168,11 +168,9 @@
static int dying = 0;
static int workers_may_exit = 0;
static int start_thread_may_exit = 0;
-static int listener_may_exit = 0;
static int requests_this_child;
static int num_listensocks = 0;
static int resource_shortage = 0;
-static fd_queue_t *worker_queue;
/* The structure used to pass unique initialization info to each thread */
typedef struct {
@@ -181,12 +179,12 @@
int sd;
} proc_info;
+
/* Structure used to pass information to the thread responsible for
* creating the rest of the threads.
*/
typedef struct {
apr_thread_t **threads;
- apr_thread_t *listener;
int child_num_arg;
apr_threadattr_t *threadattr;
} thread_starter;
@@ -232,7 +230,6 @@
static pid_t ap_my_pid; /* Linux getpid() doesn't work except in main
thread. Use this instead */
static pid_t parent_pid;
-static apr_os_thread_t *listener_os_thread;
/* Locks for accept serialization */
static apr_proc_mutex_t *accept_mutex;
@@ -243,37 +240,103 @@
#define SAFE_ACCEPT(stmt) (stmt)
#endif
-/* The LISTENER_SIGNAL signal will be sent from the main thread to the
- * listener thread to wake it up for graceful termination (what a child
- * process from an old generation does when the admin does "apachectl
- * graceful"). This signal will be blocked in all threads of a child
- * process except for the listener thread.
+
+/* Structure used to wake up an idle worker thread
+ */
+typedef struct {
+ apr_thread_cond_t *cond;
+ apr_thread_mutex_t *mutex;
+} worker_wakeup_info;
+
+/* Structure used to hold a stack of idle worker threads
*/
-#define LISTENER_SIGNAL SIGHUP
+typedef struct {
+ apr_thread_mutex_t *mutex;
+ int no_listener;
+ worker_wakeup_info **stack;
+ apr_size_t nelts;
+ apr_size_t nalloc;
+} worker_stack;
+
+static worker_stack* worker_stack_create(apr_pool_t *pool, apr_size_t max)
+{
+ apr_status_t rv;
+ worker_stack *stack = (worker_stack *)apr_palloc(pool, sizeof(*stack));
+
+ if ((rv = apr_thread_mutex_create(&stack->mutex, APR_THREAD_MUTEX_DEFAULT,
+ pool)) != APR_SUCCESS) {
+ return NULL;
+ }
+ stack->no_listener = 1;
+ stack->nelts = 0;
+ stack->nalloc = max;
+ stack->stack =
+ (worker_wakeup_info **)apr_palloc(pool, stack->nalloc *
+ sizeof(worker_wakeup_info *));
+ return stack;
+}
-static void wakeup_listener(void)
+static apr_status_t worker_stack_wait(worker_stack *stack,
+ worker_wakeup_info *wakeup)
{
- listener_may_exit = 1;
- if (!listener_os_thread) {
- /* XXX there is an obscure path that this doesn't handle perfectly:
- * right after listener thread is created but before
- * listener_os_thread is set, the first worker thread hits an
- * error and starts graceful termination
+ apr_status_t rv;
+ if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) {
+ return rv;
+ }
+ if (stack->no_listener) {
+ /* this thread should become the new listener immediately */
+ stack->no_listener = 0;
+ if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
+ return rv;
+ }
+ return APR_SUCCESS;
+ }
+ else {
+ /* push this thread onto the stack of idle workers, and block
+ * on the condition variable until awoken
*/
- return;
+ if (stack->nelts == stack->nalloc) {
+ return APR_ENOSPC;
+ }
+ stack->stack[stack->nelts++] = wakeup;
+ if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
+ return rv;
+ }
+ if ((rv = apr_thread_cond_wait(wakeup->cond, wakeup->mutex)) !=
+ APR_SUCCESS) {
+ return rv;
+ }
+ return APR_SUCCESS;
}
- /*
- * we should just be able to "kill(ap_my_pid, LISTENER_SIGNAL)" on all
- * platforms and wake up the listener thread since it is the only thread
- * with SIGHUP unblocked, but that doesn't work on Linux
- */
-#ifdef HAVE_PTHREAD_KILL
- pthread_kill(*listener_os_thread, LISTENER_SIGNAL);
-#else
- kill(ap_my_pid, LISTENER_SIGNAL);
-#endif
}
+static apr_status_t worker_stack_awaken_next(worker_stack *stack)
+{
+ apr_status_t rv;
+ if ((rv = apr_thread_mutex_lock(stack->mutex)) != APR_SUCCESS) {
+ return rv;
+ }
+ if (stack->nelts) {
+ worker_wakeup_info *wakeup = stack->stack[--stack->nelts];
+ if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
+ return rv;
+ }
+ if ((rv = apr_thread_cond_signal(wakeup->cond)) != APR_SUCCESS) {
+ apr_thread_mutex_unlock(stack->mutex);
+ return rv;
+ }
+ }
+ else {
+ stack->no_listener = 1;
+ if ((rv = apr_thread_mutex_unlock(stack->mutex)) != APR_SUCCESS) {
+ return rv;
+ }
+ }
+ return APR_SUCCESS;
+}
+
+static worker_stack *idle_worker_stack;
+
#define ST_INIT 0
#define ST_GRACEFUL 1
#define ST_UNGRACEFUL 2
@@ -282,23 +345,15 @@
static void signal_threads(int mode)
{
+ int i;
if (terminate_mode == mode) {
return;
}
terminate_mode = mode;
- /* in case we weren't called from the listener thread, wake up the
- * listener thread
- */
- wakeup_listener();
-
- /* for ungraceful termination, let the workers exit now;
- * for graceful termination, the listener thread will notify the
- * workers to exit once it has stopped accepting new connections
- */
- if (mode == ST_UNGRACEFUL) {
- workers_may_exit = 1;
- ap_queue_interrupt_all(worker_queue);
+ workers_may_exit = 1;
+ for (i = 0; i < ap_threads_per_child; i++) {
+ (void)worker_stack_awaken_next(idle_worker_stack);
}
}
@@ -576,10 +631,7 @@
* maybe it should be ap_mpm_process_exiting?
*/
{
- /* note: for a graceful termination, listener_may_exit will be set before
- * workers_may_exit, so check listener_may_exit
- */
- return listener_may_exit;
+ return workers_may_exit;
}
/*****************************************************************
@@ -659,45 +711,83 @@
*/
}
-static void *listener_thread(apr_thread_t *thd, void * dummy)
+static void *worker_thread(apr_thread_t *thd, void * dummy)
{
proc_info * ti = dummy;
int process_slot = ti->pid;
+ int thread_slot = ti->tid;
apr_pool_t *tpool = apr_thread_pool_get(thd);
void *csd = NULL;
+ apr_allocator_t *allocator;
apr_pool_t *ptrans; /* Pool for per-transaction stuff */
- apr_pool_t *recycled_pool = NULL;
+ apr_bucket_alloc_t *bucket_alloc;
int n;
apr_pollfd_t *pollset;
apr_status_t rv;
ap_listen_rec *lr, *last_lr = ap_listeners;
+ worker_wakeup_info *wakeup;
+ int is_listener;
+
+ ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING,
+NULL);
free(ti);
+ apr_allocator_create(&allocator);
+ apr_pool_create_ex(&ptrans, NULL, NULL, allocator);
+ apr_allocator_set_owner(allocator, ptrans);
+ bucket_alloc = apr_bucket_alloc_create(tpool);
+
+ wakeup = (worker_wakeup_info *)apr_palloc(tpool, sizeof(*wakeup));
+ if ((rv = apr_thread_cond_create(&wakeup->cond, tpool)) != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+ "apr_thread_cond_create failed. Attempting to shutdown "
+ "process gracefully.");
+ signal_threads(ST_GRACEFUL);
+ goto done;
+ }
+ if ((rv = apr_thread_mutex_create(&wakeup->mutex, APR_THREAD_MUTEX_DEFAULT,
+ tpool)) != APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+ "apr_thread_mutex_create failed. Attempting to shutdown "
+ "process gracefully.");
+ signal_threads(ST_GRACEFUL);
+ goto done;
+ }
+ apr_thread_mutex_lock(wakeup->mutex);
+
apr_poll_setup(&pollset, num_listensocks, tpool);
for(lr = ap_listeners ; lr != NULL ; lr = lr->next)
apr_poll_socket_add(pollset, lr->sd, APR_POLLIN);
- /* Unblock the signal used to wake this thread up, and set a handler for
- * it.
- */
- unblock_signal(LISTENER_SIGNAL);
- apr_signal(LISTENER_SIGNAL, dummy_signal_handler);
-
/* TODO: Switch to a system where threads reuse the results from earlier
poll calls - manoj */
- while (1) {
+ is_listener = 0;
+ while (!workers_may_exit) {
+
+ ap_update_child_status_from_indexes(process_slot, thread_slot,
+ SERVER_READY, NULL);
+ if (!is_listener) {
+ /* Wait until it's our turn to become the listener */
+ if ((rv = worker_stack_wait(idle_worker_stack, wakeup)) !=
+ APR_SUCCESS) {
+ ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
+ "worker_stack_wait failed. Shutting down");
+ break;
+ }
+ is_listener = 1;
+ }
+
/* TODO: requests_this_child should be synchronized - aaron */
if (requests_this_child <= 0) {
check_infinite_requests();
}
- if (listener_may_exit) break;
+ if (workers_may_exit) break;
if ((rv = SAFE_ACCEPT(apr_proc_mutex_lock(accept_mutex)))
!= APR_SUCCESS) {
int level = APLOG_EMERG;
- if (listener_may_exit) {
+ if (workers_may_exit) {
break;
}
if (ap_scoreboard_image->parent[process_slot].generation !=
@@ -716,7 +806,7 @@
lr = ap_listeners;
}
else {
- while (!listener_may_exit) {
+ while (!workers_may_exit) {
apr_status_t ret;
apr_int16_t event;
@@ -733,7 +823,7 @@
signal_threads(ST_GRACEFUL);
}
- if (listener_may_exit) break;
+ if (workers_may_exit) break;
/* find a listener */
lr = last_lr;
@@ -752,19 +842,7 @@
}
}
got_fd:
- if (!listener_may_exit) {
- /* create a new transaction pool for each accepted socket */
- if (recycled_pool == NULL) {
- apr_allocator_t *allocator;
-
- apr_allocator_create(&allocator);
- apr_pool_create_ex(&ptrans, NULL, NULL, allocator);
- apr_allocator_set_owner(allocator, ptrans);
- }
- else {
- ptrans = recycled_pool;
- }
- apr_pool_tag(ptrans, "transaction");
+ if (!workers_may_exit) {
rv = lr->accept_func(&csd, lr, ptrans);
/* If we were interrupted for whatever reason, just start
@@ -782,7 +860,7 @@
!= APR_SUCCESS) {
int level = APLOG_EMERG;
- if (listener_may_exit) {
+ if (workers_may_exit) {
break;
}
if (ap_scoreboard_image->parent[process_slot].generation !=
@@ -795,16 +873,13 @@
signal_threads(ST_GRACEFUL);
}
if (csd != NULL) {
- rv = ap_queue_push(worker_queue, csd, ptrans,
- &recycled_pool);
- if (rv) {
- /* trash the connection; we couldn't queue the connected
- * socket to a worker
- */
- apr_socket_close(csd);
- ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
- "ap_queue_push failed");
- }
+ is_listener = 0;
+ worker_stack_awaken_next(idle_worker_stack);
+ process_socket(ptrans, csd, process_slot,
+ thread_slot, bucket_alloc);
+ apr_pool_clear(ptrans);
+ requests_this_child--;
+ apr_socket_close(csd); /* Debug only */
}
}
else {
@@ -819,75 +894,10 @@
}
}
- ap_queue_term(worker_queue);
+ done:
dying = 1;
ap_scoreboard_image->parent[process_slot].quiescing = 1;
-
- /* wake up the main thread */
- kill(ap_my_pid, SIGTERM);
-
- apr_thread_exit(thd, APR_SUCCESS);
- return NULL;
-}
-
-/* XXX For ungraceful termination/restart, we definitely don't want to
- * wait for active connections to finish but we may want to wait
- * for idle workers to get out of the queue code and release mutexes,
- * since those mutexes are cleaned up pretty soon and some systems
- * may not react favorably (i.e., segfault) if operations are attempted
- * on cleaned-up mutexes.
- */
-static void * APR_THREAD_FUNC worker_thread(apr_thread_t *thd, void * dummy)
-{
- proc_info * ti = dummy;
- int process_slot = ti->pid;
- int thread_slot = ti->tid;
- apr_socket_t *csd = NULL;
- apr_bucket_alloc_t *bucket_alloc;
- apr_pool_t *last_ptrans = NULL;
- apr_pool_t *ptrans; /* Pool for per-transaction stuff */
- apr_status_t rv;
-
- free(ti);
-
- ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_STARTING,
NULL);
-
- bucket_alloc = apr_bucket_alloc_create(apr_thread_pool_get(thd));
-
- while (!workers_may_exit) {
- ap_update_child_status_from_indexes(process_slot, thread_slot, SERVER_READY,
NULL);
- rv = ap_queue_pop(worker_queue, &csd, &ptrans, last_ptrans);
- last_ptrans = NULL;
-
- if (rv != APR_SUCCESS) {
- /* We get APR_EOF during a graceful shutdown once all the connections
- * accepted by this server process have been handled.
- */
- if (rv == APR_EOF) {
- break;
- }
- /* We get APR_EINTR whenever ap_queue_pop() has been interrupted
- * from an explicit call to ap_queue_interrupt_all(). This allows
- * us to unblock threads stuck in ap_queue_pop() when a shutdown
- * is pending.
- *
- * If workers_may_exit is set and this is ungraceful termination/
- * restart, we are bound to get an error on some systems (e.g.,
- * AIX, which sanity-checks mutex operations) since the queue
- * may have already been cleaned up. Don't log the "error" if
- * workers_may_exit is set.
- */
- if (rv != APR_EINTR && !workers_may_exit) {
- ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
- "ap_queue_pop failed");
- }
- continue;
- }
- process_socket(ptrans, csd, process_slot, thread_slot, bucket_alloc);
- requests_this_child--; /* FIXME: should be synchronized - aaron */
- apr_pool_clear(ptrans);
- last_ptrans = ptrans;
- }
+ worker_stack_awaken_next(idle_worker_stack);
ap_update_child_status_from_indexes(process_slot, thread_slot,
(dying) ? SERVER_DEAD : SERVER_GRACEFUL, (request_rec *) NULL);
@@ -908,34 +918,6 @@
return 0;
}
-static void create_listener_thread(thread_starter *ts)
-{
- int my_child_num = ts->child_num_arg;
- apr_threadattr_t *thread_attr = ts->threadattr;
- proc_info *my_info;
- apr_status_t rv;
-
- my_info = (proc_info *)malloc(sizeof(proc_info));
- my_info->pid = my_child_num;
- my_info->tid = -1; /* listener thread doesn't have a thread slot */
- my_info->sd = 0;
- rv = apr_thread_create(&ts->listener, thread_attr, listener_thread,
- my_info, pchild);
- if (rv != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf,
- "apr_thread_create: unable to create listener thread");
- /* In case system resources are maxxed out, we don't want
- * Apache running away with the CPU trying to fork over and
- * over and over again if we exit.
- * XXX Jeff doesn't see how Apache is going to try to fork again since
- * the exit code is APEXIT_CHILDFATAL
- */
- apr_sleep(10 * APR_USEC_PER_SEC);
- clean_child_exit(APEXIT_CHILDFATAL);
- }
- apr_os_thread_get(&listener_os_thread, ts->listener);
-}
-
/* XXX under some circumstances not understood, children can get stuck
* in start_threads forever trying to take over slots which will
* never be cleaned up; for now there is an APLOG_DEBUG message issued
@@ -955,19 +937,15 @@
int loops;
int prev_threads_created;
- /* We must create the fd queues before we start up the listener
- * and worker threads. */
- worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
- rv = ap_queue_init(worker_queue, ap_threads_per_child, pchild);
- if (rv != APR_SUCCESS) {
+ idle_worker_stack = worker_stack_create(pchild, ap_threads_per_child);
+ if (idle_worker_stack == NULL) {
ap_log_error(APLOG_MARK, APLOG_ALERT, rv, ap_server_conf,
- "ap_queue_init() failed");
+ "worker_stack_create() failed");
clean_child_exit(APEXIT_CHILDFATAL);
}
loops = prev_threads_created = 0;
while (1) {
- /* ap_threads_per_child does not include the listener thread */
for (i = 0; i < ap_threads_per_child; i++) {
int status = ap_scoreboard_image->servers[child_num_arg][i].status;
@@ -1003,12 +981,6 @@
clean_child_exit(APEXIT_CHILDFATAL);
}
threads_created++;
- if (threads_created == 1) {
- /* now that we have a worker thread, it makes sense to create
- * a listener thread (we don't want a listener without a worker!)
- */
- create_listener_thread(ts);
- }
}
if (start_thread_may_exit || threads_created == ap_threads_per_child) {
break;
@@ -1041,48 +1013,11 @@
return NULL;
}
-static void join_workers(apr_thread_t *listener, apr_thread_t **threads)
+static void join_workers(apr_thread_t **threads)
{
int i;
apr_status_t rv, thread_rv;
- if (listener) {
- int iter;
-
- /* deal with a rare timing window which affects waking up the
- * listener thread... if the signal sent to the listener thread
- * is delivered between the time it verifies that the
- * listener_may_exit flag is clear and the time it enters a
- * blocking syscall, the signal didn't do any good... work around
- * that by sleeping briefly and sending it again
- */
-
- iter = 0;
- while (iter < 10 &&
-#ifdef HAVE_PTHREAD_KILL
- pthread_kill(*listener_os_thread, 0)
-#else
- kill(ap_my_pid, 0)
-#endif
- == 0) {
- /* listener not dead yet */
- apr_sleep(APR_USEC_PER_SEC / 2);
- wakeup_listener();
- ++iter;
- }
- if (iter >= 10) {
- ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ap_server_conf,
- "the listener thread didn't exit");
- }
- else {
- rv = apr_thread_join(&thread_rv, listener);
- if (rv != APR_SUCCESS) {
- ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ap_server_conf,
- "apr_thread_join: unable to join listener thread");
- }
- }
- }
-
for (i = 0; i < ap_threads_per_child; i++) {
if (threads[i]) { /* if we ever created this thread */
rv = apr_thread_join(&thread_rv, threads[i]);
@@ -1181,7 +1116,6 @@
apr_threadattr_detach_set(thread_attr, 0);
ts->threads = threads;
- ts->listener = NULL;
ts->child_num_arg = child_num_arg;
ts->threadattr = thread_attr;
@@ -1221,7 +1155,7 @@
* If the worker hasn't exited, then this blocks until
* they have (then cleans up).
*/
- join_workers(ts->listener, threads);
+ join_workers(threads);
}
else { /* !one_process */
/* remove SIGTERM from the set of blocked signals... if one of
@@ -1262,7 +1196,7 @@
* If the worker hasn't exited, then this blocks until
* they have (then cleans up).
*/
- join_workers(ts->listener, threads);
+ join_workers(threads);
}
}