Author: mturk Date: Mon Jul 18 09:52:13 2011 New Revision: 1147802 URL: http://svn.apache.org/viewvc?rev=1147802&view=rev Log: Add Solaris port selector
Added: commons/sandbox/runtime/trunk/src/main/native/os/solaris/port.c (with props) Modified: commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h Modified: commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in?rev=1147802&r1=1147801&r2=1147802&view=diff ============================================================================== --- commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in (original) +++ commons/sandbox/runtime/trunk/src/main/native/Makefile.unx.in Mon Jul 18 09:52:13 2011 @@ -90,6 +90,7 @@ LINUX_SOURCES=\ $(TOPDIR)/os/linux/misc.c \ $(TOPDIR)/os/linux/os.c SOLARIS_SOURCES=\ + $(TOPDIR)/os/solaris/port.c \ $(TOPDIR)/os/solaris/os.c LIBSOURCES=\ Added: commons/sandbox/runtime/trunk/src/main/native/os/solaris/port.c URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/solaris/port.c?rev=1147802&view=auto ============================================================================== --- commons/sandbox/runtime/trunk/src/main/native/os/solaris/port.c (added) +++ commons/sandbox/runtime/trunk/src/main/native/os/solaris/port.c Mon Jul 18 09:52:13 2011 @@ -0,0 +1,628 @@ +/* Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "acr/clazz.h" +#include "acr/memory.h" +#include "acr/jniapi.h" +#include "acr/port.h" +#include "acr/time.h" +#include "acr/iodefs.h" +#include "acr/netapi.h" +#include "acr/ring.h" +#include "acr/misc.h" +#include "arch_opts.h" +#include "arch_sync.h" +#include <poll.h> +#include <port.h> + +/* pollset operation states */ +#define PSS_DESTROY 1 +#define PSS_POLL 2 +#define PSS_WAIT 3 +#define PSS_WAKEUP 4 +/* Max events for a single run + * Choosen to reflect the 4k page size + * PSS_SIZE * sizeof(port_event_t) ~= 4096 + */ +#define PSS_SIZE 256 + +typedef struct pfd_elem_t pfd_elem_t; +struct pfd_elem_t { + ACR_RING_ENTRY(pfd_elem_t) link; + int fd; + short ievents; + short revents; + jobject obj; + acr_time_t ttl; + acr_time_t exp; +}; + +typedef struct acr_pollset_t { + /* A ring containing all of the pfd_elem_t that are active + */ + ACR_RING_HEAD(pfd_eset_ring_t, pfd_elem_t) eset_ring; + /* A ring of pfd_elem_t that have been used, and then deleted + */ + ACR_RING_HEAD(pfd_free_ring_t, pfd_elem_t) free_ring; + /* A ring of pfd_elem_t where rings that have been deleted but + * might still be inside a _epoll() + */ + ACR_RING_HEAD(pfd_dead_ring_t, pfd_elem_t) dead_ring; + port_event_t *peset; + int spfd; + int used; + volatile acr_atomic32_t state; + int wpipe[2]; + pthread_mutex_t mutex; + pthread_cond_t wakeup; +} acr_pollset_t; + +J_DECLARE_CLAZZ = { + INVALID_FIELD_OFFSET, + 0, + 0, + 0, + ACR_NET_CP "UnixSelector" +}; + +J_DECLARE_M_ID(0000) = { + 0, + "addSelected", + "(L" ACR_NET_CP "SelectionKeyImpl;S)V" +}; + +static short ieventt(int event) +{ + short rv = 0; + + if (event & ACR_OP_INP) + rv |= POLLIN; + if (event & ACR_OP_OUT) + rv |= POLLOUT; + if (event & ACR_OP_PRI) + rv |= POLLPRI; + /* POLLERR, POLLHUP, and POLLNVAL aren't valid as requested events + */ + return rv; +} + +static short reventt(short event) +{ + short rv = 0; + + if (event & POLLIN) + rv |= ACR_OP_INP; + if (event & POLLOUT) + rv |= ACR_OP_OUT; + if (event & POLLPRI) + rv |= ACR_OP_PRI; + if (event & POLLERR) + rv |= ACR_OP_ERROR; + if (event & POLLHUP) + rv |= ACR_OP_HANGUP; + if (event & POLLNVAL) + rv |= ACR_OP_NVAL; + return rv; +} + +ACR_NET_EXPORT(void, UnixSelector, init0)(JNI_STDARGS) +{ + _clazzn.i = (jclass)(*env)->NewGlobalRef(env, obj); + if (_clazzn.i == 0) + return; + V_LOAD_METHOD(0000); + _clazzn.u = 1; +} + +ACR_NET_EXPORT(jlong, UnixSelector, create0)(JNI_STDARGS) +{ + int rc; + acr_pollset_t *ps; + pfd_elem_t *pe = 0; + port_event_t ev; + + ps = ACR_TALLOC(acr_pollset_t); + if (ps == 0) + return 0; + ps->wpipe[0] = -1; + ps->wpipe[1] = -1; + ps->spfd = -1; + ps->used = 1; + /* Create the result epoll set. + */ + ps->peset = ACR_MALLOC(port_event_t, PSS_SIZE); + if (ps->peset == 0) + return 0; + ACR_RING_INIT(&ps->eset_ring, pfd_elem_t, link); + ACR_RING_INIT(&ps->free_ring, pfd_elem_t, link); + ACR_RING_INIT(&ps->dead_ring, pfd_elem_t, link); + + if ((rc = AcrSocketPair(ps->wpipe, 0)) != 0) { + ACR_THROW_NET_ERROR(rc); + goto cleanup; + } + ps->spfd = port_create(); + if (ps->spfd == -1) { + ACR_THROW_NET_ERRNO(); + goto cleanup; + } + if ((rc = AcrCloseOnExec(ps->spfd, 1)) != 0) { + ACR_THROW_NET_ERROR(rc); + goto cleanup; + } + pe = ACR_TALLOC(pfd_elem_t); + if (pe == 0) { + goto cleanup; + } + ACR_RING_ELEM_INIT(pe, link); + + /* Add the wakeup pipe to the pset + */ + pe->fd = ps->wpipe[0]; + pe->obj = 0; + pe->ttl = ACR_INFINITE; + pe->exp = ACR_INFINITE; + pe->ievents = POLLIN; + pe->revents = 0; + + ACR_RING_INSERT_TAIL(&ps->eset_ring, pe, pfd_elem_t, link); + if (port_associate(ps->spfd, PORT_SOURCE_FD, pe->fd, pe->ievents, (void *)pe) == -1) { + /* Failed adding pipe to the pollset + */ + ACR_THROW_NET_ERRNO(); + goto cleanup; + } + if (pthread_cond_init(&ps->wakeup, 0) != 0) { + ACR_THROW_NET_ERRNO(); + goto cleanup; + } + if (pthread_mutex_init(&ps->mutex, 0) != 0) { + ACR_THROW_NET_ERRNO(); + pthread_cond_destroy(&ps->wakeup); + goto cleanup; + } + return P2J(ps); + +cleanup: + r_close(ps->wpipe[0]); + r_close(ps->wpipe[1]); + r_close(ps->spfd); + AcrFree(ps->peset); + AcrFree(ps); + AcrFree(pe); + return 0; +} + +ACR_NET_EXPORT(void, UnixSelector, clr0)(JNI_STDARGS, jlong pollset, + jobject sset) +{ + pfd_elem_t *np, *pe; + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + + pthread_mutex_lock(&ps->mutex); + while (!AcrAtomic32Equ(&ps->state, 0)) { + if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) { + /* Interrupted by destroy0 */ + pthread_mutex_unlock(&ps->mutex); + return; + } + if (AcrAtomic32Equ(&ps->state, PSS_POLL)) { + char ch = 1; + AcrAtomic32Set(&ps->state, PSS_WAKEUP); + r_write(ps->wpipe[1], &ch, 1); + } + /* Wait until the wait0 call breaks. + * Since we set the state to DESTROY + * wait0 will return 0. + */ + if (pthread_cond_wait(&ps->wakeup, &ps->mutex) != 0) { + pthread_mutex_unlock(&ps->mutex); + ACR_THROW(ACR_EX_EILLEGAL, 0); + return; + } + } + if (ps->used == 1) { + pthread_mutex_unlock(&ps->mutex); + return; + } + /* Make sure we have enough storage */ + AcrArrayListEnsureCapacity(env, sset, ps->used - 1); + ACR_RING_FOREACH_SAFE(pe, np, &ps->eset_ring, pfd_elem_t, link) { + if (ps->wpipe[0] != pe->fd) { + port_dissociate(ps->spfd, PORT_SOURCE_FD, pe->fd); + CALL_VMETHOD2(0000, obj, pe->obj, 0); + /* Unref the container. */ + (*env)->DeleteGlobalRef(env, pe->obj); + ACR_RING_REMOVE(pe, link); + ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link); + } + } + + ps->used = 1; + pthread_mutex_unlock(&ps->mutex); +} + +ACR_NET_EXPORT(void, UnixSelector, wakeup0)(JNI_STDARGS, jlong pollset) +{ + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + + pthread_mutex_lock(&ps->mutex); + if (AcrAtomic32Equ(&ps->state, PSS_POLL)) { + char ch = 1; + AcrAtomic32Set(&ps->state, PSS_WAKEUP); + r_write(ps->wpipe[1], &ch, 1); + } + pthread_mutex_unlock(&ps->mutex); +} + +ACR_NET_EXPORT(jint, UnixSelector, size0)(JNI_STDARGS, jlong pollset) +{ + int rv; + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + + pthread_mutex_lock(&ps->mutex); + rv = ps->used - 1; + pthread_mutex_unlock(&ps->mutex); + return rv; +} + +ACR_NET_EXPORT(jint, UnixSelector, add0)(JNI_STDARGS, jlong pollset, jobject fo, + jlong fp, jint events, jint ttlms) +{ + int rc = 0; + pfd_elem_t *pe; + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + acr_sd_t *fd = J2P(fp, acr_sd_t *); + + pthread_mutex_lock(&ps->mutex); + if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) { + /* Already destroyed */ + goto cleanup; + } + ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) { + if (fd->s == pe->fd) { + /* Duplicate descriptor + */ + rc = ACR_EALREADY; + goto cleanup; + } + } + + if (!ACR_RING_EMPTY(&ps->free_ring, pfd_elem_t, link)) { + pe = ACR_RING_FIRST(&ps->free_ring); + ACR_RING_REMOVE(pe, link); + } + else { + pe = ACR_TALLOC(pfd_elem_t); + if (pe == 0) { + rc = ACR_ENOMEM; + goto cleanup; + } + ACR_RING_ELEM_INIT(pe, link); + } + + pe->fd = fd->s; + pe->ievents = ieventt(events); + pe->revents = 0; + pe->obj = (*env)->NewGlobalRef(env, fo); + if (pe->obj == 0) { + /* In case the NewGlobalRef fails, + * OutOfMemoryError should be thrown already by the JVM. + */ + ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link); + rc = ACR_ENOMEM; + goto cleanup; + } + if (ttlms > 0) { + pe->ttl = AcrTimeFromMsec(ttlms); + pe->exp = AcrTimeNow() + pe->ttl; + } + else { + pe->ttl = ACR_INFINITE; + pe->exp = ACR_INFINITE; + } + if (port_associate(ps->spfd, PORT_SOURCE_FD, pe->fd, pe->ievents, (void *)pe) == 0) { + ps->used++; + ACR_RING_INSERT_TAIL(&ps->eset_ring, pe, pfd_elem_t, link); + } + else { + rc = ACR_GET_OS_ERROR(); + if (ACR_STATUS_IS_ENOSPC(rc)) + rc = ACR_EOVERFLOW; + ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link); + } +cleanup: + pthread_mutex_unlock(&ps->mutex); + return rc; +} + +ACR_NET_EXPORT(jint, UnixSelector, del0)(JNI_STDARGS, jlong pollset, + jobject fo, jlong fp) +{ + int rc = ACR_EOF; + pfd_elem_t *pe = 0; + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + acr_sd_t *fd = J2P(fp, acr_sd_t *); + + pthread_mutex_lock(&ps->mutex); + if (AcrAtomic32Equ(&ps->state, PSS_DESTROY) || ps->used < 2) { + /* Already destroyed */ + goto cleanup; + } + /* We don't care about the epoll_ctl errors. + * They usually mean the fd was not registered with this + * epoll instance or already closed. + */ + port_dissociate(ps->spfd, PORT_SOURCE_FD, fd->s); + ACR_RING_FOREACH(pe, &ps->eset_ring, pfd_elem_t, link) { + if (fd->s == pe->fd) { + /* Unref descriptor */ + (*env)->DeleteGlobalRef(env, pe->obj); + ACR_RING_REMOVE(pe, link); + ACR_RING_INSERT_TAIL(&ps->dead_ring, pe, pfd_elem_t, link); + ps->used--; + rc = 0; + break; + } + } + +cleanup: + pthread_mutex_unlock(&ps->mutex); + return rc; +} + +ACR_NET_EXPORT(int, UnixSelector, destroy0)(JNI_STDARGS, jlong pollset) +{ + int rc = 0; + pfd_elem_t *np, *pe = 0; + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + + pthread_mutex_lock(&ps->mutex); + if (!AcrAtomic32Equ(&ps->state, 0)) { + int state = AcrAtomic32Set(&ps->state, PSS_DESTROY); + if (state == PSS_POLL) { + char ch = 1; + r_write(ps->wpipe[1], &ch, 1); + } + /* Wait until the wait0 call breaks. + * Since we set the state to DESTROY + * wait0 will return 0. + */ + if ((rc = pthread_cond_wait(&ps->wakeup, &ps->mutex)) != 0) { + pthread_mutex_unlock(&ps->mutex); + return rc; + } + } + AcrAtomic32Set(&ps->state, PSS_DESTROY); + ps->used = 0; + pthread_mutex_unlock(&ps->mutex); + r_close(ps->wpipe[0]); + r_close(ps->wpipe[1]); + r_close(ps->spfd); + pthread_cond_destroy(&ps->wakeup); + pthread_mutex_destroy(&ps->mutex); + ACR_RING_FOREACH_SAFE(pe, np, &ps->eset_ring, pfd_elem_t, link) { + if (pe->obj != 0) { + AcrSelectionKeyReset(env, pe->obj); + /* Unref descriptor */ + (*env)->DeleteGlobalRef(env, pe->obj); + } + ACR_RING_REMOVE(pe, link); + AcrFree(pe); + } + ACR_RING_FOREACH_SAFE(pe, np, &ps->free_ring, pfd_elem_t, link) { + ACR_RING_REMOVE(pe, link); + AcrFree(pe); + } + ACR_RING_FOREACH_SAFE(pe, np, &ps->dead_ring, pfd_elem_t, link) { + ACR_RING_REMOVE(pe, link); + AcrFree(pe); + } + AcrFree(ps->peset); + AcrFree(ps); + return rc; +} + +static int call_port_getn(int port, port_event_t list[], + unsigned int max, unsigned int *nget, + int timeout) +{ + struct timespec tv, *tp = 0; + int ret; + int rv = 0; + + if (timeout >= 0) { + tp = &tv; + tp->tv_sec = (long)(timeout / 1000); + tp->tv_nsec = (long)(timeout % 1000) * 1000000; + } + + list[0].portev_user = (void *)-1; /* so we can double check that an + * event was returned + */ + ret = port_getn(port, list, max, nget, tp); + /* Note: 32-bit port_getn() on Solaris 10 x86 returns large negative + * values instead of 0 when returning immediately. + */ + if (ret == -1) { + rv = ACR_GET_NETOS_ERROR(); + switch(rv) { + case EINTR: + case ETIME: + if (*nget > 0 && list[0].portev_user != (void *)-1) { + /* This confusing API can return an event at the same time + * that it reports EINTR or ETIME. If that occurs, just + * report the event. With EINTR, nget can be > 0 without + * any event, so check that portev_user was filled in. + * + * (Maybe it will be simplified; see thread + * http://mail.opensolaris.org + * /pipermail/networking-discuss/2009-August/011979.html + * This code will still work afterwards.) + */ + rv = 0; + break; + } + if (rv == ETIME) + rv = ACR_TIMEUP; + /* fall-through */ + default: + *nget = 0; + break; + } + } + else if (*nget == 0) + rv = ACR_TIMEUP; + return rv; +} + +ACR_NET_EXPORT(void, UnixSelector, wait0)(JNI_STDARGS, jlong pollset, + jobject sset, + jint timeout, jboolean autocancel) +{ + int i, rc = 0; + unsigned int ns = 1; + pfd_elem_t *np, *pe = 0; + acr_time_t now = 0; + acr_pollset_t *ps = J2P(pollset, acr_pollset_t *); + + pthread_mutex_lock(&ps->mutex); + if (!AcrAtomic32Equ(&ps->state, 0)) { + /* Note that this should never happen if api is correctly used. + * wait cannot be run from multiple threads and cannot be run + * after destroy. + */ + pthread_mutex_unlock(&ps->mutex); + ACR_THROW(ACR_EX_EILLEGAL, 0); + return; + } + if (ps->used == 1) { + /* We only have the wakeup pipe in the pollset + * so there is no point to wait. + */ + pthread_mutex_unlock(&ps->mutex); + return; + } + + AcrAtomic32Set(&ps->state, PSS_POLL); + pthread_mutex_unlock(&ps->mutex); + rc = call_port_getn(ps->spfd, ps->peset, PSS_SIZE, &ns, timeout); + pthread_mutex_lock(&ps->mutex); + if (AcrAtomic32Equ(&ps->state, PSS_DESTROY)) { + /* Interrupted by destroy0 */ + pthread_cond_broadcast(&ps->wakeup); + pthread_mutex_unlock(&ps->mutex); + return; + } + if (rc != 0 && rc != ACR_TIMEUP) { + /* Error during poll */ + AcrAtomic32Set(&ps->state, 0); + pthread_cond_broadcast(&ps->wakeup); + pthread_mutex_unlock(&ps->mutex); + ACR_THROW_NET_ERROR(rc); + return; + } + if (ns == 0 || rc == ACR_TIMEUP) { + /* Timeout occured */ + AcrAtomic32Set(&ps->state, PSS_WAIT); + goto cleanup; + } + if (AcrAtomic32Equ(&ps->state, PSS_WAKEUP)) { + /* Interrupted by wakeup0. + * Only drain the wakeup pipe without returning any descriptors. + */ + AcrDrainPipe(ps->wpipe[0]); + AcrAtomic32Set(&ps->state, 0); + pthread_cond_broadcast(&ps->wakeup); + pthread_mutex_unlock(&ps->mutex); + return; + } + AcrAtomic32Set(&ps->state, PSS_WAIT); + while (ns > 0) { + AcrArrayListEnsureCapacity(env, sset, ns); + /* Cycle trough the descriptors */ + for (i = 0; i < ns; i++) { + pe = (pfd_elem_t*)ps->peset[i].portev_user; + if (ps->peset[i].portev_events != 0) { + if (pe->fd == ps->wpipe[0]) { + /* Drain the wakeup pipe. + * Wakeup pipe is always at index zero. + */ + AcrDrainPipe(ps->wpipe[0]); + } + else { + pe->revents = reventt(ps->peset[i].portev_events); + CALL_VMETHOD2(0000, obj, pe->obj, pe->revents); + if (autocancel == JNI_TRUE) { + port_dissociate(ps->spfd, PORT_SOURCE_FD, pe->fd); + ps->used--; + /* Unref descriptor */ + (*env)->DeleteGlobalRef(env, pe->obj); + ACR_RING_REMOVE(pe, link); + ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link); + } + else if (pe->ttl > 0) { + /* Reset TTL + */ + if (now == 0) + now = AcrTimeNow(); + pe->exp = now + pe->ttl; + } + } + } + } + if (i == PSS_SIZE) { + /* Maximum number of descriptors selected. + * Try another wait with 0 timeout which should + * return immediately the signaled descriptors if any + */ + ns = 1; + rc = call_port_getn(ps->spfd, ps->peset, PSS_SIZE, &ns, 0); + if (rc == 0) + continue; + } + /* No more chunks */ + break; + } + +cleanup: + /* Remove expired descriptors */ + ACR_RING_FOREACH_SAFE(pe, np, &ps->eset_ring, pfd_elem_t, link) { + if (pe->ttl > 0 && pe->revents == 0) { + if (now == 0) + now = AcrTimeNow(); + if (now > pe->exp) { + /* Expired descriptor */ + pe->revents = ACR_OP_TIMEOUT; + CALL_VMETHOD2(0000, obj, pe->obj, pe->revents); + if (autocancel == JNI_TRUE) { + /* Unref descriptor */ + port_dissociate(ps->spfd, PORT_SOURCE_FD, pe->fd); + (*env)->DeleteGlobalRef(env, pe->obj); + ACR_RING_REMOVE(pe, link); + ACR_RING_INSERT_TAIL(&ps->free_ring, pe, pfd_elem_t, link); + ps->used--; + } + } + } + } + /* Shift all PFDs in the Dead Ring to the Free Ring */ + ACR_RING_CONCAT(&ps->free_ring, &ps->dead_ring, pfd_elem_t, link); + AcrAtomic32Set(&ps->state, 0); + pthread_cond_broadcast(&ps->wakeup); + pthread_mutex_unlock(&ps->mutex); +} + Propchange: commons/sandbox/runtime/trunk/src/main/native/os/solaris/port.c ------------------------------------------------------------------------------ svn:eol-style = native Modified: commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h URL: http://svn.apache.org/viewvc/commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h?rev=1147802&r1=1147801&r2=1147802&view=diff ============================================================================== --- commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h (original) +++ commons/sandbox/runtime/trunk/src/main/native/os/unix/arch_defs.h Mon Jul 18 09:52:13 2011 @@ -137,7 +137,7 @@ typedef struct stat struct_stat_ #endif /* F_DUPFD */ -#if defined(LINUX) +#if defined(LINUX) || defined(SOLARIS) # define PS_DEFAULT_TYPE ACR_PS_TYPE_UNIX #else # define PS_DEFAULT_TYPE ACR_PS_TYPE_POLL