Re: Virtual Topics (was Re: Failover topic subscribers)

2006-07-24 Thread James Strachan

Many thanks! I've added your patch to svn trunk and have got the test
case working (by tweaking the wildcard slightly and adding a pause
between sending the messages and asserting they are consumed). Yay!

For now I've made the code add a VirtualTopicBroker by default -
unless its disabled. We may  want to get more clever going forward by
allowing virtual topics  queues to be configured via XML in the
broker.xml file.

See if SVN trunk is working for you now - from my initial testing it
looks OK to me

On 7/21/06, bmadigan [EMAIL PROTECTED] wrote:


Thanks James, that test case works for me too. I wrote a use case that (I
think) covers the base Virtual Topic functionality. There is a problem
somewhere that causes this test to fail. Running it in debug I can see that
the Message is dispatched, but not delivered for some reason. Most of the
internals for virtual topics seem to be working fine though, so thats good
news. If you run the test case below, you can see that the MessageListeners
on the queue don't get any messages.  There is some additional code to add
the VirtualTopicBroker to the interceptor chain in BrokerService (or it can
be added as a plugin).

Test case:

package org.apache.activemq.usecases;

import org.apache.log4j.Logger;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;

import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Message;

public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {

private Connection connection;

public void testVirtualTopicCreation( )throws Exception{
if(connection == null){
connection = createConnection();
}

String queueAName  = ActiveMQ.Virtual.A.TEST;
//create consumer 'cluster'
ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);

Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
MessageConsumer c1 = session.createConsumer(queue1);
MessageConsumer c2 =  session.createConsumer(queue2);

MessageCountListener exclusive1 = new MessageCountListener();
c1.setMessageListener(exclusive1);

MessageCountListener exclusive2 = new MessageCountListener();
c2.setMessageListener(exclusive2);

//create topic producer
MessageProducer producer =
session.createProducer(new ActiveMQTopic(TEST));
assertNotNull(producer);

int total = 10;
for(int i = 0; i  total; i++){
producer.send(session.createTextMessage(xx));
}

int delivered = exclusive1.getCount( )  exclusive2.getCount();
assertTrue(Expected +total+ delivered, found +delivered,
delivered == total);

}

class MessageCountListener implements MessageListener{

private int count = 0;

public void onMessage(Message m){
System.out.println(Got one! +count);
count++;
}

public int getCount(){
return count;
}
}

protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
super.tearDown();
}


the Broker:

package org.apache.activemq.broker;

import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;

import java.util.Iterator;
import java.util.Set;


public class VirtualTopicBroker
extends BrokerFilter
implements BrokerPlugin {

public static final String VIRTUAL_WILDCARD = ActiveMQ.Virtual.*;

public VirtualTopicBroker(Broker next) {
super(next);
}

public VirtualTopicBroker() {
super(null);
}

public void send(ConnectionContext ctx,
 Message message) throws Exception {

String name = message.getDestination().getPhysicalName();

String virtualName = VIRTUAL_WILDCARD+ name;

Set destinations = getDestinations(
new ActiveMQQueue(virtualName));

for (Iterator iter = destinations.iterator();
 iter.hasNext();) {
Destination dest = (Destination) iter.next();
dest.send(ctx, message);
}
next.send(ctx, message);
}

public Broker installPlugin(Broker broker) throws Exception {
return new VirtualTopicBroker(broker);
}
}


--
View this message in context: 
http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5439965
Sent from the ActiveMQ - Dev forum at Nabble.com.





--

James
---
http://radio.weblogs.com/0112098/


Re: Virtual Topics (was Re: Failover topic subscribers)

2006-07-19 Thread James Strachan

OK I've just added a test case that seems to show that the
destinations are being created lazily when a consumer is added...

http://svn.apache.org/repos/asf/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NewConsumerCreatesDestinationTest.java

I've tried to simulate the kind of code you'll be calling the virtual
topics stuff - namely using the
broker.getDestination(topicSubscriberWildcard) to find all the virtual
topic subscriber queues etc and it seems to work for me using SVN
trunk.


On 7/20/06, James Strachan [EMAIL PROTECTED] wrote:

On 7/19/06, bmadigan [EMAIL PROTECTED] wrote:

 Is this the change to call lookup(...) in addConsumer( )?

Yes

 Looks like its not
 being called.

Damn - sorry about that. I knew I should have taken the time to write
a test case :). Lemme see if I get chance to write a little test case
today

 I'd like to run the broker in debug, I'm having trouble
 figuring out where exactly the lazy creation of destinations is happening.

It should be the lookup() which calls addDestination() I think. More
in a little while...

--

James
---
http://radio.weblogs.com/0112098/




--

James
---
http://radio.weblogs.com/0112098/


Re: Virtual Topics (was Re: Failover topic subscribers)

2006-07-18 Thread bmadigan


James.Strachan wrote:
 
 Incidentally in the example you give, was the consumer adding a
 consumer to a temporary queue?
 

Hmm, not intentionally, I'm not sure. I change the subscriber from a Topic
type to a Queue using the new Virtual prefix. When it was a Topic, I could
see the Virtual consumer being added in addConsumer(..), but after changing
it to a Queue type, addConsumer() was only being called for the advisory
topic. A little confusing. I'm not going to worry about it..
-- 
View this message in context: 
http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5382589
Sent from the ActiveMQ - Dev forum at Nabble.com.