Hi All
I'm experimenting with transactions. They work fine for QueueSessions, but
whenever I roll back a transaction on a TopicSession, I get the following
exception:
ERROR Service - Async error occurred:
javax.jms.JMSException: Invalid acknowledgment: MessageAck {commandId = 22,
response
75-0:0:1:1:5, destination = topic://neil.MyTopic, transactionId = null,
messageCount = 1}
javax.jms.JMSException: Invalid acknowledgment: MessageAck {commandId = 22,
responseRequired = false, ackType = 3, consumerId = ID:briltp0073-113
Id = null, messageCount = 1}
at
org.apache.activemq.broker.region.TopicSubscription.acknowledge(TopicSubscription.java:217)
at
org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:340)
at
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:427)
at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:73)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:73)
at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:87)
at
org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:440)
at org.apache.activemq.command.MessageAck.visit(MessageAck.java:196)
at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67)
at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:134)
at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:138)
at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:185)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:172)
at java.lang.Thread.run(Thread.java:619)
Has anyone experienced this?
Here's the code that results in the exception. It is as simple as I can
make it. I've stared and stared at it and I cannot work out what is causing
the exception.
package neil.transactions;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class JMSTransactedTopicListen {
public static class MessageHandler implements MessageListener{
Session session;
int count=0;
public MessageHandler(Session aSession){
session = aSession;
}
@Override
public void onMessage(Message msg) {
try {
if (++count % 3 == 0){
System.out.println("ROLLBACK message ID
"+msg.getJMSMessageID());
session.rollback();
return;
}
} catch (JMSException e){
e.printStackTrace();
return;
}
try{
System.out.println("COMMIT message ID
"+msg.getJMSMessageID());
session.commit();
} catch (JMSException e){
e.printStackTrace();
}
}
}
/**
* @param args
*/
public static void main(String[] args) {
System.out.println("JMSTransactedTopicListen::main()");
try {
Context context = new InitialContext();
java.util.Hashtable<?,?> myEnvironment =
context.getEnvironment();
if
(myEnvironment.containsKey("java.naming.provider.url"))
System.out.println("Connecting to url:
"+myEnvironment.get("java.naming.provider.url"));
TopicConnectionFactory tcFactory =
(TopicConnectionFactory)
context.lookup("ConnectionFactory");
TopicConnection topicConn =
tcFactory.createTopicConnection();
TopicSession tcSession =
topicConn.createTopicSession(true, 0);
System.out.println("Consuming topic: " +
context.lookup("NeilTopic"));
Topic myTopic = (Topic)context.lookup("NeilTopic");
TopicSubscriber subscriber =
tcSession.createSubscriber(myTopic);
subscriber.setMessageListener(new
MessageHandler(tcSession));
System.out.println("Starting topicConn");
topicConn.start();
System.out.println("JMSTransactedTopicListen::main()
started OK -
sleeping....");
Thread.sleep(60 * 60 * 100);
System.out.println("JMSTransactedTopicListen::main()
... finished
sleeping...");
tcSession.close();
topicConn.close();
} catch (NamingException e){
e.printStackTrace();
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("JMSTransactedTopicListen::main() Done");
}
}
--
View this message in context:
http://www.nabble.com/javax.jms.JMSException%3A-Invalid-acknowledgement-after-rollback-of-TopicSession-tp15805155s2354p15805155.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.