dkfn wrote:
>
> :) It's the mailing list software conspiring, I tell you... adding it
> directly into the mail instead:
>
OK, my first reply runs fine (i.e. without error) but didn't actually work.
I noodled around with it a little more and offer the following:
package org.apache.activemq.example;
import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
public class QueueTest extends TestCase {
private static final String TEST_QUEUE = "testQueue";
private static final String LOCAL_MQ1 = "tcp://localhost:61616";
private static final String LOCAL_MQ2 = "tcp://localhost:51515";
public void testNetworkOfBrokers() throws Exception {
Broker broker1 = createBroker("one", 61616, 51515);
Broker broker2 = createBroker("two", 51515, 61616);
pause(10, "sleeping to allow brokers to startup & connect to
each
other...");
System.out.println("creating consumer");
Consumer consumer = createConsumer(LOCAL_MQ2);
pause(5, "sleeping to allow consumer to startup & connect to
MQ...");
System.out.println("producing messages");
Connection connection = null;
try {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(LOCAL_MQ1);
connection = connectionFactory.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(new
ActiveMQQueue(TEST_QUEUE));
connection.start();
for (int i=0; i<10; i++) {
TextMessage message =
session.createTextMessage();
message.setText("Hello World!");
producer.send(message);
}
pause(5, "sleeping to allow consumer to consume all
messages...");
QueueBrowser browser = session.createBrowser(new
ActiveMQQueue(TEST_QUEUE));
Enumeration<?> enumeration = browser.getEnumeration();
assertFalse(enumeration.hasMoreElements());
}
catch (Exception e) {
e.printStackTrace();
}
finally {
try {
if (connection != null) {
connection.stop();
}
} catch (Throwable t) {
//t.printStackTrace();
}
try {
if (broker2 != null) {
broker2.stop();
}
} catch (Throwable t) {
//t.printStackTrace();
}
try {
if (broker1 != null) {
broker1.stop();
}
} catch (Throwable t) {
//t.printStackTrace();
}
}
pause(2);
System.out.println("All done!");
}
private void pause(int seconds) {
pause(seconds, null);
}
private void pause(int seconds, String msg) {
if (msg != null) System.out.println(msg);
try {
Thread.currentThread().sleep(seconds * 1000);
} catch (InterruptedException e) {
; // ignore
}
}
private Broker createBroker(String name, int listenerPort, int
networkConnectorPort) {
System.out.println("creating broker "+name);
Thread brokerThread = null;
try {
Broker broker = new Broker(name, listenerPort,
networkConnectorPort);
brokerThread = new Thread(broker);
brokerThread.start();
return broker;
} catch (Exception ignoreMe) {
ignoreMe.printStackTrace();
}
return null;
}
private Consumer createConsumer(String url) {
Thread thread = null;
try {
Consumer consumer = new Consumer(url);
thread = new Thread(consumer);
thread.start();
return consumer;
} catch (Exception ignoreMe) {
ignoreMe.printStackTrace();
}
return null;
}
private class Consumer implements Runnable {
private final String url; // "tcp://localhost:51515"
Consumer(String url) {
this.url = url;
}
public void run() {
Connection connection1 = null;
try {
ActiveMQConnectionFactory connectionFactory1 =
new
ActiveMQConnectionFactory(url);
connection1 =
connectionFactory1.createConnection();
connection1.start();
Session session1 =
connection1.createSession(true,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer1 =
session1.createConsumer(new
ActiveMQQueue(TEST_QUEUE));
//for (int i=0; i<1; i++) {
for (;;) {
Message message1 = consumer1.receive();
assertNotNull(message1);
System.out.println(message1);
}
}
catch (Exception e) {
}
finally {
try {
if (connection1 != null) {
connection1.stop();
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
}
private static class Broker implements Runnable {
private String name;
private int listenPort;
private int connectorPort;
private BrokerService brokerService = null;
Broker(String name, int listenerPort, int networkPort) {
this.name = name;
listenPort = listenerPort;
connectorPort = networkPort;
}
public void run() {
try {
brokerService = new BrokerService();
brokerService.setBrokerName(name);
brokerService.setUseJmx(false);
brokerService.setPersistenceAdapter(new
MemoryPersistenceAdapter());
NetworkConnector network2 = new
DiscoveryNetworkConnector(new
java.net.URI("static:(tcp://localhost:" + connectorPort + ")"));
network2.setName("network-" + name);
network2.setDynamicOnly(false);
network2.setNetworkTTL(2);
network2.setPrefetchSize(1);
brokerService.addNetworkConnector(network2);
brokerService.addConnector("tcp://0.0.0.0:" +
listenPort);
brokerService.start();
}
catch (Exception e) {
e.printStackTrace();
}
}
public void stop() {
try {
if (brokerService != null) {
brokerService.stop();
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
}
I changed the order around a little bit for the producer. However, I think
the main difference was this:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
This code will not work if you specify true for the first arg. I'm not sure,
but I think it's because this example is using the in memory persistence
adapter...
HTH,
Mike L (aka patzerbud)
--
View this message in context:
http://old.nabble.com/Network-of-Brokers-tp28269405p28282467.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.