[ https://issues.apache.org/jira/browse/SAMZA-579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Riccomini updated SAMZA-579: ---------------------------------- Attachment: SAMZA-579-2.patch Attaching updated patch based on [~closeuris]'s feedback. # Merged nextOffsets and droppedTopicAndPartitions into one map. # Eliminated params from refreshBrokers. Every time refreshBrokers is called, it tries to refresh any pairs in topicPartitionsAndOffsets. Whenever a broker is assigned, the TopicAndPartition is removed from topicPartitionsAndOffsets. # Updated abdicate and refreshDropped to manipulate topicPartitionsAndOffsets as required. # Added a check in refreshBrokers (inside critical section) to prevent double-listening to a TopicandPartition. For (3), the only places (outside the critical section) where topicPartitionsAndOffsets is manipulated are in start() and abdicate(). Start is safe to manipulate. Abdicate is safe as well. When a BP abdicates, it adds the TP back into the topicPartitionsAndOffsets variable and calls refreshBrokers. The only point where a TP is removed is in the critical section. A TP should never be re-added to topicPartitionsAndOffsets until the one and only BP that owned it has abdicated. > KafkaSystemConsumer drops SSPs on failure > ----------------------------------------- > > Key: SAMZA-579 > URL: https://issues.apache.org/jira/browse/SAMZA-579 > Project: Samza > Issue Type: Bug > Components: kafka > Affects Versions: 0.9.0 > Reporter: Chris Riccomini > Assignee: Chris Riccomini > Fix For: 0.9.0 > > Attachments: SAMZA-579-0.patch, SAMZA-579-1.patch, SAMZA-579-2.patch > > > While running SAMZA-394, I discovered a bug in KafkaSystemConsumer that > causes it to stop consuming under failure scenarios. This does not cause data > loss, but can wedge a container until it's restarted. > The trigger appears to be when a BrokerProxy fetches from a broker that's > still coming up, and hasn't yet claimed ownership for a TopicAndPartition. > When the fetch fails, the BrokerProxy abdicate()s the TopicAndPartition, and > KafkaSystemConsumer tries to refresh to get the leader. If there is no > leader, the KafkaSystemConsumer drops the SSP. This happens in > KafkaSystemConsumer.refreshBrokers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)