Author: ceposta Date: Wed Feb 13 17:58:30 2013 New Revision: 1445772 URL: http://svn.apache.org/r1445772 Log: Added a test that shows using virtual topics in a network of brokers to resolve some of the pain that comes up with durable subscriptions across a network. This test demonstrates the use of Virtual Topics and ConditionalNetworkBridgeFilterFactory with replayWhenNoConsumers=true
Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VirtualTopicNetworkClusterReactivationTest.java Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VirtualTopicNetworkClusterReactivationTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VirtualTopicNetworkClusterReactivationTest.java?rev=1445772&view=auto ============================================================================== --- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VirtualTopicNetworkClusterReactivationTest.java (added) +++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VirtualTopicNetworkClusterReactivationTest.java Wed Feb 13 17:58:30 2013 @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory; + +import javax.jms.*; +import java.net.URI; + +/** + * @author <a href="http://www.christianposta.com/blog">Christian Posta</a> + */ +public class VirtualTopicNetworkClusterReactivationTest extends JmsMultipleBrokersTestSupport { + + private static final String BROKER_A = "brokerA"; + private static final String BROKER_B = "brokerB"; + private static final String BROKER_A_TRANSPORT_URL = "tcp://localhost:61616"; + private static final String BROKER_B_TRANSPORT_URL = "tcp://localhost:61617"; + private static final long DEFAULT_SLEEP_MS = 1000; + + private ActiveMQTopic topic = new ActiveMQTopic("VirtualTopic.FOO.TEST"); + private ActiveMQQueue queue = new ActiveMQQueue("Consumer.FOO.VirtualTopic.FOO.TEST"); + + + /** + * This test shows how to use pub/sub to mimic durable subscribers in a network of brokers. + * + * When using durable subscribers in a broker cluster, you can encounter a situation where a + * subscription gets orphaned on a broker when the client disconnects and reattaches to another + * broker in the cluster. Since the clientID/durableName only need to be unique within a single + * broker, it's possible to have a durable sub on multiple brokers in a cluster. + * + * FOR EXAMPLE: + * Broker A and B are networked together in both directions to form a full mesh. If durable + * subscriber 'foo' subscribes to failover(A,B) and ends up on B, and a producer on A, messages + * will be demand forwarded from A to B. But if the durable sub 'foo' disconnects from B, + * then reconnects to failover(A,B) but this time gets connected to A, the subscription on + * B will still be there are continue to receive messages (and possibly have missed messages + * sent there while gone) + * + * We can avoid all of this mess with virtual topics as seen below: + * + * + * @throws JMSException + */ + public void testDurableSubReconnectFromAtoB() throws JMSException { + // create consumer on broker B + ActiveMQConnectionFactory bConnFactory = new ActiveMQConnectionFactory(BROKER_B_TRANSPORT_URL+ "?jms.prefetchPolicy.queuePrefetch=0"); + Connection bConn = bConnFactory.createConnection(); + bConn.start(); + Session bSession = bConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer bSessionConsumer = bSession.createConsumer(queue); + + + // create producer on A + ActiveMQConnectionFactory aConnFactory = new ActiveMQConnectionFactory(BROKER_A_TRANSPORT_URL); + Connection aProducerConn = aConnFactory.createConnection(); + aProducerConn.start(); + + Session aProducerSession = aProducerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = aProducerSession.createProducer(topic); + produce(producer, aProducerSession, 5); + + // sleep for a sec to let the messages get bridged over to broker B + sleep(); + + // consumer on B has not consumed any messages, and for some reason goes away: + bSessionConsumer.close(); + bSession.close(); + bConn.close(); + + // let the bridge catch up + sleep(); + + // and now consumer reattaches to A and wants the messages that were sent to B + Connection aConsumerConn = aConnFactory.createConnection(); + aConsumerConn.start(); + Session aConsumerSession = aConsumerConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer aSessionConsumer = aConsumerSession.createConsumer(queue); + + sleep(); + + // they should all be there + consume(aSessionConsumer, 5); + + } + + + private void consume(MessageConsumer durable, int numMessagesExpected) throws JMSException { + for (int i = 0; i < numMessagesExpected; i++) { + Message message = durable.receive(1000); + assertNotNull(message); + TextMessage textMessage = (TextMessage) message; + System.out.println("received: " + textMessage.getText()); + assertEquals("message: " +i, textMessage.getText()); + } + } + + private void produce(MessageProducer producer, Session sess, int numMessages) throws JMSException { + for (int i = 0; i < numMessages; i++) { + producer.send(sess.createTextMessage("message: " + i)); + } + } + + @Override + protected void setUp() throws Exception { + maxSetupTime = 1000; + super.setAutoFail(true); + super.setUp(); + final String options = "?persistent=true&useJmx=false&deleteAllMessagesOnStartup=true"; + + BrokerService brokerServiceA = createBroker(new URI(String.format("broker:(%s)/%s%s", BROKER_A_TRANSPORT_URL, BROKER_A, options))); + brokerServiceA.setDestinationPolicy(buildPolicyMap()); + brokerServiceA.setDestinations(new ActiveMQDestination[]{queue}); + + BrokerService brokerServiceB = createBroker(new URI(String.format("broker:(%s)/%s%s", BROKER_B_TRANSPORT_URL, BROKER_B, options))); + brokerServiceB.setDestinationPolicy(buildPolicyMap()); + brokerServiceB.setDestinations(new ActiveMQDestination[]{queue}); + + + + // bridge brokers to each other statically (static: discovery) + bridgeBrokers(BROKER_A, BROKER_B); + bridgeBrokers(BROKER_B, BROKER_A); + + startAllBrokers(); + } + + private PolicyMap buildPolicyMap() { + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setOptimizedDispatch(true); + ConditionalNetworkBridgeFilterFactory networkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory(); + networkBridgeFilterFactory.setReplayWhenNoConsumers(true); + policyEntry.setNetworkBridgeFilterFactory(networkBridgeFilterFactory); + policyEntry.setEnableAudit(false); + policyMap.put(new ActiveMQQueue("Consumer.*.VirtualTopic.>"), policyEntry); + return policyMap; + } + + private void sleep() { + try { + Thread.sleep(DEFAULT_SLEEP_MS); + } catch (InterruptedException igonred) { + } + } + + private void sleep(int milliSecondTime) { + try { + Thread.sleep(milliSecondTime); + } catch (InterruptedException igonred) { + } + } +} \ No newline at end of file