[ 
https://issues.apache.org/jira/browse/SAMZA-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini resolved SAMZA-3.
---------------------------------

    Resolution: Fixed
    
> BrokerProxy deadlocks if messages aren't polled from all streams
> ----------------------------------------------------------------
>
>                 Key: SAMZA-3
>                 URL: https://issues.apache.org/jira/browse/SAMZA-3
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.7.0
>
>         Attachments: SAMZA-3.0.patch
>
>
> Suppose a KafkaSystemConsumer is created with: 
> {code}
> consumer.register(sp1, 0) 
> consumer.register(sp2, 0) 
> consumer.start 
> while(true) { 
>   consumer.poll(sp2, 0) 
> } 
> {code}
> This code will eventually dead lock (assuming sp1 has messages) if sp1 and 
> sp2 are both on the same broker. 
> The current implementation of BrokerProxy/KafkaSystemConsumer puts messages 
> into BlockingEnvelopeMap. The BlockingEnvelopeMap has a 
> per-SystemStreamPartition max queueSize (defaults to 1000, I believe). This 
> means that, if a SystemStreamPartition is registered with the 
> KafkaSystemConsumer, but messages are not read off of the 
> SystemStreamPartition's queue for some reason, the 
> BrokerProxy/KafkaSystemConsumer will eventually block on 
> BlockingEnvelopeMap.add. This will prevent the BrokerProxy from fetching any 
> more messages from ANY topic/partitions on the broker. If code is trying to 
> read messages from another SystemStreamPartition, it will not ever receive 
> any new messages. 
> This is not currently a problem because Samza reads in messages in two ways: 
> 1) During state store restore. 
> 2) During process loop (feeding messages to StreamTask.process). 
> The current SamzaContainer implementation uses a new SystemConsumer for each 
> SystemStreamPartition when it restores (#1), which registers ONLY one 
> SystemStreamPartition, so no deadlock is possible here. The current 
> DefaultChooser round robins between streams, which means that you will always 
> poll from all streams with messages available in a round-robin fashion, so no 
> starvation is currently possible (which means that no deadlock is possible). 
> Nevertheless, this should be fixed. For one thing, if we change the 
> DefaultChooser's behavior, this problem would surface. 
> The simplest solution would be to stop fetching messages in the BrokerProxy 
> for queues that are full. An alternative would be to stop fetching messages 
> for any queue that has messages already in it (regardless of whether it's 
> "full" or not). 
> One nuance to the stop-fetching-on-queue-full solution is that FetchRequest 
> takes a fetchSize, which is in bytes. This means that we might get more 
> messages back in one FetchRequest than would fit into the BlockingEnvelopeMap 
> queue. We could drop these excess messages, and re-fetch them again later. 
> I think the best solution is just: 
> 1) Stop fetching messages for any queue that's not empty. 
> 2) Make KafkaSystemConsumer override newBlockingQueue with an unbounded 
> LinkedBlockingQueue. 
> The new memory semantics for the KafkaSystemConsumer would be that the 
> LinkedBlockingQueue would hold up to N elements where N is the max number of 
> elements that can theoretically be returned in a single FetchRequest for a 
> given TopicAndPartition. Thus, if a KafkaSystemConsumer's fetchSize were 1 
> megabyte, and 1 meg could return a theoretical maximum of 1 million messages 
> (1 byte per message), then the maximum number of messages you'd expect to see 
> in any single unbounded LinkedBlockingQueue would be 1 million. Once this 
> queue was drained to zero, a new fetch would be issued.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to