Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2454#discussion_r156224911
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -223,29 +220,24 @@ private long doSeek(TopicPartition tp,
OffsetAndMetadata committedOffset) {
@Override
public void nextTuple() {
try {
- if (initialized) {
--- End diff --
I'm not sure it was ever necessary.
When we were using the subscribe API the consumer rebalance code would be
running as part of a call to KafkaConsumer.poll, initially from e.g. the
NamedSubscription.subscribe method called by KafkaSpout.activate. The call to
poll blocks until the initial rebalance is complete.
Since switching to the assign API, the rebalance code is running as part of
the call to Subscription.subscribe/refreshPartitions, still initially called by
KafkaSpout.activate.
The rebalance listener is always called synchronously as part of the
Subscription.subscribe call in KafkaSpout.activate, so it shouldn't be possible
to reach KafkaSpout.nextTuple without initialized being true, because it will
be true once KafkaSpout.activate returns.
---