GitHub user srdo reopened a pull request:
https://github.com/apache/storm/pull/2454
STORM-2847: Ensure spout can handle being activated and deactivated
See https://issues.apache.org/jira/browse/STORM-2847.
The cause of the bug here is that we are shutting down the KafkaConsumer
when the spout is deactivated, but aren't returning to a "clean" state in the
spout fields. When the spout is reactivated and we call Subscribe in the
ManualPartitionSubscription, it skips doing the assignment because the new
assignment is the same as the assignment it has cached from before the
deactivation.
The fix is to not cache the assignment in ManualPartitionSubscription, but
just ask the consumer what its assignment is.
There are some other related changes in this as well:
* Removed the initialized flag in the spout. Initialization is always
synchronous, and happens in the call to KafkaSpout.activate, which happens
before Storm starts calling nextTuple. There's no need to have a flag.
* Fixed too strict check of whether consumer position was behind the
committed offset. The committed offset is the offset the spout will start at,
so it is allowed that the position is at the committed offset.
* Refactor tests that use a stubbed consumer to use a mocked Subscription
instead of the real classes. The real classes make stubbing more complicated
than it should be, and we are testing the subscription when doing integration
tests with the embedded Kafka instance, and also in the
ManualPartitionSubscription unit test.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/srdo/storm STORM-2847
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/storm/pull/2454.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2454
----
----
---