mturk 2005/04/18 05:33:42 Modified: jni/native/src poll.c Log: Make poll threadsafe and with add queue that is added to pollset before the actuall poll is called. Revision Changes Path 1.6 +101 -20 jakarta-tomcat-connectors/jni/native/src/poll.c Index: poll.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jni/native/src/poll.c,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- poll.c 15 Apr 2005 10:14:46 -0000 1.5 +++ poll.c 18 Apr 2005 12:33:42 -0000 1.6 @@ -16,6 +16,7 @@ #include "apr.h" #include "apr_pools.h" #include "apr_poll.h" +#include "apr_thread_mutex.h" #include "tcn.h" /* Internal poll structure for queryset @@ -24,10 +25,13 @@ typedef struct tcn_pollset { apr_pool_t *pool; apr_int32_t nelts; + apr_int32_t nadds; apr_int32_t nalloc; apr_pollset_t *pollset; + apr_thread_mutex_t *mutex; apr_pollfd_t *query_set; - apr_time_t *query_add; + apr_pollfd_t *query_add; + apr_time_t *query_ttl; apr_interval_time_t max_ttl; } tcn_pollset_t; @@ -38,24 +42,44 @@ apr_pool_t *p = J2P(pool, apr_pool_t *); apr_pollset_t *pollset = NULL; tcn_pollset_t *tps = NULL; + apr_thread_mutex_t *mutex = NULL; + apr_uint32_t f = (apr_uint32_t)flags; UNREFERENCED(o); + TCN_ASSERT(pool != 0); - TCN_THROW_IF_ERR(apr_pollset_create(&pollset, - (apr_uint32_t)size, p, (apr_uint32_t)flags), - pollset); + TCN_THROW_IF_ERR(apr_thread_mutex_create(&mutex, + APR_THREAD_MUTEX_DEFAULT, p), mutex); + + if (f & APR_POLLSET_THREADSAFE) { + apr_status_t rv = apr_pollset_create(&pollset, (apr_uint32_t)size, p, f); + if (rv == APR_ENOTIMPL) + f &= ~APR_POLLSET_THREADSAFE; + else if (rv != APR_SUCCESS) { + tcn_ThrowAPRException(e, rv); + goto cleanup; + } + } + if (pollset == NULL) { + TCN_THROW_IF_ERR(apr_pollset_create(&pollset, + (apr_uint32_t)size, p, f), pollset); + } tps = apr_palloc(p, sizeof(tcn_pollset_t)); tps->pollset = pollset; + tps->mutex = mutex; tps->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); - tps->query_add = apr_palloc(p, size * sizeof(apr_time_t)); + tps->query_add = apr_palloc(p, size * sizeof(apr_pollfd_t)); + tps->query_ttl = apr_palloc(p, size * sizeof(apr_time_t)); tps->nelts = 0; + tps->nadds = 0; tps->nalloc = size; tps->pool = p; tps->max_ttl = J2T(ttl); - -cleanup: return P2J(tps); - +cleanup: + if (mutex) + apr_thread_mutex_destroy(mutex); + return 0; } TCN_IMPLEMENT_CALL(jint, Poll, destroy)(TCN_STDARGS, jlong pollset) @@ -63,6 +87,8 @@ tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *); UNREFERENCED_STDARGS; + TCN_ASSERT(pollset != 0); + apr_thread_mutex_destroy(p->mutex); return (jint)apr_pollset_destroy(p->pollset); } @@ -75,20 +101,20 @@ apr_status_t rv; UNREFERENCED_STDARGS; - if (p->nelts == p->nalloc) { - return APR_ENOMEM; - } + TCN_ASSERT(socket != 0); + if (p->nadds == p->nalloc) + return APR_ENOMEM; + if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS) + return rv; memset(&fd, 0, sizeof(apr_pollfd_t)); fd.desc_type = APR_POLL_SOCKET; fd.reqevents = (apr_int16_t)reqevents; fd.desc.s = J2P(socket, apr_socket_t *); fd.client_data = J2P(data, void *); - if ((rv = apr_pollset_add(p->pollset, &fd)) == APR_SUCCESS) { - p->query_set[p->nelts] = fd; - p->query_add[p->nelts] = apr_time_now(); - p->nelts++; - } + p->query_add[p->nadds] = fd; + p->nadds++; + apr_thread_mutex_unlock(p->mutex); return (jint)rv; } @@ -98,13 +124,17 @@ tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *); apr_pollfd_t fd; apr_int32_t i; + apr_status_t rv; UNREFERENCED_STDARGS; + TCN_ASSERT(socket != 0); memset(&fd, 0, sizeof(apr_pollfd_t)); fd.desc_type = APR_POLL_SOCKET; fd.desc.s = J2P(socket, apr_socket_t *); + if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS) + return (jint)rv; for (i = 0; i < p->nelts; i++) { if (fd.desc.s == p->query_set[i].desc.s) { /* Found an instance of the fd: remove this and any other copies */ @@ -123,8 +153,31 @@ break; } } + /* Remove from add queue if present + * This is unlikely to happen, but do it anyway. + */ + for (i = 0; i < p->nadds; i++) { + if (fd.desc.s == p->query_add[i].desc.s) { + /* Found an instance of the fd: remove this and any other copies */ + apr_int32_t dst = i; + apr_int32_t old_nelts = p->nadds; + p->nadds--; + for (i++; i < old_nelts; i++) { + if (fd.desc.s == p->query_add[i].desc.s) { + p->nadds--; + } + else { + p->query_add[dst] = p->query_add[i]; + dst++; + } + } + break; + } + } - return (jint)apr_pollset_remove(p->pollset, &fd); + rv = apr_pollset_remove(p->pollset, &fd); + apr_thread_mutex_unlock(p->mutex); + return (jint)rv; } TCN_IMPLEMENT_CALL(jint, Poll, poll)(TCN_STDARGS, jlong pollset, @@ -134,12 +187,35 @@ tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *); jlong *pset = (*e)->GetLongArrayElements(e, set, NULL); apr_int32_t n, i = 0, num = 0; - apr_status_t rv; + apr_status_t rv = APR_SUCCESS; UNREFERENCED(o); + TCN_ASSERT(pollset != 0); + + if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS) + return (jint)(-rv); + /* Add what is present in add queue */ + for (n = 0; n < p->nadds; n++) { + apr_pollfd_t pf = p->query_add[n]; + if (p->nelts == p->nalloc) { + rv = APR_ENOMEM; + break; + } + if ((rv = apr_pollset_add(p->pollset, &pf)) != APR_SUCCESS) + break; + p->query_ttl[p->nelts] = apr_time_now(); + p->query_set[p->nelts] = pf; + p->nelts++; + } + p->nadds = 0; + apr_thread_mutex_unlock(p->mutex); + if (rv != APR_SUCCESS) + return (jint)(-rv); rv = apr_pollset_poll(p->pollset, J2T(timeout), &num, &fd); + if (rv != APR_SUCCESS) + num = 0; - if (rv == APR_SUCCESS && num > 0) { + if (num > 0) { for (i = 0; i < num; i++) { pset[i] = P2J(fd); fd ++; @@ -151,8 +227,9 @@ /* TODO: Add thread mutex protection * or make sure the Java part is synchronized. */ + apr_thread_mutex_lock(p->mutex); for (n = 0; n < p->nelts; n++) { - if ((now - p->query_add[n]) > p->max_ttl) { + if ((now - p->query_ttl[n]) > p->max_ttl) { p->query_set[n].rtnevents = APR_POLLHUP | APR_POLLIN; if (i < p->nelts) { pset[i++] = P2J(&(p->query_set[n])); @@ -160,6 +237,7 @@ } } } + apr_thread_mutex_unlock(p->mutex); } if (num) (*e)->ReleaseLongArrayElements(e, set, pset, 0); @@ -173,6 +251,7 @@ { apr_pollfd_t *fd = J2P(pollfd, apr_pollfd_t *); UNREFERENCED_STDARGS; + TCN_ASSERT(pollfd != 0); return P2J(fd->desc.s); } @@ -180,6 +259,7 @@ { apr_pollfd_t *fd = J2P(pollfd, apr_pollfd_t *); UNREFERENCED_STDARGS; + TCN_ASSERT(pollfd != 0); return P2J(fd->client_data); } @@ -187,6 +267,7 @@ { apr_pollfd_t *fd = J2P(pollfd, apr_pollfd_t *); UNREFERENCED_STDARGS; + TCN_ASSERT(pollfd != 0); return (jint)fd->rtnevents; }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]