Figured out the problem. I have to do
connector.setNetworkTTL(2);
On Fri, Jun 19, 2015 at 1:52 PM, pubudu gunawardena <[email protected]> wrote:
> Hi All,
>
> I have the following setup. There is a producer which sends messages
> to an embedded broker. There is a consumer that consumes messages from
> another embedded broker. I have created a network of brokers with the
> two embedded brokers connected to the standalone broker. But the
> messages don't get passed from the producer to the consumer. I have
> created an example which shows this behavior. Can someone point out to
> me what I am doing wrong or if this is not possible? Following is the
> code to reproduce what I have mentioned.
>
>
> import java.net.URI;
>
> import javax.jms.Connection;
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.Session;
> import javax.jms.TextMessage;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.activemq.network.DiscoveryNetworkConnector;
> import org.apache.activemq.network.NetworkConnector;
>
> public class Test {
>
> public static void main(String[] args) {
> try {
> startBroker1();
> startBroker2();
> runProducer();
> runConsumer();
>
> } catch (Exception e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
>
> private static void runConsumer() {
> new Thread(new Consumer()).start();
>
> }
>
> private static final class Consumer implements Runnable, MessageListener {
> @Override
> public void run() {
> try {
> ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory("tcp://localhost:61615");
> Connection connection = factory.createConnection();
> connection.start();
> Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> Queue destination = session.createQueue("topic");
> MessageConsumer consumer =
> session.createConsumer(destination);
> consumer.setMessageListener(this);
> } catch (JMSException e) {
> e.printStackTrace();
> }
> }
>
> @Override
> public void onMessage(Message message) {
> try {
> TextMessage text = (TextMessage) message;
> System.out.println("Message is : " + text.getText());
> } catch (JMSException e) {
> e.printStackTrace();
> }
> }
> }
>
> private static void runProducer() {
> new Thread(new Runnable() {
> @Override
> public void run() {
> try {
> ActiveMQConnectionFactory factory = new
> ActiveMQConnectionFactory("tcp://localhost:61617");
> // ActiveMQConnection.DEFAULT_BROKER_URL =
> // failover://tcp://localhost:61616
> Connection connection;
> connection = factory.createConnection();
> connection.start();
> Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> Queue destination = session.createQueue("topic");
> MessageProducer producer =
> session.createProducer(destination);
> TextMessage message = session.createTextMessage();
> message.setText("This is the message");
> producer.send(message);
> System.out.println("Sent: " + message.getText());
> } catch (JMSException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> }
> }).start();
>
> }
>
> private static void startBroker1() throws Exception {
> new Thread(new Runnable() {
> @Override
> public void run() {
> startBroker("broker1", "tcp://localhost:61617");
> }
> }).start();
>
> }
>
> private static void startBroker2() throws Exception {
> new Thread(new Runnable() {
> @Override
> public void run() {
> startBroker("broker2", "tcp://localhost:61615");
> }
> }).start();
> }
>
> private static void startBroker(String name, String uri) {
> try {
> BrokerService broker = new BrokerService();
> broker.setBrokerName(name);
> broker.addConnector(uri);
> NetworkConnector connector = new
> DiscoveryNetworkConnector(new URI("static://" +
> "tcp://localhost:61616"));
> connector.setDuplex(true);
> broker.addNetworkConnector(connector);
> broker.start();
> } catch (Exception e) {
> e.printStackTrace();
> }
>
> }
>
> }
>
>
> --
> Thanks,
> Pubudu
--
Thanks,
Pubudu