Author: mturk
Date: Fri Jun 24 11:16:07 2011
New Revision: 1139244

URL: http://svn.apache.org/viewvc?rev=1139244&view=rev
Log:
Implement linux epoll native part

Modified:
    commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c

Modified: commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c
URL: 
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c?rev=1139244&r1=1139243&r2=1139244&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c Fri Jun 24 
11:16:07 2011
@@ -21,6 +21,7 @@
 #include "acr/time.h"
 #include "acr/iodefs.h"
 #include "acr/netapi.h"
+#include "acr/ring.h"
 #include "arch_opts.h"
 #include <poll.h>
 #include <sys/epoll.h>
@@ -33,14 +34,31 @@
 
 typedef struct acr_pollfd_t {
     int            fd;
+    short          ievents;
+    short          revents;
     jobject        obj;
     acr_time_t     ttl;
     acr_time_t     exp;
 } acr_pollfd_t;
 
+typedef struct pfd_elem_t pfd_elem_t;
+struct pfd_elem_t {
+    ACR_RING_ENTRY(pfd_elem_t) link;
+    acr_pollfd_t pfd;
+};
+
 typedef struct acr_pollset_t {
+    /* A ring containing all of the pollfd_t that are active
+     */
+    ACR_RING_HEAD(pfd_eset_ring_t, pfd_elem_t) eset_ring;
+    /* A ring of pollfd_t that have been used, and then deleted
+     */
+    ACR_RING_HEAD(pfd_free_ring_t, pfd_elem_t) free_ring;
+    /* A ring of pollfd_t where rings that have been deleted but
+     * might still be inside a _epoll()
+     */
+    ACR_RING_HEAD(pfd_dead_ring_t, pfd_elem_t) dead_ring;
     struct epoll_event *epset;
-    struct epoll_event *eeset;
     acr_pollfd_t       *ooset;
     int                 epfd;
     int                 used;
@@ -91,6 +109,7 @@ ACR_NET_EXPORT(jlong, LinuxSelector, cre
 {
     int rc;
     acr_pollset_t *ps;
+    pfd_elem_t    *pe = 0;
     struct epoll_event epipe;
 
     ps = ACR_TALLOC(acr_pollset_t);
@@ -106,11 +125,10 @@ ACR_NET_EXPORT(jlong, LinuxSelector, cre
     ps->epset    = ACR_MALLOC(struct epoll_event, ps->size);
     if (ps->epset == 0)
         return 0;
-    ps->ooset    = ACR_MALLOC(acr_pollfd_t,  ps->size);
-    if (ps->epset == 0) {
-        AcrFree(ps->epset);
-        return 0;
-    }
+    ACR_RING_INIT(&ps->eset_ring, pfd_elem_t, link);
+    ACR_RING_INIT(&ps->free_ring, pfd_elem_t, link);
+    ACR_RING_INIT(&ps->dead_ring, pfd_elem_t, link);
+
     if ((rc = AcrPipePair(ps->wpipe, 0)) != 0) {
         ACR_THROW_NET_ERROR(rc);
         goto cleanup;
@@ -132,15 +150,24 @@ ACR_NET_EXPORT(jlong, LinuxSelector, cre
         goto cleanup;
     }
 #endif
+    pe = ACR_TALLOC(pfd_elem_t);
+    if (pe == 0) {
+        goto cleanup;
+    }
+    ACR_RING_ELEM_INIT(pe, link);
+
     /* Add the wakeup pipe to the pset
      */
-    ps->ooset[0].fd  = ps->wpipe[0];
-    ps->ooset[0].obj = 0;
-    ps->ooset[0].ttl = ACR_INFINITE;
-    ps->ooset[0].exp = ACR_INFINITE;
-
-    epipe.data.ptr   = &ps->ooset[0];
-    epipe.events     = EPOLLIN;
+    pe->pfd.fd      = ps->wpipe[0];
+    pe->pfd.obj     = 0;
+    pe->pfd.ttl     = ACR_INFINITE;
+    pe->pfd.exp     = ACR_INFINITE;
+    pe->pfd.ievents = EPOLLIN;
+    pe->pfd.revents = 0;
+
+    epipe.data.ptr = pe;
+    epipe.events   = EPOLLIN;
+    ACR_RING_INSERT_TAIL(&ps->eset_ring, pe, pfd_elem_t, link);
     if (epoll_ctl(ps->epfd, EPOLL_CTL_ADD, ps->ooset[0].fd, &epipe) == -1) {
         /* Failed adding pipe to the pollset
          */
@@ -159,11 +186,391 @@ ACR_NET_EXPORT(jlong, LinuxSelector, cre
     return P2J(ps);
 
 cleanup:
-    r_close(ps->epfd);
     r_close(ps->wpipe[0]);
     r_close(ps->wpipe[1]);
+    r_close(ps->epfd);
     AcrFree(ps->epset);
-    AcrFree(ps->ooset);
     AcrFree(ps);
+    AcrFree(pe);
     return 0;
 }
+
+ACR_NET_EXPORT(jint, LinuxSelector, clr0)(JNI_STDARGS, jlong pollset,
+                                          jobjectArray rs)
+{
+    int cnt = 0;
+    pfd_elem_t    *pe;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    while (ps->state != 0) {
+        if (ps->state == PSS_DESTROY) {
+            /* Interrupted by destroy0 */
+            pthread_mutex_unlock(&ps->mutex);
+            return 0;
+        }
+        if (ps->state == PSS_POLL) {
+            char ch   = 1;
+            ps->state = PSS_WAKEUP;
+            r_write(ps->wpipe[1], &ch, 1);
+        }
+        /* Wait until the wait0 call breaks.
+         * Since we set the state to DESTROY
+         * wait0 will return 0.
+         */
+        if (pthread_cond_wait(&ps->wakeup, &ps->mutex) != 0) {
+            pthread_mutex_unlock(&ps->mutex);
+            ACR_THROW(ACR_EX_EILLEGAL, 0);
+            return 0;
+        }
+    }
+    ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
+        if (ps->wpipe[0] != pe->pfd.fd) {
+            /* Duplicate descriptor
+             */
+            (*env)->SetObjectArrayElement(env, rs, cnt++, pe->pfd.obj);
+            /* Unref the container. */
+            (*env)->DeleteGlobalRef(env, pe->pfd.obj);
+            ACR_RING_REMOVE(pe, link);
+            ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+        }
+    }
+    
+    ps->used = 1;
+    pthread_mutex_unlock(&ps->mutex);
+    return cnt;
+}
+
+ACR_NET_EXPORT(void, LinuxSelector, wakeup0)(JNI_STDARGS, jlong pollset)
+{
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    if (ps->state == PSS_POLL) {
+        char ch   = 1;
+        ps->state = PSS_WAKEUP;
+        r_write(ps->wpipe[1], &ch, 1);
+    }
+    pthread_mutex_unlock(&ps->mutex);
+}
+
+ACR_NET_EXPORT(jint, LinuxSelector, size0)(JNI_STDARGS, jlong pollset)
+{
+    int rv;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    rv = ps->used - 1;
+    pthread_mutex_unlock(&ps->mutex);
+    return rv;
+}
+
+ACR_NET_EXPORT(jint, LinuxSelector, add0)(JNI_STDARGS, jlong pollset, jobject 
fo,
+                                          jlong fp, jint events, jint ttlms)
+{
+    int rc = 0;
+    struct epoll_event eevent;
+    pfd_elem_t    *pe;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+    acr_fd_t *fd      = J2P(fp, acr_fd_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    if (ps->state == PSS_DESTROY) {
+        /* Already destroyed */
+        goto cleanup;
+    }
+    if (ps->used == ps->size) {
+        /* Overflow
+         */
+        rc = ACR_EOVERFLOW;
+        goto cleanup;
+    }
+    ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
+        if (fd->u.s == pe->pfd.fd) {
+            /* Duplicate descriptor
+             */
+            rc = ACR_EALREADY;
+            goto cleanup;
+        }
+    }
+    
+    if (!ACR_RING_EMPTY(&ps->free_ring, pfd_elem_t, link)) {
+        pe = ACR_RING_FIRST(&ps->free_ring);
+        ACR_RING_REMOVE(pe, link);
+    }
+    else {
+        pe = ACR_TALLOC(pfd_elem_t);
+        if (pe == 0) {
+            rc = ACR_ENOMEM;
+            goto cleanup;
+        }
+        ACR_RING_ELEM_INIT(pe, link);
+    }
+    
+    pe->pfd.fd      = fd->u.s;
+    pe->pfd.ievents = ieventt(events);
+    pe->pfd.revents = 0;
+    pe->pfd.obj     = (*env)->NewGlobalRef(env, fo);
+    if (pe->pfd.obj == 0) {
+        /* In case the NewGlobalRef fails,
+         * OutOfMemoryError should be thrown already by the JVM.
+         */
+        ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+        rc = ACR_ENOMEM;
+        goto cleanup;
+    }
+    if (ttlms > 0) {
+        pe->pfd.ttl = AcrTimeFromMsec(ttlms);
+        pe->pfd.exp = AcrTimeNow() + pe->pfd.ttl;
+    }
+    else {
+        pe->pfd.ttl = ACR_INFINITE;
+        pe->pfd.exp = ACR_INFINITE;
+    }
+    eevent.data.ptr = pe;
+    eevent.events   = pe->pfd.ievents;
+    if (epoll_ctl(ps->epfd, EPOLL_CTL_ADD, pe->pfd.fd, &eevent) == 0) {
+        ps->used++;
+        ACR_RING_INSERT_TAIL(&ps->eset_ring, pe, pfd_elem_t, link);
+    }
+    else {
+        rc = ACR_GET_OS_ERROR();
+        ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+    }
+cleanup:
+    pthread_mutex_unlock(&ps->mutex);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, LinuxSelector, del0)(JNI_STDARGS, jlong pollset,
+                                          jobject fo, jlong fp)
+{
+    int rc = ACR_EOF;
+    struct epoll_event eevent = { 0 };
+    pfd_elem_t    *pe = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+    acr_fd_t *fd      = J2P(fp, acr_fd_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    if (ps->state == PSS_DESTROY || ps->used < 2) {
+        /* Already destroyed */
+        goto cleanup;
+    }
+    if (epoll_ctl(ps->epfd, EPOLL_CTL_DEL, fd->u.s, &eevent) == 0) {
+        ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
+            if (fd->u.s == pe->pfd.fd) {
+                /* Unref descriptor */
+                (*env)->DeleteGlobalRef(env, pe->pfd.obj);
+                ACR_RING_REMOVE(pe, link);
+                ACR_RING_INSERT_TAIL(&ps->dead_ring, pe, pfd_elem_t, link);
+                ps->used--;
+                rc = 0;
+                break;
+            }
+        }
+    }
+
+cleanup:
+    pthread_mutex_unlock(&ps->mutex);
+    return rc;
+}
+
+ACR_NET_EXPORT(int, LinuxSelector, destroy0)(JNI_STDARGS, jlong pollset)
+{
+    int rc = 0;
+    pfd_elem_t *np, *pe = 0;
+    acr_pollset_t   *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    if (ps->state != 0) {
+        int  state = ps->state;
+        ps->state  = PSS_DESTROY;
+        if (state == PSS_POLL) {
+            char ch   = 1;
+            r_write(ps->wpipe[1], &ch, 1);
+        }
+        /* Wait until the wait0 call breaks.
+         * Since we set the state to DESTROY
+         * wait0 will return 0.
+         */
+        if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0) {
+            pthread_mutex_unlock(&ps->mutex);
+            return rc;
+        }
+    }
+    ps->state = PSS_DESTROY;
+    ps->used = 0;
+    pthread_mutex_unlock(&ps->mutex);
+    r_close(ps->wpipe[0]);
+    r_close(ps->wpipe[1]);
+    r_close(ps->epfd);
+    pthread_cond_destroy(&ps->wakeup);
+    pthread_mutex_destroy(&ps->mutex);
+    ACR_RING_FOREACH_SAFE(pe, np, &ps->eset_ring, pfd_elem_t, link) {
+        ACR_RING_REMOVE(pe, link);
+        AcrFree(pe);
+    }
+    ACR_RING_FOREACH_SAFE(pe, np, &ps->free_ring, pfd_elem_t, link) {
+        if (pe->pfd.obj != 0) {
+            AcrSelectionKeyReset(env, pe->pfd.obj);
+            /* Unref descriptor */
+            (*env)->DeleteGlobalRef(env, pe->pfd.obj);
+        }
+        ACR_RING_REMOVE(pe, link);
+        AcrFree(pe);
+    }
+    ACR_RING_FOREACH_SAFE(pe, np, &ps->dead_ring, pfd_elem_t, link) {
+        ACR_RING_REMOVE(pe, link);
+        AcrFree(pe);
+    }
+    AcrFree(ps->epset);
+    AcrFree(ps);
+    return rc;
+}
+
+ACR_NET_EXPORT(jint, LinuxSelector, wait0)(JNI_STDARGS, jlong pollset,
+                                           jobjectArray rs, jshortArray 
revents,
+                                           jint timeout, jboolean autocancel)
+{
+    int i, ns, rc = 0;
+    int rv = 0;
+    jshort *pevents;
+    struct epoll_event eevent = { 0 };
+    pfd_elem_t *pe = 0;
+    acr_time_t now = 0;
+    acr_time_t tmx = 0;
+    acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+    pthread_mutex_lock(&ps->mutex);
+    if (ps->state != 0) {
+        /* Note that this should never happen if api is correctly used.
+         * wait cannot be run from multiple threads and cannot be run
+         * after destroy.
+         */
+        pthread_mutex_unlock(&ps->mutex);
+        ACR_THROW(ACR_EX_EILLEGAL, 0);
+        return 0;
+    }
+    if (ps->used == 1) {
+        /* We only have the wakeup pipe in the pollset
+         * so there is no point to wait.
+         */
+        pthread_mutex_unlock(&ps->mutex);
+        return 0;
+    }
+
+    ps->state = PSS_POLL;
+    pthread_mutex_unlock(&ps->mutex);
+    if (timeout > 0)
+        tmx = AcrTimeMilliseconds() + timeout;
+    for (;;) {
+        ns = epoll_wait(ps->epfd, ps->epset, ps->size, timeout);
+        if (ns == -1 && errno == EINTR) {
+            if (timeout > 0) {
+                timeout = tmx - AcrTimeMilliseconds();
+                if (timeout <= 0) {
+                    ns = 0;
+                    break;
+                }
+            }
+        }
+        else
+            break;
+    }
+
+    if (ns == -1)
+        rc = ACR_GET_OS_ERROR();
+    pthread_mutex_lock(&ps->mutex);
+    if (ps->state == PSS_DESTROY) {
+        /* Interrupted by destroy0 */
+        pthread_cond_broadcast(&ps->wakeup);
+        pthread_mutex_unlock(&ps->mutex);
+        return 0;
+    }
+    if (rc != 0) {
+        /* Error during poll */
+        ps->state = 0;
+        pthread_cond_broadcast(&ps->wakeup);
+        pthread_mutex_unlock(&ps->mutex);
+        ACR_THROW_NET_ERROR(rc);
+        return 0;
+    }
+    if (ns == 0) {
+        /* Timeout occured */
+        ps->state = 0;
+        pthread_cond_broadcast(&ps->wakeup);
+        pthread_mutex_unlock(&ps->mutex);
+        return 0;
+    }
+    if (ps->state == PSS_WAKEUP) {
+        /* Interrupted by wakeup0 */
+        //if (ps->fdset[0].revents != 0)
+        {         
+            /* Drain the wakeup pipe.
+             * Wakeup pipe is always at index zero.
+             */
+            AcrDrainPipe(ps->wpipe[0]);
+        }
+        ps->state = 0;
+        pthread_cond_broadcast(&ps->wakeup);
+        pthread_mutex_unlock(&ps->mutex);
+        return 0;
+    }
+    ps->state = PSS_WAIT;
+    pevents   = JARRAY_CRITICAL(jshort, revents);
+    /* Cycle trough the descriptors */
+    for (i = 0; i < ns; i++) {
+        pe = (pfd_elem_t *)ps->epset[i].data.ptr;
+        if (ps->epset[i].events != 0) {
+            if (pe->pfd.fd == ps->wpipe[0]) {
+                /* Drain the wakeup pipe.
+                 * Wakeup pipe is always at index zero.
+                 */
+                AcrDrainPipe(ps->wpipe[0]);
+                continue;
+            }
+            else {
+                pevents[rv] = reventt(ps->epset[i].events);
+                (*env)->SetObjectArrayElement(env, rs, rv++, pe->pfd.obj);
+                if (autocancel == JNI_TRUE) {
+                    epoll_ctl(ps->epfd, EPOLL_CTL_DEL, pe->pfd.fd, &eevent);
+                    ps->used--;
+                    /* Unref descriptor */
+                    (*env)->DeleteGlobalRef(env, pe->pfd.obj);
+                    ACR_RING_REMOVE(pe, link);
+                    ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+                }
+                else if (pe->pfd.ttl > 0) {
+                    /* Reset TTL
+                     */
+                    if (now == 0)
+                        now = AcrTimeNow();
+                    pe->pfd.exp = now + pe->pfd.ttl;
+                }
+            }
+        }
+    }
+    /* Remove expired descriptors */
+    ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
+        if (pe->pfd.ttl > 0) {
+            if (now == 0)
+                now = AcrTimeNow();
+            if (pe->pfd.exp > now) {
+                /* Expired descriptor */
+                epoll_ctl(ps->epfd, EPOLL_CTL_DEL, pe->pfd.fd, &eevent);
+                pevents[rv] = ACR_OP_TIMEOUT;
+                (*env)->SetObjectArrayElement(env, rs, rv++, pe->pfd.obj);     
       
+                /* Unref descriptor */
+                (*env)->DeleteGlobalRef(env, pe->pfd.obj);
+                ACR_RING_REMOVE(pe, link);
+                ACR_RING_INSERT_TAIL(&ps->dead_ring, pe, pfd_elem_t, link);
+                ps->used--;
+            }
+        }
+    }    
+    RELEASE_CRITICAL(revents, pevents);
+    ps->state = 0;
+    pthread_cond_broadcast(&ps->wakeup);
+    pthread_mutex_unlock(&ps->mutex);
+    return rv;
+}


Reply via email to