On 13 Mar 2016, at 10:55 PM, Eric Covener <cove...@gmail.com> wrote: > I also meant the original feature never made it, so we can whatever we > want to it.
What do you think of this? It includes a cleanup to the original pool to remove any unfired pollsets from the core. Index: include/ap_mpm.h =================================================================== --- include/ap_mpm.h (revision 1734657) +++ include/ap_mpm.h (working copy) @@ -207,50 +207,48 @@ void *baton); /** - * Register a callback on the readability or writability on a group of sockets - * @param s Null-terminated list of sockets + * Register a callback on the readability or writability on a group of + * sockets/pipes. + * @param pds Null-terminated list of apr_pollfd_t * @param p pool for use between registration and callback - * @param for_read Whether the sockets are monitored for read or writability * @param cbfn The callback function * @param baton userdata for the callback function - * @return APR_SUCCESS if all sockets could be added to a pollset, + * @return APR_SUCCESS if all sockets/pipes could be added to a pollset, * APR_ENOTIMPL if no asynch support, or an apr_pollset_add error. - * @remark When activity is found on any 1 socket in the list, all are removed + * @remark When activity is found on any 1 socket/pipe in the list, all are removed * from the pollset and only 1 callback is issued. */ -AP_DECLARE(apr_status_t) ap_mpm_register_socket_callback(apr_socket_t **s, - apr_pool_t *p, - int for_read, - ap_mpm_callback_fn_t *cbfn, - void *baton); +AP_DECLARE(apr_status_t) ap_mpm_register_poll_callback(apr_pollfd_t **pds, + apr_pool_t *p, + ap_mpm_callback_fn_t *cbfn, + void *baton); /** - * Register a callback on the readability or writability on a group of sockets, with a timeout - * @param s Null-terminated list of sockets + * Register a callback on the readability or writability on a group of sockets/pipes, + * with a timeout. + * @param pds Null-terminated list of apr_polldf_t * @param p pool for use between registration and callback - * @param for_read Whether the sockets are monitored for read or writability * @param cbfn The callback function * @param tofn The callback function if the timeout expires * @param baton userdata for the callback function * @param timeout timeout for I/O in microseconds, unlimited if <= 0 - * @return APR_SUCCESS if all sockets could be added to a pollset, + * @return APR_SUCCESS if all sockets/pipes could be added to a pollset, * APR_ENOTIMPL if no asynch support, or an apr_pollset_add error. - * @remark When activity is found on any 1 socket in the list, all are removed + * @remark When activity is found on any 1 socket/pipe in the list, all are removed * from the pollset and only 1 callback is issued. * @remark For each call, only one of tofn or cbfn will be called, never both. */ -AP_DECLARE(apr_status_t) ap_mpm_register_socket_callback_timeout(apr_socket_t **s, +AP_DECLARE(apr_status_t) ap_mpm_register_poll_callback_timeout(apr_pollfd_t **pfds, apr_pool_t *p, - int for_read, ap_mpm_callback_fn_t *cbfn, ap_mpm_callback_fn_t *tofn, void *baton, apr_time_t timeout); -AP_DECLARE(apr_status_t) ap_mpm_unregister_socket_callback(apr_socket_t **s, - apr_pool_t *p); +AP_DECLARE(apr_status_t) ap_mpm_unregister_poll_callback(apr_pollfd_t **pfds, + apr_pool_t *p); typedef enum mpm_child_status { MPM_CHILD_STARTED, Index: include/mpm_common.h =================================================================== --- include/mpm_common.h (revision 1734657) +++ include/mpm_common.h (working copy) @@ -426,15 +426,15 @@ * register the specified callback * @ingroup hooks */ -AP_DECLARE_HOOK(apr_status_t, mpm_register_socket_callback, - (apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton)) +AP_DECLARE_HOOK(apr_status_t, mpm_register_poll_callback, + (apr_pollfd_t **pds, apr_pool_t *p, ap_mpm_callback_fn_t *cbfn, void *baton)) /* register the specified callback, with timeout * @ingroup hooks * */ -AP_DECLARE_HOOK(apr_status_t, mpm_register_socket_callback_timeout, - (apr_socket_t **s, apr_pool_t *p, int for_read, +AP_DECLARE_HOOK(apr_status_t, mpm_register_poll_callback_timeout, + (apr_pollfd_t **pds, apr_pool_t *p, ap_mpm_callback_fn_t *cbfn, ap_mpm_callback_fn_t *tofn, void *baton, @@ -443,8 +443,8 @@ * Unregister the specified callback * @ingroup hooks */ -AP_DECLARE_HOOK(apr_status_t, mpm_unregister_socket_callback, - (apr_socket_t **s, apr_pool_t *p)) +AP_DECLARE_HOOK(apr_status_t, mpm_unregister_poll_callback, + (apr_pollfd_t **pds, apr_pool_t *p)) /** Resume the suspended connection * @ingroup hooks Index: server/mpm/event/event.c =================================================================== --- server/mpm/event/event.c (revision 1734657) +++ server/mpm/event/event.c (working copy) @@ -1528,9 +1528,27 @@ return APR_SUCCESS; } -static apr_status_t event_register_socket_callback_ex(apr_socket_t **s, +static apr_status_t event_cleanup_poll_callback(void *data) +{ + apr_status_t final_rc = APR_SUCCESS; + apr_pollfd_t **pfds = data; + + while (*pfds) { + if ((*pfds)->client_data) { + apr_status_t rc; + rc = apr_pollset_remove(event_pollset, (*pfds)); + if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) { + final_rc = rc; + } + } + pfds++; + } + + return final_rc; +} + +static apr_status_t event_register_poll_callback_ex(apr_pollfd_t **pfds, apr_pool_t *p, - int for_read, ap_mpm_callback_fn_t *cbfn, ap_mpm_callback_fn_t *tofn, void *baton, @@ -1540,15 +1558,12 @@ int i = 0, nsock; socket_callback_baton_t *scb = apr_pcalloc(p, sizeof(*scb)); listener_poll_type *pt = apr_palloc(p, sizeof(*pt)); - apr_pollfd_t **pfds = NULL; - while(s[i] != NULL) { - i++; + while(pfds[i] != NULL) { + i++; } nsock = i; - pfds = apr_pcalloc(p, (nsock+1) * sizeof(apr_pollfd_t*)); - pt->type = PT_USER; pt->baton = scb; @@ -1557,12 +1572,10 @@ scb->nsock = nsock; scb->pfds = pfds; + apr_pool_pre_cleanup_register(p, pfds, event_cleanup_poll_callback); + for (i = 0; i<nsock; i++) { - pfds[i] = apr_pcalloc(p, sizeof(apr_pollfd_t)); - pfds[i]->desc_type = APR_POLL_SOCKET; - pfds[i]->reqevents = (for_read ? APR_POLLIN : APR_POLLOUT) | APR_POLLERR | APR_POLLHUP; - pfds[i]->desc.s = s[i]; - pfds[i]->p = p; + pfds[i]->reqevents = (pfds[i]->reqevents) | APR_POLLERR | APR_POLLHUP; pfds[i]->client_data = pt; } @@ -1578,45 +1591,21 @@ } return final_rc; } -static apr_status_t event_register_socket_callback(apr_socket_t **s, - apr_pool_t *p, - int for_read, - ap_mpm_callback_fn_t *cbfn, - void *baton) + +static apr_status_t event_register_poll_callback(apr_pollfd_t **pfds, + apr_pool_t *p, + ap_mpm_callback_fn_t *cbfn, + void *baton) { - return event_register_socket_callback_ex(s, p, for_read, - cbfn, - NULL, /* no timeout function */ - baton, - 0 /* no timeout */); + return event_register_poll_callback_ex(pfds, p, + cbfn, + NULL, /* no timeout function */ + baton, + 0 /* no timeout */); } -static apr_status_t event_unregister_socket_callback(apr_socket_t **s, apr_pool_t *p) +static apr_status_t event_unregister_poll_callback(apr_pollfd_t **pfds, apr_pool_t *p) { - int i = 0, nsock; - apr_status_t final_rc = APR_SUCCESS; - apr_pollfd_t **pfds = NULL; - - while(s[i] != NULL) { - i++; - } - nsock = i; - - pfds = apr_palloc(p, nsock * sizeof(apr_pollfd_t*)); - - for (i = 0; i<nsock; i++) { - apr_status_t rc; - pfds[i] = apr_pcalloc(p, sizeof(apr_pollfd_t)); - pfds[i]->desc_type = APR_POLL_SOCKET; - pfds[i]->reqevents = APR_POLLERR | APR_POLLHUP; - pfds[i]->desc.s = s[i]; - pfds[i]->client_data = NULL; - rc = apr_pollset_remove(event_pollset, pfds[i]); - if (rc != APR_SUCCESS && !APR_STATUS_IS_NOTFOUND(rc)) { - final_rc = rc; - } - } - - return final_rc; + return apr_pool_cleanup_run(p, pfds, event_cleanup_poll_callback); } /* @@ -2057,7 +2046,8 @@ NULL /* no associated socket callback */); /* remove all sockets in my set */ for (i = 0; i < baton->nsock; i++) { - apr_pollset_remove(event_pollset, baton->pfds[i]); + apr_pollset_remove(event_pollset, baton->pfds[i]); + baton->pfds[i]->client_data = NULL; } push_timer2worker(te); @@ -3789,11 +3779,11 @@ ap_hook_mpm_query(event_query, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_mpm_register_timed_callback(event_register_timed_callback, NULL, NULL, APR_HOOK_MIDDLE); - ap_hook_mpm_register_socket_callback(event_register_socket_callback, NULL, NULL, + ap_hook_mpm_register_poll_callback(event_register_poll_callback, NULL, NULL, APR_HOOK_MIDDLE); - ap_hook_mpm_register_socket_callback_timeout(event_register_socket_callback_ex, NULL, NULL, + ap_hook_mpm_register_poll_callback_timeout(event_register_poll_callback_ex, NULL, NULL, APR_HOOK_MIDDLE); - ap_hook_mpm_unregister_socket_callback(event_unregister_socket_callback, NULL, NULL, + ap_hook_mpm_unregister_poll_callback(event_unregister_poll_callback, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_pre_read_request(event_pre_read_request, NULL, NULL, APR_HOOK_MIDDLE); ap_hook_post_read_request(event_post_read_request, NULL, NULL, APR_HOOK_MIDDLE); Index: server/mpm_common.c =================================================================== --- server/mpm_common.c (revision 1734657) +++ server/mpm_common.c (working copy) @@ -68,9 +68,9 @@ APR_HOOK_LINK(mpm) \ APR_HOOK_LINK(mpm_query) \ APR_HOOK_LINK(mpm_register_timed_callback) \ - APR_HOOK_LINK(mpm_register_socket_callback) \ - APR_HOOK_LINK(mpm_register_socket_callback_timeout) \ - APR_HOOK_LINK(mpm_unregister_socket_callback) \ + APR_HOOK_LINK(mpm_register_poll_callback) \ + APR_HOOK_LINK(mpm_register_poll_callback_timeout) \ + APR_HOOK_LINK(mpm_unregister_poll_callback) \ APR_HOOK_LINK(mpm_get_name) \ APR_HOOK_LINK(mpm_resume_suspended) \ APR_HOOK_LINK(end_generation) \ @@ -110,15 +110,15 @@ AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_resume_suspended, (conn_rec *c), (c), APR_ENOTIMPL) -AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_socket_callback, - (apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton), - (s, p, for_read, cbfn, baton), APR_ENOTIMPL) -AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_socket_callback_timeout, - (apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, ap_mpm_callback_fn_t *tofn, void *baton, apr_time_t timeout), - (s, p, for_read, cbfn, tofn, baton, timeout), APR_ENOTIMPL) -AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_unregister_socket_callback, - (apr_socket_t **s, apr_pool_t *p), - (s, p), APR_ENOTIMPL) +AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_poll_callback, + (apr_pollfd_t **pds, apr_pool_t *p, ap_mpm_callback_fn_t *cbfn, void *baton), + (pds, p, cbfn, baton), APR_ENOTIMPL) +AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_register_poll_callback_timeout, + (apr_pollfd_t **pds, apr_pool_t *p, ap_mpm_callback_fn_t *cbfn, ap_mpm_callback_fn_t *tofn, void *baton, apr_time_t timeout), + (pds, p, cbfn, tofn, baton, timeout), APR_ENOTIMPL) +AP_IMPLEMENT_HOOK_RUN_FIRST(apr_status_t, mpm_unregister_poll_callback, + (apr_pollfd_t **pds, apr_pool_t *p), + (pds, p), APR_ENOTIMPL) AP_IMPLEMENT_HOOK_RUN_FIRST(int, output_pending, (conn_rec *c), (c), DECLINED) AP_IMPLEMENT_HOOK_RUN_FIRST(int, input_pending, @@ -565,23 +565,22 @@ { return ap_run_mpm_register_timed_callback(t, cbfn, baton); } -AP_DECLARE(apr_status_t) ap_mpm_register_socket_callback(apr_socket_t **s, apr_pool_t *p, int for_read, ap_mpm_callback_fn_t *cbfn, void *baton) +AP_DECLARE(apr_status_t) ap_mpm_register_poll_callback(apr_pollfd_t **pds, apr_pool_t *p, ap_mpm_callback_fn_t *cbfn, void *baton) { - return ap_run_mpm_register_socket_callback(s, p, for_read, cbfn, baton); + return ap_run_mpm_register_poll_callback(pds, p, cbfn, baton); } -AP_DECLARE(apr_status_t) ap_mpm_register_socket_callback_timeout(apr_socket_t **s, - apr_pool_t *p, - int for_read, - ap_mpm_callback_fn_t *cbfn, - ap_mpm_callback_fn_t *tofn, - void *baton, - apr_time_t timeout) +AP_DECLARE(apr_status_t) ap_mpm_register_poll_callback_timeout(apr_pollfd_t **pds, + apr_pool_t *p, + ap_mpm_callback_fn_t *cbfn, + ap_mpm_callback_fn_t *tofn, + void *baton, + apr_time_t timeout) { - return ap_run_mpm_register_socket_callback_timeout(s, p, for_read, cbfn, tofn, baton, timeout); + return ap_run_mpm_register_poll_callback_timeout(pds, p, cbfn, tofn, baton, timeout); } -AP_DECLARE(apr_status_t) ap_mpm_unregister_socket_callback(apr_socket_t **s, apr_pool_t *p) +AP_DECLARE(apr_status_t) ap_mpm_unregister_poll_callback(apr_pollfd_t **pds, apr_pool_t *p) { - return ap_run_mpm_unregister_socket_callback(s, p); + return ap_run_mpm_unregister_poll_callback(pds, p); } AP_DECLARE(const char *)ap_show_mpm(void) Index: modules/proxy/mod_proxy_wstunnel.c =================================================================== --- modules/proxy/mod_proxy_wstunnel.c (revision 1734657) +++ modules/proxy/mod_proxy_wstunnel.c (working copy) @@ -182,15 +182,29 @@ */ static void proxy_wstunnel_callback(void *b) { int status; - apr_socket_t *sockets[3] = {NULL, NULL, NULL}; ws_baton_t *baton = (ws_baton_t*)b; proxyws_dir_conf *dconf = ap_get_module_config(baton->r->per_dir_config, &proxy_wstunnel_module); apr_pool_clear(baton->subpool); status = proxy_wstunnel_pump(baton, dconf->async_delay, dconf->is_async); if (status == SUSPENDED) { - sockets[0] = baton->client_soc; - sockets[1] = baton->server_soc; - ap_mpm_register_socket_callback_timeout(sockets, baton->subpool, 1, + + apr_pollfd_t **pfds = apr_palloc(baton->subpool, (3) * sizeof(apr_pollfd_t*)); + + pfds[0] = apr_pcalloc(baton->subpool, sizeof(apr_pollfd_t)); + pfds[0]->desc_type = APR_POLL_SOCKET; + pfds[0]->reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP; + pfds[0]->desc.s = baton->client_soc; + pfds[0]->p = baton->subpool; + + pfds[1] = apr_pcalloc(baton->subpool, sizeof(apr_pollfd_t)); + pfds[1]->desc_type = APR_POLL_SOCKET; + pfds[1]->reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP; + pfds[1]->desc.s = baton->server_soc; + pfds[1]->p = baton->subpool; + + pfds[2] = NULL; + + ap_mpm_register_poll_callback_timeout(pfds, baton->subpool, proxy_wstunnel_callback, proxy_wstunnel_cancel_callback, baton, @@ -298,7 +312,6 @@ apr_bucket_brigade *bb = apr_brigade_create(p, c->bucket_alloc); apr_socket_t *client_socket = ap_get_conn_socket(c); ws_baton_t *baton = apr_pcalloc(r->pool, sizeof(ws_baton_t)); - apr_socket_t *sockets[3] = {NULL, NULL, NULL}; int status; proxyws_dir_conf *dconf = ap_get_module_config(r->per_dir_config, &proxy_wstunnel_module); @@ -377,9 +390,24 @@ status = proxy_wstunnel_pump(baton, dconf->async_delay, dconf->is_async); apr_pool_clear(baton->subpool); if (status == SUSPENDED) { - sockets[0] = baton->client_soc; - sockets[1] = baton->server_soc; - rv = ap_mpm_register_socket_callback_timeout(sockets, baton->subpool, 1, + + apr_pollfd_t **pfds = apr_palloc(baton->subpool, (3) * sizeof(apr_pollfd_t*)); + + pfds[0] = apr_pcalloc(baton->subpool, sizeof(apr_pollfd_t)); + pfds[0]->desc_type = APR_POLL_SOCKET; + pfds[0]->reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP; + pfds[0]->desc.s = baton->client_soc; + pfds[0]->p = baton->subpool; + + pfds[1] = apr_pcalloc(baton->subpool, sizeof(apr_pollfd_t)); + pfds[1]->desc_type = APR_POLL_SOCKET; + pfds[1]->reqevents = APR_POLLIN | APR_POLLERR | APR_POLLHUP; + pfds[1]->desc.s = baton->server_soc; + pfds[1]->p = baton->subpool; + + pfds[2] = NULL; + + rv = ap_mpm_register_poll_callback_timeout(pfds, baton->subpool, proxy_wstunnel_callback, proxy_wstunnel_cancel_callback, baton, Regards, Graham —