[ https://issues.apache.org/jira/browse/STORM-342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rick Kellogg updated STORM-342: ------------------------------- Component/s: storm-core > Contention in Disruptor Queue which may cause message loss or out of order > -------------------------------------------------------------------------- > > Key: STORM-342 > URL: https://issues.apache.org/jira/browse/STORM-342 > Project: Apache Storm > Issue Type: Bug > Components: storm-core > Reporter: Sean Zhong > Assignee: Sean Zhong > Priority: Blocker > Fix For: 0.9.2-incubating > > > h2. STORM-342: Message loss, executor hang, or message disorder > Disruptor helper class contains a potential contention bug between consumer > and producer. It can cause consume queue hang, message loss, or message > disorder. > {code:title=Disruptor.java|borderStyle=solid} > class Disruptor { > ... > public void publish(Object obj, boolean block) throws > InsufficientCapacityException { > if(consumerStartedFlag) { > final long id; > if(block) { > id = _buffer.next(); > } else { > id = _buffer.tryNext(1); > } > final MutableObject m = _buffer.get(id); > m.setObject(obj); > _buffer.publish(id); > } else { > _cache.add(obj); > if(consumerStartedFlag) flushCache(); > } > } > public void consumerStarted() { > if(!consumerStartedFlag) { > consumerStartedFlag = true; > flushCache(); > } > } > } > {code} > Consumer > {code:title=Task Executor Thread|borderStyle=solid} > (disruptor/consumer-started! receive-queue) > (fn [] > (disruptor/consume-batch-when-available receive-queue event-handler) > {code} > h3. Howto: Executor Hang, message loss: > 1. [Consumer Thread] consumer not started. > 2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == > false, it will be added it into cache. > 3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set > to true, but flushCache() is not called yet. > 4. As "consumerStartedFlag" is true now, new produced message will be > published to RingBuffer. > 5. [Producer B Thread] generates enough message, and make RingBuffer full. > 6. [Consumer Thread] flushCache() is called in consumerStarted() > 7. [Consumer Thread] FLUSH_CACHE object is published RingBuffer in blocking > way, As now RingBuffer is full, the consumer thread will be blocked. > 8. [Consumer Thread] consumeBatch() will never called, so the RingBuffer is > always full, and the consumer thread is always blocked. > h3. Howto: Message Disorder > 1. [Consumer Thread] consumer not started. > 2. [Producer A Thread] publish message "1", as "consumerStartedFlag" == > false, it will be added it into cache. > 3. [Consumer Thread] consumerStarted() is called. consumerStartedFlag is set > to true, but flushCache() is not called yet. > 4. As "consumerStartedFlag" is true now, new produced message will be > published to RingBuffer. > 5. [Producer A Thread] publish a new message "2", it will be published > directly in RingBuffer. > 6. [Consumer Thread] flushCache() is called in consumerStarted() > 7. [Consumer Thread] FLUSH_CACHE message is published RingBuffer, FLUSH_CACHE > message is written after message "2". > 8. [Consumer Thread] consumeBatch() is called, first it picks "2", then it > picks FLUSH_CACHE, will represents "1" > 9. We produce in Producer A Thread in order "1", "2", but we received in > consumer thread "2", "1" > 10. Message order is wrong. > I found this after troubleshooting a tricky random failure(1 in 100 times). > It usually happen when producer and consumer colocated in same process, for > example, the task send queue thread as producer, produce message to local > task receive queue in same worker. -- This message was sent by Atlassian JIRA (v6.3.4#6332)