Author: trustin
Date: Mon Mar 10 02:04:45 2008
New Revision: 635494

URL: http://svn.apache.org/viewvc?rev=635494&view=rev
Log:
Resolved issue: DIRMINA-543 (Incorrect sessionCreated event order in VmPipe 
transport)
* Made sure no other event can be fired while sessionCreated() is invoked.

Added workaround for SUN Java Compiler in CircularQueue


Modified:
    
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
    
mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
    
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
    
mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
    
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
    mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java
    
mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
    mina/trunk/core/src/test/java/org/apache/mina/util/CircularQueueTest.java

Modified: 
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: 
http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
--- 
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
 (original)
+++ 
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
 Mon Mar 10 02:04:45 2008
@@ -65,29 +65,28 @@
     }
 
     private void fireEvent(Event e) {
-        IoSession session = getSession();
+        VmPipeSessionImpl session = (VmPipeSessionImpl) getSession();
         EventType type = e.getType();
         Object data = e.getData();
 
         if (type == EventType.RECEIVED) {
-            VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-            if (sessionOpened.get() && s.getTrafficMask().isReadable() && 
s.getLock().tryLock()) {
+            if (sessionOpened.get() && session.getTrafficMask().isReadable() 
&& session.getLock().tryLock()) {
                 try {
                     int byteCount = 1;
                     if (data instanceof ByteBuffer) {
                         byteCount = ((ByteBuffer) data).remaining();
                     }
 
-                    s.increaseReadBytes(byteCount);
+                    session.increaseReadBytes(byteCount);
 
-                    super.fireMessageReceived(s, data);
+                    super.fireMessageReceived(session, data);
                 } finally {
-                    s.getLock().unlock();
+                    session.getLock().unlock();
                 }
                 
-                flushPendingDataQueues(s);
+                flushPendingDataQueues(session);
             } else {
-                s.pendingDataQueue.add(data);
+                session.pendingDataQueue.add(data);
             }
         } else if (type == EventType.WRITE) {
             super.fireFilterWrite(session, (WriteRequest) data);
@@ -101,7 +100,12 @@
             super.fireSessionOpened(session);
             sessionOpened.set(true);
         } else if (type == EventType.CREATED) {
-            super.fireSessionCreated(session);
+            session.getLock().lock();
+            try {
+                super.fireSessionCreated(session);
+            } finally {
+                session.getLock().unlock();
+            }
         } else if (type == EventType.CLOSED) {
             super.fireSessionClosed(session);
         } else if (type == EventType.CLOSE) {

Modified: 
mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
URL: 
http://svn.apache.org/viewvc/mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
--- 
mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
 (original)
+++ 
mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
 Mon Mar 10 02:04:45 2008
@@ -19,9 +19,13 @@
  */
 package org.apache.mina.transport.vmpipe;
 
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
+import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.IoConnector;
@@ -143,5 +147,50 @@
         }
 
         Assert.assertEquals("ABC", actual.toString());
+    }
+    
+    public void testSessionCreated() throws Exception {
+        final Semaphore semaphore = new Semaphore(0);
+        final StringBuffer stringBuffer = new StringBuffer();
+        VmPipeAcceptor vmPipeAcceptor = new VmPipeAcceptor();
+        vmPipeAcceptor.getDefaultConfig().setThreadModel(ThreadModel.MANUAL);
+        final VmPipeAddress vmPipeAddress = new VmPipeAddress(12345);
+        vmPipeAcceptor.bind(vmPipeAddress, new IoHandlerAdapter() {
+            public void sessionCreated(IoSession session) throws Exception {
+                // pretend we are doing some time-consuming work. For
+                // performance reasons, you would never want to do time
+                // consuming work in sessionCreated.
+                // However, this increases the likelihood of the timing bug.
+                Thread.sleep(1000);
+                stringBuffer.append("A");
+            }
+
+            public void sessionOpened(IoSession session) throws Exception {
+                stringBuffer.append("B");
+            }
+
+            public void messageReceived(IoSession session, Object message)
+                    throws Exception {
+                stringBuffer.append("C");
+            }
+            
+            public void sessionClosed(IoSession session) throws Exception {
+                stringBuffer.append("D");
+                semaphore.release();
+            }
+        });
+
+        final VmPipeConnector vmPipeConnector = new VmPipeConnector();
+        ConnectFuture connectFuture = vmPipeConnector.connect(vmPipeAddress, 
new IoHandlerAdapter() {
+            public void sessionOpened(IoSession session) throws Exception {
+                session.write(ByteBuffer.wrap(new byte[1]));
+            }
+        });
+
+        connectFuture.join();
+        connectFuture.getSession().close();
+        semaphore.tryAcquire(1, TimeUnit.SECONDS);
+        vmPipeAcceptor.unbind(vmPipeAddress);
+        Assert.assertEquals("ABCD", stringBuffer.toString());
     }
 }

Modified: 
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL: 
http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
--- 
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
 (original)
+++ 
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
 Mon Mar 10 02:04:45 2008
@@ -66,30 +66,28 @@
     }
 
     private void fireEvent(Event e) {
-        IoSession session = getSession();
+        VmPipeSessionImpl session = (VmPipeSessionImpl) getSession();
         EventType type = e.getType();
         Object data = e.getData();
 
         if (type == EventType.RECEIVED) {
-            VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-
-            if( sessionOpened && s.getTrafficMask().isReadable() && 
s.getLock().tryLock()) {
+            if( sessionOpened && session.getTrafficMask().isReadable() && 
session.getLock().tryLock()) {
                 try {
                     int byteCount = 1;
                     if (data instanceof ByteBuffer) {
                         byteCount = ((ByteBuffer) data).remaining();
                     }
 
-                    s.increaseReadBytes(byteCount);
+                    session.increaseReadBytes(byteCount);
 
-                    super.fireMessageReceived(s, data);
+                    super.fireMessageReceived(session, data);
                 } finally {
-                    s.getLock().unlock();
+                    session.getLock().unlock();
                 }
 
-                flushPendingDataQueues( s );
+                flushPendingDataQueues( session );
             } else {
-                s.pendingDataQueue.add(data);
+                session.pendingDataQueue.add(data);
             }
         } else if (type == EventType.WRITE) {
             super.fireFilterWrite(session, (WriteRequest) data);
@@ -103,7 +101,12 @@
             super.fireSessionOpened(session);
             sessionOpened = true;
         } else if (type == EventType.CREATED) {
-            super.fireSessionCreated(session);
+            session.getLock().lock();
+            try {
+                super.fireSessionCreated(session);
+            } finally {
+                session.getLock().unlock();
+            }
         } else if (type == EventType.CLOSED) {
             super.fireSessionClosed(session);
         } else if (type == EventType.CLOSE) {

Modified: 
mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
URL: 
http://svn.apache.org/viewvc/mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
--- 
mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
 (original)
+++ 
mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
 Mon Mar 10 02:04:45 2008
@@ -19,9 +19,13 @@
  */
 package org.apache.mina.transport.vmpipe;
 
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
+import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoAcceptor;
 import org.apache.mina.common.IoConnector;
@@ -143,5 +147,56 @@
         }
 
         Assert.assertEquals("ABC", actual.toString());
+    }
+    
+
+    public void testSessionCreated() throws Exception {
+        final Semaphore semaphore = new Semaphore(0);
+        final StringBuffer stringBuffer = new StringBuffer();
+        VmPipeAcceptor vmPipeAcceptor = new VmPipeAcceptor();
+        vmPipeAcceptor.getDefaultConfig().setThreadModel(ThreadModel.MANUAL);
+        final VmPipeAddress vmPipeAddress = new VmPipeAddress(12345);
+        vmPipeAcceptor.bind(vmPipeAddress, new IoHandlerAdapter() {
+            @Override
+            public void sessionCreated(IoSession session) throws Exception {
+                // pretend we are doing some time-consuming work. For
+                // performance reasons, you would never want to do time
+                // consuming work in sessionCreated.
+                // However, this increases the likelihood of the timing bug.
+                Thread.sleep(1000);
+                stringBuffer.append("A");
+            }
+
+            @Override
+            public void sessionOpened(IoSession session) throws Exception {
+                stringBuffer.append("B");
+            }
+
+            @Override
+            public void messageReceived(IoSession session, Object message)
+                    throws Exception {
+                stringBuffer.append("C");
+            }
+            
+            @Override
+            public void sessionClosed(IoSession session) throws Exception {
+                stringBuffer.append("D");
+                semaphore.release();
+            }
+        });
+
+        final VmPipeConnector vmPipeConnector = new VmPipeConnector();
+        ConnectFuture connectFuture = vmPipeConnector.connect(vmPipeAddress, 
new IoHandlerAdapter() {
+            @Override
+            public void sessionOpened(IoSession session) throws Exception {
+                session.write(ByteBuffer.wrap(new byte[1]));
+            }
+        });
+
+        connectFuture.join();
+        connectFuture.getSession().close();
+        semaphore.tryAcquire(1, TimeUnit.SECONDS);
+        vmPipeAcceptor.unbind(vmPipeAddress);
+        Assert.assertEquals("ABCD", stringBuffer.toString());
     }
 }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
 Mon Mar 10 02:04:45 2008
@@ -31,7 +31,6 @@
 import org.apache.mina.common.IoEvent;
 import org.apache.mina.common.IoEventType;
 import org.apache.mina.common.IoProcessor;
-import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteRequest;
 import org.apache.mina.common.WriteRequestQueue;
 import org.apache.mina.common.WriteToClosedSessionException;
@@ -77,24 +76,23 @@
     }
 
     private void fireEvent(IoEvent e) {
-        IoSession session = getSession();
+        VmPipeSessionImpl session = (VmPipeSessionImpl) getSession();
         IoEventType type = e.getType();
         Object data = e.getParameter();
 
         if (type == IoEventType.MESSAGE_RECEIVED) {
-            VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-            if (sessionOpened && s.getTrafficMask().isReadable() && 
s.getLock().tryLock()) {
+            if (sessionOpened && session.getTrafficMask().isReadable() && 
session.getLock().tryLock()) {
                 try {
-                    if (!s.getTrafficMask().isReadable()) {
-                        s.receivedMessageQueue.add(data);
+                    if (!session.getTrafficMask().isReadable()) {
+                        session.receivedMessageQueue.add(data);
                     } else {
                         super.fireMessageReceived(data);
                     }
                 } finally {
-                    s.getLock().unlock();
+                    session.getLock().unlock();
                 }
             } else {
-                s.receivedMessageQueue.add(data);
+                session.receivedMessageQueue.add(data);
             }
         } else if (type == IoEventType.WRITE) {
             super.fireFilterWrite((WriteRequest) data);
@@ -108,7 +106,12 @@
             super.fireSessionOpened();
             sessionOpened = true;
         } else if (type == IoEventType.SESSION_CREATED) {
-            super.fireSessionCreated();
+            session.getLock().lock();
+            try {
+                super.fireSessionCreated();
+            } finally {
+                session.getLock().unlock();
+            }
         } else if (type == IoEventType.SESSION_CLOSED) {
             super.fireSessionClosed();
         } else if (type == IoEventType.CLOSE) {
@@ -173,32 +176,19 @@
                 return;
             }
             if (session.isConnected()) {
-                if (session.getLock().tryLock()) {
-                    try {
-                        WriteRequest req;
-                        while ((req = queue.poll(session)) != null) {
-                            Object message = req.getMessage();
-                            Object messageCopy = message;
-                            if (message instanceof IoBuffer) {
-                                IoBuffer rb = (IoBuffer) message;
-                                rb.mark();
-                                IoBuffer wb = 
IoBuffer.allocate(rb.remaining());
-                                wb.put(rb);
-                                wb.flip();
-                                rb.reset();
-                                messageCopy = wb;
-                            }
-
-                            
session.getRemoteSession().getFilterChain().fireMessageReceived(
-                                    messageCopy);
-                            session.getFilterChain().fireMessageSent(req);
-                        }
-                    } finally {
-                        session.getLock().unlock();
+                session.getLock().lock();
+                try {
+                    WriteRequest req;
+                    while ((req = queue.poll(session)) != null) {
+                        
session.getRemoteSession().getFilterChain().fireMessageReceived(
+                                getMessageCopy(req.getMessage()));
+                        session.getFilterChain().fireMessageSent(req);
                     }
-
-                    flushPendingDataQueues(session);
+                } finally {
+                    session.getLock().unlock();
                 }
+
+                flushPendingDataQueues(session);
             } else {
                 List<WriteRequest> failedRequests = new 
ArrayList<WriteRequest>();
                 WriteRequest req;
@@ -214,6 +204,20 @@
                     session.getFilterChain().fireExceptionCaught(cause);
                 }
             }
+        }
+
+        private Object getMessageCopy(Object message) {
+            Object messageCopy = message;
+            if (message instanceof IoBuffer) {
+                IoBuffer rb = (IoBuffer) message;
+                rb.mark();
+                IoBuffer wb = IoBuffer.allocate(rb.remaining());
+                wb.put(rb);
+                wb.flip();
+                rb.reset();
+                messageCopy = wb;
+            }
+            return messageCopy;
         }
 
         public void remove(VmPipeSessionImpl session) {

Modified: mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java 
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java Mon 
Mar 10 02:04:45 2008
@@ -39,7 +39,10 @@
     private static final int DEFAULT_CAPACITY = 4;
 
     private final int initialCapacity;
-    private Object[] items;
+    // XXX: This volatile keyword here is a workaround for SUN Java Compiler 
bug,
+    //      which produces buggy byte code.  I don't event know why adding a 
volatile
+    //      fixes the problem.  Eclipse Java Compiler seems to produce correct 
byte code.
+    private volatile Object[] items;
     private int mask;
     private int first = 0;
     private int last = 0;

Modified: 
mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
--- 
mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
 (original)
+++ 
mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
 Mon Mar 10 02:04:45 2008
@@ -19,14 +19,19 @@
  */
 package org.apache.mina.transport.vmpipe;
 
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoBuffer;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.executor.ExecutorFilter;
 
 /**
  * Makes sure if the order of event is correct.
@@ -152,5 +157,56 @@
         }
 
         Assert.assertEquals("ABC", actual.toString());
+    }
+
+    public void testSessionCreated() throws Exception {
+        final Semaphore semaphore = new Semaphore(0);
+        final StringBuffer stringBuffer = new StringBuffer();
+        VmPipeAcceptor vmPipeAcceptor = new VmPipeAcceptor();
+        final VmPipeAddress vmPipeAddress = new VmPipeAddress(12345);
+        vmPipeAcceptor.setHandler(new IoHandlerAdapter() {
+            @Override
+            public void sessionCreated(IoSession session) throws Exception {
+                // pretend we are doing some time-consuming work. For
+                // performance reasons, you would never want to do time
+                // consuming work in sessionCreated.
+                // However, this increases the likelihood of the timing bug.
+                Thread.sleep(1000);
+                stringBuffer.append("A");
+            }
+
+            @Override
+            public void sessionOpened(IoSession session) throws Exception {
+                stringBuffer.append("B");
+            }
+
+            @Override
+            public void messageReceived(IoSession session, Object message)
+                    throws Exception {
+                stringBuffer.append("C");
+            }
+            
+            @Override
+            public void sessionClosed(IoSession session) throws Exception {
+                stringBuffer.append("D");
+                semaphore.release();
+            }
+        });
+        vmPipeAcceptor.bind(vmPipeAddress);
+
+        final VmPipeConnector vmPipeConnector = new VmPipeConnector();
+        vmPipeConnector.getFilterChain().addLast("executor", new 
ExecutorFilter());
+        vmPipeConnector.setHandler(new IoHandlerAdapter() {
+            @Override
+            public void sessionOpened(IoSession session) throws Exception {
+                session.write(IoBuffer.wrap(new byte[1]));
+            }
+        });
+        ConnectFuture connectFuture = vmPipeConnector.connect(vmPipeAddress);
+        connectFuture.awaitUninterruptibly();
+        connectFuture.getSession().close();
+        semaphore.tryAcquire(1, TimeUnit.SECONDS);
+        vmPipeAcceptor.unbind(vmPipeAddress);
+        Assert.assertEquals("ABCD", stringBuffer.toString());
     }
 }

Modified: 
mina/trunk/core/src/test/java/org/apache/mina/util/CircularQueueTest.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/util/CircularQueueTest.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/util/CircularQueueTest.java 
(original)
+++ mina/trunk/core/src/test/java/org/apache/mina/util/CircularQueueTest.java 
Mon Mar 10 02:04:45 2008
@@ -31,9 +31,8 @@
  * @version $Rev$, $Date$
  */
 public class CircularQueueTest extends TestCase {
-    private int pushCount;
-
-    private int popCount;
+    private volatile int pushCount;
+    private volatile int popCount;
 
     public void setUp() {
         pushCount = 0;


Reply via email to