Author: trustin
Date: Tue Nov 13 02:42:03 2007
New Revision: 594477

URL: http://svn.apache.org/viewvc?rev=594477&view=rev
Log:
Resolved issue: DIRMINA-375 (Synchronous Client API)
* Added IoSessionConfig.useReadOperation
* Added IoSession.read() - only usable when useReadOperation is true
* Added ReadFuture and its default implementation

Added:
    mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java 
  (with props)
    mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java   
(with props)
Modified:
    
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
    mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java
    
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
    mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java?rev=594477&r1=594476&r2=594477&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoConnector.java 
Tue Nov 13 02:42:03 2007
@@ -82,7 +82,39 @@
         }
 
         if (getHandler() == null) {
-            throw new IllegalStateException("handler is not set.");
+            if (getSessionConfig().isUseReadOperation()) {
+                setHandler(new IoHandler() {
+                    public void exceptionCaught(IoSession session,
+                            Throwable cause) throws Exception {
+                    }
+
+                    public void messageReceived(IoSession session,
+                            Object message) throws Exception {
+                    }
+
+                    public void messageSent(IoSession session, Object message)
+                            throws Exception {
+                    }
+
+                    public void sessionClosed(IoSession session)
+                            throws Exception {
+                    }
+
+                    public void sessionCreated(IoSession session)
+                            throws Exception {
+                    }
+
+                    public void sessionIdle(IoSession session, IdleStatus 
status)
+                            throws Exception {
+                    }
+
+                    public void sessionOpened(IoSession session)
+                            throws Exception {
+                    }
+                });
+            } else {
+                throw new IllegalStateException("handler is not set.");
+            }
         }
 
         return doConnect(remoteAddress, localAddress);

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=594477&r1=594476&r2=594477&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java 
Tue Nov 13 02:42:03 2007
@@ -24,11 +24,14 @@
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.channels.FileChannel;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.mina.util.CircularQueue;
+
 
 /**
  * Base implementation of [EMAIL PROTECTED] IoSession}.
@@ -38,6 +41,11 @@
  */
 public abstract class AbstractIoSession implements IoSession {
 
+    private static final AttributeKey READY_READ_FUTURES =
+        new AttributeKey(AbstractIoSession.class, "readyReadFutures");
+    private static final AttributeKey WAITING_READ_FUTURES =
+        new AttributeKey(AbstractIoSession.class, "waitingReadFutures");
+    
     private static final IoFutureListener<CloseFuture> 
SCHEDULED_COUNTER_RESETTER =
         new IoFutureListener<CloseFuture>() {
             public void operationComplete(CloseFuture future) {
@@ -152,6 +160,85 @@
         getWriteRequestQueue().offer(this, CLOSE_REQUEST);
         getProcessor().flush(this);
         return closeFuture;
+    }
+
+    public ReadFuture read() {
+        if (!getConfig().isUseReadOperation()) {
+            throw new IllegalStateException("useReadOperation is not 
enabled.");
+        }
+        
+        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
+        ReadFuture future;
+        synchronized (readyReadFutures) {
+            future = readyReadFutures.poll();
+            if (future != null) {
+                if (future.isClosed()) {
+                    // Let other readers get notified.
+                    readyReadFutures.offer(future);
+                }
+            } else {
+                future = new DefaultReadFuture(this);
+                getWaitingReadFutures().offer(future);
+            }
+        }
+        
+        return future;
+    }
+    
+    protected void offerReadFuture(Object message) {
+        newReadFuture().setRead(message);
+    }
+
+    protected void offerFailedReadFuture(Throwable exception) {
+        newReadFuture().setException(exception);
+    }
+
+    protected void offerClosedReadFuture() {
+        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
+        synchronized (readyReadFutures) {
+            newReadFuture().setClosed();
+        }
+    }
+
+    private ReadFuture newReadFuture() {
+        Queue<ReadFuture> readyReadFutures = getReadyReadFutures();
+        Queue<ReadFuture> waitingReadFutures = getWaitingReadFutures();
+        ReadFuture future;
+        synchronized (readyReadFutures) {
+            future = waitingReadFutures.poll();
+            if (future == null) {
+                future = new DefaultReadFuture(this);
+                readyReadFutures.offer(future);
+            }
+        }
+        return future;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Queue<ReadFuture> getReadyReadFutures() {
+        Queue<ReadFuture> readyReadFutures = 
+            (Queue<ReadFuture>) getAttribute(READY_READ_FUTURES);
+        if (readyReadFutures == null) {
+            readyReadFutures = new CircularQueue<ReadFuture>();
+            
+            Queue<ReadFuture> oldReadyReadFutures =
+                (Queue<ReadFuture>) setAttributeIfAbsent(
+                        READY_READ_FUTURES, readyReadFutures);
+            if (oldReadyReadFutures != null) {
+                readyReadFutures = oldReadyReadFutures;
+            }
+            
+            // Initialize waitingReadFutures together.
+            Queue<ReadFuture> waitingReadFutures = 
+                new CircularQueue<ReadFuture>();
+            setAttributeIfAbsent(WAITING_READ_FUTURES, waitingReadFutures);
+        }
+        return readyReadFutures;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Queue<ReadFuture> getWaitingReadFutures() {
+        return (Queue<ReadFuture>) getAttribute(WAITING_READ_FUTURES);
     }
 
     public WriteFuture write(Object message) {

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java?rev=594477&r1=594476&r2=594477&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSessionConfig.java
 Tue Nov 13 02:42:03 2007
@@ -35,6 +35,7 @@
     private int idleTimeForWrite;
     private int idleTimeForBoth;
     private int writeTimeout;
+    private boolean useReadOperation;
 
     protected AbstractIoSessionConfig() {
     }
@@ -51,6 +52,7 @@
         setIdleTime(IdleStatus.READER_IDLE, 
config.getIdleTime(IdleStatus.READER_IDLE));
         setIdleTime(IdleStatus.WRITER_IDLE, 
config.getIdleTime(IdleStatus.WRITER_IDLE));
         setWriteTimeout(config.getWriteTimeout());
+        setUseReadOperation(config.isUseReadOperation());
 
         doSetAll(config);
     }
@@ -153,5 +155,13 @@
                     + writeTimeout);
         }
         this.writeTimeout = writeTimeout;
+    }
+
+    public boolean isUseReadOperation() {
+        return useReadOperation;
+    }
+
+    public void setUseReadOperation(boolean useReadOperation) {
+        this.useReadOperation = useReadOperation;
     }
 }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=594477&r1=594476&r2=594477&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java 
(original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java 
Tue Nov 13 02:42:03 2007
@@ -681,17 +681,24 @@
         @Override
         public void sessionClosed(NextFilter nextFilter, IoSession session)
                 throws Exception {
+            AbstractIoSession s = (AbstractIoSession) session;
             try {
-                session.getHandler().sessionClosed(session);
+                s.getHandler().sessionClosed(session);
             } finally {
                 try {
-                    ((AbstractIoSession) 
session).getWriteRequestQueue().dispose(session);
+                    s.getWriteRequestQueue().dispose(session);
                 } finally {
                     try {
-                        ((AbstractIoSession) 
session).getAttributeMap().dispose(session);
+                        s.getAttributeMap().dispose(session);
                     } finally {
-                        // Remove all filters.
-                        session.getFilterChain().clear();
+                        try {
+                            // Remove all filters.
+                            session.getFilterChain().clear();
+                        } finally {
+                            if (s.getConfig().isUseReadOperation()) {
+                                s.offerClosedReadFuture();
+                            }
+                        }
                     }
                 }
             }
@@ -706,14 +713,28 @@
         @Override
         public void exceptionCaught(NextFilter nextFilter, IoSession session,
                 Throwable cause) throws Exception {
-            session.getHandler().exceptionCaught(session, cause);
+            AbstractIoSession s = (AbstractIoSession) session;
+            try {
+                s.getHandler().exceptionCaught(s, cause);
+            } finally {
+                if (s.getConfig().isUseReadOperation()) {
+                    s.offerFailedReadFuture(cause);
+                }
+            }
         }
 
         @Override
         public void messageReceived(NextFilter nextFilter, IoSession session,
                 Object message) throws Exception {
-            ((AbstractIoSession) session).increaseReadMessages();
-            session.getHandler().messageReceived(session, message);
+            AbstractIoSession s = (AbstractIoSession) session;
+            s.increaseReadMessages();
+            try {
+                session.getHandler().messageReceived(s, message);
+            } finally {
+                if (s.getConfig().isUseReadOperation()) {
+                    s.offerReadFuture(message);
+                }
+            }
         }
 
         @Override

Added: 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java?rev=594477&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java 
(added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java 
Tue Nov 13 02:42:03 2007
@@ -0,0 +1,129 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.common;
+
+import java.io.IOException;
+
+
+/**
+ * A default implementation of [EMAIL PROTECTED] WriteFuture}.
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public class DefaultReadFuture extends DefaultIoFuture implements ReadFuture {
+    
+    private static final Object CLOSED = new Object();
+    
+    /**
+     * Creates a new instance.
+     */
+    public DefaultReadFuture(IoSession session) {
+        super(session);
+    }
+    
+    public Object getMessage() {
+        if (isReady()) {
+            Object v = getValue();
+            if (v == CLOSED) {
+                return null;
+            }
+            
+            if (v instanceof Throwable) {
+                if (v instanceof RuntimeException) {
+                    throw (RuntimeException) v;
+                }
+                if (v instanceof Error) {
+                    throw (Error) v;
+                }
+                if (v instanceof IOException) {
+                    throw new RuntimeIoException(
+                            "Failed to read.", (IOException) v);
+                }
+                if (v instanceof Exception) {
+                    throw new RuntimeException(
+                            "Failed to read.", (Exception) v);
+                }
+            }
+            
+            return v;
+        }
+
+        return null;
+    }
+
+
+    public boolean isRead() {
+        if (isReady()) {
+            Object v = getValue();
+            return (v != CLOSED && !(v instanceof Throwable));
+        }
+        return false;
+    }
+    
+    public boolean isClosed() {
+        if (isReady()) {
+            return getValue() == CLOSED;
+        }
+        return false;
+    }
+
+    public Throwable getException() {
+        if (isReady()) {
+            Object v = getValue();
+            if (v instanceof Throwable) {
+                return (Throwable) v;
+            }
+        }
+        return null;
+    }
+
+    public void setClosed() {
+        setValue(CLOSED);
+    }
+
+    public void setRead(Object message) {
+        setValue(message);
+    }
+
+    public void setException(Throwable cause) {
+        setValue(cause);
+    }
+
+    @Override
+    public ReadFuture await() throws InterruptedException {
+        return (ReadFuture) super.await();
+    }
+
+    @Override
+    public ReadFuture awaitUninterruptibly() {
+        return (ReadFuture) super.awaitUninterruptibly();
+    }
+
+    @Override
+    public ReadFuture addListener(IoFutureListener<?> listener) {
+        return (ReadFuture) super.addListener(listener);
+    }
+
+    @Override
+    public ReadFuture removeListener(IoFutureListener<?> listener) {
+        return (ReadFuture) super.removeListener(listener);
+    }
+}

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultReadFuture.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java?rev=594477&r1=594476&r2=594477&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSession.java Tue Nov 
13 02:42:03 2007
@@ -78,6 +78,14 @@
      * Returns the [EMAIL PROTECTED] TransportMetadata} that this session runs 
on.
      */
     TransportMetadata getTransportMetadata();
+    
+    /**
+     * 
+     * @throws IllegalStateException if
+     * [EMAIL PROTECTED] IoSessionConfig#setUseReadOperation(boolean) 
useReadOperation}
+     * option has not been enabled.
+     */
+    ReadFuture read();
 
     /**
      * Writes the specified <code>message</code> to remote peer.  This

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java?rev=594477&r1=594476&r2=594477&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoSessionConfig.java 
Tue Nov 13 02:42:03 2007
@@ -19,6 +19,8 @@
  */
 package org.apache.mina.common;
 
+import java.util.concurrent.BlockingQueue;
+
 /**
  * The configuration of [EMAIL PROTECTED] IoSession}.
  *
@@ -98,6 +100,26 @@
      * Sets write timeout in seconds.
      */
     void setWriteTimeout(int writeTimeout);
+    
+    /**
+     * Returns <tt>true</tt> if and only if [EMAIL PROTECTED] 
IoSession#read()} operation
+     * is enabled.  If enabled, all received messages are stored in an internal
+     * [EMAIL PROTECTED] BlockingQueue} so you can read received messages in 
more
+     * convenient way for client applications.  Enabling this option is not
+     * useful to server applications and can cause unintended memory leak, and
+     * therefore it's disabled by default.
+     */
+    boolean isUseReadOperation();
+    
+    /**
+     * Enables or disabled [EMAIL PROTECTED] IoSession#read()} operation.  If 
enabled, all
+     * received messages are stored in an internal [EMAIL PROTECTED] 
BlockingQueue} so you
+     * can read received messages in more convenient way for client
+     * applications.  Enabling this option is not useful to server applications
+     * and can cause unintended memory leak, and therefore it's disabled by
+     * default.
+     */
+    void setUseReadOperation(boolean useReadOperation);
 
     /**
      * Sets all configuration properties retrieved from the specified

Added: mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java?rev=594477&view=auto
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java (added)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java Tue 
Nov 13 02:42:03 2007
@@ -0,0 +1,99 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.common;
+
+/**
+ * An [EMAIL PROTECTED] IoFuture} for [EMAIL PROTECTED] IoSession#read() 
asynchronous read requests}. 
+ *
+ * <h3>Example</h3>
+ * <pre>
+ * IoSession session = ...;
+ * // useReadOperation must be enabled to use read operation.
+ * session.getConfig().setUseReadOperation(true);
+ * 
+ * ReadFuture future = session.read();
+ * // Wait until a message is received.
+ * future.await();
+ * try {
+ *     Object message = future.getMessage();
+ * } catch (Exception e) {
+ *     ...
+ * }
+ * </pre>
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ */
+public interface ReadFuture extends IoFuture {
+    
+    /**
+     * Returns the received message.  It returns <tt>null</tt> if this
+     * future is not ready or the associated [EMAIL PROTECTED] IoSession} has 
been closed. 
+     * 
+     * @throws RuntimeException if read or any relevant operation has failed.
+     */
+    Object getMessage();
+    
+    /**
+     * Returns <tt>true</tt> if a message was received successfully.
+     */
+    boolean isRead();
+    
+    /**
+     * Returns <tt>true</tt> if the [EMAIL PROTECTED] IoSession} associated 
with this
+     * future has been closed.
+     */
+    boolean isClosed();
+    
+    /**
+     * Returns the cause of the read failure if and only if the read
+     * operation has failed due to an [EMAIL PROTECTED] Exception}.  Otherwise,
+     * <tt>null</tt> is returned.
+     */
+    Throwable getException();
+
+    /**
+     * Sets the message is written, and notifies all threads waiting for
+     * this future.  This method is invoked by MINA internally.  Please do
+     * not call this method directly.
+     */
+    void setRead(Object message);
+    
+    /**
+     * Sets the associated [EMAIL PROTECTED] IoSession} is closed.  This 
method is invoked
+     * by MINA internally.  Please do not call this method directly.
+     */
+    void setClosed();
+    
+    /**
+     * Sets the cause of the read failure, and notifies all threads waiting
+     * for this future.  This method is invoked by MINA internally.  Please
+     * do not call this method directly.
+     */
+    void setException(Throwable cause);
+
+    ReadFuture await() throws InterruptedException;
+
+    ReadFuture awaitUninterruptibly();
+
+    ReadFuture addListener(IoFutureListener<?> listener);
+
+    ReadFuture removeListener(IoFutureListener<?> listener);
+}

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: mina/trunk/core/src/main/java/org/apache/mina/common/ReadFuture.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to