I did not see this problem with the admittedly inefficient test class
attached (which is only the consumer part).
Thanks,
Aaron
On Fri, May 16, 2008 at 6:17 PM, jydev <[EMAIL PROTECTED]> wrote:
>
> Hello,
>
> Getting the following error with VirtualTopic on 5.1, when there are more
> than one topic subscribers.
>
> ERROR Service - Async error occurred:
> java.lang.ClassCastException: org.apache.activemq.broker.region.Topic cannot
> be cast to org.apache.activemq.broker.region.Queue
> java.lang.ClassCastException: org.apache.activemq.broker.region.Topic cannot
> be cast to org.apache.activemq.broker.region.Queue
> at
> org.apache.activemq.broker.region.QueueSubscription.acknowledge(QueueSubscription.java:50)
> at
> org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:224)
> at
> org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:364)
> at
> org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:470)
> at
> org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
> at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:73)
> at
> org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:73)
> at
> org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:84)
> at
> org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:443)
> at org.apache.activemq.command.MessageAck.visit(MessageAck.java:196)
> at
> org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:292)
> at
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:180)
> at
> org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68)
> at
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:143)
> at
> org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:206)
> at
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
> at
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:196)
> at
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:183)
> at java.lang.Thread.run(Thread.java:619)
>
> This seems to be already captured as an issue:
> https://issues.apache.org/activemq/browse/AMQ-1687
>
> Any idea when there will be a patch for this? Is there a work around?
>
> It seems like the messages are getting to the listeners ok even though
> broker is spitting out the errors above. But I want to make sure that there
> would be no weird side-effects due to the error.
>
> Thanks you in advance
> jydev
> --
> View this message in context:
> http://www.nabble.com/ClassCastException-with-VirtualTopic-on-5.1-tp17285256s2354p17285256.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>
>
package training;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Connection;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.MessageConsumer;
import javax.jms.Destination;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* Used to start clients to read a number of messages from a topic or queue.
*/
public class GenericConsumer {
private final static String DEFAULT_ACTIVEMQ_URL = "tcp://localhost:61616";
private ConnectionFactory factory;
public void initialize() {
factory = new ActiveMQConnectionFactory(DEFAULT_ACTIVEMQ_URL);
}
public void receiveMessage(String destName, boolean isQueue, String
clientID) throws JMSException {
Connection connection = null;
Session session = null;
MessageConsumer consumer = null;
try {
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination dest = isQueue ? session.createQueue(destName) :
session.createTopic(destName);
consumer = session.createConsumer(dest);
connection.start();
TextMessage message = (TextMessage) consumer.receive(5000);
if(message != null) {
System.out.println(clientID+" Received message on destination
"+destName+" with ID "+message.getJMSMessageID());
} else {
System.out.println(clientID+" Consumer timed out on destination
"+destName+"; no message received.");
}
connection.stop();
} finally {
if(consumer != null) try {consumer.close();}catch(JMSException e) {}
if(session != null) try {session.close();}catch(JMSException e) {}
if(connection != null) try {connection.close();}catch(JMSException
e) {}
}
}
/**
* Starts a new thread to process messages from the given destination.
*/
public static void launchConsumer(final String destName, final boolean
isQueue, final String clientID, final int messageCount) {
Thread t = new Thread() {
public void run() {
GenericConsumer consumer = new GenericConsumer();
consumer.initialize();
System.out.println(clientID+" consumer started.");
try {
for(int i=0; i<messageCount; i++) {
consumer.receiveMessage(destName, isQueue, clientID);
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(clientID+" consumer FINISHED.");
}
};
t.start();
}
public static void main(String[] args) {
// Start 2 consumers to read off the queue normally
launchConsumer("Consumer.Foo.VirtualTopic.Test", true, "Client 1", 10);
launchConsumer("Consumer.Foo.VirtualTopic.Test", true, "Client 2", 20);
// Start 2 consumers to read off the queue normally
launchConsumer("Consumer.Bar.VirtualTopic.Test", true, "ClientB1", 10);
launchConsumer("Consumer.Bar.VirtualTopic.Test", true, "ClientB2", 20);
}
}