GitHub user srdo reopened a pull request: https://github.com/apache/storm/pull/2454
STORM-2847: Ensure spout can handle being activated and deactivated See https://issues.apache.org/jira/browse/STORM-2847. The cause of the bug here is that we are shutting down the KafkaConsumer when the spout is deactivated, but aren't returning to a "clean" state in the spout fields. When the spout is reactivated and we call Subscribe in the ManualPartitionSubscription, it skips doing the assignment because the new assignment is the same as the assignment it has cached from before the deactivation. The fix is to not cache the assignment in ManualPartitionSubscription, but just ask the consumer what its assignment is. There are some other related changes in this as well: * Removed the initialized flag in the spout. Initialization is always synchronous, and happens in the call to KafkaSpout.activate, which happens before Storm starts calling nextTuple. There's no need to have a flag. * Fixed too strict check of whether consumer position was behind the committed offset. The committed offset is the offset the spout will start at, so it is allowed that the position is at the committed offset. * Refactor tests that use a stubbed consumer to use a mocked Subscription instead of the real classes. The real classes make stubbing more complicated than it should be, and we are testing the subscription when doing integration tests with the embedded Kafka instance, and also in the ManualPartitionSubscription unit test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-2847 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/2454.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2454 ---- ---- ---