http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index e7582f2..ada2a70 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -40,9 +40,23 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction; +import org.apache.activemq.artemis.core.remoting.CloseListener; +import org.apache.activemq.artemis.core.remoting.FailureListener; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.Acceptor; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.BrokerInfo; @@ -77,20 +91,7 @@ import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.WireFormatInfo; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction; -import org.apache.activemq.artemis.core.remoting.CloseListener; -import org.apache.activemq.artemis.core.remoting.FailureListener; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.spi.core.remoting.Acceptor; -import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.state.CommandVisitor; import org.apache.activemq.state.ConnectionState; import org.apache.activemq.state.ConsumerState; @@ -100,7 +101,6 @@ import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transport.TransmitCallback; import org.apache.activemq.util.ByteSequence; -import org.apache.activemq.artemis.utils.ConcurrentHashSet; import org.apache.activemq.wireformat.WireFormat; /** @@ -176,8 +176,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor { private final Set<String> tempQueues = new ConcurrentHashSet<String>(); - private DataInputWrapper dataInput = new DataInputWrapper(); - private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId, TransactionInfo>(); private volatile AMQSession advisorySession; @@ -196,96 +194,78 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor { @Override public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) { try { - dataInput.receiveData(buffer); - } - catch (Throwable t) { - ActiveMQServerLogger.LOGGER.error("decoding error", t); - return; - } - - // this.setDataReceived(); - while (dataInput.readable()) { - try { - Object object = null; - try { - object = wireFormat.unmarshal(dataInput); - dataInput.mark(); - } - catch (NotEnoughBytesException e) { - //meaning the dataInput hasn't enough bytes for a command. - //in that case we just return and waiting for the next - //call of bufferReceived() - return; + Object object = wireFormat.unmarshal(buffer); + + Command command = (Command) object; + boolean responseRequired = command.isResponseRequired(); + int commandId = command.getCommandId(); + // the connection handles pings, negotiations directly. + // and delegate all other commands to manager. + if (command.getClass() == KeepAliveInfo.class) { + KeepAliveInfo info = (KeepAliveInfo) command; + if (info.isResponseRequired()) { + info.setResponseRequired(false); + protocolManager.sendReply(this, info); } + } + else if (command.getClass() == WireFormatInfo.class) { + // amq here starts a read/write monitor thread (detect ttl?) + negotiate((WireFormatInfo) command); + } + else if (command.getClass() == ConnectionInfo.class || command.getClass() == ConsumerInfo.class || command.getClass() == RemoveInfo.class || + command.getClass() == SessionInfo.class || command.getClass() == ProducerInfo.class || ActiveMQMessage.class.isAssignableFrom(command.getClass()) || + command.getClass() == MessageAck.class || command.getClass() == TransactionInfo.class || command.getClass() == DestinationInfo.class || + command.getClass() == ShutdownInfo.class || command.getClass() == RemoveSubscriptionInfo.class) { + Response response = null; - Command command = (Command) object; - boolean responseRequired = command.isResponseRequired(); - int commandId = command.getCommandId(); - // the connection handles pings, negotiations directly. - // and delegate all other commands to manager. - if (command.getClass() == KeepAliveInfo.class) { - KeepAliveInfo info = (KeepAliveInfo) command; - if (info.isResponseRequired()) { - info.setResponseRequired(false); - protocolManager.sendReply(this, info); - } - } - else if (command.getClass() == WireFormatInfo.class) { - // amq here starts a read/write monitor thread (detect ttl?) - negotiate((WireFormatInfo) command); + if (pendingStop) { + response = new ExceptionResponse(this.stopError); } - else if (command.getClass() == ConnectionInfo.class || command.getClass() == ConsumerInfo.class || command.getClass() == RemoveInfo.class || command.getClass() == SessionInfo.class || command.getClass() == ProducerInfo.class || ActiveMQMessage.class.isAssignableFrom(command.getClass()) || command.getClass() == MessageAck.class || command.getClass() == TransactionInfo.class || command.getClass() == DestinationInfo.class || command.getClass() == ShutdownInfo.class || command.getClass() == RemoveSubscriptionInfo.class) { - Response response = null; - - if (pendingStop) { - response = new ExceptionResponse(this.stopError); - } - else { - response = ((Command) command).visit(this); - - if (response instanceof ExceptionResponse) { - if (!responseRequired) { - Throwable cause = ((ExceptionResponse) response).getException(); - serviceException(cause); - response = null; - } - } - } - - if (responseRequired) { - if (response == null) { - response = new Response(); - } - } + else { + response = ((Command) command).visit(this); - // The context may have been flagged so that the response is not - // sent. - if (context != null) { - if (context.isDontSendReponse()) { - context.setDontSendReponse(false); + if (response instanceof ExceptionResponse) { + if (!responseRequired) { + Throwable cause = ((ExceptionResponse) response).getException(); + serviceException(cause); response = null; } } + } - if (response != null && !protocolManager.isStopping()) { - response.setCorrelationId(commandId); - dispatchSync(response); + if (responseRequired) { + if (response == null) { + response = new Response(); } + } + // The context may have been flagged so that the response is not + // sent. + if (context != null) { + if (context.isDontSendReponse()) { + context.setDontSendReponse(false); + response = null; + } } - else { - // note!!! wait for negotiation (e.g. use a countdown latch) - // before handling any other commands - this.protocolManager.handleCommand(this, command); + + if (response != null && !protocolManager.isStopping()) { + response.setCorrelationId(commandId); + dispatchSync(response); } + } - catch (IOException e) { - ActiveMQServerLogger.LOGGER.error("error decoding", e); - } - catch (Throwable t) { - ActiveMQServerLogger.LOGGER.error("error decoding", t); + else { + // note!!! wait for negotiation (e.g. use a countdown latch) + // before handling any other commands + this.protocolManager.handleCommand(this, command); } } + catch (IOException e) { + ActiveMQServerLogger.LOGGER.error("error decoding", e); + } + catch (Throwable t) { + ActiveMQServerLogger.LOGGER.error("error decoding", t); + } } private void negotiate(WireFormatInfo command) throws IOException { @@ -624,6 +604,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor { public void serviceExceptionAsync(final IOException e) { if (asyncException.compareAndSet(false, true)) { + // Why this is not through an executor? new Thread("Async Exception Handler") { @Override public void run() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 260ee02..90518ec 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.core.protocol.openwire; +import javax.jms.JMSException; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; @@ -23,13 +24,19 @@ import java.io.IOException; import java.io.InputStream; import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.Map.Entry; - -import javax.jms.JMSException; +import java.util.Set; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; +import org.apache.activemq.artemis.core.server.ServerMessage; +import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; +import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; +import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.activemq.artemis.utils.TypedProperties; +import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.command.ActiveMQBytesMessage; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMapMessage; @@ -51,14 +58,6 @@ import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.MarshallingSupport; import org.apache.activemq.wireformat.WireFormat; import org.fusesource.hawtbuf.UTF8Buffer; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; -import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; -import org.apache.activemq.artemis.utils.DataConstants; -import org.apache.activemq.artemis.utils.TypedProperties; -import org.apache.activemq.artemis.utils.UUIDGenerator; public class OpenWireMessageConverter implements MessageConverter { @@ -429,7 +428,7 @@ public class OpenWireMessageConverter implements MessageConverter { } amqMsg.setBrokerInTime(brokerInTime); - ActiveMQBuffer buffer = coreMessage.getBodyBuffer(); + ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy(); if (buffer != null) { buffer.resetReaderIndex(); byte[] bytes = null; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 10d67a1..8c20c46 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -31,18 +31,38 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledExecutorService; import io.netty.channel.ChannelPipeline; +import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.BaseInterceptor; import org.apache.activemq.artemis.api.core.Interceptor; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; -import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.api.core.management.ManagementHelper; +import org.apache.activemq.artemis.core.io.IOCallback; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; +import org.apache.activemq.artemis.core.security.CheckType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.management.ManagementService; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationListener; +import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; +import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; +import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; +import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.spi.core.remoting.Acceptor; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; +import org.apache.activemq.artemis.utils.DataConstants; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; @@ -66,26 +86,8 @@ import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionInfo; import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.XATransactionId; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession; -import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection; -import org.apache.activemq.artemis.core.security.CheckType; -import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormatFactory; -import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; -import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; -import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager; -import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; -import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; -import org.apache.activemq.artemis.spi.core.remoting.Acceptor; -import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager; import org.apache.activemq.state.ConnectionState; import org.apache.activemq.state.ProducerState; import org.apache.activemq.state.SessionState; @@ -183,8 +185,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No @Override public void addChannelHandlers(ChannelPipeline pipeline) { - // TODO Auto-generated method stub - + // each read will have a full packet with this + pipeline.addLast("packet-decipher", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, DataConstants.SIZE_INT)); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 741c32b..12ddb94 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -275,12 +275,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { return HandleStatus.BUSY; } - // TODO - https://jira.jboss.org/browse/HORNETQ-533 - // if (!writeReady.get()) - // { - // return HandleStatus.BUSY; - // } - synchronized (lock) { // If the consumer is stopped then we don't accept the message, it // should go back into the http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 83c94ee..faa947e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -27,18 +27,18 @@ import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; import javax.jms.TextMessage; +import java.util.concurrent.TimeUnit; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; -import java.util.concurrent.TimeUnit; - public class SimpleOpenWireTest extends BasicOpenWireTest { @Rule @@ -300,6 +300,42 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } } + @Test + public void testFailoverTransportReconnect() throws Exception { + Connection exConn = null; + + try { + String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + ")"; + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString); + + Queue queue = new ActiveMQQueue(durableQueueName); + + exConn = exFact.createConnection(); + exConn.start(); + + Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer messageProducer = session.createProducer(queue); + messageProducer.send(session.createTextMessage("Test")); + + MessageConsumer consumer = session.createConsumer(queue); + assertNotNull(consumer.receive(5000)); + + server.stop(); + Thread.sleep(3000); + + server.start(); + server.waitForActivation(10, TimeUnit.SECONDS); + + messageProducer.send(session.createTextMessage("Test2")); + assertNotNull(consumer.receive(5000)); + } + finally { + if (exConn != null) { + exConn.close(); + } + } + } + /** * This is the example shipped with the distribution * @@ -309,41 +345,30 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { public void testOpenWireExample() throws Exception { Connection exConn = null; + SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue"); + this.server.createQueue(durableQueue, durableQueue, null, true, false); + try { - String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true"; - ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString); + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(); - // Step 2. Perfom a lookup on the queue Queue queue = new ActiveMQQueue(durableQueueName); - // Step 4.Create a JMS Connection exConn = exFact.createConnection(); - // Step 10. Start the Connection exConn.start(); - // Step 5. Create a JMS Session Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); - // Step 6. Create a JMS Message Producer MessageProducer producer = session.createProducer(queue); - // Step 7. Create a Text Message TextMessage message = session.createTextMessage("This is a text message"); - //System.out.println("Sent message: " + message.getText()); - - // Step 8. Send the Message producer.send(message); - // Step 9. Create a JMS Message Consumer MessageConsumer messageConsumer = session.createConsumer(queue); - // Step 11. Receive the message TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000); - System.out.println("Received message: " + messageReceived); - assertEquals("This is a text message", messageReceived.getText()); } finally { @@ -354,39 +379,88 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { } + + /** + * This is the example shipped with the distribution + * + * @throws Exception + */ @Test - public void testFailoverTransportReconnect() throws Exception { + public void testMultipleConsumers() throws Exception { Connection exConn = null; + SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue"); + this.server.createQueue(durableQueue, durableQueue, null, true, false); + try { - String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + ")"; - ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString); + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(); Queue queue = new ActiveMQQueue(durableQueueName); exConn = exFact.createConnection(); + exConn.start(); Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer messageProducer = session.createProducer(queue); - messageProducer.send(session.createTextMessage("Test")); - MessageConsumer consumer = session.createConsumer(queue); - assertNotNull(consumer.receive(5000)); + MessageProducer producer = session.createProducer(queue); - server.stop(); - Thread.sleep(3000); + TextMessage message = session.createTextMessage("This is a text message"); - server.start(); - server.waitForActivation(10, TimeUnit.SECONDS); + producer.send(message); - messageProducer.send(session.createTextMessage("Test2")); - assertNotNull(consumer.receive(5000)); + MessageConsumer messageConsumer = session.createConsumer(queue); + + TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000); + + assertEquals("This is a text message", messageReceived.getText()); } finally { if (exConn != null) { exConn.close(); } } + } + + @Test + public void testMixedOpenWireExample() throws Exception { + Connection openConn = null; + + SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue"); + this.server.createQueue(durableQueue, durableQueue, null, true, false); + + ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory(); + + Queue queue = new ActiveMQQueue("exampleQueue"); + + openConn = openCF.createConnection(); + + openConn.start(); + + Session openSession = openConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = openSession.createProducer(queue); + + TextMessage message = openSession.createTextMessage("This is a text message"); + + producer.send(message); + + org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory artemisCF = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(); + + Connection artemisConn = artemisCF.createConnection(); + Session artemisSession = artemisConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + artemisConn.start(); + MessageConsumer messageConsumer = artemisSession.createConsumer(artemisSession.createQueue("exampleQueue")); + + TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000); + + assertEquals("This is a text message", messageReceived.getText()); + + openConn.close(); + artemisConn.close(); + + } + + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java new file mode 100644 index 0000000..8d315d3 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.openwire; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.Test; + +/** This is useful to debug connection ordering. There's only one connection being made from these tests */ +public class VerySimpleOenwireTest extends OpenWireTestBase { + + /** + * This is the example shipped with the distribution + * + * @throws Exception + */ + @Test + public void testOpenWireExample() throws Exception { + Connection exConn = null; + + SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue"); + this.server.createQueue(durableQueue, durableQueue, null, true, false); + + try { + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(); + + Queue queue = new ActiveMQQueue("exampleQueue"); + + exConn = exFact.createConnection(); + + exConn.start(); + + Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + + TextMessage message = session.createTextMessage("This is a text message"); + + producer.send(message); + + MessageConsumer messageConsumer = session.createConsumer(queue); + + TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000); + + assertEquals("This is a text message", messageReceived.getText()); + } + finally { + if (exConn != null) { + exConn.close(); + } + } + + } + + @Test + public void testMixedOpenWireExample() throws Exception { + Connection openConn = null; + + SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue"); + this.server.createQueue(durableQueue, durableQueue, null, true, false); + + ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory(); + + Queue queue = new ActiveMQQueue("exampleQueue"); + + openConn = openCF.createConnection(); + + openConn.start(); + + Session openSession = openConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = openSession.createProducer(queue); + + TextMessage message = openSession.createTextMessage("This is a text message"); + + producer.send(message); + + org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory artemisCF = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(); + + Connection artemisConn = artemisCF.createConnection(); + Session artemisSession = artemisConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + artemisConn.start(); + MessageConsumer messageConsumer = artemisSession.createConsumer(artemisSession.createQueue("exampleQueue")); + + TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000); + + assertEquals("This is a text message", messageReceived.getText()); + + openConn.close(); + artemisConn.close(); + + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java index a770183..321fda7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java @@ -16,6 +16,10 @@ */ package org.apache.activemq.artemis.tests.integration.transports.netty; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; @@ -24,10 +28,6 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Assert; import org.junit.Test; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; - public class ActiveMQFrameDecoder2Test extends ActiveMQTestBase { private static final int MSG_CNT = 10000;