[ https://issues.apache.org/jira/browse/SAMZA-584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Riccomini updated SAMZA-584: ---------------------------------- Attachment: SAMZA-584-0.patch Attaching patch. RB at: https://reviews.apache.org/r/31649/ Discovered this issue on one of our large jobs. # Moved nextOffset.get inside the if() statement. # Set last dropped timer to be clock(), not 0. (2) starts refreshes 10s after starting, rather than immediately refreshing after a refresh occurs. This just saves a bit of wasted effort. > KafkaSystemConsumer race condition > ---------------------------------- > > Key: SAMZA-584 > URL: https://issues.apache.org/jira/browse/SAMZA-584 > Project: Samza > Issue Type: Bug > Components: kafka > Affects Versions: 0.9.0 > Reporter: Chris Riccomini > Assignee: Chris Riccomini > Fix For: 0.9.0 > > Attachments: SAMZA-584-0.patch > > > After SAMZA-579, a race condition was introduced that leads to an NPE > (effectively). The error is that this log line repeats over and over: > {noformat} > 2015-03-02 23:59:36 KafkaSystemConsumer [WARN] While refreshing brokers for > [topic1,5]: java.util.NoSuchElementException: None.get. Retrying. > {noformat} > This is caused by: > {code} > val nextOffset = topicPartitionsAndOffsets.get(head).get > {code} > Which is called *outside* this if statement: > {code} > if (topicPartitionsAndOffsets.contains(head)) { > {code} > If multiple threads call KafkaSystemConsumer.refreshBrokers, there is a race > condition where one might get a TopicAndPartition that's already been removed > from topicPartitionsAndOffsets. This would cause the .get call to be > None.get, which will fail. > This error is seen immediately when consuming from a large number of brokers, > as every BrokerProxy thread immediately refreshes on start. There are so many > threads that one is bound to catch this error. -- This message was sent by Atlassian JIRA (v6.3.4#6332)