Many thanks! I've added your patch to svn trunk and have got the test
case working (by tweaking the wildcard slightly and adding a pause
between sending the messages and asserting they are consumed). Yay!
For now I've made the code add a VirtualTopicBroker by default -
unless its disabled. We may want to get more clever going forward by
allowing virtual topics queues to be configured via XML in the
broker.xml file.
See if SVN trunk is working for you now - from my initial testing it
looks OK to me
On 7/21/06, bmadigan [EMAIL PROTECTED] wrote:
Thanks James, that test case works for me too. I wrote a use case that (I
think) covers the base Virtual Topic functionality. There is a problem
somewhere that causes this test to fail. Running it in debug I can see that
the Message is dispatched, but not delivered for some reason. Most of the
internals for virtual topics seem to be working fine though, so thats good
news. If you run the test case below, you can see that the MessageListeners
on the queue don't get any messages. There is some additional code to add
the VirtualTopicBroker to the interceptor chain in BrokerService (or it can
be added as a plugin).
Test case:
package org.apache.activemq.usecases;
import org.apache.log4j.Logger;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Message;
public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
private Connection connection;
public void testVirtualTopicCreation( )throws Exception{
if(connection == null){
connection = createConnection();
}
String queueAName = ActiveMQ.Virtual.A.TEST;
//create consumer 'cluster'
ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer c1 = session.createConsumer(queue1);
MessageConsumer c2 = session.createConsumer(queue2);
MessageCountListener exclusive1 = new MessageCountListener();
c1.setMessageListener(exclusive1);
MessageCountListener exclusive2 = new MessageCountListener();
c2.setMessageListener(exclusive2);
//create topic producer
MessageProducer producer =
session.createProducer(new ActiveMQTopic(TEST));
assertNotNull(producer);
int total = 10;
for(int i = 0; i total; i++){
producer.send(session.createTextMessage(xx));
}
int delivered = exclusive1.getCount( ) exclusive2.getCount();
assertTrue(Expected +total+ delivered, found +delivered,
delivered == total);
}
class MessageCountListener implements MessageListener{
private int count = 0;
public void onMessage(Message m){
System.out.println(Got one! +count);
count++;
}
public int getCount(){
return count;
}
}
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
super.tearDown();
}
the Broker:
package org.apache.activemq.broker;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;
import java.util.Iterator;
import java.util.Set;
public class VirtualTopicBroker
extends BrokerFilter
implements BrokerPlugin {
public static final String VIRTUAL_WILDCARD = ActiveMQ.Virtual.*;
public VirtualTopicBroker(Broker next) {
super(next);
}
public VirtualTopicBroker() {
super(null);
}
public void send(ConnectionContext ctx,
Message message) throws Exception {
String name = message.getDestination().getPhysicalName();
String virtualName = VIRTUAL_WILDCARD+ name;
Set destinations = getDestinations(
new ActiveMQQueue(virtualName));
for (Iterator iter = destinations.iterator();
iter.hasNext();) {
Destination dest = (Destination) iter.next();
dest.send(ctx, message);
}
next.send(ctx, message);
}
public Broker installPlugin(Broker broker) throws Exception {
return new VirtualTopicBroker(broker);
}
}
--
View this message in context:
http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5439965
Sent from the ActiveMQ - Dev forum at Nabble.com.
--
James
---
http://radio.weblogs.com/0112098/