[ 
https://issues.apache.org/jira/browse/QPID-2693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13012593#comment-13012593
 ] 

Emmanuel Bourg commented on QPID-2693:
--------------------------------------

I've made some tests with session.processed(xfr) and it seems to work better. 
The error no longer occurs after 20 minutes.

However this is a bit bothering, why a programming error at the consumer level 
triggers an error at the producer level? The producer shouldn't be affected, 
otherwise this could be exploited by a malicious consumer to crash the producer.

Also worth noting, with the current code on the trunk (0.11) the broker is 
still functional after the error occurs. Previously it had to be restarted.


> Broker instability with the topic exchange
> ------------------------------------------
>
>                 Key: QPID-2693
>                 URL: https://issues.apache.org/jira/browse/QPID-2693
>             Project: Qpid
>          Issue Type: Bug
>          Components: Java Broker
>    Affects Versions: 0.7, 0.8, 0.9, 0.10
>         Environment: java version "1.6.0_12"
> Java(TM) SE Runtime Environment (build 1.6.0_12-b04)
> Java HotSpot(TM) 64-Bit Server VM (build 11.2-b01, mixed mode)
> Linux 2.6.24-11-pve #1 SMP PREEMPT Fri May 14 09:28:08 CEST 2010 x86_64 
> GNU/Linux
>            Reporter: Emmanuel Bourg
>            Assignee: Robbie Gemmell
>            Priority: Critical
>             Fix For: 0.11
>
>         Attachments: qpid-dump.txt, qpid-log.txt
>
>
> I've noticed an instability of the Java broker when sending a high volume of 
> messages to the topic exchange. The messages are non acked, non durable. 
> After about 15 minutes the messages can no longer be dispatched and the 
> client gets this exception:
> org.apache.qpid.transport.SessionException: timed out waiting for sync: 
> complete = 77824, point = 77825
>         at org.apache.qpid.transport.Session.sync(Session.java:743)
>         at org.apache.qpid.transport.Session.sync(Session.java:712)
>         at org.apache.qpid.transport.Session.invoke(Session.java:672)
>         at org.apache.qpid.transport.Session.invoke(Session.java:518)
>         at 
> org.apache.qpid.transport.SessionInvoker.messageTransfer(SessionInvoker.java:96)
>         at org.apache.qpid.transport.Session.messageTransfer(Session.java:880)
> And in the server log I get these exceptions:
> 2010-06-25 02:48:48,005 [ERROR] Exception thrown and no ProtocolEngine to 
> handle it
> org.apache.qpid.transport.SessionException: timed out waiting for completion
>         at org.apache.qpid.transport.Session.invoke(Session.java:635)
>         at 
> org.apache.qpid.server.transport.ServerSession.sendMessage(ServerSession.java:180)
>         at 
> org.apache.qpid.server.subscription.Subscription_0_10.send(Subscription_0_10.java:573)
>         at 
> org.apache.qpid.server.queue.SimpleAMQQueue.deliverMessage(SimpleAMQQueue.java:715)
>         at 
> org.apache.qpid.server.queue.SimpleAMQQueue.deliverToSubscription(SimpleAMQQueue.java:658)
>         at 
> org.apache.qpid.server.queue.SimpleAMQQueue.enqueue(SimpleAMQQueue.java:611)
>         at 
> org.apache.qpid.server.queue.SimpleAMQQueue.enqueue(SimpleAMQQueue.java:536)
>         at 
> org.apache.qpid.server.transport.ServerSession$1.postCommit(ServerSession.java:157)
>         at 
> org.apache.qpid.server.txn.AutoCommitTransaction.enqueue(AutoCommitTransaction.java:151)
>         at 
> org.apache.qpid.server.transport.ServerSession.enqueue(ServerSession.java:146)
>         at 
> org.apache.qpid.server.transport.ServerSessionDelegate.messageTransfer(ServerSessionDelegate.java:287)
>         at 
> org.apache.qpid.server.transport.ServerSessionDelegate.messageTransfer(ServerSessionDelegate.java:96)
>         at 
> org.apache.qpid.transport.MessageTransfer.dispatch(MessageTransfer.java:103)
>         at 
> org.apache.qpid.transport.SessionDelegate.command(SessionDelegate.java:46)
>         at 
> org.apache.qpid.server.transport.ServerSessionDelegate.command(ServerSessionDelegate.java:110)
>         at 
> org.apache.qpid.server.transport.ServerSessionDelegate.command(ServerSessionDelegate.java:96)
>         at org.apache.qpid.transport.Method.delegate(Method.java:159)
>         at org.apache.qpid.transport.Session.received(Session.java:487)
>         at org.apache.qpid.transport.Connection.dispatch(Connection.java:377)
>         at 
> org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:64)
>         at 
> org.apache.qpid.transport.ConnectionDelegate.handle(ConnectionDelegate.java:40)
>         at 
> org.apache.qpid.transport.MethodDelegate.messageTransfer(MethodDelegate.java:113)
>         at 
> org.apache.qpid.transport.MessageTransfer.dispatch(MessageTransfer.java:103)
>         at 
> org.apache.qpid.transport.ConnectionDelegate.command(ConnectionDelegate.java:54)
>         at 
> org.apache.qpid.transport.ConnectionDelegate.command(ConnectionDelegate.java:40)
>         at org.apache.qpid.transport.Method.delegate(Method.java:159)
>         at org.apache.qpid.transport.Connection.received(Connection.java:342)
>         at org.apache.qpid.transport.Connection.received(Connection.java:55)
>         at org.apache.qpid.transport.network.Assembler.emit(Assembler.java:98)
>         at 
> org.apache.qpid.transport.network.Assembler.assemble(Assembler.java:220)
>         at 
> org.apache.qpid.transport.network.Assembler.frame(Assembler.java:132)
>         at org.apache.qpid.transport.network.Frame.delegate(Frame.java:133)
>         at 
> org.apache.qpid.transport.network.Assembler.received(Assembler.java:103)
>         at 
> org.apache.qpid.transport.network.Assembler.received(Assembler.java:48)
>         at 
> org.apache.qpid.transport.network.InputHandler.next(InputHandler.java:187)
>         at 
> org.apache.qpid.transport.network.InputHandler.received(InputHandler.java:103)
>         at 
> org.apache.qpid.transport.network.InputHandler.received(InputHandler.java:42)
>         at 
> org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:101)
>         at 
> org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:36)
>         at 
> org.apache.qpid.transport.network.mina.MINANetworkDriver.messageReceived(MINANetworkDriver.java:337)
>         at 
> org.apache.mina.common.support.AbstractIoFilterChain$TailFilter.messageReceived(AbstractIoFilterChain.java:703)
>         at 
> org.apache.mina.common.support.AbstractIoFilterChain.callNextMessageReceived(AbstractIoFilterChain.java:362)
>         at 
> org.apache.mina.common.support.AbstractIoFilterChain.access$1200(AbstractIoFilterChain.java:54)
>         at 
> org.apache.mina.common.support.AbstractIoFilterChain$EntryImpl$1.messageReceived(AbstractIoFilterChain.java:800)
>         at 
> org.apache.mina.filter.executor.ExecutorFilter.processEvent(ExecutorFilter.java:243)
>         at 
> org.apache.mina.filter.executor.ExecutorFilter$ProcessEventsRunnable.run(ExecutorFilter.java:305)
>         at 
> edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:665)
>         at 
> edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:690)
>         at java.lang.Thread.run(Thread.java:619)
> 2010-06-25 02:48:48,005 [ERROR] Exception thrown and no ProtocolEngine to 
> handle it
> java.lang.NullPointerException
>         at 
> org.apache.qpid.transport.network.Assembler.assemble(Assembler.java:218)
>         at 
> org.apache.qpid.transport.network.Assembler.frame(Assembler.java:132)
>         at org.apache.qpid.transport.network.Frame.delegate(Frame.java:133)
>         at 
> org.apache.qpid.transport.network.Assembler.received(Assembler.java:103)
>         at 
> org.apache.qpid.transport.network.Assembler.received(Assembler.java:48)
>         at 
> org.apache.qpid.transport.network.InputHandler.next(InputHandler.java:187)
>         at 
> org.apache.qpid.transport.network.InputHandler.received(InputHandler.java:113)
>         at 
> org.apache.qpid.transport.network.InputHandler.received(InputHandler.java:42)
>         at 
> org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:101)
>         at 
> org.apache.qpid.server.protocol.MultiVersionProtocolEngine.received(MultiVersionProtocolEngine.java:36)
>         at 
> org.apache.qpid.transport.network.mina.MINANetworkDriver.messageReceived(MINANetworkDriver.java:337)
>         at 
> org.apache.mina.common.support.AbstractIoFilterChain$TailFilter.messageReceived(AbstractIoFilterChain.java:703)
>         at 
> org.apache.mina.common.support.AbstractIoFilterChain.callNextMessageReceived(AbstractIoFilterChain.java:362)
>         at 
> org.apache.mina.common.support.AbstractIoFilterChain.access$1200(AbstractIoFilterChain.java:54)
>         at 
> org.apache.mina.common.support.AbstractIoFilterChain$EntryImpl$1.messageReceived(AbstractIoFilterChain.java:800)
>         at 
> org.apache.mina.filter.executor.ExecutorFilter.processEvent(ExecutorFilter.java:243)
>         at 
> org.apache.mina.filter.executor.ExecutorFilter$ProcessEventsRunnable.run(ExecutorFilter.java:305)
>         at 
> edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:665)
>         at 
> edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:690)
>         at java.lang.Thread.run(Thread.java:619)
> At this point the client can reconnect to the broker, but the messages are no 
> longer dispatched. The broker has to be restarted.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:dev-subscr...@qpid.apache.org

Reply via email to