Author: mturk
Date: Sun Jun 26 18:12:33 2011
New Revision: 1139892
URL: http://svn.apache.org/viewvc?rev=1139892&view=rev
Log:
Use real limits and dynamic structs with epoll
Modified:
commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PollSelector.java
commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/UnixSelector.java
commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/platform/unix/Posix.java
commons/sandbox/runtime/trunk/src/main/native/configure
commons/sandbox/runtime/trunk/src/main/native/include/acr/misc.h
commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c
commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_opts.h
commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c
commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c
commons/sandbox/runtime/trunk/src/main/native/os/unix/selectset.c
commons/sandbox/runtime/trunk/src/main/native/os/unix/util.c
commons/sandbox/runtime/trunk/src/main/native/os/win32/config.hw
commons/sandbox/runtime/trunk/src/main/native/shared/array.c
commons/sandbox/runtime/trunk/src/main/native/shared/sbuf.c
commons/sandbox/runtime/trunk/src/main/native/shared/select.c
commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestPosixEndpoint.java
Modified:
commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PollSelector.java
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PollSelector.java?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
---
commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PollSelector.java
(original)
+++
commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/PollSelector.java
Sun Jun 26 18:12:33 2011
@@ -53,9 +53,11 @@ final class PollSelector extends Abstrac
private static native int size0(long pollset);
private static native int add0(long pollset, SelectionKeyImpl key, long
fd, int events, int ttl);
private static native int del0(long pollset, SelectionKeyImpl key, long
fd);
- private static native int clr0(long pollset, SelectionKeyImpl[] set);
+ private static native int clr0(long pollset, SelectionKeyImpl[] set)
+ throws IllegalStateException;
private static native int wait0(long pollset, SelectionKeyImpl[] set,
short[] events,
- int timeout, boolean autocancel);
+ int timeout, boolean autocancel)
+ throws IllegalStateException, IOException;
/*
* Created from native
Modified:
commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/UnixSelector.java
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/UnixSelector.java?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
---
commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/UnixSelector.java
(original)
+++
commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/net/UnixSelector.java
Sun Jun 26 18:12:33 2011
@@ -40,12 +40,11 @@ import org.apache.commons.runtime.Status
final class UnixSelector extends AbstractSelector
{
- private short[] revents;
- private SelectionKeyImpl[] keyset;
private long pollset;
private ArrayList<SelectionKey> selected;
- private static native long create0(int size)
+ private static native void init0();
+ private static native long create0()
throws OutOfMemoryError,
SystemException;
private static native void wakeup0(long pollset);
@@ -53,19 +52,22 @@ final class UnixSelector extends Abstrac
private static native int size0(long pollset);
private static native int add0(long pollset, SelectionKeyImpl key, long
fd, int events, int ttl);
private static native int del0(long pollset, SelectionKeyImpl key, long
fd);
- private static native int clr0(long pollset, SelectionKeyImpl[] set);
- private static native int wait0(long pollset, SelectionKeyImpl[] set,
short[] events,
- int timeout, boolean autocancel);
+ private native void clr0(long pollset, ArrayList<SelectionKey> set)
+ throws IllegalStateException;
+ private native void wait0(long pollset, ArrayList<SelectionKey>
set,
+ int timeout, boolean autocancel)
+ throws IllegalStateException, IOException;
+ static {
+ init0();
+ }
/*
* Created from native
*/
public UnixSelector(int size)
{
super(size);
- pollset = create0(size);
- revents = new short[size];
- keyset = new SelectionKeyImpl[size];
+ pollset = create0();
selected = new ArrayList<SelectionKey>();
}
@@ -114,6 +116,13 @@ final class UnixSelector extends Abstrac
return key;
}
+ private void addSelected(SelectionKeyImpl key, short events)
+ {
+ selected.add(key);
+ key.revents = events;
+ key.selected = false;
+ }
+
@Override
public List<SelectionKey> select(int timeout)
throws ClosedSelectorException,
@@ -121,13 +130,7 @@ final class UnixSelector extends Abstrac
{
ensureValid();
selected.clear();
- int ns = wait0(pollset, keyset, revents, timeout, autoCancel);
- selected.ensureCapacity(ns);
- for (int i = 0; i < ns; i++) {
- selected.add(keyset[i]);
- keyset[i].revents = revents[i];
- keyset[i].selected = false;
- }
+ wait0(pollset, selected, timeout, autoCancel);
return selected;
}
@@ -138,13 +141,7 @@ final class UnixSelector extends Abstrac
{
ensureValid();
selected.clear();
- int ns = clr0(pollset, keyset);
- selected.ensureCapacity(ns);
- for (int i = 0; i < ns; i++) {
- selected.add(keyset[i]);
- keyset[i].revents = 0;
- keyset[i].selected = false;
- }
+ clr0(pollset, selected);
return selected;
}
Modified:
commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/platform/unix/Posix.java
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/platform/unix/Posix.java?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
---
commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/platform/unix/Posix.java
(original)
+++
commons/sandbox/runtime/trunk/src/main/java/org/apache/commons/runtime/platform/unix/Posix.java
Sun Jun 26 18:12:33 2011
@@ -38,6 +38,13 @@ final class Posix
// No Instance
}
+ private static native int rlim0();
+ public static final int RLIMIT_NOFILE;
+
+ static {
+ RLIMIT_NOFILE = rlim0();
+ }
+
/* open flags */
public static final int O_RDONLY = 0;
public static final int O_WRONLY = 1;
Modified: commons/sandbox/runtime/trunk/src/main/native/configure
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/configure?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/configure (original)
+++ commons/sandbox/runtime/trunk/src/main/native/configure Sun Jun 26 18:12:33
2011
@@ -1455,6 +1455,7 @@ extern "C" {
#define HAVE_STRTOULL `have_function x strtoull`
#define HAVE_STRTONUM `have_function x strtonum`
#define HAVE_SHM_OPEN `have_function x shm_open`
+#define HAVE_SOCKETPAIR `have_function x socketpair`
#define HAVE_GETNAMEINFO `have_working_getnameinfo`
#define HAVE_GETHOSTBYADDR_R `have_function x gethostbyaddr_r`
#define HAVE_GETHOSTBYNAME2 `have_function x gethostbyname2`
Modified: commons/sandbox/runtime/trunk/src/main/native/include/acr/misc.h
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/include/acr/misc.h?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/include/acr/misc.h (original)
+++ commons/sandbox/runtime/trunk/src/main/native/include/acr/misc.h Sun Jun 26
18:12:33 2011
@@ -28,6 +28,7 @@ ACR_CLASS_CTOR(HashSet);
ACR_CLASS_DTOR(HashSet);
int AcrArrayListAdd(JNI_STDARGS, jobject e);
+int AcrArrayListEnsureCapacity(JNI_STDARGS, jint size);
int AcrHashSetAdd(JNI_STDARGS, jobject e);
void AcrLibLockAcquire(void);
void AcrLibLockRelease(void);
Modified: commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/linux/epoll.c Sun Jun 26
18:12:33 2011
@@ -22,6 +22,7 @@
#include "acr/iodefs.h"
#include "acr/netapi.h"
#include "acr/ring.h"
+#include "acr/misc.h"
#include "arch_opts.h"
#include "arch_sync.h"
#include <poll.h>
@@ -32,6 +33,11 @@
#define PSS_POLL 2
#define PSS_WAIT 3
#define PSS_WAKEUP 4
+/* Max epoll_wait events for a single run
+ * Choosen to reflect the 4k page size
+ * PSS_SIZE * sizeof(struct epoll_event) ~= 4096
+ */
+#define PSS_SIZE 256
typedef struct pfd_elem_t pfd_elem_t;
struct pfd_elem_t {
@@ -58,13 +64,26 @@ typedef struct acr_pollset_t {
struct epoll_event *epset;
int epfd;
int used;
- int size;
volatile acr_atomic32_t state;
int wpipe[2];
pthread_mutex_t mutex;
pthread_cond_t wakeup;
} acr_pollset_t;
+J_DECLARE_CLAZZ = {
+ INVALID_FIELD_OFFSET,
+ 0,
+ 0,
+ 0,
+ ACR_NET_CP "UnixSelector"
+};
+
+J_DECLARE_M_ID(0000) = {
+ 0,
+ "addSelected",
+ "(L" ACR_NET_CP "SelectionKeyImpl;S)V"
+};
+
static short ieventt(int event)
{
short rv = 0;
@@ -101,7 +120,16 @@ static short reventt(short event)
return rv;
}
-ACR_NET_EXPORT(jlong, UnixSelector, create0)(JNI_STDARGS, jint size)
+ACR_NET_EXPORT(void, UnixSelector, init0)(JNI_STDARGS)
+{
+ _clazzn.i = (jclass)(*env)->NewGlobalRef(env, obj);
+ if (_clazzn.i == 0)
+ return;
+ V_LOAD_METHOD(0000);
+ _clazzn.u = 1;
+}
+
+ACR_NET_EXPORT(jlong, UnixSelector, create0)(JNI_STDARGS)
{
int rc;
acr_pollset_t *ps;
@@ -114,18 +142,17 @@ ACR_NET_EXPORT(jlong, UnixSelector, crea
ps->wpipe[0] = -1;
ps->wpipe[1] = -1;
ps->epfd = -1;
- ps->size = size + 1;
ps->used = 1;
/* Create the result epoll set.
*/
- ps->epset = ACR_MALLOC(struct epoll_event, ps->size);
+ ps->epset = ACR_MALLOC(struct epoll_event, PSS_SIZE);
if (ps->epset == 0)
return 0;
ACR_RING_INIT(&ps->eset_ring, pfd_elem_t, link);
ACR_RING_INIT(&ps->free_ring, pfd_elem_t, link);
ACR_RING_INIT(&ps->dead_ring, pfd_elem_t, link);
- if ((rc = AcrPipePair(ps->wpipe, 0)) != 0) {
+ if ((rc = AcrSocketPair(ps->wpipe, 0)) != 0) {
ACR_THROW_NET_ERROR(rc);
goto cleanup;
}
@@ -136,7 +163,7 @@ ACR_NET_EXPORT(jlong, UnixSelector, crea
goto cleanup;
}
#else
- ps->epfd = epoll_create(ps->size);
+ ps->epfd = epoll_create(PSS_SIZE);
if (ps->epfd == -1) {
ACR_THROW_NET_ERRNO();
goto cleanup;
@@ -191,10 +218,9 @@ cleanup:
return 0;
}
-ACR_NET_EXPORT(jint, UnixSelector, clr0)(JNI_STDARGS, jlong pollset,
- jobjectArray rs)
+ACR_NET_EXPORT(void, UnixSelector, clr0)(JNI_STDARGS, jlong pollset,
+ jobject sset)
{
- int cnt = 0;
struct epoll_event ev = { 0 };
pfd_elem_t *np, *pe;
acr_pollset_t *ps = J2P(pollset, acr_pollset_t *);
@@ -204,10 +230,10 @@ ACR_NET_EXPORT(jint, UnixSelector, clr0)
if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) {
/* Interrupted by destroy0 */
pthread_mutex_unlock(&ps->mutex);
- return 0;
+ return;
}
if (AcrAtomic32Equ(&ps->state, PSS_POLL)) {
- char ch = 1;
+ char ch = 1;
AcrAtomic32Set(&ps->state, PSS_WAKEUP);
r_write(ps->wpipe[1], &ch, 1);
}
@@ -218,13 +244,19 @@ ACR_NET_EXPORT(jint, UnixSelector, clr0)
if (pthread_cond_wait(&ps->wakeup, &ps->mutex) != 0) {
pthread_mutex_unlock(&ps->mutex);
ACR_THROW(ACR_EX_EILLEGAL, 0);
- return 0;
+ return;
}
}
+ if (ps->used == 1) {
+ pthread_mutex_unlock(&ps->mutex);
+ return;
+ }
+ /* Make sure we have enough storage */
+ AcrArrayListEnsureCapacity(env, sset, ps->used - 1);
ACR_RING_FOREACH_SAFE(pe, np, &ps->eset_ring, pfd_elem_t, link) {
if (ps->wpipe[0] != pe->fd) {
epoll_ctl(ps->epfd, EPOLL_CTL_DEL, pe->fd, &ev);
- (*env)->SetObjectArrayElement(env, rs, cnt++, pe->obj);
+ CALL_VMETHOD2(0000, obj, pe->obj, 0);
/* Unref the container. */
(*env)->DeleteGlobalRef(env, pe->obj);
ACR_RING_REMOVE(pe, link);
@@ -234,7 +266,6 @@ ACR_NET_EXPORT(jint, UnixSelector, clr0)
ps->used = 1;
pthread_mutex_unlock(&ps->mutex);
- return cnt;
}
ACR_NET_EXPORT(void, UnixSelector, wakeup0)(JNI_STDARGS, jlong pollset)
@@ -275,12 +306,6 @@ ACR_NET_EXPORT(jint, UnixSelector, add0)
/* Already destroyed */
goto cleanup;
}
- if (ps->used == ps->size) {
- /* Overflow
- */
- rc = ACR_EOVERFLOW;
- goto cleanup;
- }
ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
if (fd->u.s == pe->fd) {
/* Duplicate descriptor
@@ -331,6 +356,8 @@ ACR_NET_EXPORT(jint, UnixSelector, add0)
}
else {
rc = ACR_GET_OS_ERROR();
+ if (ACR_STATUS_IS_ENOSPC(rc))
+ rc = ACR_EOVERFLOW;
ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
}
cleanup:
@@ -352,17 +379,20 @@ ACR_NET_EXPORT(jint, UnixSelector, del0)
/* Already destroyed */
goto cleanup;
}
- if (epoll_ctl(ps->epfd, EPOLL_CTL_DEL, fd->u.s, &ev) == 0) {
- ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
- if (fd->u.s == pe->fd) {
- /* Unref descriptor */
- (*env)->DeleteGlobalRef(env, pe->obj);
- ACR_RING_REMOVE(pe, link);
- ACR_RING_INSERT_TAIL(&ps->dead_ring, pe, pfd_elem_t, link);
- ps->used--;
- rc = 0;
- break;
- }
+ /* We don't care about the epoll_ctl errors.
+ * They usually mean the fd was not registered with this
+ * epoll instance or already closed.
+ */
+ epoll_ctl(ps->epfd, EPOLL_CTL_DEL, fd->u.s, &ev);
+ ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) {
+ if (fd->u.s == pe->fd) {
+ /* Unref descriptor */
+ (*env)->DeleteGlobalRef(env, pe->obj);
+ ACR_RING_REMOVE(pe, link);
+ ACR_RING_INSERT_TAIL(&ps->dead_ring, pe, pfd_elem_t, link);
+ ps->used--;
+ rc = 0;
+ break;
}
}
@@ -402,10 +432,6 @@ ACR_NET_EXPORT(int, UnixSelector, destro
pthread_cond_destroy(&ps->wakeup);
pthread_mutex_destroy(&ps->mutex);
ACR_RING_FOREACH_SAFE(pe, np, &ps->eset_ring, pfd_elem_t, link) {
- ACR_RING_REMOVE(pe, link);
- AcrFree(pe);
- }
- ACR_RING_FOREACH_SAFE(pe, np, &ps->free_ring, pfd_elem_t, link) {
if (pe->obj != 0) {
AcrSelectionKeyReset(env, pe->obj);
/* Unref descriptor */
@@ -414,6 +440,10 @@ ACR_NET_EXPORT(int, UnixSelector, destro
ACR_RING_REMOVE(pe, link);
AcrFree(pe);
}
+ ACR_RING_FOREACH_SAFE(pe, np, &ps->free_ring, pfd_elem_t, link) {
+ ACR_RING_REMOVE(pe, link);
+ AcrFree(pe);
+ }
ACR_RING_FOREACH_SAFE(pe, np, &ps->dead_ring, pfd_elem_t, link) {
ACR_RING_REMOVE(pe, link);
AcrFree(pe);
@@ -423,13 +453,11 @@ ACR_NET_EXPORT(int, UnixSelector, destro
return rc;
}
-ACR_NET_EXPORT(jint, UnixSelector, wait0)(JNI_STDARGS, jlong pollset,
- jobjectArray rs, jshortArray
revents,
- jint timeout, jboolean autocancel)
+ACR_NET_EXPORT(void, UnixSelector, wait0)(JNI_STDARGS, jlong pollset,
+ jobject sset,
+ jint timeout, jboolean autocancel)
{
int i, ns, rc = 0;
- int rv = 0;
- jshort *pevents;
struct epoll_event ev = { 0 };
pfd_elem_t *np, *pe = 0;
acr_time_t now = 0;
@@ -444,14 +472,14 @@ ACR_NET_EXPORT(jint, UnixSelector, wait0
*/
pthread_mutex_unlock(&ps->mutex);
ACR_THROW(ACR_EX_EILLEGAL, 0);
- return 0;
+ return;
}
if (ps->used == 1) {
/* We only have the wakeup pipe in the pollset
* so there is no point to wait.
*/
pthread_mutex_unlock(&ps->mutex);
- return 0;
+ return;
}
AcrAtomic32Set(&ps->state, PSS_POLL);
@@ -459,11 +487,11 @@ ACR_NET_EXPORT(jint, UnixSelector, wait0
if (timeout > 0)
tmx = AcrTimeMilliseconds() + timeout;
for (;;) {
- ns = epoll_wait(ps->epfd, ps->epset, ps->size, timeout);
+ ns = epoll_wait(ps->epfd, ps->epset, PSS_SIZE, timeout);
if (ns == -1 && errno == EINTR) {
if (timeout > 0) {
- timeout = tmx - AcrTimeMilliseconds();
- if (timeout <= 0) {
+ timeout = (int)(tmx - AcrTimeMilliseconds());
+ if (tmx <= 0) {
ns = 0;
break;
}
@@ -480,7 +508,7 @@ ACR_NET_EXPORT(jint, UnixSelector, wait0
/* Interrupted by destroy0 */
pthread_cond_broadcast(&ps->wakeup);
pthread_mutex_unlock(&ps->mutex);
- return 0;
+ return;
}
if (rc != 0) {
/* Error during poll */
@@ -488,59 +516,72 @@ ACR_NET_EXPORT(jint, UnixSelector, wait0
pthread_cond_broadcast(&ps->wakeup);
pthread_mutex_unlock(&ps->mutex);
ACR_THROW_NET_ERROR(rc);
- return 0;
+ return;
}
if (ns == 0) {
/* Timeout occured */
AcrAtomic32Set(&ps->state, PSS_WAIT);
- pevents = JARRAY_CRITICAL(jshort, revents);
goto cleanup;
}
if (AcrAtomic32Equ(&ps->state, PSS_WAKEUP)) {
/* Interrupted by wakeup0.
- * Drain the wakeup pipe.
+ * Only drain the wakeup pipe without returning any descriptors.
*/
AcrDrainPipe(ps->wpipe[0]);
AcrAtomic32Set(&ps->state, 0);
pthread_cond_broadcast(&ps->wakeup);
pthread_mutex_unlock(&ps->mutex);
- return 0;
+ return;
}
AcrAtomic32Set(&ps->state, PSS_WAIT);
- pevents = JARRAY_CRITICAL(jshort, revents);
- /* Cycle trough the descriptors */
- for (i = 0; i < ns; i++) {
- pe = (pfd_elem_t *)ps->epset[i].data.ptr;
- if (ps->epset[i].events != 0) {
- if (pe->fd == ps->wpipe[0]) {
- /* Drain the wakeup pipe.
- * Wakeup pipe is always at index zero.
- */
- AcrDrainPipe(ps->wpipe[0]);
- continue;
- }
- else {
- pe->revents = reventt(ps->epset[i].events);
- pevents[rv] = pe->revents;
- (*env)->SetObjectArrayElement(env, rs, rv++, pe->obj);
- if (autocancel == JNI_TRUE) {
- epoll_ctl(ps->epfd, EPOLL_CTL_DEL, pe->fd, &ev);
- ps->used--;
- /* Unref descriptor */
- (*env)->DeleteGlobalRef(env, pe->obj);
- ACR_RING_REMOVE(pe, link);
- ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link);
+ while (ns > 0) {
+ AcrArrayListEnsureCapacity(env, sset, ns);
+ /* Cycle trough the descriptors */
+ for (i = 0; i < ns; i++) {
+ pe = (pfd_elem_t *)ps->epset[i].data.ptr;
+ if (ps->epset[i].events != 0) {
+ if (pe->fd == ps->wpipe[0]) {
+ /* Drain the wakeup pipe.
+ * Wakeup pipe is always at index zero.
+ */
+ AcrDrainPipe(ps->wpipe[0]);
}
- else if (pe->ttl > 0) {
- /* Reset TTL
- */
- if (now == 0)
- now = AcrTimeNow();
- pe->exp = now + pe->ttl;
+ else {
+ pe->revents = reventt(ps->epset[i].events);
+ CALL_VMETHOD2(0000, obj, pe->obj, pe->revents);
+ if (autocancel == JNI_TRUE) {
+ epoll_ctl(ps->epfd, EPOLL_CTL_DEL, pe->fd, &ev);
+ ps->used--;
+ /* Unref descriptor */
+ (*env)->DeleteGlobalRef(env, pe->obj);
+ ACR_RING_REMOVE(pe, link);
+ ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t,
link);
+ }
+ else if (pe->ttl > 0) {
+ /* Reset TTL
+ */
+ if (now == 0)
+ now = AcrTimeNow();
+ pe->exp = now + pe->ttl;
+ }
}
}
}
+ if (i == PSS_SIZE) {
+ /* Maximum number of descriptors selected.
+ * Try another wait with 0 timeout which should
+ * return immediately the signaled descriptors if any
+ */
+ do {
+ ns = epoll_wait(ps->epfd, ps->epset, PSS_SIZE, 0);
+ } while (ns == -1 && errno == EINTR);
+ }
+ else {
+ /* No more chunks */
+ break;
+ }
}
+
cleanup:
/* Remove expired descriptors */
ACR_RING_FOREACH_SAFE(pe, np, &ps->eset_ring, pfd_elem_t, link) {
@@ -549,8 +590,8 @@ cleanup:
now = AcrTimeNow();
if (now > pe->exp) {
/* Expired descriptor */
- pevents[rv] = ACR_OP_TIMEOUT;
- (*env)->SetObjectArrayElement(env, rs, rv++, pe->obj);
+ pe->revents = ACR_OP_TIMEOUT;
+ CALL_VMETHOD2(0000, obj, pe->obj, pe->revents);
if (autocancel == JNI_TRUE) {
/* Unref descriptor */
epoll_ctl(ps->epfd, EPOLL_CTL_DEL, pe->fd, &ev);
@@ -562,11 +603,9 @@ cleanup:
}
}
}
- RELEASE_CRITICAL(revents, pevents);
/* Shift all PFDs in the Dead Ring to the Free Ring */
ACR_RING_CONCAT(&ps->free_ring, &ps->dead_ring, pfd_elem_t, link);
AcrAtomic32Set(&ps->state, 0);
pthread_cond_broadcast(&ps->wakeup);
pthread_mutex_unlock(&ps->mutex);
- return rv;
}
Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_opts.h
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_opts.h?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_opts.h (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_opts.h Sun Jun
26 18:12:33 2011
@@ -149,6 +149,7 @@ int AcrCloseOnExec(int fd, int on);
int AcrWaitIO(int fd, int timeout, int events);
int AcrNullPipe(int flags, int fd);
int AcrPipePair(int pd[2], int flags);
+int AcrSocketPair(int pd[2], int flags);
int AcrSigIgnore(int signo);
int AcrSigDefault(int signo);
void AcrDrainPipe(int fd);
Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/poll.c Sun Jun 26
18:12:33 2011
@@ -99,7 +99,7 @@ ACR_NET_EXPORT(jint, Poll, wait0)(JNI_ST
ns = poll(pfd, nevents, timeout);
if (ns == -1 && errno == EINTR) {
if (timeout > 0) {
- timeout = tmx - AcrTimeMilliseconds();
+ timeout = (int)(tmx - AcrTimeMilliseconds());
if (timeout <= 0) {
ns = 0;
break;
@@ -145,7 +145,7 @@ ACR_NET_EXPORT(jshort, Poll, wait1)(JNI_
ns = poll(&pfd, 1, timeout);
if (ns == -1 && errno == EINTR) {
if (timeout > 0) {
- timeout = tmx - AcrTimeMilliseconds();
+ timeout = (int)(tmx - AcrTimeMilliseconds());
if (timeout <= 0) {
ns = 0;
break;
Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/pollset.c Sun Jun 26
18:12:33 2011
@@ -114,7 +114,7 @@ ACR_NET_EXPORT(jlong, PollSelector, crea
AcrFree(ps->fdset);
return 0;
}
- if ((rc = AcrPipePair(ps->wpipe, 0)) != 0) {
+ if ((rc = AcrSocketPair(ps->wpipe, 0)) != 0) {
ACR_THROW_NET_ERROR(rc);
goto cleanup;
}
@@ -277,7 +277,7 @@ ACR_NET_EXPORT(jint, PollSelector, wait0
ns = poll(ps->fdset, ps->used, timeout);
if (ns == -1 && errno == EINTR) {
if (timeout > 0) {
- timeout = tmx - AcrTimeMilliseconds();
+ timeout = (int)(tmx - AcrTimeMilliseconds());
if (timeout <= 0) {
ns = 0;
break;
Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/selectset.c
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/selectset.c?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/selectset.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/selectset.c Sun Jun
26 18:12:33 2011
@@ -42,17 +42,27 @@ ACR_NET_EXPORT(jint, LocalSelectorFactor
return PS_DEFAULT_LOCAL;
}
+/* Instead infinity (-1), set some sane absolute positive limit
+ * Currently this is 256K opened descriptors.
+ */
+#define REAL_INFINITY (1024 * 256)
static int rlimit_nofile(void)
{
- int nm = 65536;
+ int nm = REAL_INFINITY;
#if HAVE_SYS_RESOURCE_H
struct rlimit rl;
- if (getrlimit(RLIMIT_NOFILE, &rl) == 0 && rl.rlim_max != RLIM_INFINITY)
+ if (getrlimit(RLIMIT_NOFILE, &rl) == 0 && rl.rlim_max != RLIM_INFINITY) {
nm = (int)rl.rlim_max;
+ }
else
#endif
+ {
+ errno = 0;
nm = (int)sysconf(_SC_OPEN_MAX);
+ if (nm == -1 && errno == 0)
+ nm = REAL_INFINITY;
+ }
if (nm > 1)
--nm;
else
@@ -60,12 +70,49 @@ static int rlimit_nofile(void)
return nm;
}
+ACR_UNX_EXPORT(jint, Posix, rlim0)(JNI_STDARGS)
+{
+ return rlimit_nofile();
+}
+
+#if defined(LINUX)
+static int epoll_nofile(void)
+{
+ FILE *f;
+ static int _enofile = 0;
+
+ if (_enofile != 0)
+ return _enofile;
+ if ((f = fopen("/proc/sys/fs/epoll/max_user_watches", "r")) != 0) {
+ char b[32] = "";
+ if (fread(b, 1, sizeof(b), f) > 0)
+ _enofile = atoi(b);
+ fclose(f);
+ }
+ return _enofile;
+}
+#endif
+
ACR_NET_EXPORT(jint, SocketSelectorFactory, size0)(JNI_STDARGS)
{
+#if defined(LINUX)
+ int s = epoll_nofile();
+ if (s > 1)
+ return s - 1; /* Space for wakeup pipe */
+ else
+#else
+#endif
return rlimit_nofile();
}
ACR_NET_EXPORT(jint, LocalSelectorFactory, size0)(JNI_STDARGS)
{
+#if defined(LINUX)
+ int s = epoll_nofile();
+ if (s > 1)
+ return s - 1;
+ else
+#else
+#endif
return rlimit_nofile();
}
Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/util.c
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/util.c?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/unix/util.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/unix/util.c Sun Jun 26
18:12:33 2011
@@ -271,12 +271,12 @@ AcrPipePair(int pd[2], int flags)
flags == ACR_PIPE_FULL_NONBLOCK) {
if ((rc = AcrNonblock(pd[0], 1)))
goto finally;
- }
- if (flags == ACR_PIPE_READ_BLOCK ||
- flags == ACR_PIPE_FULL_NONBLOCK) {
- if ((rc = AcrNonblock(pd[1], 1)))
- goto finally;
- }
+ }
+ if (flags == ACR_PIPE_READ_BLOCK ||
+ flags == ACR_PIPE_FULL_NONBLOCK) {
+ if ((rc = AcrNonblock(pd[1], 1)))
+ goto finally;
+ }
#endif
FD_ABOVE_STDFILENO(pd[0]);
if (pd[0] == -1) {
@@ -297,6 +297,61 @@ finally:
return rc;
}
+int
+AcrSocketPair(int pd[2], int flags)
+{
+ int rc = 0;
+#if HAVE_SOCKETPAIR
+ int type = SOCK_STREAM;
+
+# if HAVE_SOCK_NONBLOCK
+ if (flags == ACR_PIPE_FULL_NONBLOCK)
+ type |= SOCK_NONBLOCK;
+# endif
+# if HAVE_SOCK_CLOEXEC
+ type |= SOCK_CLOEXEC;
+# endif
+ if (socketpair(AF_LOCAL, type, 0, pd) == -1)
+ return ACR_GET_OS_ERROR();
+ if (flags == ACR_PIPE_WRITE_BLOCK) {
+ if ((rc = AcrNonblock(pd[0], 1)))
+ goto finally;
+ }
+ if (flags == ACR_PIPE_READ_BLOCK) {
+ if ((rc = AcrNonblock(pd[1], 1)))
+ goto finally;
+ }
+# if !HAVE_SOCK_NONBLOCK
+ if (flags == ACR_PIPE_FULL_NONBLOCK) {
+ if ((rc = AcrNonblock(pd[0], 1)))
+ goto finally;
+ if ((rc = AcrNonblock(pd[1], 1)))
+ goto finally;
+ }
+# endif
+# if !HAVE_SOCK_CLOEXEC
+ if ((rc = AcrCloseOnExec(pd[0], 1)))
+ goto finally;
+ if ((rc = AcrCloseOnExec(pd[1], 1)))
+ goto finally;
+# endif
+#else
+ rc = AcrPipePair(pd, flags);
+ if (rc != 0)
+ goto finally;
+ if ((rc = AcrCloseOnExec(pd[0], 1)))
+ goto finally;
+ if ((rc = AcrCloseOnExec(pd[1], 1)))
+ goto finally;
+#endif
+ return 0;
+finally:
+ s_close(pd[0]);
+ s_close(pd[1]);
+
+ return rc;
+}
+
void
AcrDrainPipe(int fd)
{
Modified: commons/sandbox/runtime/trunk/src/main/native/os/win32/config.hw
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/win32/config.hw?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/os/win32/config.hw (original)
+++ commons/sandbox/runtime/trunk/src/main/native/os/win32/config.hw Sun Jun 26
18:12:33 2011
@@ -109,6 +109,7 @@
#define HAVE_STRTOULL 0
#define HAVE_STRTONUM 0
#define HAVE_SHM_OPEN 0
+#define HAVE_SOCKETPAIR 0
#define HAVE_GETHOSTBYADDR_R 0
#define HAVE_GETHOSTBYNAME2 0
#define HAVE_GETHOSTBYNAME_R 0
Modified: commons/sandbox/runtime/trunk/src/main/native/shared/array.c
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/shared/array.c?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/shared/array.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/shared/array.c Sun Jun 26
18:12:33 2011
@@ -37,11 +37,19 @@ J_DECLARE_M_ID(0000) = {
"(Ljava/lang/Object;)Z"
};
+J_DECLARE_M_ID(0001) = {
+ 0,
+ "ensureCapacity",
+ "(I)V"
+};
+
+
ACR_CLASS_CTOR(ArrayList)
{
if (AcrLoadClass(env, &_clazzn, 0) == JNI_FALSE)
return JNI_FALSE;
J_LOAD_METHOD(0000);
+ J_LOAD_METHOD(0001);
_clazzn.u = 1;
return JNI_TRUE;
}
@@ -65,6 +73,20 @@ AcrArrayListAdd(JNI_STDARGS, jobject e)
return rv;
}
+int
+AcrArrayListEnsureCapacity(JNI_STDARGS, jint size)
+{
+ int rv = ACR_EINIT;
+ if (CLAZZ_LOADED) {
+ CALL_METHOD1(Boolean, 0001, obj, size);
+ if ((*env)->ExceptionCheck(env) == JNI_TRUE)
+ rv = ACR_EGENERAL;
+ else
+ rv = 0;
+ }
+ return rv;
+}
+
ACR_UTIL_EXPORT(jboolean, Array, memcpy0)(JNI_STDARGS,
jarray src,
jint srcPos,
Modified: commons/sandbox/runtime/trunk/src/main/native/shared/sbuf.c
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/shared/sbuf.c?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/shared/sbuf.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/shared/sbuf.c Sun Jun 26
18:12:33 2011
@@ -40,7 +40,7 @@ static int sbuf_extend_size(int size)
{
int newsize;
if (size > 65536)
- newsize = 65536;
+ newsize = ACR_ALIGN(size, 65536);
else
newsize = SBUF_MINEXTEND_SIZE;
while (newsize < size) {
@@ -67,14 +67,18 @@ sbuf_extend(acr_sb_t *s, int addlen)
newsize = sbuf_extend_size(s->s_size + addlen);
if (newsize > s->s_max)
return -1;
- newbuf = malloc(newsize);
- if (newbuf == 0)
- return -1;
- memcpy(newbuf, s->s_buf, s->s_size);
- if (SBUF_ISDYNAMIC(s))
- AcrFree(s->s_buf);
- else
+ if (!SBUF_ISDYNAMIC(s)) {
+ newbuf = malloc(newsize);
+ if (newbuf == 0)
+ return -1;
+ memcpy(newbuf, s->s_buf, s->s_size);
SBUF_SETFLAG(s, ACR_SB_DYNAMIC);
+ }
+ else {
+ newbuf = realloc(s->s_buf, newsize);
+ if (newbuf == 0)
+ return -1;
+ }
s->s_buf = newbuf;
s->s_size = newsize;
@@ -154,17 +158,17 @@ AcrSbInit(acr_sb_t *s, char *buf, int le
}
flags &= ACR_SB_USRFLAGMSK;
memset(s, 0, sizeof(acr_sb_t));
- s->s_size = length;
if (buf != 0) {
s->s_buf = buf;
- s->s_max = length;
+ s->s_size = length;
+ s->s_max = s->s_size;
s->s_flags = flags;
return 0;
}
+ s->s_max = INT_MAX - 65536;
s->s_flags = flags | ACR_SB_AUTOEXTEND;
- s->s_size = sbuf_extend_size(s->s_size);
+ s->s_size = sbuf_extend_size(length);
s->s_buf = malloc(s->s_size);
- s->s_max = INT_MAX - 65536;
if (s->s_buf == 0)
return ACR_ENOMEM;
SBUF_SETFLAG(s, ACR_SB_DYNAMIC);
Modified: commons/sandbox/runtime/trunk/src/main/native/shared/select.c
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/shared/select.c?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
--- commons/sandbox/runtime/trunk/src/main/native/shared/select.c (original)
+++ commons/sandbox/runtime/trunk/src/main/native/shared/select.c Sun Jun 26
18:12:33 2011
@@ -32,6 +32,7 @@ ACR_NET_EXPORT(jint, Select, wait0)(JNI_
#if !defined(WINDOWS)
acr_time_t tmx = 0;
#endif
+ acr_time_t tmo = 0;
acr_fd_t *fd;
struct timeval tv = { 0, 0 };
struct timeval *tp = 0;
@@ -75,24 +76,24 @@ ACR_NET_EXPORT(jint, Select, wait0)(JNI_
}
RELEASE_CRITICAL(events, pevents);
if (timeout >= 0) {
+ tmo = AcrTimeFromMsec(timeout);
#if !defined(WINDOWS)
if (timeout > 0)
- tmx = AcrTimeNow() + AcrTimeFromMsec(timeout);
+ tmx = AcrTimeNow() + tmo;
#endif
tp = &tv;
}
for (;;) {
- if (timeout > 0) {
- acr_time_t us = AcrTimeFromMsec(timeout);
- tv.tv_sec = (long)AcrTimeSec(us);
- tv.tv_usec = (long)AcrTimeUsec(us);
+ if (tp != 0) {
+ tp->tv_sec = (long)AcrTimeSec(tmo);
+ tp->tv_usec = (long)AcrTimeUsec(tmo);
}
ns = select(nmax + 1, &rdset, &wrset, &exset, tp);
#if !defined(WINDOWS)
if (ns == -1 && errno == EINTR) {
- if (timeout > 0) {
- timeout = tmx - AcrTimeNow();
- if (timeout <= 0) {
+ if (tmo > 0) {
+ tmo = tmx - AcrTimeNow();
+ if (tmo <= 0) {
ns = 0;
break;
}
@@ -157,6 +158,7 @@ ACR_NET_EXPORT(jshort, Select, wait1)(JN
#if !defined(WINDOWS)
acr_time_t tmx = 0;
#endif
+ acr_time_t tmo = 0;
acr_fd_t *fd = J2P(fp, acr_fd_t *);
struct timeval tv;
struct timeval *tp = 0;
@@ -176,23 +178,23 @@ ACR_NET_EXPORT(jshort, Select, wait1)(JN
FD_SET(fd->u.s, &exset);
}
if (timeout >= 0) {
+ tmo = AcrTimeFromMsec(timeout);
#if !defined(WINDOWS)
- tmx = AcrTimeNow() + AcrTimeFromMsec(timeout);
+ tmx = AcrTimeNow() + tmo;
#endif
tp = &tv;
}
for (;;) {
- if (timeout >= 0) {
- acr_time_t us = AcrTimeFromMsec(timeout);
- tv.tv_sec = (long)AcrTimeSec(us);
- tv.tv_usec = (long)AcrTimeUsec(us);
+ if (tp != 0) {
+ tp->tv_sec = (long)AcrTimeSec(tmo);
+ tp->tv_usec = (long)AcrTimeUsec(tmo);
}
ns = select(fd->u.s + 1, &rdset, &wrset, &exset, tp);
#if !defined(WINDOWS)
if (ns == -1 && errno == EINTR) {
- if (timeout >= 0) {
- timeout = tmx - AcrTimeNow();
- if (timeout <= 0) {
+ if (tmo >= 0) {
+ tmo = tmx - AcrTimeNow();
+ if (tmo <= 0) {
ns = 0;
break;
}
Modified:
commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestPosixEndpoint.java
URL:
http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestPosixEndpoint.java?rev=1139892&r1=1139891&r2=1139892&view=diff
==============================================================================
---
commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestPosixEndpoint.java
(original)
+++
commons/sandbox/runtime/trunk/src/main/test/org/apache/commons/runtime/TestPosixEndpoint.java
Sun Jun 26 18:12:33 2011
@@ -58,16 +58,20 @@ public class TestPosixEndpoint extends A
} catch (Exception x) {
fail("Acceptor setup failed " + x.toString());
}
- synchronized (sync) {
- // Notify that we are ready to
- // accept the connections
- sync.notifyAll();
- }
while (true) {
try {
- // System.out.println("Accepting connection ...");
+ System.out.println("Accepting connection ...");
+ synchronized (sync) {
+ // Notify that we are ready to
+ // accept the connections
+ sync.notifyAll();
+ }
List<SelectionKey> set = ps.select();
- // System.out.println("Accepted connection: " +
set.size());
+ if (set.size() == 0) {
+ System.out.println("Interrupted.");
+ break;
+ }
+ System.out.println("Accepted " + set.size() + "
connection");
assertEquals(set.size(), 1);
sk = set.get(0);
LocalEndpoint e =
((LocalServerEndpoint)sk.endpoint()).accept();
@@ -78,8 +82,6 @@ public class TestPosixEndpoint extends A
} catch (Exception x) {
fail("Accept failed " + x.toString());
}
- if (!running)
- break;
}
try {
ss.close();
@@ -112,18 +114,29 @@ public class TestPosixEndpoint extends A
//
sync.wait();
}
+ Thread.sleep(200);
} catch (InterruptedException x) {
// Ignore
}
// By this time the key is set up.
assertNotNull(ss.keyFor(ps));
- aw.running = false;
// Connect to the Acceptor
//
LocalEndpoint cs = new LocalEndpoint();
cs.connect(sa);
assertTrue(cs.isBlocking());
cs.close();
+ try {
+ synchronized (sync) {
+ // Wait until Acceptor process the accepted connection
+ //
+ sync.wait();
+ }
+ Thread.sleep(200);
+ } catch (InterruptedException x) {
+ // Ignore
+ }
+ ps.interrupt();
// Wait for the Acceptor thread to finish
aw.join();
List<SelectionKey> set = ps.clear();