Hi All,
I have the following setup. There is a producer which sends messages
to an embedded broker. There is a consumer that consumes messages from
another embedded broker. I have created a network of brokers with the
two embedded brokers connected to the standalone broker. But the
messages don't get passed from the producer to the consumer. I have
created an example which shows this behavior. Can someone point out to
me what I am doing wrong or if this is not possible? Following is the
code to reproduce what I have mentioned.
import java.net.URI;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
public class Test {
public static void main(String[] args) {
try {
startBroker1();
startBroker2();
runProducer();
runConsumer();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static void runConsumer() {
new Thread(new Consumer()).start();
}
private static final class Consumer implements Runnable, MessageListener {
@Override
public void run() {
try {
ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61615");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue("topic");
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(this);
} catch (JMSException e) {
e.printStackTrace();
}
}
@Override
public void onMessage(Message message) {
try {
TextMessage text = (TextMessage) message;
System.out.println("Message is : " + text.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
private static void runProducer() {
new Thread(new Runnable() {
@Override
public void run() {
try {
ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("tcp://localhost:61617");
// ActiveMQConnection.DEFAULT_BROKER_URL =
// failover://tcp://localhost:61616
Connection connection;
connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue("topic");
MessageProducer producer =
session.createProducer(destination);
TextMessage message = session.createTextMessage();
message.setText("This is the message");
producer.send(message);
System.out.println("Sent: " + message.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
}
private static void startBroker1() throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
startBroker("broker1", "tcp://localhost:61617");
}
}).start();
}
private static void startBroker2() throws Exception {
new Thread(new Runnable() {
@Override
public void run() {
startBroker("broker2", "tcp://localhost:61615");
}
}).start();
}
private static void startBroker(String name, String uri) {
try {
BrokerService broker = new BrokerService();
broker.setBrokerName(name);
broker.addConnector(uri);
NetworkConnector connector = new
DiscoveryNetworkConnector(new URI("static://" +
"tcp://localhost:61616"));
connector.setDuplex(true);
broker.addNetworkConnector(connector);
broker.start();
} catch (Exception e) {
e.printStackTrace();
}
}
}
--
Thanks,
Pubudu