[
https://issues.apache.org/jira/browse/SAMZA-3?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Riccomini resolved SAMZA-3.
---------------------------------
Resolution: Fixed
> BrokerProxy deadlocks if messages aren't polled from all streams
> ----------------------------------------------------------------
>
> Key: SAMZA-3
> URL: https://issues.apache.org/jira/browse/SAMZA-3
> Project: Samza
> Issue Type: Bug
> Components: kafka
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Chris Riccomini
> Fix For: 0.7.0
>
> Attachments: SAMZA-3.0.patch
>
>
> Suppose a KafkaSystemConsumer is created with:
> {code}
> consumer.register(sp1, 0)
> consumer.register(sp2, 0)
> consumer.start
> while(true) {
> consumer.poll(sp2, 0)
> }
> {code}
> This code will eventually dead lock (assuming sp1 has messages) if sp1 and
> sp2 are both on the same broker.
> The current implementation of BrokerProxy/KafkaSystemConsumer puts messages
> into BlockingEnvelopeMap. The BlockingEnvelopeMap has a
> per-SystemStreamPartition max queueSize (defaults to 1000, I believe). This
> means that, if a SystemStreamPartition is registered with the
> KafkaSystemConsumer, but messages are not read off of the
> SystemStreamPartition's queue for some reason, the
> BrokerProxy/KafkaSystemConsumer will eventually block on
> BlockingEnvelopeMap.add. This will prevent the BrokerProxy from fetching any
> more messages from ANY topic/partitions on the broker. If code is trying to
> read messages from another SystemStreamPartition, it will not ever receive
> any new messages.
> This is not currently a problem because Samza reads in messages in two ways:
> 1) During state store restore.
> 2) During process loop (feeding messages to StreamTask.process).
> The current SamzaContainer implementation uses a new SystemConsumer for each
> SystemStreamPartition when it restores (#1), which registers ONLY one
> SystemStreamPartition, so no deadlock is possible here. The current
> DefaultChooser round robins between streams, which means that you will always
> poll from all streams with messages available in a round-robin fashion, so no
> starvation is currently possible (which means that no deadlock is possible).
> Nevertheless, this should be fixed. For one thing, if we change the
> DefaultChooser's behavior, this problem would surface.
> The simplest solution would be to stop fetching messages in the BrokerProxy
> for queues that are full. An alternative would be to stop fetching messages
> for any queue that has messages already in it (regardless of whether it's
> "full" or not).
> One nuance to the stop-fetching-on-queue-full solution is that FetchRequest
> takes a fetchSize, which is in bytes. This means that we might get more
> messages back in one FetchRequest than would fit into the BlockingEnvelopeMap
> queue. We could drop these excess messages, and re-fetch them again later.
> I think the best solution is just:
> 1) Stop fetching messages for any queue that's not empty.
> 2) Make KafkaSystemConsumer override newBlockingQueue with an unbounded
> LinkedBlockingQueue.
> The new memory semantics for the KafkaSystemConsumer would be that the
> LinkedBlockingQueue would hold up to N elements where N is the max number of
> elements that can theoretically be returned in a single FetchRequest for a
> given TopicAndPartition. Thus, if a KafkaSystemConsumer's fetchSize were 1
> megabyte, and 1 meg could return a theoretical maximum of 1 million messages
> (1 byte per message), then the maximum number of messages you'd expect to see
> in any single unbounded LinkedBlockingQueue would be 1 million. Once this
> queue was drained to zero, a new fetch would be issued.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira