dkfn wrote:
> 
> :) It's the mailing list software conspiring, I tell you... adding it
> directly into the mail instead:
> 

OK, my first reply runs fine (i.e. without error) but didn't actually work.
I noodled around with it a little more and offer the following:

package org.apache.activemq.example;

import java.util.Enumeration;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;

import junit.framework.TestCase;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;


public class QueueTest extends TestCase {

        private static final String TEST_QUEUE = "testQueue";
        private static final String LOCAL_MQ1 = "tcp://localhost:61616";
        private static final String LOCAL_MQ2 = "tcp://localhost:51515";


    public void testNetworkOfBrokers() throws Exception {

                Broker broker1 = createBroker("one", 61616, 51515);
                Broker broker2 = createBroker("two", 51515, 61616);
                pause(10, "sleeping to allow brokers to startup & connect to 
each
other...");

                System.out.println("creating consumer");
                Consumer consumer = createConsumer(LOCAL_MQ2);
                pause(5, "sleeping to allow consumer to startup & connect to 
MQ...");


                System.out.println("producing messages");
                Connection connection = null;

                try {
                        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(LOCAL_MQ1);
                        connection = connectionFactory.createConnection();

                        Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
                        MessageProducer producer = session.createProducer(new
ActiveMQQueue(TEST_QUEUE));
                        connection.start();

                        for (int i=0; i<10; i++) {
                                TextMessage message = 
session.createTextMessage();
                                message.setText("Hello World!");
                                producer.send(message);
                        }

                        pause(5, "sleeping to allow consumer to consume all 
messages...");

                        QueueBrowser browser = session.createBrowser(new
ActiveMQQueue(TEST_QUEUE));
                        Enumeration<?> enumeration = browser.getEnumeration();
                        assertFalse(enumeration.hasMoreElements());
                }
                catch (Exception e) {
                        e.printStackTrace();
                }
                finally {
                        try {
                                if (connection != null) {
                                        connection.stop();
                                }
                        } catch (Throwable t) {
                                //t.printStackTrace();
                        }

                        try {
                                if (broker2 != null) {
                                        broker2.stop();
                                }
                        } catch (Throwable t) {
                                //t.printStackTrace();
                        }

                        try {
                                if (broker1 != null) {
                                        broker1.stop();
                                }
                        } catch (Throwable t) {
                                //t.printStackTrace();
                        }

                }

                pause(2);
                System.out.println("All done!");
    }

        private void pause(int seconds) {
                pause(seconds, null);
        }

        private void pause(int seconds, String msg) {
                if (msg != null) System.out.println(msg);
                try {
                        Thread.currentThread().sleep(seconds * 1000);
                } catch (InterruptedException e) {
                        ; // ignore
                }
        }

        private Broker createBroker(String name, int listenerPort, int
networkConnectorPort) {
                System.out.println("creating broker "+name);
                Thread brokerThread = null;
                try {
                        Broker broker = new Broker(name, listenerPort, 
networkConnectorPort);
                        brokerThread = new Thread(broker);
                        brokerThread.start();
                        return broker;
                } catch (Exception ignoreMe) {
                        ignoreMe.printStackTrace();
                }
                return null;
        }

        private Consumer createConsumer(String url) {
                Thread thread = null;
                try {
                        Consumer consumer = new Consumer(url);
                        thread = new Thread(consumer);
                        thread.start();
                        return consumer;
                } catch (Exception ignoreMe) {
                        ignoreMe.printStackTrace();
                }
                return null;
        }

        private class Consumer implements Runnable {
                private final String url; // "tcp://localhost:51515"
                Consumer(String url) {
                        this.url = url;
                }

                public void run() {

                        Connection connection1 = null;

                        try {
                                ActiveMQConnectionFactory connectionFactory1 = 
new
ActiveMQConnectionFactory(url);
                                connection1 = 
connectionFactory1.createConnection();
                                connection1.start();

                                Session session1 = 
connection1.createSession(true,
Session.AUTO_ACKNOWLEDGE);
                                MessageConsumer consumer1 = 
session1.createConsumer(new
ActiveMQQueue(TEST_QUEUE));

                                //for (int i=0; i<1; i++) {
                                for (;;) {
                                        Message message1 = consumer1.receive();
                                        assertNotNull(message1);
                                        System.out.println(message1);
                                }
                        }
                        catch (Exception e) {
                        }
                        finally {
                                try {
                                        if (connection1 != null) {
                                                connection1.stop();
                                        }
                                } catch (Throwable t) {
                                        t.printStackTrace();
                                }
                        }
                }
        }

        private static class Broker implements Runnable {

                private String name;
                private int listenPort;
                private int connectorPort;
                private BrokerService brokerService = null;

                Broker(String name, int listenerPort, int networkPort) {
                        this.name = name;
                        listenPort = listenerPort;
                        connectorPort = networkPort;
                }

                public void run() {
                        try {
                                brokerService = new BrokerService();
                                brokerService.setBrokerName(name);
                                brokerService.setUseJmx(false);
                                brokerService.setPersistenceAdapter(new
                                MemoryPersistenceAdapter());

                                NetworkConnector network2 = new 
DiscoveryNetworkConnector(new
java.net.URI("static:(tcp://localhost:" + connectorPort + ")"));
                                network2.setName("network-" + name);
                                network2.setDynamicOnly(false);
                                network2.setNetworkTTL(2);
                                network2.setPrefetchSize(1);

                                brokerService.addNetworkConnector(network2);

                                brokerService.addConnector("tcp://0.0.0.0:" + 
listenPort);
                                brokerService.start();

                        }
                        catch (Exception e) {
                                e.printStackTrace();
                        }
                }

                public void stop() {
                        try {
                                if (brokerService != null) {
                                        brokerService.stop();
                                }
                        } catch (Throwable t) {
                                t.printStackTrace();
                        }
                }
        }

}


I changed the order around a little bit for the producer. However, I think
the main difference was this:

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

This code will not work if you specify true for the first arg. I'm not sure,
but I think it's because this example is using the in memory persistence
adapter...

HTH,

Mike L (aka patzerbud)

-- 
View this message in context: 
http://old.nabble.com/Network-of-Brokers-tp28269405p28282467.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to