mturk       2005/04/13 23:47:56

  Modified:    jni/java/org/apache/tomcat/jni Poll.java
               jni/native/src network.c poll.c
  Log:
  Add time to live to socket Poller, so we can maintain resources.
  The returned descriptor will be returned with
   APR_POOLIN + APR_POLLHUP in case the recycle is needed.
  
  Revision  Changes    Path
  1.5       +3 -3      
jakarta-tomcat-connectors/jni/java/org/apache/tomcat/jni/Poll.java
  
  Index: Poll.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-connectors/jni/java/org/apache/tomcat/jni/Poll.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- Poll.java 13 Apr 2005 13:16:55 -0000      1.4
  +++ Poll.java 14 Apr 2005 06:47:56 -0000      1.5
  @@ -60,9 +60,10 @@
        * @param size The maximum number of descriptors that this pollset can 
hold
        * @param p The pool from which to allocate the pollset
        * @param flags Optional flags to modify the operation of the pollset.
  +     * @param ttl Maximum time to live for a particular socket.
        * @return  The pointer in which to return the newly created object
        */
  -    public static native long create(int size, long p, int flags)
  +    public static native long create(int size, long p, int flags, long ttl)
           throws Error;
       /**
        * Destroy a pollset object
  @@ -98,8 +99,7 @@
        * @return Number of signalled descriptors (output parameter)
        */
       public static native int poll(long pollset, long timeout,
  -                                  long [] descriptors)
  -        throws Error;
  +                                  long [] descriptors);
   
       /**
        * Return socket from poll descriptor
  
  
  
  1.4       +6 -6      jakarta-tomcat-connectors/jni/native/src/network.c
  
  Index: network.c
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-connectors/jni/native/src/network.c,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- network.c 13 Apr 2005 13:17:42 -0000      1.3
  +++ network.c 14 Apr 2005 06:47:56 -0000      1.4
  @@ -183,7 +183,7 @@
       UNREFERENCED(o);
       apr_socket_opt_get(s, APR_SO_NONBLOCK, &nb);
       if (tosend > 0)
  -        nbytes = min(nbytes, (apr_size_t)tosend);
  +        nbytes = min(nbytes - offset, (apr_size_t)tosend);
       if (nb)
            bytes = (*e)->GetPrimitiveArrayCritical(e, buf, NULL);
       else
  @@ -213,7 +213,7 @@
           goto cleanup;
       }
       if (len > 0)
  -        nbytes = min(nbytes, (apr_size_t)len);
  +        nbytes = min(nbytes - offset, (apr_size_t)len);
       TCN_THROW_IF_ERR(apr_socket_send(s, bytes + offset, &nbytes), nbytes);
   
   cleanup:
  @@ -264,7 +264,7 @@
       UNREFERENCED(o);
       apr_socket_opt_get(s, APR_SO_NONBLOCK, &nb);
       if (tosend > 0)
  -        nbytes = min(nbytes, (apr_size_t)tosend);
  +        nbytes = min(nbytes - offset, (apr_size_t)tosend);
       if (nb)
            bytes = (*e)->GetPrimitiveArrayCritical(e, buf, NULL);
       else
  @@ -288,7 +288,7 @@
   
       UNREFERENCED(o);
       if (toread > 0)
  -        nbytes = min(nbytes, (apr_size_t)toread);
  +        nbytes = min(nbytes - offset, (apr_size_t)toread);
   
       TCN_THROW_IF_ERR(apr_socket_recv(s, bytes + offset, &nbytes), nbytes);
   
  @@ -314,7 +314,7 @@
           goto cleanup;
       }
       if (len > 0)
  -        nbytes = min(nbytes, (apr_size_t)len);
  +        nbytes = min(nbytes - offset, (apr_size_t)len);
   
       TCN_THROW_IF_ERR(apr_socket_recv(s, bytes + offset, &nbytes), nbytes);
   
  @@ -333,7 +333,7 @@
   
       UNREFERENCED(o);
       if (toread > 0)
  -        nbytes = min(nbytes, (apr_size_t)toread);
  +        nbytes = min(nbytes - offset, (apr_size_t)toread);
   
       TCN_THROW_IF_ERR(apr_socket_recvfrom(f, s,
               (apr_int32_t)flags, bytes + offset, &nbytes), nbytes);
  
  
  
  1.4       +83 -17    jakarta-tomcat-connectors/jni/native/src/poll.c
  
  Index: poll.c
  ===================================================================
  RCS file: /home/cvs/jakarta-tomcat-connectors/jni/native/src/poll.c,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- poll.c    13 Apr 2005 13:17:42 -0000      1.3
  +++ poll.c    14 Apr 2005 06:47:56 -0000      1.4
  @@ -18,55 +18,86 @@
   #include "apr_poll.h"
   #include "tcn.h"
   
  +/* Internal poll structure for queryset
  + */
  +
  +typedef struct tcn_pollset {
  +    apr_pool_t    *pool;
  +    apr_int32_t   nelts;
  +    apr_int32_t   nalloc;
  +    apr_pollset_t *pollset;
  +    apr_pollfd_t  *query_set;
  +    apr_time_t    *query_add;
  +    apr_interval_time_t max_ttl;
  +} tcn_pollset_t;
   
   TCN_IMPLEMENT_CALL(jlong, Poll, create)(TCN_STDARGS, jint size,
  -                                        jlong pool, jint flags)
  +                                        jlong pool, jint flags,
  +                                        jlong ttl)
   {
       apr_pool_t *p = J2P(pool, apr_pool_t *);
       apr_pollset_t *pollset = NULL;
  -
  +    tcn_pollset_t *tps = NULL;
       UNREFERENCED(o);
   
       TCN_THROW_IF_ERR(apr_pollset_create(&pollset,
                        (apr_uint32_t)size, p, (apr_uint32_t)flags),
                        pollset);
   
  +    tps = apr_palloc(p, sizeof(tcn_pollset_t));
  +    tps->pollset = pollset;
  +    tps->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
  +    tps->query_add = apr_palloc(p, size * sizeof(apr_time_t));
  +    tps->nelts  = 0;
  +    tps->nalloc = size;
  +    tps->pool   = p;
  +    tps->max_ttl = J2T(ttl);
  +
   cleanup:
  -    return P2J(pollset);
  +    return P2J(tps);
   
   }
   
   TCN_IMPLEMENT_CALL(jint, Poll, destroy)(TCN_STDARGS, jlong pollset)
   {
  -    apr_pollset_t *p = J2P(pollset,  apr_pollset_t *);
  +    tcn_pollset_t *p = J2P(pollset,  tcn_pollset_t *);;
   
       UNREFERENCED_STDARGS;;
  -    return (jint)apr_pollset_destroy(p);
  +    return (jint)apr_pollset_destroy(p->pollset);
   }
   
   TCN_IMPLEMENT_CALL(jint, Poll, add)(TCN_STDARGS, jlong pollset,
                                       jlong socket, jlong data,
                                       jint reqevents)
   {
  -    apr_pollset_t *p = J2P(pollset,  apr_pollset_t *);
  +    tcn_pollset_t *p = J2P(pollset,  tcn_pollset_t *);
       apr_pollfd_t fd;
  +    apr_status_t rv;
   
       UNREFERENCED_STDARGS;
  +    if (p->nelts == p->nalloc) {
  +        return APR_ENOMEM;
  +    }
   
       memset(&fd, 0, sizeof(apr_pollfd_t));
       fd.desc_type = APR_POLL_SOCKET;
       fd.reqevents = (apr_int16_t)reqevents;
       fd.desc.s = J2P(socket, apr_socket_t *);
       fd.client_data = J2P(data, void *);
  -
  -    return (jint)apr_pollset_add(p, &fd);
  +    if ((rv = apr_pollset_add(p->pollset, &fd)) == APR_SUCCESS) {
  +        p->query_set[p->nelts] = fd;
  +        p->query_add[p->nelts] = apr_time_now();
  +        p->nelts++;
  +    }
  +    return (jint)rv;
   }
   
   TCN_IMPLEMENT_CALL(jint, Poll, remove)(TCN_STDARGS, jlong pollset,
                                          jlong socket)
   {
  -    apr_pollset_t *p = J2P(pollset,  apr_pollset_t *);
  +    tcn_pollset_t *p = J2P(pollset,  tcn_pollset_t *);
       apr_pollfd_t fd;
  +    apr_int32_t i;
   
       UNREFERENCED_STDARGS;;
   
  @@ -74,29 +105,64 @@
       fd.desc_type = APR_POLL_SOCKET;
       fd.desc.s = J2P(socket, apr_socket_t *);
   
  -    return (jint)apr_pollset_remove(p, &fd);
  +    for (i = 0; i < p->nelts; i++) {
  +        if (fd.desc.s == p->query_set[i].desc.s) {
  +            /* Found an instance of the fd: remove this and any other copies 
*/
  +            apr_int32_t dst = i;
  +            apr_int32_t old_nelts = p->nelts;
  +            p->nelts--;
  +            for (i++; i < old_nelts; i++) {
  +                if (fd.desc.s == p->query_set[i].desc.s) {
  +                    p->nelts--;
  +                }
  +                else {
  +                    p->query_set[dst] = p->query_set[i];
  +                    dst++;
  +                }
  +            }
  +            break;
  +        }
  +    }
  +
  +    return (jint)apr_pollset_remove(p->pollset, &fd);
   }
   
   TCN_IMPLEMENT_CALL(jint, Poll, poll)(TCN_STDARGS, jlong pollset,
                                        jlong timeout, jlongArray set)
   {
       const apr_pollfd_t *fd = NULL;
  -    apr_pollset_t *p = J2P(pollset,  apr_pollset_t *);
  +    tcn_pollset_t *p = J2P(pollset,  tcn_pollset_t *);
       jlong *pset = (*e)->GetLongArrayElements(e, set, NULL);
  -    apr_int32_t i, num = 0;
  +    apr_int32_t  n, i = 0, num = 0;
  +    apr_status_t rv;
   
       UNREFERENCED(o);
  -    TCN_THROW_IF_ERR(apr_pollset_poll(p, J2T(timeout),
  -                        &num, &fd), num);
  +    rv = apr_pollset_poll(p->pollset, J2T(timeout), &num, &fd);
   
  -cleanup:
  -    if (num) {
  +    if (rv == APR_SUCCESS && num > 0) {
           for (i = 0; i < num; i++) {
               pset[i] = P2J(fd);
               fd ++;
           }
  -        (*e)->ReleaseLongArrayElements(e, set, pset, 0);
       }
  +    /* In any case check for timeout sockets */
  +    if (p->max_ttl > 0) {
  +        apr_time_t now = apr_time_now();
  +        /* TODO: Add thread mutex protection
  +         * or make sure the Java part is synchronized.
  +         */
  +        for (n = 0; n < p->nelts; n++) {
  +            if ((now - p->query_add[n]) > p->max_ttl) {
  +                p->query_set[n].rtnevents = APR_POLLHUP | APR_POLLIN;
  +                if (i < p->nelts) {
  +                    pset[i++] = P2J(&(p->query_set[n]));
  +                    num++;
  +                }
  +            }
  +        }
  +    }
  +    if (num)
  +        (*e)->ReleaseLongArrayElements(e, set, pset, 0);
       else
           (*e)->ReleaseLongArrayElements(e, set, pset, JNI_ABORT);
   
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to