This is almost working, there are a few things I need to fix:
- Need to figure out how to add the new Broker to the factory without using
the plugin loader
- It may not be a problem, but I'm synchronizing on next when I create the
queues for the virtual groups in addConsumer(). This could be finer grained
I think.
- I'm calling ConsumerInfo.setDestination(virtualQueue) to point the
consumer to a virtual queue. This is probably incorrect, not sure if there
is a better way.
- The virtual queues can't provide subscription recovery. Not sure how to
handle that.
I created a BrokerFilter subclass which overrides addConsumer() and send():
public Subscription addConsumer(ConnectionContext cc,
ConsumerInfo ci) throws Exception {
synchronized(next){
String name = ci.getDestination().getPhysicalName();
if(name.startsWith(VIRTUAL)){
Set destinations = getDestinations(
new ActiveMQQueue(name));
if(destinations.size()==0){//create a new virtual queue
ActiveMQQueue queue = new ActiveMQQueue(
name+"?consumer.exclusive=true");
next.addDestination(cc,queue);
ci.setDestination(queue);
}else{ //queue exists, add the consumer
ActiveMQQueue queue = (ActiveMQQueue)
destinations.iterator().next();
ci.setDestination(queue);
}
}
}
return next.addConsumer(cc, ci);
}
public void send(ConnectionContext ctx,
Message message) throws Exception {
String topic = message.getDestination().getPhysicalName();
Iterator destinations = getDestinations(
new ActiveMQQueue(VIRTUAL + ".*." + topic)).iterator();
while(destinations.hasNext()){
Destination dest = (Destination) destinations.next();
dest.send(ctx, message);
}
next.send(ctx, message);
}
Except for the subscription recovery part, this seems to work.
--
View this message in context:
http://www.nabble.com/Failover-topic-subscribers-tf1896829.html#a5317425
Sent from the ActiveMQ - User forum at Nabble.com.