Author: tabish
Date: Mon Oct 17 14:33:41 2011
New Revision: 1185210
URL: http://svn.apache.org/viewvc?rev=1185210&view=rev
Log:
Push enough data through the socket so that the socket close gets detected.
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java?rev=1185210&r1=1185209&r2=1185210&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/SoWriteTimeoutTest.java
Mon Oct 17 14:33:41 2011
@@ -42,10 +42,10 @@ import org.slf4j.LoggerFactory;
public class SoWriteTimeoutTest extends JmsTestSupport {
private static final Logger LOG =
LoggerFactory.getLogger(SoWriteTimeoutTest.class);
-
+
final int receiveBufferSize = 16*1024;
public String brokerTransportScheme = "nio";
-
+
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
broker.addConnector(brokerTransportScheme +
"://localhost:0?transport.soWriteTimeout=1000&transport.sleep=1000&socketBufferSize="+
receiveBufferSize);
@@ -54,30 +54,30 @@ public class SoWriteTimeoutTest extends
}
return broker;
}
-
+
public void initCombosForTestWriteTimeout() {
addCombinationValues("brokerTransportScheme", new Object[]{"tcp",
"nio"});
}
-
+
public void testWriteTimeout() throws Exception {
-
+
Destination dest = new ActiveMQQueue("testWriteTimeout");
messageTextPrefix = initMessagePrefix(8*1024);
sendMessages(dest, 500);
-
+
URI tcpBrokerUri =
URISupport.removeQuery(broker.getTransportConnectors().get(0).getConnectUri());
LOG.info("consuming using uri: " + tcpBrokerUri);
-
+
SocketProxy proxy = new SocketProxy();
proxy.setTarget(tcpBrokerUri);
proxy.setReceiveBufferSize(receiveBufferSize);
proxy.open();
-
+
ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(proxy.getUrl());
Connection c = factory.createConnection();
c.start();
Session session = c.createSession(true, Session.SESSION_TRANSACTED);
- MessageConsumer consumer = session.createConsumer(dest);
+ MessageConsumer consumer = session.createConsumer(dest);
proxy.pause();
// writes should back up... writeTimeout will kick in a abort the
connection
TimeUnit.SECONDS.sleep(10);
@@ -89,23 +89,24 @@ public class SoWriteTimeoutTest extends
} catch (JMSException expected) {
}
}
-
+
public void testWriteTimeoutStompNio() throws Exception {
ActiveMQQueue dest = new ActiveMQQueue("testWriteTimeout");
messageTextPrefix = initMessagePrefix(8*1024);
sendMessages(dest, 500);
-
+
URI stompBrokerUri =
URISupport.removeQuery(broker.getTransportConnectors().get(1).getConnectUri());
LOG.info("consuming using uri: " + stompBrokerUri);
-
+
SocketProxy proxy = new SocketProxy();
proxy.setTarget(new URI("tcp://localhost:" +
stompBrokerUri.getPort()));
proxy.setReceiveBufferSize(receiveBufferSize);
proxy.open();
-
+
StompConnection stompConnection = new StompConnection();
stompConnection.open(new Socket("localhost",
proxy.getUrl().getPort()));
-
+ stompConnection.getStompSocket().setTcpNoDelay(true);
+
String frame = "CONNECT\n" + "login: system\n" + "passcode:
manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
@@ -113,31 +114,31 @@ public class SoWriteTimeoutTest extends
frame = "SUBSCRIBE\n" + "destination:/queue/" + dest.getQueueName() +
"\n" + "ack:client\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
-
+
// ensure dispatch has started before pause
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
-
+
proxy.pause();
-
+
// writes should back up... writeTimeout will kick in a abort the
connection
TimeUnit.SECONDS.sleep(1);
// see the blocked threads
//dumpAllThreads("blocked on write");
-
+
// abort should be done after this
TimeUnit.SECONDS.sleep(10);
proxy.goOn();
-
+
// get a buffered message
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
-
+
// verify connection is dead
try {
- for (int i=0; i<100; i++) {
+ for (int i=0; i<200; i++) {
stompConnection.send("/queue/" + dest.getPhysicalName(),
"ShouldBeDeadConnectionText" + i);
}
fail("expected send to fail with timeout out connection");
@@ -145,7 +146,7 @@ public class SoWriteTimeoutTest extends
LOG.info("got exception on send after timeout: " + expected);
}
}
-
+
private String initMessagePrefix(int i) {
byte[] content = new byte[i];
return new String(content);