ARTEMIS-1711 - Fix openwire exlusive divert

Fixing the failure on send from an OpenWire producer when an exclusive
divert exists


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8e9ee808
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8e9ee808
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8e9ee808

Branch: refs/heads/master
Commit: 8e9ee808921f8c9d6a9cae7bfc8c4ddf73020be4
Parents: d61f058
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Wed Feb 28 09:30:36 2018 -0500
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Wed Feb 28 17:35:56 2018 -0500

----------------------------------------------------------------------
 .../core/protocol/openwire/amq/AMQSession.java  | 22 ++----
 .../openwire/OpenWireDivertExclusiveTest.java   | 70 ++++++++++++++++++--
 .../OpenWireDivertNonExclusiveTest.java         | 65 ++++++++++++++++++
 3 files changed, 136 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9ee808/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
index aae1a53..fecb5a1 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java
@@ -16,14 +16,17 @@
  */
 package org.apache.activemq.artemis.core.protocol.openwire.amq;
 
-import javax.jms.InvalidDestinationException;
-import javax.jms.ResourceAllocationException;
+import static 
org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD;
+
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import javax.jms.InvalidDestinationException;
+import javax.jms.ResourceAllocationException;
+
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -31,7 +34,6 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.paging.PagingStore;
-import org.apache.activemq.artemis.core.postoffice.RoutingStatus;
 import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection;
 import 
org.apache.activemq.artemis.core.protocol.openwire.OpenWireMessageConverter;
 import 
org.apache.activemq.artemis.core.protocol.openwire.OpenWireProtocolManager;
@@ -62,8 +64,6 @@ import org.apache.activemq.command.SessionInfo;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.jboss.logging.Logger;
 
-import static 
org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.OPENWIRE_WILDCARD;
-
 public class AMQSession implements SessionCallback {
    private final Logger logger = Logger.getLogger(AMQSession.class);
 
@@ -422,10 +422,7 @@ public class AMQSession implements SessionCallback {
                throw new ResourceAllocationException("Queue is full " + 
address);
             }
 
-            final RoutingStatus result = getCoreSession().send(coreMsg, false, 
dest.isTemporary());
-            if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
-               throw new InvalidDestinationException("Cannot publish to a 
non-existent Destination: " + dest);
-            }
+            getCoreSession().send(coreMsg, false, dest.isTemporary());
 
             if (count == null || count.decrementAndGet() == 0) {
                if (sendProducerAck) {
@@ -449,13 +446,8 @@ public class AMQSession implements SessionCallback {
          Exception exceptionToSend = null;
 
          try {
-            RoutingStatus result = getCoreSession().send(coreMsg, false, 
dest.isTemporary());
-
-            if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) {
-               throw new InvalidDestinationException("Cannot publish to a 
non-existent Destination: " + dest);
-            }
+            getCoreSession().send(coreMsg, false, dest.isTemporary());
          } catch (Exception e) {
-
             logger.warn(e.getMessage(), e);
             exceptionToSend = e;
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9ee808/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertExclusiveTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertExclusiveTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertExclusiveTest.java
index f8a03fa..2e56a4a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertExclusiveTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertExclusiveTest.java
@@ -16,6 +16,13 @@
  */
 package org.apache.activemq.artemis.tests.integration.openwire;
 
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -28,12 +35,6 @@ import org.apache.activemq.artemis.utils.CompositeAddress;
 import org.junit.Assert;
 import org.junit.Test;
 
-import javax.jms.Connection;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
 public class OpenWireDivertExclusiveTest extends OpenWireDivertTestBase {
 
    @Override
@@ -112,4 +113,61 @@ public class OpenWireDivertExclusiveTest extends 
OpenWireDivertTestBase {
          }
       }
    }
+
+   @Test
+   public void testSingleExclusiveDivertOpenWirePublisher() throws Exception {
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession coreSession = sf.createSession(false, true, true);
+
+      final SimpleString queueName1 = new SimpleString("queue1");
+      final SimpleString queueName2 = new SimpleString("queue2");
+
+      coreSession.createQueue(new SimpleString(forwardAddress), 
RoutingType.ANYCAST, queueName1, null, false);
+      coreSession.createQueue(new SimpleString(testAddress), 
RoutingType.ANYCAST, queueName2, null, false);
+      coreSession.close();
+
+      factory = new ActiveMQConnectionFactory(urlString);
+      Connection openwireConnection = factory.createConnection();
+
+      try {
+         openwireConnection.start();
+         Session session = openwireConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         MessageProducer producer = 
session.createProducer(session.createQueue(testAddress));
+
+         final int numMessages = 10;
+
+         final String propKey = "testkey";
+
+         for (int i = 0; i < numMessages; i++) {
+            Message message = session.createMessage();
+            message.setIntProperty(propKey, i);
+            producer.send(message);
+         }
+
+         Queue q1 = 
session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue1"));
+         Queue q2 = session.createQueue(CompositeAddress.toFullQN(testAddress, 
"queue2"));
+
+         MessageConsumer consumer1 = session.createConsumer(q1);
+         MessageConsumer consumer2 = session.createConsumer(q2);
+
+         System.out.println("receiving ...");
+         for (int i = 0; i < numMessages; i++) {
+            Message message = consumer1.receive(TIMEOUT);
+
+            Assert.assertNotNull(message);
+
+            Assert.assertEquals(i, 
message.getObjectProperty(propKey.toString()));
+
+            message.acknowledge();
+         }
+         Assert.assertNull(consumer1.receive(50));
+         Assert.assertNull(consumer2.receive(50));
+      } finally {
+         if (openwireConnection != null) {
+            openwireConnection.close();
+         }
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8e9ee808/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertNonExclusiveTest.java
----------------------------------------------------------------------
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertNonExclusiveTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertNonExclusiveTest.java
index 64e8e4f..7dacaf2 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertNonExclusiveTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/OpenWireDivertNonExclusiveTest.java
@@ -31,6 +31,7 @@ import org.junit.Test;
 import javax.jms.Connection;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
 
@@ -119,4 +120,68 @@ public class OpenWireDivertNonExclusiveTest extends 
OpenWireDivertTestBase {
       }
    }
 
+   @Test
+   //openwire sending, openwire receiving
+   public void testSingleNonExclusiveDivertOpenWirePublisher() throws 
Exception {
+      ServerLocator locator = createInVMNonHALocator();
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession coreSession = sf.createSession(false, true, true);
+
+      final SimpleString queueName1 = new SimpleString("queue1");
+      final SimpleString queueName2 = new SimpleString("queue2");
+
+      coreSession.createQueue(new SimpleString(forwardAddress), 
RoutingType.ANYCAST, queueName1, null, false);
+      coreSession.createQueue(new SimpleString(testAddress), 
RoutingType.ANYCAST, queueName2, null, false);
+      coreSession.close();
+
+      //use openwire to receive
+      factory = new ActiveMQConnectionFactory(urlString);
+      Connection openwireConnection = factory.createConnection();
+
+      try {
+         Session session = openwireConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         openwireConnection.start();
+
+         MessageProducer producer = 
session.createProducer(session.createQueue(testAddress));
+
+         final int numMessages = 10;
+         final String propKey = "testkey";
+
+         for (int i = 0; i < numMessages; i++) {
+            Message message = session.createMessage();
+            message.setIntProperty(propKey, i);
+            producer.send(message);
+         }
+
+         Queue q1 = session.createQueue(CompositeAddress.toFullQN(testAddress, 
"queue1"));
+         Queue q2 = 
session.createQueue(CompositeAddress.toFullQN(forwardAddress, "queue2"));
+
+         MessageConsumer consumer1 = session.createConsumer(q1);
+         MessageConsumer consumer2 = session.createConsumer(q2);
+
+         System.out.println("receiving ...");
+         for (int i = 0; i < numMessages; i++) {
+            Message message = consumer1.receive(TIMEOUT);
+            Assert.assertNotNull(message);
+            Assert.assertEquals(i, 
message.getObjectProperty(propKey.toString()));
+            message.acknowledge();
+         }
+
+         Assert.assertNull(consumer1.receive(50));
+
+         for (int i = 0; i < numMessages; i++) {
+            Message message = consumer2.receive(TIMEOUT);
+            Assert.assertNotNull(message);
+            Assert.assertEquals(i, 
message.getObjectProperty(propKey.toString()));
+            message.acknowledge();
+         }
+
+         Assert.assertNull(consumer2.receive(50));
+      } finally {
+         if (openwireConnection != null) {
+            openwireConnection.close();
+         }
+      }
+   }
+
 }

Reply via email to