[ https://issues.apache.org/jira/browse/QPID-5912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Keith Wall updated QPID-5912: ----------------------------- Assignee: Rob Godfrey (was: Keith Wall) > "Unable to find message with id" exception will result if message is > delivered straight through and transport exception occurs during send to > consumer > ------------------------------------------------------------------------------------------------------------------------------------------------------ > > Key: QPID-5912 > URL: https://issues.apache.org/jira/browse/QPID-5912 > Project: Qpid > Issue Type: Bug > Components: Java Broker > Reporter: Keith Wall > Assignee: Rob Godfrey > Priority: Critical > Fix For: 0.29 > > Attachments: qpid_after_fix.log, qpid_before_fix.log > > > If a message takes the straight through delivery path through the Broker > (that is, where the IO thread that accepts the message from the publisher is > the same thread that delivers the message to the consumer), and the send to > the consumer fails with exception (for instance, a sender timeout exception), > the connection to the consumer is rightfully closed. However, when the > consumer reconnects, the Broker sends the message to the consumer > successfully, but will suffer an "Unable to find message with id" store > exception when the consumers tries to or ack the message/commit. > The issue is that mishandling of the sendertimeout exception has meant the > the Broker has failed to perform *publishing* post delivery tasks so the > message is never committed to disk. > Reproduction > # Prepare a JMS publisher application that sends one message, make message > sufficiently large to overwhelm the TCP buffers (I use 100MB). > # Prepare a JMS consumer application to receive the message. > # Start the JMS consumer application, once it has begun to await the message > (MessageConsumer.receive()) either stop the process or put a breakpoint in > IoReceiver#run line 160) > # Start the JMS publisher, it will send the message to the Broker > # Broker will choose the straight through path > # Broker's send to consumer will timeout (stack 1 below) > # JMS consumer will be disconnected > # Restart the JMS consumer > # Expected behave message delivered to consumer successfully. Actual > behaviour, Broker reports exception when client tries to ack the message > (stack 2 below) > stack 1 - Broker reports timeout on send to consumer > {noformat} > 2014-07-22 11:32:56,296 ERROR [IoReceiver - /127.0.0.1:57756] (io.IoSender) - > write timed out for socket /127.0.0.1:57748: head -2146434251, tail > -2146958539 > 2014-07-22 11:32:56,297 INFO [IoReceiver - /127.0.0.1:57748] > (subscription.close) - [Broker] [sub:7(vh(/default)/qu(myqueue)] SUB-1002 : > Close > 2014-07-22 11:32:56,297 ERROR [IoReceiver - /127.0.0.1:57756] > (v0_10.ServerSessionDelegate) - Exception processing command > org.apache.qpid.transport.SenderException: write timed out for socket > /127.0.0.1:57748: head -2146434251, tail -2146958539 > at > org.apache.qpid.transport.SenderException.rethrow(SenderException.java:49) > at org.apache.qpid.transport.Session.invoke(Session.java:784) > at > org.apache.qpid.server.protocol.v0_10.ServerSession.sendMessage(ServerSession.java:263) > at > org.apache.qpid.server.protocol.v0_10.ConsumerTarget_0_10.send(ConsumerTarget_0_10.java:304) > at > org.apache.qpid.server.queue.QueueConsumerImpl.send(QueueConsumerImpl.java:476) > at > org.apache.qpid.server.queue.AbstractQueue.deliverMessage(AbstractQueue.java:1117) > at > org.apache.qpid.server.queue.AbstractQueue.deliverToConsumer(AbstractQueue.java:1041) > at > org.apache.qpid.server.queue.AbstractQueue.access$200(AbstractQueue.java:91) > at > org.apache.qpid.server.queue.AbstractQueue$4.run(AbstractQueue.java:985) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:356) > at > org.apache.qpid.server.queue.AbstractQueue.doEnqueue(AbstractQueue.java:941) > at > org.apache.qpid.server.queue.AbstractQueue.enqueue(AbstractQueue.java:888) > at > org.apache.qpid.server.exchange.AbstractExchange$2.postCommit(AbstractExchange.java:535) > at > org.apache.qpid.server.protocol.v0_10.ServerSession$AsyncCommand.complete(ServerSession.java:971) > at > org.apache.qpid.server.protocol.v0_10.ServerSession.completeAsyncCommands(ServerSession.java:916) > at > org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:103) > at > org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:77) > at org.apache.qpid.transport.Method.delegate(Method.java:159) > at org.apache.qpid.transport.Session.received(Session.java:596) > at org.apache.qpid.transport.Connection.dispatch(Connection.java:439) > at > org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:64) > at > org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:40) > at > org.apache.qpid.transport.MethodDelegate.messageTransfer(MethodDelegate.java:113) > {noformat} > stack 2 - broker fail to ack message after client reconnects > {noformat} > 2014-07-22 11:36:29,044 ERROR [IoReceiver - /127.0.0.1:57806] > (util.ServerScopedRuntimeException) - Unable to find message with id 3 on > queue myqueue with id 63798825-4070-47df-8187-b58328a3d942 > 2014-07-22 11:36:29,044 ERROR [IoReceiver - /127.0.0.1:57806] > (v0_10.ServerSessionDelegate) - Exception processing command > org.apache.qpid.server.store.StoreException: Unable to find message with id 3 > on queue myqueue with id 63798825-4070-47df-8187-b58328a3d942 > at > org.apache.qpid.server.store.AbstractJDBCMessageStore.dequeueMessage(AbstractJDBCMessageStore.java:649) > at > org.apache.qpid.server.store.AbstractJDBCMessageStore.access$300(AbstractJDBCMessageStore.java:49) > at > org.apache.qpid.server.store.AbstractJDBCMessageStore$JDBCTransaction.dequeueMessage(AbstractJDBCMessageStore.java:1166) > at > org.apache.qpid.server.txn.AsyncAutoCommitTransaction.dequeue(AsyncAutoCommitTransaction.java:104) > at > org.apache.qpid.server.protocol.v0_10.ServerSession.acknowledge(ServerSession.java:457) > at > org.apache.qpid.server.protocol.v0_10.ExplicitAcceptDispositionChangeListener.onAccept(ExplicitAcceptDispositionChangeListener.java:46) > at > org.apache.qpid.server.protocol.v0_10.ServerSession$3.performAction(ServerSession.java:283) > at > org.apache.qpid.server.protocol.v0_10.ServerSession.dispositionChange(ServerSession.java:372) > at > org.apache.qpid.server.protocol.v0_10.ServerSession.accept(ServerSession.java:279) > at > org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.messageAccept(ServerSessionDelegate.java:128) > at > org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.messageAccept(ServerSessionDelegate.java:77) > at > org.apache.qpid.transport.MessageAccept.dispatch(MessageAccept.java:87) > at > org.apache.qpid.transport.SessionDelegate.command(SessionDelegate.java:55) > at > org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:94) > at > org.apache.qpid.server.protocol.v0_10.ServerSessionDelegate.command(ServerSessionDelegate.java:77) > {noformat} > -- This message was sent by Atlassian JIRA (v6.2#6252) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org