Repository: activemq
Updated Branches:
  refs/heads/activemq-5.12.x 451344486 -> a38c3d4b6


https://issues.apache.org/jira/browse/AMQ-5898

Removing assertion in VirtualDestinationInterceptor to allow
multiple composite destinations to forward to a physical destination

(cherry picked from commit 35b7ac250b5fa0b8c8dbf728881cc9dbf6edce19)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a38c3d4b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a38c3d4b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a38c3d4b

Branch: refs/heads/activemq-5.12.x
Commit: a38c3d4b6b9a4e828c80e88dd5a33c86fae99a06
Parents: 4513444
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Wed Nov 25 13:33:32 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Wed Nov 25 13:46:52 2015 +0000

----------------------------------------------------------------------
 .../virtual/VirtualDestinationInterceptor.java  |   1 -
 .../MultipleCompositeToPhysicalQueueTest.java   | 139 +++++++++++++++++++
 2 files changed, 139 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a38c3d4b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
index 70be686..d3c5cee 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualDestinationInterceptor.java
@@ -67,7 +67,6 @@ public class VirtualDestinationInterceptor implements 
DestinationInterceptor {
         }
         // check if the destination instead matches any mapped destinations
         Set mappedDestinations = mappedDestinationMap.get(activeMQDestination);
-        assert mappedDestinations.size() < 2;
         if (!mappedDestinations.isEmpty()) {
             // create a mapped destination interceptor
             VirtualDestination virtualDestination = (VirtualDestination)

http://git-wip-us.apache.org/repos/asf/activemq/blob/a38c3d4b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MultipleCompositeToPhysicalQueueTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MultipleCompositeToPhysicalQueueTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MultipleCompositeToPhysicalQueueTest.java
new file mode 100644
index 0000000..6c72a11
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MultipleCompositeToPhysicalQueueTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.virtual;
+
+import static org.junit.Assert.assertEquals;
+
+import java.net.URI;
+import java.util.Collections;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.CompositeQueue;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *  https://issues.apache.org/jira/browse/AMQ-5898
+ */
+public class MultipleCompositeToPhysicalQueueTest {
+
+    private final ActiveMQQueue SUB1 = new ActiveMQQueue("SUB1");
+    private final CompositeQueue PUB_BROADCAST = newCompositeQueue("PUB.ALL", 
SUB1);
+    private final CompositeQueue PUB_INDIVIDUAL = 
newCompositeQueue("PUB.SUB1", SUB1);
+    private String url;;
+
+    private BrokerService broker;
+
+    @Before
+    public void before() throws Exception {
+        broker = createBroker(false);
+        VirtualDestinationInterceptor virtualDestinationInterceptor = new 
VirtualDestinationInterceptor();
+        virtualDestinationInterceptor.setVirtualDestinations(
+                new VirtualDestination[]{
+                        PUB_BROADCAST,
+                        PUB_INDIVIDUAL
+                }
+        );
+        broker.setDestinationInterceptors(new 
DestinationInterceptor[]{virtualDestinationInterceptor});
+        broker.start();
+        broker.waitUntilStarted();
+        url = broker.getConnectorByName("tcp").getConnectUri().toString();
+    }
+
+    @After
+    public void after() throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test(timeout = 60000)
+    public void testManyToOne() throws Exception {
+
+        Session consumerSession = buildSession("Consumer", url);
+
+        MessageConsumer consumer = createSubscriber(consumerSession, SUB1, 
null);
+
+        // Producer
+        Session publisherSession = buildSession("Producer", url);
+
+        createPublisher(publisherSession, 
PUB_BROADCAST.getVirtualDestination()).send(publisherSession.createTextMessage("BROADCAST"));
+        assertEquals("BROADCAST", ((TextMessage) 
consumer.receive()).getText());
+
+        createPublisher(publisherSession, 
PUB_INDIVIDUAL.getVirtualDestination()).send(publisherSession.createTextMessage("INDIVIDUAL"));
+        assertEquals("INDIVIDUAL", ((TextMessage) 
consumer.receive()).getText());
+    }
+
+    private BrokerService createBroker(boolean persistent) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("TestBroker");
+        broker.setPersistent(persistent);
+        TransportConnector connector = new TransportConnector();
+        connector.setUri(new URI("tcp://localhost:0"));
+        connector.setName("tcp");;
+        broker.addConnector(connector);
+
+        return broker;
+    }
+
+    private MessageConsumer createSubscriber(Session session, Destination 
destination, MessageListener messageListener) throws JMSException {
+        MessageConsumer consumer = session.createConsumer(destination);
+        consumer.setMessageListener(messageListener);
+        return consumer;
+    }
+
+    private MessageProducer createPublisher(Session session, Destination 
destination) throws JMSException {
+        MessageProducer producer = session.createProducer(destination);
+        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+        return producer;
+    }
+
+    private Session buildSession(String clientId, String url) throws 
JMSException {
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(url);
+
+        connectionFactory.setClientIDPrefix(clientId);
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        connection.start();
+
+        return session;
+    }
+
+    private CompositeQueue newCompositeQueue(String name, ActiveMQDestination 
forwardTo) {
+        CompositeQueue queue = new CompositeQueue();
+        queue.setName(name);
+        queue.setForwardTo(Collections.singleton(forwardTo));
+        return queue;
+    }
+}
\ No newline at end of file

Reply via email to