[ https://issues.apache.org/jira/browse/STORM-1682?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stig Rohde Døssing updated STORM-1682: -------------------------------------- Description: The KafkaSpout can lose partitions for a period, or hang because getBrokersInfo (https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java#L77) may get a NoNodeException if there is no broker info in Zookeeper corresponding to the leader id in Zookeeper. When this error occurs, the spout ignores the partition until the next time getBrokersInfo is called, which isn't until the next time the spout gets an exception on fetch. If the timing is really bad, it might ignore all the partitions and never restart. As far as I'm aware, Kafka doesn't update leader and brokerinfo atomically, so it's possible to get unlucky and hit the NoNodeException when a broker has just died. I have a few suggestions for dealing with this. getBrokerInfo could simply retry the inner loop over partitions if it gets the NoNodeException (probably with a limit and a short sleep between attempts). If it fails repeatedly, the spout should be crashed. Alternatively the DynamicBrokersReader could instead lookup all brokers in Zookeeper, create a consumer and send a TopicMetadataRequest on it. The response contains the leader for each partition and host/port for the relevant brokers. was: The KafkaSpout can lose partitions for a period, or hang because getBrokersInfo (https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java#L77) may get a NoNodeException if there is no broker info in Zookeeper corresponding to the leader id in Zookeeper. When this error occurs, the spout ignores the partition until the next time getBrokersInfo is called, which isn't until the next time the spout gets an exception on fetch. If the timing is really bad, it might ignore all the partitions and never restart. As far as I'm aware, Kafka doesn't update leader and brokerinfo atomically, so it's possible to get unlucky and hit the NoNodeException when a broker has just died. I have a few suggestions for dealing with this. getBrokerInfo could simply retry the inner loop over partitions if it gets the NoNodeException (probably with a limit and a short sleep between attempts). If it fails repeatedly, the spout should be crashed. Alternatively the DynamicBrokersReader could instead lookup all brokers in Zookeeper, create a consumer and send a TopicMetadataRequest on it. The response contains the leader for each partition and host/port for the relevant broker. > Kafka spout can lose partitions > ------------------------------- > > Key: STORM-1682 > URL: https://issues.apache.org/jira/browse/STORM-1682 > Project: Apache Storm > Issue Type: Bug > Components: storm-kafka > Affects Versions: 0.10.0, 1.0.0, 2.0.0 > Reporter: Stig Rohde Døssing > Assignee: Stig Rohde Døssing > > The KafkaSpout can lose partitions for a period, or hang because > getBrokersInfo > (https://github.com/apache/storm/blob/master/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java#L77) > may get a NoNodeException if there is no broker info in Zookeeper > corresponding to the leader id in Zookeeper. When this error occurs, the > spout ignores the partition until the next time getBrokersInfo is called, > which isn't until the next time the spout gets an exception on fetch. If the > timing is really bad, it might ignore all the partitions and never restart. > As far as I'm aware, Kafka doesn't update leader and brokerinfo atomically, > so it's possible to get unlucky and hit the NoNodeException when a broker has > just died. > I have a few suggestions for dealing with this. > getBrokerInfo could simply retry the inner loop over partitions if it gets > the NoNodeException (probably with a limit and a short sleep between > attempts). If it fails repeatedly, the spout should be crashed. > Alternatively the DynamicBrokersReader could instead lookup all brokers in > Zookeeper, create a consumer and send a TopicMetadataRequest on it. The > response contains the leader for each partition and host/port for the > relevant brokers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)