Author: trustin
Date: Mon Mar 10 02:04:45 2008
New Revision: 635494
URL: http://svn.apache.org/viewvc?rev=635494&view=rev
Log:
Resolved issue: DIRMINA-543 (Incorrect sessionCreated event order in VmPipe
transport)
* Made sure no other event can be fired while sessionCreated() is invoked.
Added workaround for SUN Java Compiler in CircularQueue
Modified:
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java
mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
mina/trunk/core/src/test/java/org/apache/mina/util/CircularQueueTest.java
Modified:
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
---
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
(original)
+++
mina/branches/1.0/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
Mon Mar 10 02:04:45 2008
@@ -65,29 +65,28 @@
}
private void fireEvent(Event e) {
- IoSession session = getSession();
+ VmPipeSessionImpl session = (VmPipeSessionImpl) getSession();
EventType type = e.getType();
Object data = e.getData();
if (type == EventType.RECEIVED) {
- VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- if (sessionOpened.get() && s.getTrafficMask().isReadable() &&
s.getLock().tryLock()) {
+ if (sessionOpened.get() && session.getTrafficMask().isReadable()
&& session.getLock().tryLock()) {
try {
int byteCount = 1;
if (data instanceof ByteBuffer) {
byteCount = ((ByteBuffer) data).remaining();
}
- s.increaseReadBytes(byteCount);
+ session.increaseReadBytes(byteCount);
- super.fireMessageReceived(s, data);
+ super.fireMessageReceived(session, data);
} finally {
- s.getLock().unlock();
+ session.getLock().unlock();
}
- flushPendingDataQueues(s);
+ flushPendingDataQueues(session);
} else {
- s.pendingDataQueue.add(data);
+ session.pendingDataQueue.add(data);
}
} else if (type == EventType.WRITE) {
super.fireFilterWrite(session, (WriteRequest) data);
@@ -101,7 +100,12 @@
super.fireSessionOpened(session);
sessionOpened.set(true);
} else if (type == EventType.CREATED) {
- super.fireSessionCreated(session);
+ session.getLock().lock();
+ try {
+ super.fireSessionCreated(session);
+ } finally {
+ session.getLock().unlock();
+ }
} else if (type == EventType.CLOSED) {
super.fireSessionClosed(session);
} else if (type == EventType.CLOSE) {
Modified:
mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
URL:
http://svn.apache.org/viewvc/mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
---
mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
(original)
+++
mina/branches/1.0/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
Mon Mar 10 02:04:45 2008
@@ -19,9 +19,13 @@
*/
package org.apache.mina.transport.vmpipe;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoConnector;
@@ -143,5 +147,50 @@
}
Assert.assertEquals("ABC", actual.toString());
+ }
+
+ public void testSessionCreated() throws Exception {
+ final Semaphore semaphore = new Semaphore(0);
+ final StringBuffer stringBuffer = new StringBuffer();
+ VmPipeAcceptor vmPipeAcceptor = new VmPipeAcceptor();
+ vmPipeAcceptor.getDefaultConfig().setThreadModel(ThreadModel.MANUAL);
+ final VmPipeAddress vmPipeAddress = new VmPipeAddress(12345);
+ vmPipeAcceptor.bind(vmPipeAddress, new IoHandlerAdapter() {
+ public void sessionCreated(IoSession session) throws Exception {
+ // pretend we are doing some time-consuming work. For
+ // performance reasons, you would never want to do time
+ // consuming work in sessionCreated.
+ // However, this increases the likelihood of the timing bug.
+ Thread.sleep(1000);
+ stringBuffer.append("A");
+ }
+
+ public void sessionOpened(IoSession session) throws Exception {
+ stringBuffer.append("B");
+ }
+
+ public void messageReceived(IoSession session, Object message)
+ throws Exception {
+ stringBuffer.append("C");
+ }
+
+ public void sessionClosed(IoSession session) throws Exception {
+ stringBuffer.append("D");
+ semaphore.release();
+ }
+ });
+
+ final VmPipeConnector vmPipeConnector = new VmPipeConnector();
+ ConnectFuture connectFuture = vmPipeConnector.connect(vmPipeAddress,
new IoHandlerAdapter() {
+ public void sessionOpened(IoSession session) throws Exception {
+ session.write(ByteBuffer.wrap(new byte[1]));
+ }
+ });
+
+ connectFuture.join();
+ connectFuture.getSession().close();
+ semaphore.tryAcquire(1, TimeUnit.SECONDS);
+ vmPipeAcceptor.unbind(vmPipeAddress);
+ Assert.assertEquals("ABCD", stringBuffer.toString());
}
}
Modified:
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
---
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
(original)
+++
mina/branches/1.1/core/src/main/java/org/apache/mina/transport/vmpipe/support/VmPipeFilterChain.java
Mon Mar 10 02:04:45 2008
@@ -66,30 +66,28 @@
}
private void fireEvent(Event e) {
- IoSession session = getSession();
+ VmPipeSessionImpl session = (VmPipeSessionImpl) getSession();
EventType type = e.getType();
Object data = e.getData();
if (type == EventType.RECEIVED) {
- VmPipeSessionImpl s = (VmPipeSessionImpl) session;
-
- if( sessionOpened && s.getTrafficMask().isReadable() &&
s.getLock().tryLock()) {
+ if( sessionOpened && session.getTrafficMask().isReadable() &&
session.getLock().tryLock()) {
try {
int byteCount = 1;
if (data instanceof ByteBuffer) {
byteCount = ((ByteBuffer) data).remaining();
}
- s.increaseReadBytes(byteCount);
+ session.increaseReadBytes(byteCount);
- super.fireMessageReceived(s, data);
+ super.fireMessageReceived(session, data);
} finally {
- s.getLock().unlock();
+ session.getLock().unlock();
}
- flushPendingDataQueues( s );
+ flushPendingDataQueues( session );
} else {
- s.pendingDataQueue.add(data);
+ session.pendingDataQueue.add(data);
}
} else if (type == EventType.WRITE) {
super.fireFilterWrite(session, (WriteRequest) data);
@@ -103,7 +101,12 @@
super.fireSessionOpened(session);
sessionOpened = true;
} else if (type == EventType.CREATED) {
- super.fireSessionCreated(session);
+ session.getLock().lock();
+ try {
+ super.fireSessionCreated(session);
+ } finally {
+ session.getLock().unlock();
+ }
} else if (type == EventType.CLOSED) {
super.fireSessionClosed(session);
} else if (type == EventType.CLOSE) {
Modified:
mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
URL:
http://svn.apache.org/viewvc/mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
---
mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
(original)
+++
mina/branches/1.1/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
Mon Mar 10 02:04:45 2008
@@ -19,9 +19,13 @@
*/
package org.apache.mina.transport.vmpipe;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
import junit.framework.Assert;
import junit.framework.TestCase;
+import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoAcceptor;
import org.apache.mina.common.IoConnector;
@@ -143,5 +147,56 @@
}
Assert.assertEquals("ABC", actual.toString());
+ }
+
+
+ public void testSessionCreated() throws Exception {
+ final Semaphore semaphore = new Semaphore(0);
+ final StringBuffer stringBuffer = new StringBuffer();
+ VmPipeAcceptor vmPipeAcceptor = new VmPipeAcceptor();
+ vmPipeAcceptor.getDefaultConfig().setThreadModel(ThreadModel.MANUAL);
+ final VmPipeAddress vmPipeAddress = new VmPipeAddress(12345);
+ vmPipeAcceptor.bind(vmPipeAddress, new IoHandlerAdapter() {
+ @Override
+ public void sessionCreated(IoSession session) throws Exception {
+ // pretend we are doing some time-consuming work. For
+ // performance reasons, you would never want to do time
+ // consuming work in sessionCreated.
+ // However, this increases the likelihood of the timing bug.
+ Thread.sleep(1000);
+ stringBuffer.append("A");
+ }
+
+ @Override
+ public void sessionOpened(IoSession session) throws Exception {
+ stringBuffer.append("B");
+ }
+
+ @Override
+ public void messageReceived(IoSession session, Object message)
+ throws Exception {
+ stringBuffer.append("C");
+ }
+
+ @Override
+ public void sessionClosed(IoSession session) throws Exception {
+ stringBuffer.append("D");
+ semaphore.release();
+ }
+ });
+
+ final VmPipeConnector vmPipeConnector = new VmPipeConnector();
+ ConnectFuture connectFuture = vmPipeConnector.connect(vmPipeAddress,
new IoHandlerAdapter() {
+ @Override
+ public void sessionOpened(IoSession session) throws Exception {
+ session.write(ByteBuffer.wrap(new byte[1]));
+ }
+ });
+
+ connectFuture.join();
+ connectFuture.getSession().close();
+ semaphore.tryAcquire(1, TimeUnit.SECONDS);
+ vmPipeAcceptor.unbind(vmPipeAddress);
+ Assert.assertEquals("ABCD", stringBuffer.toString());
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
Mon Mar 10 02:04:45 2008
@@ -31,7 +31,6 @@
import org.apache.mina.common.IoEvent;
import org.apache.mina.common.IoEventType;
import org.apache.mina.common.IoProcessor;
-import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteRequest;
import org.apache.mina.common.WriteRequestQueue;
import org.apache.mina.common.WriteToClosedSessionException;
@@ -77,24 +76,23 @@
}
private void fireEvent(IoEvent e) {
- IoSession session = getSession();
+ VmPipeSessionImpl session = (VmPipeSessionImpl) getSession();
IoEventType type = e.getType();
Object data = e.getParameter();
if (type == IoEventType.MESSAGE_RECEIVED) {
- VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- if (sessionOpened && s.getTrafficMask().isReadable() &&
s.getLock().tryLock()) {
+ if (sessionOpened && session.getTrafficMask().isReadable() &&
session.getLock().tryLock()) {
try {
- if (!s.getTrafficMask().isReadable()) {
- s.receivedMessageQueue.add(data);
+ if (!session.getTrafficMask().isReadable()) {
+ session.receivedMessageQueue.add(data);
} else {
super.fireMessageReceived(data);
}
} finally {
- s.getLock().unlock();
+ session.getLock().unlock();
}
} else {
- s.receivedMessageQueue.add(data);
+ session.receivedMessageQueue.add(data);
}
} else if (type == IoEventType.WRITE) {
super.fireFilterWrite((WriteRequest) data);
@@ -108,7 +106,12 @@
super.fireSessionOpened();
sessionOpened = true;
} else if (type == IoEventType.SESSION_CREATED) {
- super.fireSessionCreated();
+ session.getLock().lock();
+ try {
+ super.fireSessionCreated();
+ } finally {
+ session.getLock().unlock();
+ }
} else if (type == IoEventType.SESSION_CLOSED) {
super.fireSessionClosed();
} else if (type == IoEventType.CLOSE) {
@@ -173,32 +176,19 @@
return;
}
if (session.isConnected()) {
- if (session.getLock().tryLock()) {
- try {
- WriteRequest req;
- while ((req = queue.poll(session)) != null) {
- Object message = req.getMessage();
- Object messageCopy = message;
- if (message instanceof IoBuffer) {
- IoBuffer rb = (IoBuffer) message;
- rb.mark();
- IoBuffer wb =
IoBuffer.allocate(rb.remaining());
- wb.put(rb);
- wb.flip();
- rb.reset();
- messageCopy = wb;
- }
-
-
session.getRemoteSession().getFilterChain().fireMessageReceived(
- messageCopy);
- session.getFilterChain().fireMessageSent(req);
- }
- } finally {
- session.getLock().unlock();
+ session.getLock().lock();
+ try {
+ WriteRequest req;
+ while ((req = queue.poll(session)) != null) {
+
session.getRemoteSession().getFilterChain().fireMessageReceived(
+ getMessageCopy(req.getMessage()));
+ session.getFilterChain().fireMessageSent(req);
}
-
- flushPendingDataQueues(session);
+ } finally {
+ session.getLock().unlock();
}
+
+ flushPendingDataQueues(session);
} else {
List<WriteRequest> failedRequests = new
ArrayList<WriteRequest>();
WriteRequest req;
@@ -214,6 +204,20 @@
session.getFilterChain().fireExceptionCaught(cause);
}
}
+ }
+
+ private Object getMessageCopy(Object message) {
+ Object messageCopy = message;
+ if (message instanceof IoBuffer) {
+ IoBuffer rb = (IoBuffer) message;
+ rb.mark();
+ IoBuffer wb = IoBuffer.allocate(rb.remaining());
+ wb.put(rb);
+ wb.flip();
+ rb.reset();
+ messageCopy = wb;
+ }
+ return messageCopy;
}
public void remove(VmPipeSessionImpl session) {
Modified: mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/util/CircularQueue.java Mon
Mar 10 02:04:45 2008
@@ -39,7 +39,10 @@
private static final int DEFAULT_CAPACITY = 4;
private final int initialCapacity;
- private Object[] items;
+ // XXX: This volatile keyword here is a workaround for SUN Java Compiler
bug,
+ // which produces buggy byte code. I don't event know why adding a
volatile
+ // fixes the problem. Eclipse Java Compiler seems to produce correct
byte code.
+ private volatile Object[] items;
private int mask;
private int first = 0;
private int last = 0;
Modified:
mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
---
mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
(original)
+++
mina/trunk/core/src/test/java/org/apache/mina/transport/vmpipe/VmPipeEventOrderTest.java
Mon Mar 10 02:04:45 2008
@@ -19,14 +19,19 @@
*/
package org.apache.mina.transport.vmpipe;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoAcceptor;
+import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.executor.ExecutorFilter;
/**
* Makes sure if the order of event is correct.
@@ -152,5 +157,56 @@
}
Assert.assertEquals("ABC", actual.toString());
+ }
+
+ public void testSessionCreated() throws Exception {
+ final Semaphore semaphore = new Semaphore(0);
+ final StringBuffer stringBuffer = new StringBuffer();
+ VmPipeAcceptor vmPipeAcceptor = new VmPipeAcceptor();
+ final VmPipeAddress vmPipeAddress = new VmPipeAddress(12345);
+ vmPipeAcceptor.setHandler(new IoHandlerAdapter() {
+ @Override
+ public void sessionCreated(IoSession session) throws Exception {
+ // pretend we are doing some time-consuming work. For
+ // performance reasons, you would never want to do time
+ // consuming work in sessionCreated.
+ // However, this increases the likelihood of the timing bug.
+ Thread.sleep(1000);
+ stringBuffer.append("A");
+ }
+
+ @Override
+ public void sessionOpened(IoSession session) throws Exception {
+ stringBuffer.append("B");
+ }
+
+ @Override
+ public void messageReceived(IoSession session, Object message)
+ throws Exception {
+ stringBuffer.append("C");
+ }
+
+ @Override
+ public void sessionClosed(IoSession session) throws Exception {
+ stringBuffer.append("D");
+ semaphore.release();
+ }
+ });
+ vmPipeAcceptor.bind(vmPipeAddress);
+
+ final VmPipeConnector vmPipeConnector = new VmPipeConnector();
+ vmPipeConnector.getFilterChain().addLast("executor", new
ExecutorFilter());
+ vmPipeConnector.setHandler(new IoHandlerAdapter() {
+ @Override
+ public void sessionOpened(IoSession session) throws Exception {
+ session.write(IoBuffer.wrap(new byte[1]));
+ }
+ });
+ ConnectFuture connectFuture = vmPipeConnector.connect(vmPipeAddress);
+ connectFuture.awaitUninterruptibly();
+ connectFuture.getSession().close();
+ semaphore.tryAcquire(1, TimeUnit.SECONDS);
+ vmPipeAcceptor.unbind(vmPipeAddress);
+ Assert.assertEquals("ABCD", stringBuffer.toString());
}
}
Modified:
mina/trunk/core/src/test/java/org/apache/mina/util/CircularQueueTest.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/util/CircularQueueTest.java?rev=635494&r1=635493&r2=635494&view=diff
==============================================================================
--- mina/trunk/core/src/test/java/org/apache/mina/util/CircularQueueTest.java
(original)
+++ mina/trunk/core/src/test/java/org/apache/mina/util/CircularQueueTest.java
Mon Mar 10 02:04:45 2008
@@ -31,9 +31,8 @@
* @version $Rev$, $Date$
*/
public class CircularQueueTest extends TestCase {
- private int pushCount;
-
- private int popCount;
+ private volatile int pushCount;
+ private volatile int popCount;
public void setUp() {
pushCount = 0;