Awesome. Thanks for looking into that... you're right. I changed the transacted to false in the original test and it worked... (I also spotted a bug in the original test in that it was creating both sessions from the same connection). In case anyone is interested, the updated test is attached...
*sigh* it's pretty obvious in retrospect that a consumer isn't going to see a message where the transaction hasn't been committed... Thanks again Mike, cheers, j. ------- package org.example.activemq; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.network.NetworkConnector; import org.apache.activemq.store.memory.MemoryPersistenceAdapter; public class NetworkTest extends TestCase { public void testNetworkOfBrokers() throws Exception { BrokerService brokerService1 = null; BrokerService brokerService2 = null; try { { brokerService1 = new BrokerService(); brokerService1.setBrokerName("one"); brokerService1.setUseJmx(false); brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter()); brokerService1.addConnector("tcp://0.0.0.0:61616"); NetworkConnector network1 = brokerService1.addNetworkConnector("static:(tcp://localhost:51515)"); // NetworkConnector network1 = brokerService1.addNetworkConnector("multicast://default"); network1.setName("network1"); network1.setDynamicOnly(true); network1.setNetworkTTL(3); network1.setPrefetchSize(1); brokerService1.start(); } { brokerService2 = new BrokerService(); brokerService2.setBrokerName("two"); brokerService2.setUseJmx(false); brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter()); brokerService2.addConnector("tcp://0.0.0.0:51515"); NetworkConnector network2 = brokerService2.addNetworkConnector("static:(tcp://localhost:61616)"); // NetworkConnector network2 = brokerService2.addNetworkConnector("multicast://default"); network2.setName("network2"); network2.setDynamicOnly(true); network2.setNetworkTTL(3); network2.setPrefetchSize(1); brokerService2.start(); } ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616,tcp://localhost:51515)?randomize=false"); ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory("failover:(tcp://localhost:51515,tcp://localhost:61616)?randomize=false"); Connection connection1 = connectionFactory1.createConnection(); connection1.start(); Connection connection2 = connectionFactory2.createConnection(); connection2.start(); try { Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); MessageConsumer consumer1 = session1.createConsumer(new ActiveMQQueue("testingqueue")); MessageProducer producer2 = session2.createProducer(new ActiveMQQueue("testingqueue")); TextMessage message2 = session2.createTextMessage(); message2.setText("Hello World!"); producer2.send(message2); Message message1 = consumer1.receive(1000); assertNotNull(message1); System.out.println(message1); QueueBrowser browser = session2.createBrowser(new ActiveMQQueue("testingqueue")); Enumeration<?> enumeration = browser.getEnumeration(); assertFalse(enumeration.hasMoreElements()); } finally { connection1.stop(); connection2.stop(); } } finally { try { if(brokerService1 != null) { brokerService1.stop(); }} catch(Throwable t) { t.printStackTrace(); } try { if(brokerService2 != null) { brokerService2.stop(); }} catch(Throwable t) { t.printStackTrace(); } } } } On Sun, Apr 18, 2010 at 3:10 PM, patzerbud <patzer...@hotmail.com> wrote: > > > > dkfn wrote: >> >> :) It's the mailing list software conspiring, I tell you... adding it >> directly into the mail instead: >> > > OK, my first reply runs fine (i.e. without error) but didn't actually work. > I noodled around with it a little more and offer the following: > > package org.apache.activemq.example; > > import java.util.Enumeration; > > import javax.jms.Connection; > import javax.jms.Message; > import javax.jms.MessageConsumer; > import javax.jms.MessageProducer; > import javax.jms.QueueBrowser; > import javax.jms.Session; > import javax.jms.TextMessage; > > import junit.framework.TestCase; > > import org.apache.activemq.ActiveMQConnectionFactory; > import org.apache.activemq.broker.BrokerService; > import org.apache.activemq.command.ActiveMQQueue; > import org.apache.activemq.network.DiscoveryNetworkConnector; > import org.apache.activemq.network.NetworkConnector; > import org.apache.activemq.store.memory.MemoryPersistenceAdapter; > > > public class QueueTest extends TestCase { > > private static final String TEST_QUEUE = "testQueue"; > private static final String LOCAL_MQ1 = "tcp://localhost:61616"; > private static final String LOCAL_MQ2 = "tcp://localhost:51515"; > > > public void testNetworkOfBrokers() throws Exception { > > Broker broker1 = createBroker("one", 61616, 51515); > Broker broker2 = createBroker("two", 51515, 61616); > pause(10, "sleeping to allow brokers to startup & connect to > each > other..."); > > System.out.println("creating consumer"); > Consumer consumer = createConsumer(LOCAL_MQ2); > pause(5, "sleeping to allow consumer to startup & connect to > MQ..."); > > > System.out.println("producing messages"); > Connection connection = null; > > try { > ActiveMQConnectionFactory connectionFactory = new > ActiveMQConnectionFactory(LOCAL_MQ1); > connection = connectionFactory.createConnection(); > > Session session = connection.createSession(false, > Session.AUTO_ACKNOWLEDGE); > MessageProducer producer = session.createProducer(new > ActiveMQQueue(TEST_QUEUE)); > connection.start(); > > for (int i=0; i<10; i++) { > TextMessage message = > session.createTextMessage(); > message.setText("Hello World!"); > producer.send(message); > } > > pause(5, "sleeping to allow consumer to consume all > messages..."); > > QueueBrowser browser = session.createBrowser(new > ActiveMQQueue(TEST_QUEUE)); > Enumeration<?> enumeration = browser.getEnumeration(); > assertFalse(enumeration.hasMoreElements()); > } > catch (Exception e) { > e.printStackTrace(); > } > finally { > try { > if (connection != null) { > connection.stop(); > } > } catch (Throwable t) { > //t.printStackTrace(); > } > > try { > if (broker2 != null) { > broker2.stop(); > } > } catch (Throwable t) { > //t.printStackTrace(); > } > > try { > if (broker1 != null) { > broker1.stop(); > } > } catch (Throwable t) { > //t.printStackTrace(); > } > > } > > pause(2); > System.out.println("All done!"); > } > > private void pause(int seconds) { > pause(seconds, null); > } > > private void pause(int seconds, String msg) { > if (msg != null) System.out.println(msg); > try { > Thread.currentThread().sleep(seconds * 1000); > } catch (InterruptedException e) { > ; // ignore > } > } > > private Broker createBroker(String name, int listenerPort, int > networkConnectorPort) { > System.out.println("creating broker "+name); > Thread brokerThread = null; > try { > Broker broker = new Broker(name, listenerPort, > networkConnectorPort); > brokerThread = new Thread(broker); > brokerThread.start(); > return broker; > } catch (Exception ignoreMe) { > ignoreMe.printStackTrace(); > } > return null; > } > > private Consumer createConsumer(String url) { > Thread thread = null; > try { > Consumer consumer = new Consumer(url); > thread = new Thread(consumer); > thread.start(); > return consumer; > } catch (Exception ignoreMe) { > ignoreMe.printStackTrace(); > } > return null; > } > > private class Consumer implements Runnable { > private final String url; // "tcp://localhost:51515" > Consumer(String url) { > this.url = url; > } > > public void run() { > > Connection connection1 = null; > > try { > ActiveMQConnectionFactory connectionFactory1 = > new > ActiveMQConnectionFactory(url); > connection1 = > connectionFactory1.createConnection(); > connection1.start(); > > Session session1 = > connection1.createSession(true, > Session.AUTO_ACKNOWLEDGE); > MessageConsumer consumer1 = > session1.createConsumer(new > ActiveMQQueue(TEST_QUEUE)); > > //for (int i=0; i<1; i++) { > for (;;) { > Message message1 = consumer1.receive(); > assertNotNull(message1); > System.out.println(message1); > } > } > catch (Exception e) { > } > finally { > try { > if (connection1 != null) { > connection1.stop(); > } > } catch (Throwable t) { > t.printStackTrace(); > } > } > } > } > > private static class Broker implements Runnable { > > private String name; > private int listenPort; > private int connectorPort; > private BrokerService brokerService = null; > > Broker(String name, int listenerPort, int networkPort) { > this.name = name; > listenPort = listenerPort; > connectorPort = networkPort; > } > > public void run() { > try { > brokerService = new BrokerService(); > brokerService.setBrokerName(name); > brokerService.setUseJmx(false); > brokerService.setPersistenceAdapter(new > MemoryPersistenceAdapter()); > > NetworkConnector network2 = new > DiscoveryNetworkConnector(new > java.net.URI("static:(tcp://localhost:" + connectorPort + ")")); > network2.setName("network-" + name); > network2.setDynamicOnly(false); > network2.setNetworkTTL(2); > network2.setPrefetchSize(1); > > brokerService.addNetworkConnector(network2); > > brokerService.addConnector("tcp://0.0.0.0:" + > listenPort); > brokerService.start(); > > } > catch (Exception e) { > e.printStackTrace(); > } > } > > public void stop() { > try { > if (brokerService != null) { > brokerService.stop(); > } > } catch (Throwable t) { > t.printStackTrace(); > } > } > } > > } > > > I changed the order around a little bit for the producer. However, I think > the main difference was this: > > Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); > > This code will not work if you specify true for the first arg. I'm not sure, > but I think it's because this example is using the in memory persistence > adapter... > > HTH, > > Mike L (aka patzerbud) > > -- > View this message in context: > http://old.nabble.com/Network-of-Brokers-tp28269405p28282467.html > Sent from the ActiveMQ - User mailing list archive at Nabble.com. > >