[ 
https://issues.apache.org/jira/browse/SAMZA-579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated SAMZA-579:
----------------------------------
    Attachment: SAMZA-579-2.patch

Attaching updated patch based on [~closeuris]'s feedback.

# Merged nextOffsets and droppedTopicAndPartitions into one map.
# Eliminated params from refreshBrokers. Every time refreshBrokers is called, 
it tries to refresh any pairs in topicPartitionsAndOffsets. Whenever a broker 
is assigned, the TopicAndPartition is removed from topicPartitionsAndOffsets.
# Updated abdicate and refreshDropped to manipulate topicPartitionsAndOffsets 
as required.
# Added a check in refreshBrokers (inside critical section) to prevent 
double-listening to a TopicandPartition.

For (3), the only places (outside the critical section) where 
topicPartitionsAndOffsets is manipulated are in start() and abdicate(). Start 
is safe to manipulate. Abdicate is safe as well. When a BP abdicates, it adds 
the TP back into the topicPartitionsAndOffsets variable and calls 
refreshBrokers. The only point where a TP is removed is in the critical 
section. A TP should never be re-added to topicPartitionsAndOffsets until the 
one and only BP that owned it has abdicated.

> KafkaSystemConsumer drops SSPs on failure
> -----------------------------------------
>
>                 Key: SAMZA-579
>                 URL: https://issues.apache.org/jira/browse/SAMZA-579
>             Project: Samza
>          Issue Type: Bug
>          Components: kafka
>    Affects Versions: 0.9.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>             Fix For: 0.9.0
>
>         Attachments: SAMZA-579-0.patch, SAMZA-579-1.patch, SAMZA-579-2.patch
>
>
> While running SAMZA-394, I discovered a bug in KafkaSystemConsumer that 
> causes it to stop consuming under failure scenarios. This does not cause data 
> loss, but can wedge a container until it's restarted.
> The trigger appears to be when a BrokerProxy fetches from a broker that's 
> still coming up, and hasn't yet claimed ownership for a TopicAndPartition. 
> When the fetch fails, the BrokerProxy abdicate()s the TopicAndPartition, and 
> KafkaSystemConsumer tries to refresh to get the leader. If there is no 
> leader, the KafkaSystemConsumer drops the SSP. This happens in 
> KafkaSystemConsumer.refreshBrokers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to