Repository: activemq Updated Branches: refs/heads/trunk b97fa15d5 -> da63f3f20
Fix for https://issues.apache.org/jira/browse/AMQ-5042. Handles receiving multiple frames at once from an nio channel Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/da63f3f2 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/da63f3f2 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/da63f3f2 Branch: refs/heads/trunk Commit: da63f3f20a348b29b43ef84bb7a4f6b02d2cd35c Parents: b97fa15 Author: Kevin Earls <[email protected]> Authored: Tue Feb 18 10:52:37 2014 +0100 Committer: Kevin Earls <[email protected]> Committed: Tue Feb 18 10:52:37 2014 +0100 ---------------------------------------------------------------------- .../transport/amqp/AmqpNioTransport.java | 28 ++++- .../transport/amqp/JMSClientNioTest.java | 112 ++----------------- .../activemq/transport/amqp/JMSClientTest.java | 25 ++++- 3 files changed, 52 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/da63f3f2/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java index 665bf88..dfb5f60 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpNioTransport.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.transport.amqp; +import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; @@ -29,7 +30,6 @@ import java.nio.channels.SocketChannel; import javax.net.SocketFactory; -import org.apache.activemq.transport.nio.NIOInputStream; import org.apache.activemq.transport.nio.NIOOutputStream; import org.apache.activemq.transport.nio.SelectorManager; import org.apache.activemq.transport.nio.SelectorSelection; @@ -38,11 +38,15 @@ import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtbuf.Buffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * An implementation of the {@link org.apache.activemq.transport.Transport} interface for using AMQP over NIO */ public class AmqpNioTransport extends TcpTransport { + private DataInputStream amqpHeaderValue = new DataInputStream(new ByteArrayInputStream(new byte[]{'A', 'M', 'Q', 'P'})); + private final Integer AMQP_HEADER_VALUE = amqpHeaderValue.readInt(); private SocketChannel channel; private SelectorSelection selection; @@ -120,10 +124,28 @@ public class AmqpNioTransport extends TcpTransport { } } - doConsume(AmqpSupport.toBuffer(inputBuffer)); + while(inputBuffer.position() < inputBuffer.limit()) { + inputBuffer.mark(); + int commandSize = inputBuffer.getInt(); + inputBuffer.reset(); + + // handles buffers starting with 'A','M','Q','P' rather than size + if (commandSize == AMQP_HEADER_VALUE) { + doConsume(AmqpSupport.toBuffer(inputBuffer)); + break; + } + + byte[] bytes = new byte[commandSize]; + ByteBuffer commandBuffer = ByteBuffer.allocate(commandSize); + inputBuffer.get(bytes, 0, commandSize); + commandBuffer.put(bytes); + commandBuffer.flip(); + doConsume(AmqpSupport.toBuffer(commandBuffer)); + commandBuffer.clear(); + } + // clear the buffer inputBuffer.clear(); - } } catch (IOException e) { onException(e); http://git-wip-us.apache.org/repos/asf/activemq/blob/da63f3f2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java index 42420e2..9004aa0 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientNioTest.java @@ -18,117 +18,19 @@ package org.apache.activemq.transport.amqp; import javax.jms.JMSException; +import org.junit.Ignore; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; /** * Test the JMS client when connected to the NIO transport. */ public class JMSClientNioTest extends JMSClientTest { - - @Override - @Test - public void testProducerConsume() throws Exception { - } - - @Override - @Test - public void testTransactedConsumer() throws Exception { - } - - @Override - @Test - public void testRollbackRececeivedMessage() throws Exception { - } - - @Override - @Test - public void testTXConsumerAndLargeNumberOfMessages() throws Exception { - } - - @Override - @Test - public void testSelectors() throws Exception { - } - - @Override - @Test(timeout=30000) - public void testProducerThrowsWhenBrokerStops() throws Exception { - } - - @Override - @Test(timeout=30000) - public void testProducerCreateThrowsWhenBrokerStops() throws Exception { - } - - @Override - @Test(timeout=30000) - public void testConsumerCreateThrowsWhenBrokerStops() throws Exception { - } - - @Override - @Test(timeout=30000) - public void testConsumerReceiveNoWaitThrowsWhenBrokerStops() throws Exception { - } - - @Override - @Test(timeout=30000) - public void testConsumerReceiveTimedThrowsWhenBrokerStops() throws Exception { - } - - @Override - @Test(timeout=30000) - public void testConsumerReceiveReturnsBrokerStops() throws Exception { - } - - @Override - @Test(timeout=30000) - public void testBrokerRestartWontHangConnectionClose() throws Exception { - } - - @Override - @Test(timeout=120000) - public void testProduceAndConsumeLargeNumbersOfMessages() throws JMSException { - } - - @Override - @Test(timeout=30000) - public void testSyncSends() throws Exception { - } - - @Override - @Test(timeout=30000) - public void testDurableConsumerAsync() throws Exception { - } - - @Override - @Test(timeout=30000) - public void testDurableConsumerSync() throws Exception { - } - - @Override - @Test(timeout=30000) - public void testTopicConsumerAsync() throws Exception { - } - - @Override - @Test(timeout=45000) - public void testTopicConsumerSync() throws Exception { - } - - @Override - @Test(timeout=60000) - public void testConnectionsAreClosed() throws Exception { - } - - @Override - @Test(timeout=30000) - public void testExecptionListenerCalledOnBrokerStop() throws Exception { - } - - @Override - @Test(timeout=30000) - public void testSessionTransactedCommit() throws JMSException, InterruptedException { - } + protected static final Logger LOG = LoggerFactory.getLogger(JMSClientNioTest.class); @Override protected int getBrokerPort() { http://git-wip-us.apache.org/repos/asf/activemq/blob/da63f3f2/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index cb18f73..27719f9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -1,4 +1,4 @@ -/** +/** >>>>>> pumping * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -47,23 +47,36 @@ import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin; import org.apache.activemq.util.Wait; import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; import org.objectweb.jtests.jms.framework.TestConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class JMSClientTest extends AmqpTestSupport { - + protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class); @Rule public TestName name = new TestName(); + java.util.logging.Logger frameLoggger = java.util.logging.Logger.getLogger("FRM"); + @Override @Before public void setUp() throws Exception { - LOG.debug("Starting test {}", name.getMethodName()); + LOG.debug("in setUp of {}", name.getMethodName()); super.setUp(); } + @Override + @After + public void tearDown() throws Exception { + LOG.debug("in tearDown of {}", name.getMethodName()); + super.tearDown(); + Thread.sleep(500); + } + @SuppressWarnings("rawtypes") @Test(timeout=30000) public void testProducerConsume() throws Exception { @@ -169,7 +182,7 @@ public class JMSClientTest extends AmqpTestSupport { connection.close(); } - @Test(timeout=30000) + @Test(timeout=60000) public void testTXConsumerAndLargeNumberOfMessages() throws Exception { ActiveMQAdmin.enableJMSFrameTracing(); @@ -760,7 +773,9 @@ public class JMSClientTest extends AmqpTestSupport { private Connection createConnection(String clientId, boolean syncPublish) throws JMSException { - final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", getBrokerPort(), "admin", "password"); + int brokerPort = getBrokerPort(); + LOG.debug("Creating connection on port {}", brokerPort); + final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", brokerPort, "admin", "password"); factory.setSyncPublish(syncPublish); factory.setTopicPrefix("topic://");
