Author: mturk
Date: Fri Jun 24 11:16:07 2011
New Revision: 1139244
URL: http://svn.apache.org/viewvc?rev=1139244&view=rev
Log:
Implement linux epoll native part
Modified:
commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c
Modified: commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c?rev=1139244&r1=1139243&r2=1139244&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c Fri Jun 24
11:16:07 2011
@@ -21,6 +21,7 @@
#include "acr/time.h"
#include "acr/iodefs.h"
#include "acr/netapi.h"
+#include "acr/ring.h"
#include "arch_opts.h"
#include <poll.h>
#include <sys/epoll.h>
@@ -33,14 +34,31 @@
typedef struct acr_pollfd_t {
int fd;
+ short ievents;
+ short revents;
jobject obj;
acr_time_t ttl;
acr_time_t exp;
} acr_pollfd_t;
+typedef struct pfd_elem_t pfd_elem_t;
+struct pfd_elem_t {
+ ACR_RING_ENTRY(pfd_elem_t) link;
+ acr_pollfd_t pfd;
+};
+
typedef struct acr_pollset_t {
+ /* A ring containing all of the pollfd_t that are active
+ */
+ ACR_RING_HEAD(pfd_eset_ring_t, pfd_elem_t) eset_ring;
+ /* A ring of pollfd_t that have been used, and then deleted
+ */
+ ACR_RING_HEAD(pfd_free_ring_t, pfd_elem_t) free_ring;
+ /* A ring of pollfd_t where rings that have been deleted but
+ * might still be inside a _epoll()
+ */
+ ACR_RING_HEAD(pfd_dead_ring_t, pfd_elem_t) dead_ring;
struct epoll_event *epset;
- struct epoll_event *eeset;
acr_pollfd_t *ooset;
int epfd;
int used;
@@ -91,6 +109,7 @@ ACR_NET_EXPORT(jlong, LinuxSelector, cre
{
int rc;
acr_pollset_t *ps;
+ pfd_elem_t *pe = 0;
struct epoll_event epipe;
ps = ACR_TALLOC(acr_pollset_t);
@@ -106,11 +125,10 @@ ACR_NET_EXPORT(jlong, LinuxSelector, cre
ps->epset = ACR_MALLOC(struct epoll_event, ps->size);
if (ps->epset == 0)
return 0;
- ps->ooset = ACR_MALLOC(acr_pollfd_t, ps->size);
- if (ps->epset == 0) {
- AcrFree(ps->epset);
- return 0;
- }
+ ACR_RING_INIT(&ps->eset_ring, pfd_elem_t, link);
+ ACR_RING_INIT(&ps->free_ring, pfd_elem_t, link);
+ ACR_RING_INIT(&ps->dead_ring, pfd_elem_t, link);
+
if ((rc = AcrPipePair(ps->wpipe, 0)) != 0) {
ACR_THROW_NET_ERROR(rc);
goto cleanup;
@@ -132,15 +150,24 @@ ACR_NET_EXPORT(jlong, LinuxSelector, cre
goto cleanup;
}
#endif
+ pe = ACR_TALLOC(pfd_elem_t);
+ if (pe == 0) {
+ goto cleanup;
+ }
+ ACR_RING_ELEM_INIT(pe, link);
+
/* Add the wakeup pipe to the pset
*/
- ps->ooset[0].fd = ps->wpipe[0];
- ps->ooset[0].obj = 0;
- ps->ooset[0].ttl = ACR_INFINITE;
- ps->ooset[0].exp = ACR_INFINITE;
-
- epipe.data.ptr = &ps->ooset[0];
- epipe.events = EPOLLIN;
+ pe->pfd.fd = ps->wpipe[0];
+ pe->pfd.obj = 0;
+ pe->pfd.ttl = ACR_INFINITE;
+ pe->pfd.exp = ACR_INFINITE;
+ pe->pfd.ievents = EPOLLIN;
+ pe->pfd.revents = 0;
+
+ epipe.data.ptr = pe;
+ epipe.events = EPOLLIN;
+ ACR_RING_INSERT_TAIL(&ps->eset_ring, pe, pfd_elem_t, link);
if (epoll_ctl(ps->epfd, EPOLL_CTL_ADD, ps->ooset[0].fd, &epipe) == -1) {
/* Failed adding pipe to the pollset
*/
@@ -159,11 +186,391 @@ ACR_NET_EXPORT(jlong, LinuxSelector, cre
return P2J(ps);
cleanup:
- r_close(ps->epfd);
r_close(ps->wpipe[0]);
r_close(ps->wpipe[1]);
+ r_close(ps->epfd);
AcrFree(ps->epset);
- AcrFree(ps->ooset);
AcrFree(ps);
+ AcrFree(pe);
return 0;
}
+
+ACR_NET_EXPORT(jint, LinuxSelector, clr0)(JNI_STDARGS, jlong pollset,
+ jobjectArray rs)
+{
+ int cnt = 0;
+ pfd_elem_t *pe;
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+ pthread_mutex_lock(&ps->mutex);
+ while (ps->state != 0) {
+ if (ps->state == PSS_DESTROY) {
+ /* Interrupted by destroy0 */
+ pthread_mutex_unlock(&ps->mutex);
+ return 0;
+ }
+ if (ps->state == PSS_POLL) {
+ char ch = 1;
+ ps->state = PSS_WAKEUP;
+ r_write(ps->wpipe[1], &ch, 1);
+ }
+ /* Wait until the wait0 call breaks.
+ * Since we set the state to DESTROY
+ * wait0 will return 0.
+ */
+ if (pthread_cond_wait(&ps->wakeup, &ps->mutex) != 0) {
+ pthread_mutex_unlock(&ps->mutex);
+ ACR_THROW(ACR_EX_EILLEGAL, 0);
+ return 0;
+ }
+ }
+ ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
+ if (ps->wpipe[0] != pe->pfd.fd) {
+ /* Duplicate descriptor
+ */
+ (*env)->SetObjectArrayElement(env, rs, cnt++, pe->pfd.obj);
+ /* Unref the container. */
+ (*env)->DeleteGlobalRef(env, pe->pfd.obj);
+ ACR_RING_REMOVE(pe, link);
+ ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+ }
+ }
+
+ ps->used = 1;
+ pthread_mutex_unlock(&ps->mutex);
+ return cnt;
+}
+
+ACR_NET_EXPORT(void, LinuxSelector, wakeup0)(JNI_STDARGS, jlong pollset)
+{
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+ pthread_mutex_lock(&ps->mutex);
+ if (ps->state == PSS_POLL) {
+ char ch = 1;
+ ps->state = PSS_WAKEUP;
+ r_write(ps->wpipe[1], &ch, 1);
+ }
+ pthread_mutex_unlock(&ps->mutex);
+}
+
+ACR_NET_EXPORT(jint, LinuxSelector, size0)(JNI_STDARGS, jlong pollset)
+{
+ int rv;
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+ pthread_mutex_lock(&ps->mutex);
+ rv = ps->used - 1;
+ pthread_mutex_unlock(&ps->mutex);
+ return rv;
+}
+
+ACR_NET_EXPORT(jint, LinuxSelector, add0)(JNI_STDARGS, jlong pollset, jobject
fo,
+ jlong fp, jint events, jint ttlms)
+{
+ int rc = 0;
+ struct epoll_event eevent;
+ pfd_elem_t *pe;
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+ acr_fd_t *fd = J2P(fp, acr_fd_t *);
+
+ pthread_mutex_lock(&ps->mutex);
+ if (ps->state == PSS_DESTROY) {
+ /* Already destroyed */
+ goto cleanup;
+ }
+ if (ps->used == ps->size) {
+ /* Overflow
+ */
+ rc = ACR_EOVERFLOW;
+ goto cleanup;
+ }
+ ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
+ if (fd->u.s == pe->pfd.fd) {
+ /* Duplicate descriptor
+ */
+ rc = ACR_EALREADY;
+ goto cleanup;
+ }
+ }
+
+ if (!ACR_RING_EMPTY(&ps->free_ring, pfd_elem_t, link)) {
+ pe = ACR_RING_FIRST(&ps->free_ring);
+ ACR_RING_REMOVE(pe, link);
+ }
+ else {
+ pe = ACR_TALLOC(pfd_elem_t);
+ if (pe == 0) {
+ rc = ACR_ENOMEM;
+ goto cleanup;
+ }
+ ACR_RING_ELEM_INIT(pe, link);
+ }
+
+ pe->pfd.fd = fd->u.s;
+ pe->pfd.ievents = ieventt(events);
+ pe->pfd.revents = 0;
+ pe->pfd.obj = (*env)->NewGlobalRef(env, fo);
+ if (pe->pfd.obj == 0) {
+ /* In case the NewGlobalRef fails,
+ * OutOfMemoryError should be thrown already by the JVM.
+ */
+ ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+ rc = ACR_ENOMEM;
+ goto cleanup;
+ }
+ if (ttlms > 0) {
+ pe->pfd.ttl = AcrTimeFromMsec(ttlms);
+ pe->pfd.exp = AcrTimeNow() + pe->pfd.ttl;
+ }
+ else {
+ pe->pfd.ttl = ACR_INFINITE;
+ pe->pfd.exp = ACR_INFINITE;
+ }
+ eevent.data.ptr = pe;
+ eevent.events = pe->pfd.ievents;
+ if (epoll_ctl(ps->epfd, EPOLL_CTL_ADD, pe->pfd.fd, &eevent) == 0) {
+ ps->used++;
+ ACR_RING_INSERT_TAIL(&ps->eset_ring, pe, pfd_elem_t, link);
+ }
+ else {
+ rc = ACR_GET_OS_ERROR();
+ ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+ }
+cleanup:
+ pthread_mutex_unlock(&ps->mutex);
+ return rc;
+}
+
+ACR_NET_EXPORT(jint, LinuxSelector, del0)(JNI_STDARGS, jlong pollset,
+ jobject fo, jlong fp)
+{
+ int rc = ACR_EOF;
+ struct epoll_event eevent = { 0 };
+ pfd_elem_t *pe = 0;
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+ acr_fd_t *fd = J2P(fp, acr_fd_t *);
+
+ pthread_mutex_lock(&ps->mutex);
+ if (ps->state == PSS_DESTROY || ps->used < 2) {
+ /* Already destroyed */
+ goto cleanup;
+ }
+ if (epoll_ctl(ps->epfd, EPOLL_CTL_DEL, fd->u.s, &eevent) == 0) {
+ ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
+ if (fd->u.s == pe->pfd.fd) {
+ /* Unref descriptor */
+ (*env)->DeleteGlobalRef(env, pe->pfd.obj);
+ ACR_RING_REMOVE(pe, link);
+ ACR_RING_INSERT_TAIL(&ps->dead_ring, pe, pfd_elem_t, link);
+ ps->used--;
+ rc = 0;
+ break;
+ }
+ }
+ }
+
+cleanup:
+ pthread_mutex_unlock(&ps->mutex);
+ return rc;
+}
+
+ACR_NET_EXPORT(int, LinuxSelector, destroy0)(JNI_STDARGS, jlong pollset)
+{
+ int rc = 0;
+ pfd_elem_t *np, *pe = 0;
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+ pthread_mutex_lock(&ps->mutex);
+ if (ps->state != 0) {
+ int state = ps->state;
+ ps->state = PSS_DESTROY;
+ if (state == PSS_POLL) {
+ char ch = 1;
+ r_write(ps->wpipe[1], &ch, 1);
+ }
+ /* Wait until the wait0 call breaks.
+ * Since we set the state to DESTROY
+ * wait0 will return 0.
+ */
+ if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0) {
+ pthread_mutex_unlock(&ps->mutex);
+ return rc;
+ }
+ }
+ ps->state = PSS_DESTROY;
+ ps->used = 0;
+ pthread_mutex_unlock(&ps->mutex);
+ r_close(ps->wpipe[0]);
+ r_close(ps->wpipe[1]);
+ r_close(ps->epfd);
+ pthread_cond_destroy(&ps->wakeup);
+ pthread_mutex_destroy(&ps->mutex);
+ ACR_RING_FOREACH_SAFE(pe, np, &ps->eset_ring, pfd_elem_t, link) {
+ ACR_RING_REMOVE(pe, link);
+ AcrFree(pe);
+ }
+ ACR_RING_FOREACH_SAFE(pe, np, &ps->free_ring, pfd_elem_t, link) {
+ if (pe->pfd.obj != 0) {
+ AcrSelectionKeyReset(env, pe->pfd.obj);
+ /* Unref descriptor */
+ (*env)->DeleteGlobalRef(env, pe->pfd.obj);
+ }
+ ACR_RING_REMOVE(pe, link);
+ AcrFree(pe);
+ }
+ ACR_RING_FOREACH_SAFE(pe, np, &ps->dead_ring, pfd_elem_t, link) {
+ ACR_RING_REMOVE(pe, link);
+ AcrFree(pe);
+ }
+ AcrFree(ps->epset);
+ AcrFree(ps);
+ return rc;
+}
+
+ACR_NET_EXPORT(jint, LinuxSelector, wait0)(JNI_STDARGS, jlong pollset,
+ jobjectArray rs, jshortArray
revents,
+ jint timeout, jboolean autocancel)
+{
+ int i, ns, rc = 0;
+ int rv = 0;
+ jshort *pevents;
+ struct epoll_event eevent = { 0 };
+ pfd_elem_t *pe = 0;
+ acr_time_t now = 0;
+ acr_time_t tmx = 0;
+ acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
+
+ pthread_mutex_lock(&ps->mutex);
+ if (ps->state != 0) {
+ /* Note that this should never happen if api is correctly used.
+ * wait cannot be run from multiple threads and cannot be run
+ * after destroy.
+ */
+ pthread_mutex_unlock(&ps->mutex);
+ ACR_THROW(ACR_EX_EILLEGAL, 0);
+ return 0;
+ }
+ if (ps->used == 1) {
+ /* We only have the wakeup pipe in the pollset
+ * so there is no point to wait.
+ */
+ pthread_mutex_unlock(&ps->mutex);
+ return 0;
+ }
+
+ ps->state = PSS_POLL;
+ pthread_mutex_unlock(&ps->mutex);
+ if (timeout > 0)
+ tmx = AcrTimeMilliseconds() + timeout;
+ for (;;) {
+ ns = epoll_wait(ps->epfd, ps->epset, ps->size, timeout);
+ if (ns == -1 && errno == EINTR) {
+ if (timeout > 0) {
+ timeout = tmx - AcrTimeMilliseconds();
+ if (timeout <= 0) {
+ ns = 0;
+ break;
+ }
+ }
+ }
+ else
+ break;
+ }
+
+ if (ns == -1)
+ rc = ACR_GET_OS_ERROR();
+ pthread_mutex_lock(&ps->mutex);
+ if (ps->state == PSS_DESTROY) {
+ /* Interrupted by destroy0 */
+ pthread_cond_broadcast(&ps->wakeup);
+ pthread_mutex_unlock(&ps->mutex);
+ return 0;
+ }
+ if (rc != 0) {
+ /* Error during poll */
+ ps->state = 0;
+ pthread_cond_broadcast(&ps->wakeup);
+ pthread_mutex_unlock(&ps->mutex);
+ ACR_THROW_NET_ERROR(rc);
+ return 0;
+ }
+ if (ns == 0) {
+ /* Timeout occured */
+ ps->state = 0;
+ pthread_cond_broadcast(&ps->wakeup);
+ pthread_mutex_unlock(&ps->mutex);
+ return 0;
+ }
+ if (ps->state == PSS_WAKEUP) {
+ /* Interrupted by wakeup0 */
+ //if (ps->fdset[0].revents != 0)
+ {
+ /* Drain the wakeup pipe.
+ * Wakeup pipe is always at index zero.
+ */
+ AcrDrainPipe(ps->wpipe[0]);
+ }
+ ps->state = 0;
+ pthread_cond_broadcast(&ps->wakeup);
+ pthread_mutex_unlock(&ps->mutex);
+ return 0;
+ }
+ ps->state = PSS_WAIT;
+ pevents = JARRAY_CRITICAL(jshort, revents);
+ /* Cycle trough the descriptors */
+ for (i = 0; i < ns; i++) {
+ pe = (pfd_elem_t *)ps->epset[i].data.ptr;
+ if (ps->epset[i].events != 0) {
+ if (pe->pfd.fd == ps->wpipe[0]) {
+ /* Drain the wakeup pipe.
+ * Wakeup pipe is always at index zero.
+ */
+ AcrDrainPipe(ps->wpipe[0]);
+ continue;
+ }
+ else {
+ pevents[rv] = reventt(ps->epset[i].events);
+ (*env)->SetObjectArrayElement(env, rs, rv++, pe->pfd.obj);
+ if (autocancel == JNI_TRUE) {
+ epoll_ctl(ps->epfd, EPOLL_CTL_DEL, pe->pfd.fd, &eevent);
+ ps->used--;
+ /* Unref descriptor */
+ (*env)->DeleteGlobalRef(env, pe->pfd.obj);
+ ACR_RING_REMOVE(pe, link);
+ ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+ }
+ else if (pe->pfd.ttl > 0) {
+ /* Reset TTL
+ */
+ if (now == 0)
+ now = AcrTimeNow();
+ pe->pfd.exp = now + pe->pfd.ttl;
+ }
+ }
+ }
+ }
+ /* Remove expired descriptors */
+ ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
+ if (pe->pfd.ttl > 0) {
+ if (now == 0)
+ now = AcrTimeNow();
+ if (pe->pfd.exp > now) {
+ /* Expired descriptor */
+ epoll_ctl(ps->epfd, EPOLL_CTL_DEL, pe->pfd.fd, &eevent);
+ pevents[rv] = ACR_OP_TIMEOUT;
+ (*env)->SetObjectArrayElement(env, rs, rv++, pe->pfd.obj);
+ /* Unref descriptor */
+ (*env)->DeleteGlobalRef(env, pe->pfd.obj);
+ ACR_RING_REMOVE(pe, link);
+ ACR_RING_INSERT_TAIL(&ps->dead_ring, pe, pfd_elem_t, link);
+ ps->used--;
+ }
+ }
+ }
+ RELEASE_CRITICAL(revents, pevents);
+ ps->state = 0;
+ pthread_cond_broadcast(&ps->wakeup);
+ pthread_mutex_unlock(&ps->mutex);
+ return rv;
+}