2009/9/21 Rafael Schloming <[email protected]>:
> Martin Ritchie wrote:
>>
>> Hi,
>>
>> Following on from looking at QPID-1871. I believe that there is quite
>> a significant change required to ensure that the message order or
>> rollback is maintained.
>>
>> I propose that we extract the Dispatcher from AMQSession, which will
>> simplify our biggest class (3100+ lines!) and show clear
>> responsibility for incoming message processing. This will simplify
>> rollback as the Dispatcher thread can be given full responsibility for
>> clearing up the state that it knows best. Rather than the current
>> situation where the calling thread does some work on AMQSession whilst
>> the Dispatcher is running/stopping, then calls the the Dispatcher code
>> directly clean up the remainder. All this while the Dispatcher may be
>> processing a message.
>>
>> Change design posted here:
>>
>> http://cwiki.apache.org/confluence/display/qpid/0.6+Java+Client+Dispatcher+Changes
>>
>> Comments on the investigation, implications and design welcome.
>> I'll capture the details on the wiki so we don't lose track of comments
>
> Hey Martin,
>
> Sorry I didn't pick up on this earlier. We hit this issue a while back in
> the 0-10 code path. That's why we added RollbackOrderTest, and that's why it
> doesn't fail for 0-10 brokers. You should probably check out
> AMQSession.syncDispatchQueue, this method pretty much solves the problem
> you're describing. It will block until the dispatch queue is empty... or
> more precisely it will block until everything that is currently in the
> dispatch queue has been processed by the dispatcher thread, which if done
> after stopping incoming message flow means it will block until the dispatch
> queue is empty.
>
> This method is used in a few places in the 0-10 codepath where it is
> necessary to clean out the dispatch queue prior to proceeding (e.g. during
> failover), however the key place here is AMQSession_0_10.releaseForRollback.
> If you look at this you'll notice that it is called before the release is
> actually done. If AMQSession_0_8.releaseForRollback were to do the same, or
> preferrably if we were to move the syncDispatchQueue call up to
> AMQSession.java then I suspect this problem would go away without the need
> for a large refactor.
>
> --Rafael
Hi Rafi,
I saw the syncDispatchQueue method but I don't think waiting for the
_queue to clear is not the right solution, IMO, for rollback, even on
0-10. When rollback is called you don't want the dispatcher to process
any more messages. Your client may have MessageListeners setup that
will take a long time to process, so the Dispatcher should stop
processing messages and perform the Rollback.
I've attached a new test for RollbackOrderTest that blocks because the
syncDispatchQueue waits for the Dispatcher to empty the _queue.
However, when called via the MessageListener.onMessage(), you end up
blocking the Dispatcher.
I can understand that you may wish to do the block for 0-10 Failover
as there may still be useful data in the _queue. Also with failover
you are guaranteed that it is not going to be the Dispatcher thread
that is calling syncDispatchQueue.
I think extracting the Dispatcher will make it clearer to show that
the message processing varies in each protocol and will allow the
Session classes to focus on the creation and control of
Consumer/Producers. This will allow Dispatchers for each protocol to
be cleaner and highlight the protocol differences at failover; A 0-8
Dispatcher that simply drops all pending messages compared to the 0-10
Dispatcher that attempts to process all the messages it has received.
I think it is a significant change but I think it is worth it as it
will improve the readability of the Client code as well as improving
the testability. Currently AMQSession is not exactly easy to unit test
so splitting it in to more discrete components and unit testing them
will be beneficial.
The change boils down to:
- Extract Dispatcher Logic to Dispatcher_0_8 , Dispatcher_0_10
- Update AMQSession to use new Dispatcher
- Update Dispatcher to be able to stop processing the _queue of
messages and perform rollback.
Cheers
Martin
--
Martin Ritchie
"Dispatcher-Channel-1" daemon prio=10 tid=0x083dc800 nid=0x2e65 waiting on
condition [0x8e952000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0xb2000e78> (a
java.util.concurrent.CountDownLatch$Sync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:905)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1217)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207)
at
org.apache.qpid.client.AMQSession.syncDispatchQueue(AMQSession.java:1869)
at
org.apache.qpid.client.AMQSession_0_10.releaseForRollback(AMQSession_0_10.java:418)
at org.apache.qpid.client.AMQSession.rollback(AMQSession.java:1562)
- locked <0xb1d43a68> (a java.lang.Object)
at
org.apache.qpid.test.client.RollbackOrderTest$1.onMessage(RollbackOrderTest.java:140)
at
org.apache.qpid.client.BasicMessageConsumer.notifyMessage(BasicMessageConsumer.java:737)
at
org.apache.qpid.client.BasicMessageConsumer_0_10.notifyMessage(BasicMessageConsumer_0_10.java:156)
at
org.apache.qpid.client.BasicMessageConsumer.notifyMessage(BasicMessageConsumer.java:703)
at
org.apache.qpid.client.BasicMessageConsumer_0_10.notifyMessage(BasicMessageConsumer_0_10.java:178)
at
org.apache.qpid.client.BasicMessageConsumer_0_10.notifyMessage(BasicMessageConsumer_0_10.java:42)
at
org.apache.qpid.client.AMQSession$Dispatcher.notifyConsumer(AMQSession.java:2984)
at
org.apache.qpid.client.AMQSession$Dispatcher.dispatchMessage(AMQSession.java:2928)
- locked <0xb1d43a80> (a java.lang.Object)
- locked <0xb1f2a808> (a java.lang.Object)
at
org.apache.qpid.client.AMQSession$Dispatcher.access$1000(AMQSession.java:2740)
at org.apache.qpid.client.AMQSession.dispatch(AMQSession.java:2733)
at
org.apache.qpid.client.message.UnprocessedMessage.dispatch(UnprocessedMessage.java:55)
at
org.apache.qpid.client.AMQSession$Dispatcher.run(AMQSession.java:2861)
at java.lang.Thread.run(Thread.java:619)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]