Hi,

I would like to commit this module to trunk.
It's a watchdog module that creates a worker threads
either in parent, child via an API, and it's not for
standalone use, but for other module usage like
mod_heartbeat, probably mod_jk and others that need
maintenance threads.

The reason is because couple of modules would benefit
from this functionality, and this adds a common API.

Usage is very simple
1. ap_watchdog_get_instance
2. set singleton mode (only one child will execute)
3. register a callback(s) with desired interval

Module can either use pre-created watchdog instance
or create it's own in special cases thus creating
additional thread(s)

For example heartbeat would be by an order of magnitude
simpler (I took some concepts from it)
Also I'd love to register the callbacks for balancer
so transferred method can normalize the load.

Comments?

Regards
--
^(TM)
/* Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef WATCHDOG_H
#define WATCHDOG_H

/**
 * @file  ap_watchdog.h
 * @brief Watchdog module for Apache
 *
 * @defgroup MOD_WATCHDOG watchdog
 * @ingroup  APACHE_MODS
 * @{
 */

#include "httpd.h"
#include "http_config.h"
#include "http_log.h"
#include "ap_provider.h"

#include "apr.h"
#include "apr_strings.h"
#include "apr_pools.h"
#include "apr_shm.h"
#include "apr_hash.h"
#include "apr_file_io.h"
#include "apr_time.h"
#include "apr_thread_proc.h"
#include "apr_global_mutex.h"
#include "apr_thread_mutex.h"

#ifdef AP_NEED_SET_MUTEX_PERMS
#include "unixd.h"
#endif

#if APR_HAVE_UNISTD_H
#include <unistd.h>         /* for getpid() */
#endif

#ifdef __cplusplus
extern "C" {
#endif

/**
 * Dafault watchdog name
 */
#define AP_WATCHDOG_DEFAULT         "_default_"

/**
 * Watchdog create flags
 */
#define AP_WATCHDOG_PARENT          0x01    /** Run in parent process */
#define AP_WATCHDOG_SINGLETON       0x02    /** Run only in one process */

#define AP_WD_TM_RESOLUTION         APR_TIME_C(100000)  /* 1 second */
#define AP_WD_TM_SLICE              APR_TIME_C(10000)   /* 100 ms   */


typedef struct ap_watchdog_t ap_watchdog_t;

/**
 * Callback function used for watchdog.
 * @param data is what is passed to watchdog.
 * @param pool Temporary callback pool destoryed after the call.
 * @return APR_SUCCESS to continue calling the callback.
 */
typedef apr_status_t ap_watchdog_callback_fn_t(void *data, apr_pool_t *pool);

/**
 * Get watchdog instance.
 * @param watchdog Storage for watchdog instance.
 * @param name Watchdog name.
 * @param parent Non zero to get the parent process watchdog instance.
 * @param pool The pool use.
 * @return APR_SUCCESS if all went well
 */
AP_DECLARE(apr_status_t) ap_watchdog_get_instance(ap_watchdog_t **watchdog,
                                                  const char *name,
                                                  int parent,
                                                  apr_pool_t *p);

/**
 * Set watchdog singleton mode.
 * @param watchdog Watchdog instance.
 * @param on Nonzero to set.
 */
AP_DECLARE(void) ap_watchdog_set_singleton(ap_watchdog_t *watchdog, int on);

/**
 * Register watchdog callback.
 * @param watchdog Watchdog to use
 * @param flags Watchdog flags
 * @param callback  The function to call on whatchdog event.
 * @param data The data to pass to the callback function.
 * @return APR_SUCCESS if all went well
 */
AP_DECLARE(apr_status_t) ap_watchdog_register_callback(ap_watchdog_t *watchdog,
                            apr_interval_time_t interval,
                            const void *data,
                            ap_watchdog_callback_fn_t *callback);

#ifdef __cplusplus
}
#endif

#endif /* WATCHDOG_H */
/** @} */
/* Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

/* Watchdog module.
 */

#define CORE_PRIVATE
#include "ap_watchdog.h"
#include "ap_provider.h"

#ifdef AP_NEED_SET_MUTEX_PERMS
#include "unixd.h"
#endif

#define AP_WATCHODG_PGROUP    "watchdog"
#define AP_WATCHODG_PVERSION  "parent"
#define AP_WATCHODG_CVERSION  "child"

typedef struct watchdog_list_t watchdog_list_t;

struct watchdog_list_t
{
    struct watchdog_list_t *next;
    ap_watchdog_t *wd;
    apr_status_t status;
    apr_interval_time_t interval;
    apr_interval_time_t step;
    const void *data;
    ap_watchdog_callback_fn_t *callback_fn;
};

struct ap_watchdog_t
{
    apr_thread_mutex_t   *startup;
    apr_proc_mutex_t     *mutex;
    const char           *mutex_path;
    const char           *name;
    watchdog_list_t      *callbacks;
    int                   is_running;
    int                   singleton;
    apr_thread_t         *thread;
    apr_pool_t           *pool;
};

typedef struct wd_server_conf_t wd_server_conf_t;
struct wd_server_conf_t
{
    int active;
    apr_pool_t *pool;
    server_rec *s;
};

static const char *wd_provider_ver[] = {
    AP_WATCHODG_CVERSION,
    AP_WATCHODG_PVERSION,
    NULL
};

static char *wd_mutex_path = NULL;
static wd_server_conf_t *wd_server_conf = NULL;

static apr_status_t wd_worker_cleanup(void *data)
{
    apr_status_t rv;
    ap_watchdog_t *w = (ap_watchdog_t *)data;
    w->is_running = 0;
    apr_thread_join(&rv, w->thread);
    return rv;
}

static void* APR_THREAD_FUNC wd_worker(apr_thread_t *thread, void *data)
{
    ap_watchdog_t *w = (ap_watchdog_t *)data;
    apr_status_t rv;
    int locked = 0;

    w->pool = apr_thread_pool_get(thread);
    w->is_running = 1;

    apr_thread_mutex_unlock(w->startup);
    if (w->mutex) {
        while (w->is_running) {
            rv = apr_proc_mutex_trylock(w->mutex);
            if (rv == APR_SUCCESS) {
                locked = 1;
                break;
            }
            apr_sleep(AP_WD_TM_SLICE);
        }
    }
    if (w->is_running) {
        ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, wd_server_conf->s,
                    "%sWatchdog (%s) running",
                    w->singleton ? "Singleton" : "",
                    w->name);
    }
    /* Main execution loop */
    while (w->is_running) {
        apr_pool_t *ctx = NULL;
        apr_time_t curr;
        watchdog_list_t *wl = w->callbacks;

        apr_sleep(AP_WD_TM_SLICE);
        if (!w->is_running) {
            break;
        }
        curr = apr_time_now() - AP_WD_TM_SLICE;
        while (wl && w->is_running) {
            if (wl->status == APR_SUCCESS) {
                wl->step += (apr_time_now() - curr);
                if (wl->step >= wl->interval) {
                    if (!ctx)
                        apr_pool_create(&ctx, w->pool);
                    wl->step = 0;
                    /* Execute watchdog callback */
                    wl->status = (*wl->callback_fn)((void *)wl->data, ctx);
                }
            }
            wl = wl->next;
        }
        if (ctx)
            apr_pool_destroy(ctx);
        if (!w->is_running) {
            break;
        }
    }
    if (locked)
        apr_proc_mutex_unlock(w->mutex);
    apr_thread_exit(w->thread, APR_SUCCESS);

    return NULL;
}

static apr_status_t wd_startup(ap_watchdog_t *w, apr_pool_t *p)
{
    apr_status_t rc;

    /* Create thread startup mutex */
    rc = apr_thread_mutex_create(&w->startup, APR_THREAD_MUTEX_UNNESTED, p);
    if (rc != APR_SUCCESS)
        return rc;

    if (w->singleton) {
        /* Initialize singleton mutex in child */
        rc = apr_proc_mutex_child_init(&w->mutex, w->mutex_path, p);
        if (rc != APR_SUCCESS)
            return rc;
    }
    /* This mutex fixes problems with a fast start/fast end, where the pool
     * cleanup was being invoked before the thread completely spawned.
     */
    apr_thread_mutex_lock(w->startup);
    apr_pool_pre_cleanup_register(p, w, wd_worker_cleanup);

    /* Start the newly created watchdog */
    rc = apr_thread_create(&w->thread, NULL, wd_worker, w, p);
    if (rc) {
        apr_pool_cleanup_kill(p, w, wd_worker_cleanup);
    }

    apr_thread_mutex_lock(w->startup);
    apr_thread_mutex_unlock(w->startup);
    apr_thread_mutex_destroy(w->startup);

    return rc;
}

AP_DECLARE(apr_status_t) ap_watchdog_get_instance(ap_watchdog_t **watchdog,
                                                  const char *name,
                                                  int parent,
                                                  apr_pool_t *p)
{
    ap_watchdog_t *w;
    const char *pver = parent ? AP_WATCHODG_PVERSION : AP_WATCHODG_CVERSION;

    w = ap_lookup_provider(AP_WATCHODG_PGROUP, name, pver);
    if (w) {
        *watchdog = w;
        return APR_SUCCESS;
    }
    w = apr_pcalloc(p, sizeof(ap_watchdog_t));
    w->name   = name;
    w->pool   = p;

    *watchdog = w;
    return ap_register_provider(p, AP_WATCHODG_PGROUP, name,
                                pver, *watchdog);
}

AP_DECLARE(void) ap_watchdog_set_singleton(ap_watchdog_t *watchdog, int on)
{
    watchdog->singleton = on;
}

AP_DECLARE(apr_status_t) ap_watchdog_register_callback(ap_watchdog_t *w,
                            apr_interval_time_t interval,
                            const void *data,
                            ap_watchdog_callback_fn_t *callback)
{
    watchdog_list_t *c;

    c = apr_palloc(w->pool, sizeof(watchdog_list_t));

    c->interval = interval;
    c->data = data;
    c->callback_fn = callback;

    c->step = 0;
    c->status = APR_SUCCESS;
    c->wd = w;

    c->next = w->callbacks;
    w->callbacks = c;

    return APR_SUCCESS;
}

static apr_status_t wd_create_mutex(ap_watchdog_t *w, apr_pool_t *p)
{
    apr_status_t rv;
    apr_lockmech_e mech = APR_LOCK_DEFAULT;
    const char *b = wd_mutex_path ? wd_mutex_path : "logs";

    w->mutex_path = ap_server_root_relative(p,
                                        apr_pstrcat(p, b,
                                        "/.wdc-", w->name, ".mutex", NULL));

    /* TODO: Check the mutex mechanisms */
#if APR_HAS_FCNTL_SERIALIZE
    mech = APR_LOCK_FCNTL;
#else
#if APR_HAS_FLOCK_SERIALIZE
    mech = APR_LOCK_FLOCK;
#endif
#endif
    rv = apr_proc_mutex_create(&w->mutex, w->mutex_path, mech, p);
#ifdef AP_NEED_SET_MUTEX_PERMS
    if (rv == APR_SUCCESS) {
        rv = ap_unixd_set_proc_mutex_perms(w->mutex);
        if (rv != APR_SUCCESS) {
            /* Destroy the mutex early */
            apr_proc_mutex_destroy(w->mutex);
            w->mutex = NULL;
        }
    }
#endif
    return rv;
}

/*--------------------------------------------------------------------------*/
/*                                                                          */
/* Pre config hook.                                                         */
/* Create default watchdogs for parent and child                            */
/* Parent watchdog executes inside parent proces so it doesn't need the     */
/* singleton mutex                                                          */
/*                                                                          */
/*--------------------------------------------------------------------------*/
static int wd_pre_config_hook(apr_pool_t *pconf, apr_pool_t *plog,
                              apr_pool_t *ptemp)
{
    apr_status_t rv;
    ap_watchdog_t *w;

    /* Create parent process watchdog */
    if ((rv = ap_watchdog_get_instance(&w,
                AP_WATCHDOG_DEFAULT, 1, pconf)) != APR_SUCCESS) {
        return rv;
    }
    if ((rv = ap_watchdog_get_instance(&w,
                AP_WATCHDOG_DEFAULT, 0, pconf)) != APR_SUCCESS) {
        return rv;
    }
    w->singleton = 1;
    return OK;
}

/*--------------------------------------------------------------------------*/
/*                                                                          */
/* Post config hook.                                                        */
/* Create watchdog thread in parent and initializes Watchdog module         */
/*                                                                          */
/*--------------------------------------------------------------------------*/
static int wd_post_config_hook(apr_pool_t *pconf, apr_pool_t *plog,
                               apr_pool_t *ptemp, server_rec *s)
{
    apr_status_t rv;
    const char *pk = "watchdog_init_module_tag";
    const char *lk = "watchdog_pmutex_name_tag";
    apr_pool_t *pproc = s->process->pool;
    const apr_array_header_t *wl;

    apr_pool_userdata_get((void *)&wd_server_conf, pk, pproc);
    if (!wd_server_conf) {
        if (!(wd_server_conf = apr_pcalloc(pproc, sizeof(wd_server_conf_t))))
            return apr_get_os_error();
        apr_pool_create(&wd_server_conf->pool, pproc);
        wd_server_conf->s = s;
        apr_pool_userdata_set(wd_server_conf, pk, apr_pool_cleanup_null, pproc);
        /* First time config phase -- skip. */
        return OK;
    }
    if ((wl = ap_list_provider_names(pconf, AP_WATCHODG_PGROUP,
                                            AP_WATCHODG_PVERSION))) {
        const ap_list_provider_names_t *wn;
        int i;

        wn = (ap_list_provider_names_t *)wl->elts;
        for (i = 0; i < wl->nelts; i++) {
            ap_watchdog_t *w = ap_lookup_provider(AP_WATCHODG_PGROUP,
                                                  wn[i].provider_name,
                                                  AP_WATCHODG_PVERSION);
            if (w && w->callbacks) {
                /* We have some callbacks registered.
                 * Create the watchdog thread
                 */
                w->singleton = 0;
                if ((rv = wd_startup(w, wd_server_conf->pool)) != APR_SUCCESS) {
                    ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
                            "Watchdog: Failed to create parent worker thread.");
                    return rv;
                }
                wd_server_conf->active++;
            }
        }
    }
    if ((wl = ap_list_provider_names(pconf, AP_WATCHODG_PGROUP,
                                            AP_WATCHODG_CVERSION))) {
        const ap_list_provider_names_t *wn;
        int i;

        wn = (ap_list_provider_names_t *)wl->elts;
        for (i = 0; i < wl->nelts; i++) {
            ap_watchdog_t *w = ap_lookup_provider(AP_WATCHODG_PGROUP,
                                                  wn[i].provider_name,
                                                  AP_WATCHODG_CVERSION);
            if (w && w->callbacks) {
                /* We have some callbacks registered.
                 * Create mutexes for singleton watchdogs
                 */
                if (w->singleton) {
                    rv = wd_create_mutex(w, wd_server_conf->pool);
                    if (rv != APR_SUCCESS) {
                        ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
                                "Watchdog: Failed to create mutex.");
                        return rv;
                    }
                    wd_server_conf->active++;
                }
            }
        }
    }
    return OK;
}

/*--------------------------------------------------------------------------*/
/*                                                                          */
/* Child init hook.                                                         */
/* Create watchdog threads and initializes Mutexes in child                 */
/*                                                                          */
/*--------------------------------------------------------------------------*/
static void wd_child_init_hook(apr_pool_t *p, server_rec *s)
{
    apr_status_t rv;
    const apr_array_header_t *wl;

    if (!wd_server_conf->active) {
        /* We don't have anything configured, bail out.
         */
        return;
    }
    if ((wl = ap_list_provider_names(p, AP_WATCHODG_PGROUP,
                                        AP_WATCHODG_CVERSION))) {
        const ap_list_provider_names_t *wn;
        int i;
        wn = (ap_list_provider_names_t *)wl->elts;
        for (i = 0; i < wl->nelts; i++) {
            ap_watchdog_t *w = ap_lookup_provider(AP_WATCHODG_PGROUP,
                                                  wn[i].provider_name,
                                                  AP_WATCHODG_CVERSION);
            if (w && w->callbacks) {
                /* We have some callbacks registered.
                 * Kick of the watchdog
                 */
                if ((rv = wd_startup(w, wd_server_conf->pool)) != APR_SUCCESS) {
                    ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s,
                                 "Watchdog: Failed to create worker thread.");
                    /* No point to continue */
                    return;
                }
            }
        }
    }
}

/*--------------------------------------------------------------------------*/
/*                                                                          */
/* WatchdogMutexPath directive                                              */
/*                                                                          */
/*--------------------------------------------------------------------------*/
static const char *wd_cmd_mutex_path(cmd_parms *cmd, void *dummy,
                                     const char *arg)
{
    const char *errs = ap_check_cmd_context(cmd, GLOBAL_ONLY);

    if (errs != NULL)
        return errs;

    if (wd_mutex_path != NULL)
       return "Duplicate WatchdogMutexPath directives are not allowed";

    wd_mutex_path = apr_pstrdup(cmd->pool, arg);
    if (wd_mutex_path == NULL)
        return "Invalid WatchdogMutexPath name";
    if (wd_mutex_path[strlen(wd_mutex_path) - 1] == '/')
        wd_mutex_path[strlen(wd_mutex_path) - 1] = '\0';
    return NULL;
}

/*--------------------------------------------------------------------------*/
/*                                                                          */
/* List of directives specific to our module.                               */
/*                                                                          */
/*--------------------------------------------------------------------------*/
static const command_rec wd_directives[] =
{
    AP_INIT_TAKE1(
        "WatchdogMutexPath",                /* directive name               */
        wd_cmd_mutex_path,                  /* config action routine        */
        NULL,                               /* argument to include in call  */
        RSRC_CONF,                          /* where available              */
        "Path where the Watchdog mutexes will be created"
    ),
    {NULL}
};

/*--------------------------------------------------------------------------*/
/*                                                                          */
/* Which functions are responsible for which hooks in the server.           */
/*                                                                          */
/*--------------------------------------------------------------------------*/
static void wd_register_hooks(apr_pool_t *p)
{

    /* Only the mpm_winnt has child init hook handler.
     * Make sure that we are called after the mpm child init handler
     * initializes.
     */
    static const char *const after_mpm[]      = { "mpm_winnt.c", NULL};

    /* Pre config handling
     */
    ap_hook_pre_config(wd_pre_config_hook,
                       NULL,
                       NULL,
                       APR_HOOK_FIRST);

    /* Post config handling
     */
    ap_hook_post_config(wd_post_config_hook,
                        NULL,
                        NULL,
                        APR_HOOK_LAST);

    /* Child init hook
     */
    ap_hook_child_init(wd_child_init_hook,
                       after_mpm,
                       NULL,
                       APR_HOOK_MIDDLE);

}

/*--------------------------------------------------------------------------*/
/*                                                                          */
/* The list of callback routines and data structures that provide           */
/* the static hooks into our module from the other parts of the server.     */
/*                                                                          */
/*--------------------------------------------------------------------------*/
module AP_MODULE_DECLARE_DATA watchdog_module = {
    STANDARD20_MODULE_STUFF,
    NULL,                       /* create per-directory config structure */
    NULL,                       /* merge per-directory config structures */
    NULL,                       /* create per-server config structure */
    NULL,                       /* merge per-server config structures */
    wd_directives,              /* command apr_table_t */
    wd_register_hooks           /* register hooks */
};

Reply via email to