Hi all,
I'm just trying to evaluate the JMSXGroupID functionality in ActiveMQ 4.0.2.
But seems to not been working as described on activeMQ page (
http://activemq.apache.org/message-groups.html).
This page says :
When a message is being dispatched to a consumer, the JMSXGroupID is
checked. If one is present then the broker checks to see if a consumer owns
that message group. (Since there could be a massive number of individual
message groups we use hash buckets rather than the actual JMSXGroupID
string).
If no consumer is associated with a message group a consumer is chosen.
That JMS MessageConsumer will receive all further messages with the same
JMSXGroupID value until
- the consumer closes (or the client which created the consumer dies
etc)
- someone closes the message group by sending a message with a
JMSXGroupSeq value of zero (see below for more details)
So I assume (*correct me I'm wrong*):
- if I have two consumers on the same topic, only one consumer should
receive messages published on that topic at any time
- This does not seem to work. In the following example, when 2
consumers are started, both receive all messages on the topic.
- I don't need to define selector on my consumers to get one focus on
a particular group.
- This seems to be broken also. If I don't define a selector on
consumers they all receive all messages from the topic.
*Any help appreciated.*
TIA
Harry,
*package* com.test;
*import* javax.jms.Connection;
*import* javax.jms.DeliveryMode;
*import* javax.jms.Destination;
*import* javax.jms.JMSException;
*import* javax.jms.MessageProducer;
*import* javax.jms.Session;
*import* javax.jms.Topic;
*import* org.apache.activemq.ActiveMQConnectionFactory;
*import* org.apache.activemq.command.ActiveMQTextMessage;
*public* *class* GreetingCardFromHolidays {
*private* String where;
*private* *int* holidaysDelay;
Connection connection;
*private* Session session;
*private* Topic destination;
*private* MessageProducer producer;
*private* *static* *class* CardSender *implements* Runnable {
*private* GreetingCardFromHolidays from;
CardSender(GreetingCardFromHolidays from) {
*this*.from = from;
}
*public* *void* run() {
*for* (; *this*.from.holidaysDelay > 0; *this*.from.
holidaysDelay--) {
*this*.from.send();
*try* {
Thread.*sleep*(2000);
} *catch* (InterruptedException e) {
e.printStackTrace();
} *finally* {
// Thread.currentThread().notifyAll();
}
}
}
}
GreetingCardFromHolidays(String where, *int* holidaysDelay) {
*this*.where = where;
*this*.holidaysDelay = holidaysDelay;
*this*.init();
}
*public* *static* *void* main(String[] args) {
*final* GreetingCardFromHolidays tanganika =
*new*GreetingCardFromHolidays(
"Tanganika", 2 * 7);
*final* GreetingCardFromHolidays victoria =
*new*GreetingCardFromHolidays(
"Victoria", 4 * 7);
*new* Thread(*new* CardSender(tanganika), tanganika.where
).start();
*new* Thread(*new* CardSender(victoria), victoria.where
).start();
}
*private* *void* send() {
*try* {
// Create a Session
Session session = connection.createSession(*false*,
Session.*AUTO_ACKNOWLEDGE*);
// Create the destination (Topic or Queue)
Destination destination = session.createTopic("
GONE.FISHING");
// Create a MessageProducer from the Session to the
Topic or Queue
MessageProducer producer = session.createProducer
(destination);
// producer.setTimeToLive(10000);
producer.setDeliveryMode(DeliveryMode.*PERSISTENT*);
// Create a message
String text = "Hello my friend! From [" + where
+ "]. I've gone fishing since [" + *this*.
holidaysDelay
+ "] days.";
ActiveMQTextMessage message = (ActiveMQTextMessage) *
this*.session
.createTextMessage(text);
message.setStringProperty("JMSXGroupID", *this*.where);
// message.setStringProperty("where",where);
// Tell the producer to send the message
*this*.producer.send(message);
System.*out*.println(*this* + "Sending card :[" + text +
"]. GroupID ["
+ message.getGroupID() + "]");
} *catch* (Exception e) {
System.*out*.println("Caught: " + e);
e.printStackTrace();
}
}
*private* *void* init() {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory =
*new*ActiveMQConnectionFactory(
"tcp://localhost:61616");
// Create a Connection
*try* {
*this*.connection = connectionFactory.createConnection(*
this*.where,
*this*.where);
connection.setClientID(*this*.where);
connection.start();
// Create a Session
*this*.session = connection.createSession(*false*,
Session.*AUTO_ACKNOWLEDGE*);
// Create the destination (Topic or Queue)
*this*.destination = session.createTopic("GONE.FISHING"
);
// Create a MessageProducer from the Session to the
Topic or Queue
*this*.producer = session.createProducer(destination);
// producer.setTimeToLive(10000);
*this*.producer.setDeliveryMode(DeliveryMode.*PERSISTENT
*);
} *catch* (JMSException e) {
e.printStackTrace();
}
}
*protected* *void* finalize() *throws* Throwable {
System.*out*.println("Finalizing [" + *this* + "]");
// Clean up
*if* (*this*.session != *null*)
session.close();
*if* (*this*.connection != *null*)
*this*.connection.close();
*super*.finalize();
}
*public* String toString() {
*return* "On holiday :: " + *this*.where;
}
}
*package* com.test;
*import* javax.jms.Connection;
*import* javax.jms.Destination;
*import* javax.jms.JMSException;
*import* javax.jms.Message;
*import* javax.jms.MessageConsumer;
*import* javax.jms.MessageListener;
*import* javax.jms.Session;
*import* javax.jms.TextMessage;
*import* org.apache.activemq.ActiveMQConnectionFactory;
*public* *class* GreetingsCardWaiter {
*private* String where;
Connection connection;
Session session;
*private* *static* *class* CardReceiver *implements* Runnable{
*private* GreetingsCardWaiter from;
CardReceiver(GreetingsCardWaiter from){
*this*.from=from;
}
*public* *void* run(){
*this*.from.waitForAcard();
}
}
GreetingsCardWaiter(String where){
*this*.where = where;
*this*.init();
}
*public* *static* *void* main(String[] args) {
*new* Thread(*new*
CardReceiver(*new*GreetingsCardWaiter(args[0])), args[0]).start();
}
*private* *void* waitForAcard(){
*try* {
// Create the destination (Topic or Queue)
Destination destination = session.createTopic("
GONE.FISHING");
// Create a MessageProducer from the Session to the
Topic or Queue
MessageConsumer consumer = session
.createConsumer(destination);
*final* String receiver = *this*.toString();
consumer.setMessageListener(*new* MessageListener(){
*public* *void* onMessage(Message arg0) {
*try* {
System.*out*.println(receiver + " >>
COOL!!! I've received a card from:["+arg0.getStringProperty("JMSXGroupID")+
"]."+System.*getProperty*("line.separator")+" Card message is "
+((TextMessage)arg0).getText());
} *catch* (JMSException e) {
// *TODO* Auto-generated catch block
e.printStackTrace();
}
}
});
*this*.connection.start();
System.*out*.println(receiver + " starts waiting for new
Cards. Selector is ["+consumer.getMessageSelector()+"]");
// Clean up
} *catch* (Exception e) {
System.*out*.println("Caught: " + e);
e.printStackTrace();
}
}
*private* *void* init() {
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory =
*new*ActiveMQConnectionFactory(
"tcp://dmc17525:61616");
// Create a Connection
*try* {
*this*.connection = connectionFactory.createConnection(*
this*.where, *this*.where);
connection.setClientID("Waiting on -" + *this*.where + "
"+ System.*currentTimeMillis*());
// Create a Session
session = connection.createSession(*false*,
Session.*AUTO_ACKNOWLEDGE*);
} *catch* (JMSException e) {
e.printStackTrace();
}
}
*protected* *void* finalize() *throws* Throwable {
System.*out*.println("Finalizing [" + *this* +"]");
*if*(*this*.session!=*null*)session.close();
*if*(*this*.connection!=*null*) *this*.connection
.close();
*super*.finalize();
}
*public* String toString(){
*return* "Card receiver :: " + *this*.where;
}
}