Hi Again, all Solved my own problem, but I've left this in just in case others make the same mistake!
I should have been using a durable subscriber. As soon as I corrected the code to the following, ActiveMQ stopped generating the exceptions: package neil.transactions; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; public class JMSTransactedTopicListen { public static class MessageHandler implements MessageListener{ Session session; int count=0; public MessageHandler(Session aSession){ session = aSession; } @Override public void onMessage(Message msg) { try { if (++count % 3 == 0){ System.out.println("ROLLBACK message ID "+msg.getJMSMessageID()); session.rollback(); return; } } catch (JMSException e){ e.printStackTrace(); return; } try{ System.out.println("COMMIT message ID "+msg.getJMSMessageID()); session.commit(); } catch (JMSException e){ e.printStackTrace(); } } } /** * @param args */ public static void main(String[] args) { String clientId = "TRANSACTED_TOPIC_CLIENT_ID"; System.out.println("JMSTransactedTopicListen::main()"); try { Context context = new InitialContext(); java.util.Hashtable<?,?> myEnvironment = context.getEnvironment(); if (myEnvironment.containsKey("java.naming.provider.url")) System.out.println("Connecting to url: "+myEnvironment.get("java.naming.provider.url")); ConnectionFactory tcFactory = (ConnectionFactory) context.lookup("ConnectionFactory"); Connection topicConn = tcFactory.createConnection(); topicConn.setClientID(clientId); Session tcSession = topicConn.createSession(true, 0); System.out.println("Consuming topic: " + context.lookup("NeilTopic")); Topic myTopic = (Topic)context.lookup("NeilTopic"); TopicSubscriber subscriber = tcSession.createDurableSubscriber(myTopic, clientId); subscriber.setMessageListener(new MessageHandler(tcSession)); System.out.println("Starting topicConn"); topicConn.start(); System.out.println("JMSTransactedTopicListen::main() started OK - sleeping...."); Thread.sleep(60 * 60 * 100); System.out.println("JMSTransactedTopicListen::main() ... finished sleeping..."); tcSession.close(); topicConn.close(); } catch (NamingException e){ e.printStackTrace(); } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("JMSTransactedTopicListen::main() Done"); } } neilwilk wrote: > > Hi All > > I'm experimenting with transactions. They work fine for QueueSessions, > but whenever I roll back a transaction on a TopicSession, I get the > following exception: > > ERROR Service - Async error occurred: > javax.jms.JMSException: Invalid acknowledgment: MessageAck {commandId = > 13, responseRequired = false, ackType = 3, consumerId = > ID:briltp0073-2889-1204555304234-0:0:1:1, firstMessageId = null, > lastMessageId = ID:briltp0073-2891-1204555307078-0:0:1:1:3, destination = > topic://neil.MyTopic, transactionId = null, messageCount = 1} > javax.jms.JMSException: Invalid acknowledgment: MessageAck {commandId = > 13, responseRequired = false, ackType = 3, consumerId = > ID:briltp0073-2889-1204555304234-0:0:1:1, firstMessageId = null, > lastMessageId = ID:briltp0073-2891-1204555307078-0:0:1:1:3, destination = > topic://neil.MyTopic, transactionId = null, messageCount = 1} > at > org.apache.activemq.broker.region.TopicSubscription.acknowledge(TopicSubscription.java:217) > at > org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:340) > at > org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:427) > at > org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194) > at > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:73) > at > org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:73) > at > org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:87) > at > org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:440) > at > org.apache.activemq.command.MessageAck.visit(MessageAck.java:196) > at > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:281) > at > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:178) > at > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:67) > at > org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:134) > at > org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:138) > at > org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) > at > org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:185) > at > org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:172) > at java.lang.Thread.run(Thread.java:619) > > Has anyone experienced this? > > Here's the code that results in the exception. It is as simple as I can > make it. I've stared and stared at it and I cannot work out what is > causing the exception. > > package neil.transactions; > > import javax.jms.JMSException; > import javax.jms.Message; > import javax.jms.MessageListener; > import javax.jms.Session; > import javax.jms.Topic; > import javax.jms.TopicConnection; > import javax.jms.TopicConnectionFactory; > import javax.jms.TopicSession; > import javax.jms.TopicSubscriber; > import javax.naming.Context; > import javax.naming.InitialContext; > import javax.naming.NamingException; > > public class JMSTransactedTopicListen { > > public static class MessageHandler implements MessageListener{ > Session session; > int count=0; > > public MessageHandler(Session aSession){ > session = aSession; > } > > @Override > public void onMessage(Message msg) { > try { > if (++count % 3 == 0){ > System.out.println("ROLLBACK message ID > "+msg.getJMSMessageID()); > session.rollback(); > return; > } > } catch (JMSException e){ > e.printStackTrace(); > return; > } > > try{ > System.out.println("COMMIT message ID > "+msg.getJMSMessageID()); > session.commit(); > } catch (JMSException e){ > e.printStackTrace(); > } > } > } > > /** > * @param args > */ > public static void main(String[] args) { > System.out.println("JMSTransactedTopicListen::main()"); > try { > Context context = new InitialContext(); > > java.util.Hashtable<?,?> myEnvironment = > context.getEnvironment(); > if > (myEnvironment.containsKey("java.naming.provider.url")) > System.out.println("Connecting to url: > "+myEnvironment.get("java.naming.provider.url")); > > TopicConnectionFactory tcFactory = > (TopicConnectionFactory) > context.lookup("ConnectionFactory"); > TopicConnection topicConn = > tcFactory.createTopicConnection(); > TopicSession tcSession = > topicConn.createTopicSession(true, 0); > > System.out.println("Consuming topic: " + > context.lookup("NeilTopic")); > Topic myTopic = (Topic)context.lookup("NeilTopic"); > TopicSubscriber subscriber = > tcSession.createSubscriber(myTopic); > subscriber.setMessageListener(new > MessageHandler(tcSession)); > > System.out.println("Starting topicConn"); > topicConn.start(); > System.out.println("JMSTransactedTopicListen::main() > started OK - > sleeping...."); > > Thread.sleep(60 * 60 * 100); > > System.out.println("JMSTransactedTopicListen::main() > ... finished > sleeping..."); > > tcSession.close(); > topicConn.close(); > } catch (NamingException e){ > e.printStackTrace(); > } catch (JMSException e) { > e.printStackTrace(); > } catch (InterruptedException e) { > e.printStackTrace(); > } > > System.out.println("JMSTransactedTopicListen::main() Done"); > } > } > > > > -- View this message in context: http://www.nabble.com/javax.jms.JMSException%3A-Invalid-acknowledgement-after-rollback-of-TopicSession-tp15805155s2354p15806371.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.