Looks like the attachment didn't come through... inlining test case here.  Any 
thoughts?

///////////////////////////////////////////////////////////////////////////////////
package org.activemq;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.NetworkConnector;
import org.junit.Test;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.management.MBeanServer;
import java.lang.management.ManagementFactory;
import java.net.URI;

import static org.junit.Assert.assertNotNull;

public class NetworkOfBrokersTest {

    private static final MBeanServer PLATFORM_MBEAN_SERVER = 
ManagementFactory.getPlatformMBeanServer();
    private static final String QUEUE_NAME = "testQueue";

    @Test
    public void testNetworkOfBrokersHandlesRestartOfNetworkConnector() throws 
Exception {
        // create consumer broker
        BrokerService consumerBrokerService = 
createBrokerService("consumerBrokerService");
        consumerBrokerService.addConnector("tcp://localhost:61616");
        consumerBrokerService.start();

        JmsOperations consumerJmsOperations =
                
createJmsOperations(createConnectionFactory(consumerBrokerService.getVmConnectorURI()));

        // create producer broker in "network-of-brokers" configuration
        BrokerService producerBrokerService = 
createBrokerService("producerBrokerService");
        NetworkConnector networkConnector = 
producerBrokerService.addNetworkConnector("static:(tcp://localhost:61616)");
        producerBrokerService.start();

        JmsOperations producerJmsOperations =
                
createJmsOperations(createConnectionFactory(producerBrokerService.getVmConnectorURI()));

        try {
            // assert "network-of-brokers" configuration is working
            
assertMessageSentByProducerIsReceivedByConsumer(consumerJmsOperations, 
producerJmsOperations);

            // restart network connector via JMX
            PLATFORM_MBEAN_SERVER.invoke(networkConnector.getObjectName(), 
"stop", new Object[0], new String[0]);
            PLATFORM_MBEAN_SERVER.invoke(networkConnector.getObjectName(), 
"start", new Object[0], new String[0]);

            // assert "network-of-brokers" configuration is working
            
assertMessageSentByProducerIsReceivedByConsumer(consumerJmsOperations, 
producerJmsOperations);
        }
        finally {
            consumerBrokerService.stop();
            producerBrokerService.stop();
        }
    }

    private static BrokerService createBrokerService(String brokerName) {
        BrokerService brokerService = new BrokerService();
        brokerService.getManagementContext().setCreateConnector(false);
        brokerService.setBrokerName(brokerName);
        brokerService.setPersistent(false);
        brokerService.setTempDataStore(null);
        brokerService.setUseJmx(true);
        brokerService.setUseShutdownHook(false);

        return brokerService;
    }

    private static ConnectionFactory createConnectionFactory(URI brokerUrl) {
        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory();
        connectionFactory.getPrefetchPolicy().setAll(1);
        connectionFactory.setBrokerURL(brokerUrl.toString());
        connectionFactory.setDispatchAsync(false);
        connectionFactory.setUseAsyncSend(false);

        return connectionFactory;
    }

    private static JmsOperations createJmsOperations(ConnectionFactory 
connectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate();
        jmsTemplate.setConnectionFactory(connectionFactory);
        jmsTemplate.setDefaultDestinationName(QUEUE_NAME);
        jmsTemplate.setDeliveryPersistent(false);
        jmsTemplate.setReceiveTimeout(1000);

        return jmsTemplate;
    }

    private static void assertMessageSentByProducerIsReceivedByConsumer(
            JmsOperations consumerJmsOperations, JmsOperations 
producerJmsOperations) {

        producerJmsOperations.send(
                new MessageCreator() {
                    @Override
                    public Message createMessage(Session session) throws 
JMSException {
                        return session.createTextMessage();
                    }
                });
        assertNotNull(consumerJmsOperations.receive());
    }
}

On May 5, 2010, at 12:42 PM, Geoffrey Arnold wrote:

> Issued opened with failing test:
> 
>       https://issues.apache.org/activemq/browse/AMQ-2722
> 
> Attaching JUnit test here too:
> 
> <NetworkOfBrokersTest.java>
> 
> On Apr 6, 2010, at 5:10 AM, Dejan Bosanac wrote:
> 
>> Hi,
>> 
>> can you raise a Jira for this (ideally with a test case that demonstrates
>> the problem).
>> 
>> Cheers
>> --
>> Dejan Bosanac - http://twitter.com/dejanb
>> 
>> Open Source Integration - http://fusesource.com/
>> ActiveMQ in Action - http://www.manning.com/snyder/
>> Blog - http://www.nighttale.net
>> 
>> 
>> On Fri, Apr 2, 2010 at 11:16 PM, Geoffrey Arnold <
>> geoff...@geoffreyarnold.com> wrote:
>> 
>>> Hi All!
>>> 
>>> During upgrades we would like to be able to stop and restart the network
>>> connector between embedded brokers running in separate VMs.
>>> 
>>> In the embedded broker of the producer VM we have a network connector
>>> pointing to the transport connector of an embedded broker in the consumer
>>> VM.  I have attempted to use JMX to stop and restart the network connector
>>> in the producer VM, however the connection does not appear to be
>>> reestablished.
>>> 
>>> What is the correct way to stop and restart this connection?
>>> 
>>> Thanks,
>>> Geoff.
> 

Reply via email to