ARTEMIS-1711 - Fix openwire exlusive divert Fixing the failure on send from an OpenWire producer when an exclusive divert exists
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8e9ee808 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8e9ee808 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8e9ee808 Branch: refs/heads/master Commit: 8e9ee808921f8c9d6a9cae7bfc8c4ddf73020be4 Parents: d61f058 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Wed Feb 28 09:30:36 2018 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Wed Feb 28 17:35:56 2018 -0500 ---------------------------------------------------------------------- .../core/protocol/openwire/amq/AMQSession.java | 22 ++---- .../openwire/OpenWireDivertExclusiveTest.java | 70 ++++++++++++++++++-- .../OpenWireDivertNonExclusiveTest.java | 65 ++++++++++++++++++ 3 files changed, 136 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9ee808/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index aae1a53..fecb5a1 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -16,14 +16,17 @@ */ package org.apache.activemq.artemis.core.protocol.openwire.amq; -import javax.jms.InvalidDestinationException; -import javax.jms.ResourceAllocationException; +import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD; + import java.io.IOException; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.InvalidDestinationException; +import javax.jms.ResourceAllocationException; + import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.RoutingType; @@ -31,7 +34,6 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools; import org.apache.activemq.artemis.core.paging.PagingStore; -import org.apache.activemq.artemis.core.postoffice.RoutingStatus; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager; @@ -62,8 +64,6 @@ import org.apache.activemq.command.SessionInfo; import org.apache.activemq.openwire.OpenWireFormat; import org.jboss.logging.Logger; -import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD; - public class AMQSession implements SessionCallback { private final Logger logger = Logger.getLogger(AMQSession.class); @@ -422,10 +422,7 @@ public class AMQSession implements SessionCallback { throw new ResourceAllocationException("Queue is full " + address); } - final RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary()); - if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) { - throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest); - } + getCoreSession().send(coreMsg, false, dest.isTemporary()); if (count == null || count.decrementAndGet() == 0) { if (sendProducerAck) { @@ -449,13 +446,8 @@ public class AMQSession implements SessionCallback { Exception exceptionToSend = null; try { - RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary()); - - if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) { - throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest); - } + getCoreSession().send(coreMsg, false, dest.isTemporary()); } catch (Exception e) { - logger.warn(e.getMessage(), e); exceptionToSend = e; } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9ee808/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertExclusiveTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertExclusiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertExclusiveTest.java index f8a03fa..2e56a4a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertExclusiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertExclusiveTest.java @@ -16,6 +16,13 @@ */ package org.apache.activemq.artemis.tests.integration.openwire; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -28,12 +35,6 @@ import org.apache.activemq.artemis.utils.CompositeAddress; import org.junit.Assert; import org.junit.Test; -import javax.jms.Connection; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.Queue; -import javax.jms.Session; - public class OpenWireDivertExclusiveTest extends OpenWireDivertTestBase { @Override @@ -112,4 +113,61 @@ public class OpenWireDivertExclusiveTest extends OpenWireDivertTestBase { } } } + + @Test + public void testSingleExclusiveDivertOpenWirePublisher() throws Exception { + ServerLocator locator = createInVMNonHALocator(); + ClientSessionFactory sf = createSessionFactory(locator); + + ClientSession coreSession = sf.createSession(false, true, true); + + final SimpleString queueName1 = new SimpleString("queue1"); + final SimpleString queueName2 = new SimpleString("queue2"); + + coreSession.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1, null, false); + coreSession.createQueue(new SimpleString(testAddress), RoutingType.ANYCAST, queueName2, null, false); + coreSession.close(); + + factory = new ActiveMQConnectionFactory(urlString); + Connection openwireConnection = factory.createConnection(); + + try { + openwireConnection.start(); + Session session = openwireConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(testAddress)); + + final int numMessages = 10; + + final String propKey = "testkey"; + + for (int i = 0; i < numMessages; i++) { + Message message = session.createMessage(); + message.setIntProperty(propKey, i); + producer.send(message); + } + + Queue q1 = session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue1")); + Queue q2 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue2")); + + MessageConsumer consumer1 = session.createConsumer(q1); + MessageConsumer consumer2 = session.createConsumer(q2); + + System.out.println("receiving ..."); + for (int i = 0; i < numMessages; i++) { + Message message = consumer1.receive(TIMEOUT); + + Assert.assertNotNull(message); + + Assert.assertEquals(i, message.getObjectProperty(propKey.toString())); + + message.acknowledge(); + } + Assert.assertNull(consumer1.receive(50)); + Assert.assertNull(consumer2.receive(50)); + } finally { + if (openwireConnection != null) { + openwireConnection.close(); + } + } + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9ee808/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertNonExclusiveTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertNonExclusiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertNonExclusiveTest.java index 64e8e4f..7dacaf2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertNonExclusiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertNonExclusiveTest.java @@ -31,6 +31,7 @@ import org.junit.Test; import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; @@ -119,4 +120,68 @@ public class OpenWireDivertNonExclusiveTest extends OpenWireDivertTestBase { } } + @Test + //openwire sending, openwire receiving + public void testSingleNonExclusiveDivertOpenWirePublisher() throws Exception { + ServerLocator locator = createInVMNonHALocator(); + ClientSessionFactory sf = createSessionFactory(locator); + ClientSession coreSession = sf.createSession(false, true, true); + + final SimpleString queueName1 = new SimpleString("queue1"); + final SimpleString queueName2 = new SimpleString("queue2"); + + coreSession.createQueue(new SimpleString(forwardAddress), RoutingType.ANYCAST, queueName1, null, false); + coreSession.createQueue(new SimpleString(testAddress), RoutingType.ANYCAST, queueName2, null, false); + coreSession.close(); + + //use openwire to receive + factory = new ActiveMQConnectionFactory(urlString); + Connection openwireConnection = factory.createConnection(); + + try { + Session session = openwireConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + openwireConnection.start(); + + MessageProducer producer = session.createProducer(session.createQueue(testAddress)); + + final int numMessages = 10; + final String propKey = "testkey"; + + for (int i = 0; i < numMessages; i++) { + Message message = session.createMessage(); + message.setIntProperty(propKey, i); + producer.send(message); + } + + Queue q1 = session.createQueue(CompositeAddress.toFullQN(testAddress, "queue1")); + Queue q2 = session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue2")); + + MessageConsumer consumer1 = session.createConsumer(q1); + MessageConsumer consumer2 = session.createConsumer(q2); + + System.out.println("receiving ..."); + for (int i = 0; i < numMessages; i++) { + Message message = consumer1.receive(TIMEOUT); + Assert.assertNotNull(message); + Assert.assertEquals(i, message.getObjectProperty(propKey.toString())); + message.acknowledge(); + } + + Assert.assertNull(consumer1.receive(50)); + + for (int i = 0; i < numMessages; i++) { + Message message = consumer2.receive(TIMEOUT); + Assert.assertNotNull(message); + Assert.assertEquals(i, message.getObjectProperty(propKey.toString())); + message.acknowledge(); + } + + Assert.assertNull(consumer2.receive(50)); + } finally { + if (openwireConnection != null) { + openwireConnection.close(); + } + } + } + }