mturk       2005/04/18 05:33:42

  Modified:    jni/native/src poll.c
  Log:
  Make poll threadsafe and with add queue that is added to pollset
  before the actuall poll is called.
  
  Revision  Changes    Path
  1.6       +101 -20   jakarta-tomcat-connectors/jni/native/src/poll.c
  
  Index: poll.c
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-connectors/jni/native/src/poll.c,v
  retrieving revision 1.5
  retrieving revision 1.6
  diff -u -r1.5 -r1.6
  --- poll.c    15 Apr 2005 10:14:46 -0000      1.5
  +++ poll.c    18 Apr 2005 12:33:42 -0000      1.6
  @@ -16,6 +16,7 @@
   #include "apr.h"
   #include "apr_pools.h"
   #include "apr_poll.h"
  +#include "apr_thread_mutex.h"
   #include "tcn.h"
   
   /* Internal poll structure for queryset
  @@ -24,10 +25,13 @@
   typedef struct tcn_pollset {
       apr_pool_t    *pool;
       apr_int32_t   nelts;
  +    apr_int32_t   nadds;
       apr_int32_t   nalloc;
       apr_pollset_t *pollset;
  +    apr_thread_mutex_t *mutex;
       apr_pollfd_t  *query_set;
  -    apr_time_t    *query_add;
  +    apr_pollfd_t  *query_add;
  +    apr_time_t    *query_ttl;
       apr_interval_time_t max_ttl;
   } tcn_pollset_t;
   
  @@ -38,24 +42,44 @@
       apr_pool_t *p = J2P(pool, apr_pool_t *);
       apr_pollset_t *pollset = NULL;
       tcn_pollset_t *tps = NULL;
  +    apr_thread_mutex_t *mutex = NULL;
  +    apr_uint32_t f = (apr_uint32_t)flags;
       UNREFERENCED(o);
  +    TCN_ASSERT(pool != 0);
   
  -    TCN_THROW_IF_ERR(apr_pollset_create(&pollset,
  -                     (apr_uint32_t)size, p, (apr_uint32_t)flags),
  -                     pollset);
  +    TCN_THROW_IF_ERR(apr_thread_mutex_create(&mutex,
  +                     APR_THREAD_MUTEX_DEFAULT, p),  mutex);
  +
  +    if (f & APR_POLLSET_THREADSAFE) {
  +        apr_status_t rv = apr_pollset_create(&pollset, (apr_uint32_t)size, 
p, f);
  +        if (rv == APR_ENOTIMPL)
  +            f &= ~APR_POLLSET_THREADSAFE;
  +        else if (rv != APR_SUCCESS) {
  +            tcn_ThrowAPRException(e, rv);
  +            goto cleanup;
  +        }
  +    }
  +    if (pollset == NULL) {
  +        TCN_THROW_IF_ERR(apr_pollset_create(&pollset,
  +                         (apr_uint32_t)size, p, f), pollset);
  +    }
   
       tps = apr_palloc(p, sizeof(tcn_pollset_t));
       tps->pollset = pollset;
  +    tps->mutex   = mutex;
       tps->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
  -    tps->query_add = apr_palloc(p, size * sizeof(apr_time_t));
  +    tps->query_add = apr_palloc(p, size * sizeof(apr_pollfd_t));
  +    tps->query_ttl = apr_palloc(p, size * sizeof(apr_time_t));
       tps->nelts  = 0;
  +    tps->nadds  = 0;
       tps->nalloc = size;
       tps->pool   = p;
       tps->max_ttl = J2T(ttl);
  -
  -cleanup:
       return P2J(tps);
  -
  +cleanup:
  +    if (mutex)
  +        apr_thread_mutex_destroy(mutex);
  +    return 0;
   }
   
   TCN_IMPLEMENT_CALL(jint, Poll, destroy)(TCN_STDARGS, jlong pollset)
  @@ -63,6 +87,8 @@
       tcn_pollset_t *p = J2P(pollset,  tcn_pollset_t *);
   
       UNREFERENCED_STDARGS;
  +    TCN_ASSERT(pollset != 0);
  +    apr_thread_mutex_destroy(p->mutex);
       return (jint)apr_pollset_destroy(p->pollset);
   }
   
  @@ -75,20 +101,20 @@
       apr_status_t rv;
   
       UNREFERENCED_STDARGS;
  -    if (p->nelts == p->nalloc) {
  -        return APR_ENOMEM;
  -    }
  +    TCN_ASSERT(socket != 0);
   
  +    if (p->nadds == p->nalloc)
  +        return APR_ENOMEM;
  +    if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS)
  +        return rv;
       memset(&fd, 0, sizeof(apr_pollfd_t));
       fd.desc_type = APR_POLL_SOCKET;
       fd.reqevents = (apr_int16_t)reqevents;
       fd.desc.s = J2P(socket, apr_socket_t *);
       fd.client_data = J2P(data, void *);
  -    if ((rv = apr_pollset_add(p->pollset, &fd)) == APR_SUCCESS) {
  -        p->query_set[p->nelts] = fd;
  -        p->query_add[p->nelts] = apr_time_now();
  -        p->nelts++;
  -    }
  +    p->query_add[p->nadds] = fd;
  +    p->nadds++;
  +    apr_thread_mutex_unlock(p->mutex);
       return (jint)rv;
   }
   
  @@ -98,13 +124,17 @@
       tcn_pollset_t *p = J2P(pollset,  tcn_pollset_t *);
       apr_pollfd_t fd;
       apr_int32_t i;
  +    apr_status_t rv;
   
       UNREFERENCED_STDARGS;
  +    TCN_ASSERT(socket != 0);
   
       memset(&fd, 0, sizeof(apr_pollfd_t));
       fd.desc_type = APR_POLL_SOCKET;
       fd.desc.s = J2P(socket, apr_socket_t *);
   
  +    if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS)
  +        return (jint)rv;
       for (i = 0; i < p->nelts; i++) {
           if (fd.desc.s == p->query_set[i].desc.s) {
               /* Found an instance of the fd: remove this and any other copies 
*/
  @@ -123,8 +153,31 @@
               break;
           }
       }
  +    /* Remove from add queue if present
  +     * This is unlikely to happen, but do it anyway.
  +     */
  +    for (i = 0; i < p->nadds; i++) {
  +        if (fd.desc.s == p->query_add[i].desc.s) {
  +            /* Found an instance of the fd: remove this and any other copies 
*/
  +            apr_int32_t dst = i;
  +            apr_int32_t old_nelts = p->nadds;
  +            p->nadds--;
  +            for (i++; i < old_nelts; i++) {
  +                if (fd.desc.s == p->query_add[i].desc.s) {
  +                    p->nadds--;
  +                }
  +                else {
  +                    p->query_add[dst] = p->query_add[i];
  +                    dst++;
  +                }
  +            }
  +            break;
  +        }
  +    }
   
  -    return (jint)apr_pollset_remove(p->pollset, &fd);
  +    rv = apr_pollset_remove(p->pollset, &fd);
  +    apr_thread_mutex_unlock(p->mutex);
  +    return (jint)rv;
   }
   
   TCN_IMPLEMENT_CALL(jint, Poll, poll)(TCN_STDARGS, jlong pollset,
  @@ -134,12 +187,35 @@
       tcn_pollset_t *p = J2P(pollset,  tcn_pollset_t *);
       jlong *pset = (*e)->GetLongArrayElements(e, set, NULL);
       apr_int32_t  n, i = 0, num = 0;
  -    apr_status_t rv;
  +    apr_status_t rv = APR_SUCCESS;
   
       UNREFERENCED(o);
  +    TCN_ASSERT(pollset != 0);
  +
  +    if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS)
  +        return (jint)(-rv);
  +    /* Add what is present in add queue */
  +    for (n = 0; n < p->nadds; n++) {
  +        apr_pollfd_t pf = p->query_add[n];
  +        if (p->nelts == p->nalloc) {
  +            rv = APR_ENOMEM;
  +            break;
  +        }
  +        if ((rv = apr_pollset_add(p->pollset, &pf)) != APR_SUCCESS)
  +            break;
  +        p->query_ttl[p->nelts] = apr_time_now();
  +        p->query_set[p->nelts] = pf;
  +        p->nelts++;
  +    }
  +    p->nadds = 0;
  +    apr_thread_mutex_unlock(p->mutex);
  +    if (rv != APR_SUCCESS)
  +        return (jint)(-rv);
       rv = apr_pollset_poll(p->pollset, J2T(timeout), &num, &fd);
  +    if (rv != APR_SUCCESS)
  +        num = 0;
   
  -    if (rv == APR_SUCCESS && num > 0) {
  +    if (num > 0) {
           for (i = 0; i < num; i++) {
               pset[i] = P2J(fd);
               fd ++;
  @@ -151,8 +227,9 @@
           /* TODO: Add thread mutex protection
            * or make sure the Java part is synchronized.
            */
  +        apr_thread_mutex_lock(p->mutex);
           for (n = 0; n < p->nelts; n++) {
  -            if ((now - p->query_add[n]) > p->max_ttl) {
  +            if ((now - p->query_ttl[n]) > p->max_ttl) {
                   p->query_set[n].rtnevents = APR_POLLHUP | APR_POLLIN;
                   if (i < p->nelts) {
                       pset[i++] = P2J(&(p->query_set[n]));
  @@ -160,6 +237,7 @@
                   }
               }
           }
  +        apr_thread_mutex_unlock(p->mutex);
       }
       if (num)
           (*e)->ReleaseLongArrayElements(e, set, pset, 0);
  @@ -173,6 +251,7 @@
   {
       apr_pollfd_t *fd = J2P(pollfd,  apr_pollfd_t *);
       UNREFERENCED_STDARGS;
  +    TCN_ASSERT(pollfd != 0);
       return P2J(fd->desc.s);
   }
   
  @@ -180,6 +259,7 @@
   {
       apr_pollfd_t *fd = J2P(pollfd,  apr_pollfd_t *);
       UNREFERENCED_STDARGS;
  +    TCN_ASSERT(pollfd != 0);
       return P2J(fd->client_data);
   }
   
  @@ -187,6 +267,7 @@
   {
       apr_pollfd_t *fd = J2P(pollfd,  apr_pollfd_t *);
       UNREFERENCED_STDARGS;
  +    TCN_ASSERT(pollfd != 0);
       return (jint)fd->rtnevents;
   }
   
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to