[ 
https://issues.apache.org/jira/browse/STORM-342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rick Kellogg updated STORM-342:
-------------------------------
    Component/s: storm-core

> Contention in Disruptor Queue which may cause message loss or out of order
> --------------------------------------------------------------------------
>
>                 Key: STORM-342
>                 URL: https://issues.apache.org/jira/browse/STORM-342
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-core
>            Reporter: Sean Zhong
>            Assignee: Sean Zhong
>            Priority: Blocker
>             Fix For: 0.9.2-incubating
>
>
> h2. STORM-342: Message loss, executor hang, or message disorder
> Disruptor helper class contains a potential contention bug between consumer 
> and producer. It can cause consume queue hang, message loss, or message 
> disorder.
> {code:title=Disruptor.java|borderStyle=solid}
> class Disruptor {
> ...
>     public void publish(Object obj, boolean block) throws 
> InsufficientCapacityException {
>         if(consumerStartedFlag) {
>             final long id;
>             if(block) {
>                 id = _buffer.next();
>             } else {
>                 id = _buffer.tryNext(1);
>             }
>             final MutableObject m = _buffer.get(id);
>             m.setObject(obj);
>             _buffer.publish(id);
>         } else {
>             _cache.add(obj);
>             if(consumerStartedFlag) flushCache();
>         }
>     }
>     public void consumerStarted() {
>         if(!consumerStartedFlag) {
>             consumerStartedFlag = true;
>             flushCache();
>         }
>     }
> }
> {code}
> Consumer
> {code:title=Task Executor Thread|borderStyle=solid}
>   (disruptor/consumer-started! receive-queue)
>   (fn []            
>      (disruptor/consume-batch-when-available receive-queue event-handler)
> {code}
> h3. Howto: Executor Hang, message loss:
> 1. [Consumer Thread] consumer not started.
> 2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == 
> false, it will be added it into cache.
> 3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set 
> to true, but flushCache() is not called yet.
> 4. As "consumerStartedFlag" is true now, new produced message will be 
> published to RingBuffer.
> 5. [Producer B Thread] generates enough message, and make RingBuffer full.
> 6. [Consumer Thread] flushCache() is called in consumerStarted()
> 7. [Consumer Thread] FLUSH_CACHE object is published RingBuffer in blocking 
> way, As now RingBuffer is full, the consumer thread will be blocked.
> 8. [Consumer Thread] consumeBatch() will never called, so the RingBuffer is 
> always full, and the consumer thread is always blocked.
> h3. Howto: Message Disorder
> 1. [Consumer Thread] consumer not started.
> 2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == 
> false, it will be added it into cache.
> 3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set 
> to true, but flushCache() is not called yet.
> 4. As "consumerStartedFlag" is true now, new produced message will be 
> published to RingBuffer.
> 5. [Producer A Thread] publish a new message "2", it will be published 
> directly in RingBuffer.
> 6. [Consumer Thread] flushCache() is called in consumerStarted()
> 7. [Consumer Thread] FLUSH_CACHE message is published RingBuffer, FLUSH_CACHE 
> message is written after message "2".
> 8. [Consumer Thread] consumeBatch() is called, first it picks "2", then it 
> picks FLUSH_CACHE, will represents "1"
> 9. We produce in Producer A Thread in order "1", "2", but we received in 
> consumer thread "2", "1"
> 10. Message order is wrong.
> I found this after troubleshooting a tricky random failure(1 in 100 times). 
> It usually happen when producer and consumer colocated in same process, for 
> example,  the task send queue thread as producer, produce message to local 
> task receive queue in same worker.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to