On 12/01/2008 03:55 AM, [EMAIL PROTECTED] wrote: > Author: pquerna > Date: Sun Nov 30 18:55:14 2008 > New Revision: 721952 > > URL: http://svn.apache.org/viewvc?rev=721952&view=rev > Log: > Add two new modules, originally written at Joost, to handle load balancing > across > multiple apache servers within the same datacenter. > > mod_heartbeat generates multicast status messages with the current number of > clients connected, but the formated can easily be extended to include other > things. > > mod_heartmonitor collects these messages into a static file, which then can > be > used for other modules to make load balancing decisions on. > > Added: > httpd/httpd/trunk/modules/cluster/ (with props) > httpd/httpd/trunk/modules/cluster/Makefile.in (with props) > httpd/httpd/trunk/modules/cluster/README.heartbeat > httpd/httpd/trunk/modules/cluster/README.heartmonitor > httpd/httpd/trunk/modules/cluster/config.m4 > httpd/httpd/trunk/modules/cluster/mod_heartbeat.c (with props) > httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c (with props) > Modified: > httpd/httpd/trunk/CHANGES > httpd/httpd/trunk/modules/README > > Modified: httpd/httpd/trunk/CHANGES > URL: > http://svn.apache.org/viewvc/httpd/httpd/trunk/CHANGES?rev=721952&r1=721951&r2=721952&view=diff > ============================================================================== > --- httpd/httpd/trunk/CHANGES [utf-8] (original) > +++ httpd/httpd/trunk/CHANGES [utf-8] Sun Nov 30 18:55:14 2008 > @@ -2,6 +2,12 @@ > Changes with Apache 2.3.0 > [ When backported to 2.2.x, remove entry from this file ] > > + *) mod_heartmonitor: New module to collect heartbeats, and write out a file > + so that other modules can load balance traffic as needed. [Paul Querna] > + > + *) mod_heartbeat: New module to genarate multicast heartbeats to konw if a > + server is online. [Paul Querna] > +
s/konw/know/ In addition to the later adjusted svn log message I would propose that you add Sanders and Justins name to this change entry as well. > Added: httpd/httpd/trunk/modules/cluster/mod_heartbeat.c > URL: > http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/mod_heartbeat.c?rev=721952&view=auto > ============================================================================== > --- httpd/httpd/trunk/modules/cluster/mod_heartbeat.c (added) > +++ httpd/httpd/trunk/modules/cluster/mod_heartbeat.c Sun Nov 30 18:55:14 2008 > @@ -0,0 +1,354 @@ > +/* Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +#include "httpd.h" > +#include "http_config.h" > +#include "http_log.h" > +#include "apr_strings.h" > + > +#include "ap_mpm.h" > +#include "scoreboard.h" > + > +#ifndef HEARTBEAT_INTERVAL > +#define HEARTBEAT_INTERVAL (1) > +#endif > + > +module AP_MODULE_DECLARE_DATA heartbeat_module; > + > +typedef struct hb_ctx_t > +{ > + int active; > + apr_sockaddr_t *mcast_addr; > + int server_limit; > + int thread_limit; > + int status; > + int keep_running; Shouldn't this be volatile? > + apr_proc_mutex_t *mutex; > + const char *mutex_path; > + apr_thread_mutex_t *start_mtx; > + apr_thread_t *thread; > + apr_file_t *lockf; > +} hb_ctx_t; > + > +static const char *msg_format = "v=%u&ready=%u&busy=%u"; > + > +#define MSG_VERSION (1) > + > +static int hb_monitor(hb_ctx_t *ctx, apr_pool_t *p) > +{ > + int i, j; > + apr_uint32_t ready = 0; > + apr_uint32_t busy = 0; > + > + for (i = 0; i < ctx->server_limit; i++) { > + process_score *ps; > + ps = ap_get_scoreboard_process(i); > + > + for (j = 0; j < ctx->thread_limit; j++) { > + worker_score *ws = NULL; > + > + ws = &ap_scoreboard_image->servers[i][j]; > + > + int res = ws->status; > + > + if (res == SERVER_READY && ps->generation == ap_my_generation) { > + ready++; > + } > + else if (res != SERVER_DEAD && > + res != SERVER_STARTING && res != SERVER_IDLE_KILL) { > + busy++; Is this correct even if ps->generation != ap_my_generation? > + } > + } > + } > + > + char buf[256]; > + apr_size_t len = > + apr_snprintf(buf, sizeof(buf), msg_format, MSG_VERSION, ready, busy); > + > + apr_socket_t *sock = NULL; > + do { > + apr_status_t rv; > + rv = apr_socket_create(&sock, ctx->mcast_addr->family, > + SOCK_DGRAM, APR_PROTO_UDP, p); > + if (rv) { > + ap_log_error(APLOG_MARK, APLOG_WARNING, rv, > + NULL, "Heartbeat: apr_socket_create failed"); > + break; > + } > + > + rv = apr_mcast_loopback(sock, 1); > + if (rv) { > + ap_log_error(APLOG_MARK, APLOG_WARNING, rv, > + NULL, "Heartbeat: apr_mcast_loopback failed"); > + break; > + } > + > + rv = apr_socket_sendto(sock, ctx->mcast_addr, 0, buf, &len); > + if (rv) { > + ap_log_error(APLOG_MARK, APLOG_WARNING, rv, > + NULL, "Heartbeat: apr_socket_sendto failed"); > + break; > + } > + } while (0); > + > + if (sock) { > + apr_socket_close(sock); > + } > + > + return OK; > +} > + > +#ifndef apr_time_from_msec > +#define apr_time_from_msec(x) (x * 1000) > +#endif > + > +static void *hb_worker(apr_thread_t *thd, void *data) > +{ > + hb_ctx_t *ctx = (hb_ctx_t *) data; > + apr_status_t rv; > + > + apr_pool_t *pool = apr_thread_pool_get(thd); > + apr_pool_tag(pool, "heartbeat_worker"); > + ctx->status = 0; > + ctx->keep_running = 1; > + apr_thread_mutex_unlock(ctx->start_mtx); > + > + while (ctx->keep_running) { > + rv = apr_proc_mutex_trylock(ctx->mutex); > + if (rv == APR_SUCCESS) { > + break; > + } > + apr_sleep(apr_time_from_msec(200)); > + } > + > + while (ctx->keep_running) { > + int mpm_state = 0; > + rv = ap_mpm_query(AP_MPMQ_MPM_STATE, &mpm_state); > + > + if (rv != APR_SUCCESS) { > + break; > + } > + > + if (mpm_state == AP_MPMQ_STOPPING) { > + ctx->keep_running = 0; > + break; > + } > + > + apr_pool_t *tpool; > + apr_pool_create(&tpool, pool); > + apr_pool_tag(tpool, "heartbeat_worker_temp"); > + hb_monitor(ctx, tpool); > + apr_pool_destroy(tpool); Why create / destroy and not simply create once and call apr_pool_clear in the loop? > + apr_sleep(apr_time_from_sec(HEARTBEAT_INTERVAL)); > + } > + > + apr_proc_mutex_unlock(ctx->mutex); > + apr_thread_exit(ctx->thread, APR_SUCCESS); > + > + return NULL; > +} > + > +static apr_status_t hb_pool_cleanup(void *baton) > +{ > + apr_status_t rv; > + hb_ctx_t *ctx = (hb_ctx_t *) baton; > + > + ctx->keep_running = 0; > + > + apr_thread_join(&rv, ctx->thread); > + > + return rv; > +} > + > +static void start_hb_worker(apr_pool_t *p, hb_ctx_t *ctx) > +{ > + apr_status_t rv; > + > + rv = apr_thread_mutex_create(&ctx->start_mtx, APR_THREAD_MUTEX_UNNESTED, > + p); > + > + if (rv) { > + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, > + "Heartbeat: apr_thread_cond_create failed"); You create a thread mutex above, not a thread cond. > + ctx->status = rv; > + return; > + } > + > + apr_thread_mutex_lock(ctx->start_mtx); > + > + apr_pool_cleanup_register(p, ctx, hb_pool_cleanup, > apr_pool_cleanup_null); > + > + rv = apr_thread_create(&ctx->thread, NULL, hb_worker, ctx, p); > + if (rv) { > + apr_pool_cleanup_kill(p, ctx, hb_pool_cleanup); > + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, > + "Heartbeat: apr_thread_create failed"); > + ctx->status = rv; > + } > + > + apr_thread_mutex_lock(ctx->start_mtx); > + apr_thread_mutex_unlock(ctx->start_mtx); This may deserve some comment. As far as I understand the desire is to wait until the hb_worker thread is up. But to be honest I do not understand the need for the start_mutex at all. > + apr_thread_mutex_destroy(ctx->start_mtx); > +} > + > +static void hb_child_init(apr_pool_t *p, server_rec *s) > +{ > + hb_ctx_t *ctx = ap_get_module_config(s->module_config, > &heartbeat_module); > + > + apr_proc_mutex_child_init(&ctx->mutex, ctx->mutex_path, p); > + > + ctx->status = -1; > + > + if (ctx->active) { > + start_hb_worker(p, ctx); > + if (ctx->status != 0) { > + ap_log_error(APLOG_MARK, APLOG_CRIT, 0, s, > + "Heartbeat: Failed to start worker thread."); > + return; > + } > + } > + > + return; > +} > + > +static int hb_init(apr_pool_t *p, apr_pool_t *plog, apr_pool_t *ptemp, > + server_rec *s) > +{ > + apr_status_t rv; > + hb_ctx_t *ctx = ap_get_module_config(s->module_config, > &heartbeat_module); > + > + ap_mpm_query(AP_MPMQ_HARD_LIMIT_THREADS, &ctx->thread_limit); > + ap_mpm_query(AP_MPMQ_HARD_LIMIT_DAEMONS, &ctx->server_limit); > + > + rv = apr_proc_mutex_create(&ctx->mutex, ctx->mutex_path, > +#if APR_HAS_FCNTL_SERIALIZE > + APR_LOCK_FCNTL, > +#else > +#if APR_HAS_FLOCK_SERIALIZE > + APR_LOCK_FLOCK, > +#else > +#error port me to a non crap platform. > +#endif > +#endif > + p); Is there any reason why we must use either APR_LOCK_FCNTL or APR_LOCK_FLOCK, wouldn't the default mutex work? > + > + if (rv) { > + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s, > + "Heartbeat: mutex failed creation at %s (type=%s)", > + ctx->mutex_path, apr_proc_mutex_defname()); And how do you know that apr_proc_mutex_defname is either APR_LOCK_FCNTL or APR_LOCK_FLOCK? Maybe the default mutex on this platform is something different. > > Added: httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c > URL: > http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c?rev=721952&view=auto > ============================================================================== > --- httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c (added) > +++ httpd/httpd/trunk/modules/cluster/mod_heartmonitor.c Sun Nov 30 18:55:14 > 2008 > @@ -0,0 +1,551 @@ > +/* Licensed to the Apache Software Foundation (ASF) under one or more > + * contributor license agreements. See the NOTICE file distributed with > + * this work for additional information regarding copyright ownership. > + * The ASF licenses this file to You under the Apache License, Version 2.0 > + * (the "License"); you may not use this file except in compliance with > + * the License. You may obtain a copy of the License at > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +#include "httpd.h" > +#include "http_config.h" > +#include "http_log.h" > +#include "apr_strings.h" > +#include "apr_hash.h" > +#include "ap_mpm.h" > +#include "scoreboard.h" > + > +module AP_MODULE_DECLARE_DATA heartmonitor_module; > + > +typedef struct hm_server_t > +{ > + const char *ip; > + int busy; > + int ready; > + apr_time_t seen; > +} hm_server_t; > + > +typedef struct hm_ctx_t > +{ > + int active; > + const char *storage_path; > + apr_proc_mutex_t *mutex; > + const char *mutex_path; > + apr_sockaddr_t *mcast_addr; > + int status; > + int keep_running; Shouldn't this be volatile? > + apr_thread_mutex_t *start_mtx; > + apr_thread_t *thread; > + apr_socket_t *sock; > + apr_pool_t *p; > + apr_hash_t *servers; > +} hm_ctx_t; > + > + > +static void *hm_worker(apr_thread_t *thd, void *data) > +{ > + hm_ctx_t *ctx = (hm_ctx_t *) data; > + apr_status_t rv; > + > + ctx->p = apr_thread_pool_get(thd); > + ctx->status = 0; > + ctx->keep_running = 1; > + apr_thread_mutex_unlock(ctx->start_mtx); > + > + while (ctx->keep_running) { > + rv = apr_proc_mutex_trylock(ctx->mutex); > + if (rv == APR_SUCCESS) { > + break; > + } > + apr_sleep(apr_time_from_msec(200)); > + } > + > + rv = hm_listen(ctx); > + > + if (rv) { > + ctx->status = rv; > + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, > + "Heartmonitor: Unable to listen for connections!"); > + apr_proc_mutex_unlock(ctx->mutex); > + apr_thread_exit(ctx->thread, rv); > + return NULL; > + } > + > + > + apr_time_t last = apr_time_now(); > + while (ctx->keep_running) { > + int n; > + apr_pool_t *p; > + apr_pollfd_t pfd; > + apr_interval_time_t timeout; > + apr_pool_create(&p, ctx->p); > + > + apr_time_t now = apr_time_now(); > + > + if (apr_time_sec((now - last)) > 5) { Hardcoded 5 seconds? Bah!! > + hm_update_stats(ctx, p); > + apr_pool_clear(p); > + last = now; > + } > + > + pfd.desc_type = APR_POLL_SOCKET; > + pfd.desc.s = ctx->sock; > + pfd.p = p; > + pfd.reqevents = APR_POLLIN; > + > + timeout = apr_time_from_sec(1); > + > + rv = apr_poll(&pfd, 1, &n, timeout); > + > + if (!ctx->keep_running) { > + break; > + } > + > + if (rv) { > + apr_pool_destroy(p); > + continue; > + } > + > + if (pfd.rtnevents & APR_POLLIN) { > + hm_recv(ctx, p); > + } > + > + apr_pool_destroy(p); Why not just clearing the pool? > + } > + > + apr_proc_mutex_unlock(ctx->mutex); > + apr_thread_exit(ctx->thread, APR_SUCCESS); > + > + return NULL; > +} > + > +static apr_status_t hm_pool_cleanup(void *baton) > +{ > + apr_status_t rv; > + hm_ctx_t *ctx = (hm_ctx_t *) baton; > + > + ctx->keep_running = 0; > + > + apr_thread_join(&rv, ctx->thread); > + > + return rv; > +} > + > +static void start_hm_worker(apr_pool_t *p, hm_ctx_t *ctx) > +{ > + apr_status_t rv; > + > + rv = apr_thread_mutex_create(&ctx->start_mtx, APR_THREAD_MUTEX_UNNESTED, > + p); > + > + if (rv) { > + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, > + "Heartmonitor: apr_thread_cond_create failed"); You create a thread mutex above, not a thread cond. > + ctx->status = rv; > + return; > + } > + > + apr_thread_mutex_lock(ctx->start_mtx); > + > + apr_pool_cleanup_register(p, ctx, hm_pool_cleanup, > apr_pool_cleanup_null); > + > + rv = apr_thread_create(&ctx->thread, NULL, hm_worker, ctx, p); > + if (rv) { > + apr_pool_cleanup_kill(p, ctx, hm_pool_cleanup); > + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, NULL, > + "Heartmonitor: apr_thread_create failed"); > + ctx->status = rv; > + } > + > + apr_thread_mutex_lock(ctx->start_mtx); > + apr_thread_mutex_unlock(ctx->start_mtx); This may deserve some comment. As far as I understand the desire is to wait until the hb_worker thread is up. But to be honest I do not understand the need for the start_mutex at all. > + apr_thread_mutex_destroy(ctx->start_mtx); > +} > + > +static void hm_child_init(apr_pool_t *p, server_rec *s) > +{ > + hm_ctx_t *ctx = > + ap_get_module_config(s->module_config, &heartmonitor_module); > + > + apr_proc_mutex_child_init(&ctx->mutex, ctx->mutex_path, p); > + > + ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s, > + "Heartmonitor: Starting Listener Thread. mcast=%pI", > + ctx->mcast_addr); > + > + ctx->status = -1; > + > + start_hm_worker(p, ctx); > + > + if (ctx->status != 0) { > + ap_log_error(APLOG_MARK, APLOG_CRIT, 0, s, > + "Heartmonitor: Failed to start listener thread."); > + return; > + } > + > + return; > +} > + > +static int hm_post_config(apr_pool_t *p, apr_pool_t *plog, > + apr_pool_t *ptemp, server_rec *s) > +{ > + hm_ctx_t *ctx = ap_get_module_config(s->module_config, > + &heartmonitor_module); > + > + apr_status_t rv = apr_proc_mutex_create(&ctx->mutex, > + ctx->mutex_path, > +#if APR_HAS_FCNTL_SERIALIZE > + > + APR_LOCK_FCNTL, > +#else > +#if APR_HAS_FLOCK_SERIALIZE > + APR_LOCK_FLOCK, > +#else > +#error port me to a non crap platform. > +#endif > +#endif > + p); Is there any reason why we must use either APR_LOCK_FCNTL or APR_LOCK_FLOCK, wouldn't the default mutex work? > + > + if (rv) { > + ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s, > + "Heartmonitor: Failed to create listener " > + "mutex at %s (type=%s)", ctx->mutex_path, > + apr_proc_mutex_defname()); And how do you know that apr_proc_mutex_defname is either APR_LOCK_FCNTL or APR_LOCK_FLOCK? Maybe the default mutex on this platform is something different. > + return !OK; > + } > + > + return OK; > +} > + > +static void hm_register_hooks(apr_pool_t *p) > +{ > + ap_hook_post_config(hm_post_config, NULL, NULL, APR_HOOK_MIDDLE); > + ap_hook_child_init(hm_child_init, NULL, NULL, APR_HOOK_MIDDLE); > +} > + > +static void *hm_create_config(apr_pool_t *p, server_rec *s) > +{ > + hm_ctx_t *ctx = (hm_ctx_t *) apr_palloc(p, sizeof(hm_ctx_t)); > + > + ctx->active = 0; > + ctx->storage_path = ap_server_root_relative(p, "logs/hb.dat"); Why doesn't ctx->mutex_path get initialized here? > + > + return ctx; > +} > + Regards RĂ¼diger
