Author: trustin
Date: Tue Nov 13 02:42:03 2007
New Revision: 594477
URL: http://svn.apache.org/viewvc?rev=594477&view=rev
Log:
Resolved issue: DIRMINA-375 (Synchronous Client API)
* Added IoSessionConfig.useReadOperation
* Added IoSession.read() - only usable when useReadOperation is true
* Added ReadFuture and its default implementation
Added:
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java
(with props)
mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java
(with props)
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java?rev=594477&r1=594476&r2=594477&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
Tue Nov 13 02:42:03 2007
@@ -82,7 +82,39 @@
}
if (getHandler() == null) {
- throw new IllegalStateException("handler is not set.");
+ if (getSessionConfig().isUseReadOperation()) {
+ setHandler(new IoHandler() {
+ public void exceptionCaught(IoSession session,
+ Throwable cause) throws Exception {
+ }
+
+ public void messageReceived(IoSession session,
+ Object message) throws Exception {
+ }
+
+ public void messageSent(IoSession session, Object message)
+ throws Exception {
+ }
+
+ public void sessionClosed(IoSession session)
+ throws Exception {
+ }
+
+ public void sessionCreated(IoSession session)
+ throws Exception {
+ }
+
+ public void sessionIdle(IoSession session, IdleStatus
status)
+ throws Exception {
+ }
+
+ public void sessionOpened(IoSession session)
+ throws Exception {
+ }
+ });
+ } else {
+ throw new IllegalStateException("handler is not set.");
+ }
}
return doConnect(remoteAddress, localAddress);
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=594477&r1=594476&r2=594477&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
Tue Nov 13 02:42:03 2007
@@ -24,11 +24,14 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.FileChannel;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.mina.util.CircularQueue;
+
/**
* Base implementation of [EMAIL PROTECTED] IoSession}.
@@ -38,6 +41,11 @@
*/
public abstract class AbstractIoSession implements IoSession {
+ private static final AttributeKey READY_READ_FUTURES =
+ new AttributeKey(AbstractIoSession.class, "readyReadFutures");
+ private static final AttributeKey WAITING_READ_FUTURES =
+ new AttributeKey(AbstractIoSession.class, "waitingReadFutures");
+
private static final IoFutureListener<CloseFuture>
SCHEDULED_COUNTER_RESETTER =
new IoFutureListener<CloseFuture>() {
public void operationComplete(CloseFuture future) {
@@ -152,6 +160,85 @@
getWriteRequestQueue().offer(this, CLOSE_REQUEST);
getProcessor().flush(this);
return closeFuture;
+ }
+
+ public ReadFuture read() {
+ if (!getConfig().isUseReadOperation()) {
+ throw new IllegalStateException("useReadOperation is not
enabled.");
+ }
+
+ Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
+ ReadFuture future;
+ synchronized (readyReadFutures) {
+ future = readyReadFutures.poll();
+ if (future != null) {
+ if (future.isClosed()) {
+ // Let other readers get notified.
+ readyReadFutures.offer(future);
+ }
+ } else {
+ future = new DefaultReadFuture(this);
+ getWaitingReadFutures().offer(future);
+ }
+ }
+
+ return future;
+ }
+
+ protected void offerReadFuture(Object message) {
+ newReadFuture().setRead(message);
+ }
+
+ protected void offerFailedReadFuture(Throwable exception) {
+ newReadFuture().setException(exception);
+ }
+
+ protected void offerClosedReadFuture() {
+ Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
+ synchronized (readyReadFutures) {
+ newReadFuture().setClosed();
+ }
+ }
+
+ private ReadFuture newReadFuture() {
+ Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
+ Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
+ ReadFuture future;
+ synchronized (readyReadFutures) {
+ future = waitingReadFutures.poll();
+ if (future == null) {
+ future = new DefaultReadFuture(this);
+ readyReadFutures.offer(future);
+ }
+ }
+ return future;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Queue<ReadFuture> getReadyReadFutures() {
+ Queue<ReadFuture> readyReadFutures =
+ (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES);
+ if (readyReadFutures == null) {
+ readyReadFutures = new CircularQueue<ReadFuture>();
+
+ Queue<ReadFuture> oldReadyReadFutures =
+ (Queue<ReadFuture>) setAttributeIfAbsent(
+ READY_READ_FUTURES, readyReadFutures);
+ if (oldReadyReadFutures != null) {
+ readyReadFutures = oldReadyReadFutures;
+ }
+
+ // Initialize waitingReadFutures together.
+ Queue<ReadFuture> waitingReadFutures =
+ new CircularQueue<ReadFuture>();
+ setAttributeIfAbsent(WAITING_READ_FUTURES, waitingReadFutures);
+ }
+ return readyReadFutures;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Queue<ReadFuture> getWaitingReadFutures() {
+ return (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES);
}
public WriteFuture write(Object message) {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java?rev=594477&r1=594476&r2=594477&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java
Tue Nov 13 02:42:03 2007
@@ -35,6 +35,7 @@
private int idleTimeForWrite;
private int idleTimeForBoth;
private int writeTimeout;
+ private boolean useReadOperation;
protected AbstractIoSessionConfig() {
}
@@ -51,6 +52,7 @@
setIdleTime(IdleStatus.READER_IDLE,
config.getIdleTime(IdleStatus.READER_IDLE));
setIdleTime(IdleStatus.WRITER_IDLE,
config.getIdleTime(IdleStatus.WRITER_IDLE));
setWriteTimeout(config.getWriteTimeout());
+ setUseReadOperation(config.isUseReadOperation());
doSetAll(config);
}
@@ -153,5 +155,13 @@
+ writeTimeout);
}
this.writeTimeout = writeTimeout;
+ }
+
+ public boolean isUseReadOperation() {
+ return useReadOperation;
+ }
+
+ public void setUseReadOperation(boolean useReadOperation) {
+ this.useReadOperation = useReadOperation;
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=594477&r1=594476&r2=594477&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
Tue Nov 13 02:42:03 2007
@@ -681,17 +681,24 @@
@Override
public void sessionClosed(NextFilter nextFilter, IoSession session)
throws Exception {
+ AbstractIoSession s = (AbstractIoSession) session;
try {
- session.getHandler().sessionClosed(session);
+ s.getHandler().sessionClosed(session);
} finally {
try {
- ((AbstractIoSession)
session).getWriteRequestQueue().dispose(session);
+ s.getWriteRequestQueue().dispose(session);
} finally {
try {
- ((AbstractIoSession)
session).getAttributeMap().dispose(session);
+ s.getAttributeMap().dispose(session);
} finally {
- // Remove all filters.
- session.getFilterChain().clear();
+ try {
+ // Remove all filters.
+ session.getFilterChain().clear();
+ } finally {
+ if (s.getConfig().isUseReadOperation()) {
+ s.offerClosedReadFuture();
+ }
+ }
}
}
}
@@ -706,14 +713,28 @@
@Override
public void exceptionCaught(NextFilter nextFilter, IoSession session,
Throwable cause) throws Exception {
- session.getHandler().exceptionCaught(session, cause);
+ AbstractIoSession s = (AbstractIoSession) session;
+ try {
+ s.getHandler().exceptionCaught(s, cause);
+ } finally {
+ if (s.getConfig().isUseReadOperation()) {
+ s.offerFailedReadFuture(cause);
+ }
+ }
}
@Override
public void messageReceived(NextFilter nextFilter, IoSession session,
Object message) throws Exception {
- ((AbstractIoSession) session).increaseReadMessages();
- session.getHandler().messageReceived(session, message);
+ AbstractIoSession s = (AbstractIoSession) session;
+ s.increaseReadMessages();
+ try {
+ session.getHandler().messageReceived(s, message);
+ } finally {
+ if (s.getConfig().isUseReadOperation()) {
+ s.offerReadFuture(message);
+ }
+ }
}
@Override
Added:
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java?rev=594477&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java
(added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java
Tue Nov 13 02:42:03 2007
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.io.IOException;
+
+
+/**
+ * A default implementation of [EMAIL PROTECTED] WriteFuture}.
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public class DefaultReadFuture extends DefaultIoFuture implements ReadFuture {
+
+ private static final Object CLOSED = new Object();
+
+ /**
+ * Creates a new instance.
+ */
+ public DefaultReadFuture(IoSession session) {
+ super(session);
+ }
+
+ public Object getMessage() {
+ if (isReady()) {
+ Object v = getValue();
+ if (v == CLOSED) {
+ return null;
+ }
+
+ if (v instanceof Throwable) {
+ if (v instanceof RuntimeException) {
+ throw (RuntimeException) v;
+ }
+ if (v instanceof Error) {
+ throw (Error) v;
+ }
+ if (v instanceof IOException) {
+ throw new RuntimeIoException(
+ "Failed to read.", (IOException) v);
+ }
+ if (v instanceof Exception) {
+ throw new RuntimeException(
+ "Failed to read.", (Exception) v);
+ }
+ }
+
+ return v;
+ }
+
+ return null;
+ }
+
+
+ public boolean isRead() {
+ if (isReady()) {
+ Object v = getValue();
+ return (v != CLOSED && !(v instanceof Throwable));
+ }
+ return false;
+ }
+
+ public boolean isClosed() {
+ if (isReady()) {
+ return getValue() == CLOSED;
+ }
+ return false;
+ }
+
+ public Throwable getException() {
+ if (isReady()) {
+ Object v = getValue();
+ if (v instanceof Throwable) {
+ return (Throwable) v;
+ }
+ }
+ return null;
+ }
+
+ public void setClosed() {
+ setValue(CLOSED);
+ }
+
+ public void setRead(Object message) {
+ setValue(message);
+ }
+
+ public void setException(Throwable cause) {
+ setValue(cause);
+ }
+
+ @Override
+ public ReadFuture await() throws InterruptedException {
+ return (ReadFuture) super.await();
+ }
+
+ @Override
+ public ReadFuture awaitUninterruptibly() {
+ return (ReadFuture) super.awaitUninterruptibly();
+ }
+
+ @Override
+ public ReadFuture addListener(IoFutureListener<?> listener) {
+ return (ReadFuture) super.addListener(listener);
+ }
+
+ @Override
+ public ReadFuture removeListener(IoFutureListener<?> listener) {
+ return (ReadFuture) super.removeListener(listener);
+ }
+}
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java?rev=594477&r1=594476&r2=594477&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java Tue Nov
13 02:42:03 2007
@@ -78,6 +78,14 @@
* Returns the [EMAIL PROTECTED] TransportMetadata} that this session runs
on.
*/
TransportMetadata getTransportMetadata();
+
+ /**
+ *
+ * @throws IllegalStateException if
+ * [EMAIL PROTECTED] IoSessionConfig#setUseReadOperation(boolean)
useReadOperation}
+ * option has not been enabled.
+ */
+ ReadFuture read();
/**
* Writes the specified <code>message</code> to remote peer. This
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java?rev=594477&r1=594476&r2=594477&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java
Tue Nov 13 02:42:03 2007
@@ -19,6 +19,8 @@
*/
package org.apache.mina.common;
+import java.util.concurrent.BlockingQueue;
+
/**
* The configuration of [EMAIL PROTECTED] IoSession}.
*
@@ -98,6 +100,26 @@
* Sets write timeout in seconds.
*/
void setWriteTimeout(int writeTimeout);
+
+ /**
+ * Returns <tt>true</tt> if and only if [EMAIL PROTECTED]
IoSession#read()} operation
+ * is enabled. If enabled, all received messages are stored in an internal
+ * [EMAIL PROTECTED] BlockingQueue} so you can read received messages in
more
+ * convenient way for client applications. Enabling this option is not
+ * useful to server applications and can cause unintended memory leak, and
+ * therefore it's disabled by default.
+ */
+ boolean isUseReadOperation();
+
+ /**
+ * Enables or disabled [EMAIL PROTECTED] IoSession#read()} operation. If
enabled, all
+ * received messages are stored in an internal [EMAIL PROTECTED]
BlockingQueue} so you
+ * can read received messages in more convenient way for client
+ * applications. Enabling this option is not useful to server applications
+ * and can cause unintended memory leak, and therefore it's disabled by
+ * default.
+ */
+ void setUseReadOperation(boolean useReadOperation);
/**
* Sets all configuration properties retrieved from the specified
Added: mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java?rev=594477&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java Tue
Nov 13 02:42:03 2007
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.common;
+
+/**
+ * An [EMAIL PROTECTED] IoFuture} for [EMAIL PROTECTED] IoSession#read()
asynchronous read requests}.
+ *
+ * <h3>Example</h3>
+ * <pre>
+ * IoSession session = ...;
+ * // useReadOperation must be enabled to use read operation.
+ * session.getConfig().setUseReadOperation(true);
+ *
+ * ReadFuture future = session.read();
+ * // Wait until a message is received.
+ * future.await();
+ * try {
+ * Object message = future.getMessage();
+ * } catch (Exception e) {
+ * ...
+ * }
+ * </pre>
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public interface ReadFuture extends IoFuture {
+
+ /**
+ * Returns the received message. It returns <tt>null</tt> if this
+ * future is not ready or the associated [EMAIL PROTECTED] IoSession} has
been closed.
+ *
+ * @throws RuntimeException if read or any relevant operation has failed.
+ */
+ Object getMessage();
+
+ /**
+ * Returns <tt>true</tt> if a message was received successfully.
+ */
+ boolean isRead();
+
+ /**
+ * Returns <tt>true</tt> if the [EMAIL PROTECTED] IoSession} associated
with this
+ * future has been closed.
+ */
+ boolean isClosed();
+
+ /**
+ * Returns the cause of the read failure if and only if the read
+ * operation has failed due to an [EMAIL PROTECTED] Exception}. Otherwise,
+ * <tt>null</tt> is returned.
+ */
+ Throwable getException();
+
+ /**
+ * Sets the message is written, and notifies all threads waiting for
+ * this future. This method is invoked by MINA internally. Please do
+ * not call this method directly.
+ */
+ void setRead(Object message);
+
+ /**
+ * Sets the associated [EMAIL PROTECTED] IoSession} is closed. This
method is invoked
+ * by MINA internally. Please do not call this method directly.
+ */
+ void setClosed();
+
+ /**
+ * Sets the cause of the read failure, and notifies all threads waiting
+ * for this future. This method is invoked by MINA internally. Please
+ * do not call this method directly.
+ */
+ void setException(Throwable cause);
+
+ ReadFuture await() throws InterruptedException;
+
+ ReadFuture awaitUninterruptibly();
+
+ ReadFuture addListener(IoFutureListener<?> listener);
+
+ ReadFuture removeListener(IoFutureListener<?> listener);
+}
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java
------------------------------------------------------------------------------
svn:keywords = Rev Date