GitHub user srdo reopened a pull request:

    https://github.com/apache/storm/pull/2454

    STORM-2847: Ensure spout can handle being activated and deactivated

    See https://issues.apache.org/jira/browse/STORM-2847.
    
    The cause of the bug here is that we are shutting down the KafkaConsumer 
when the spout is deactivated, but aren't returning to a "clean" state in the 
spout fields. When the spout is reactivated and we call Subscribe in the 
ManualPartitionSubscription, it skips doing the assignment because the new 
assignment is the same as the assignment it has cached from before the 
deactivation.
    
    The fix is to not cache the assignment in ManualPartitionSubscription, but 
just ask the consumer what its assignment is.
    
    There are some other related changes in this as well:
    * Removed the initialized flag in the spout. Initialization is always 
synchronous, and happens in the call to KafkaSpout.activate, which happens 
before Storm starts calling nextTuple. There's no need to have a flag.
    * Fixed too strict check of whether consumer position was behind the 
committed offset. The committed offset is the offset the spout will start at, 
so it is allowed that the position is at the committed offset.
    * Refactor tests that use a stubbed consumer to use a mocked Subscription 
instead of the real classes. The real classes make stubbing more complicated 
than it should be, and we are testing the subscription when doing integration 
tests with the embedded Kafka instance, and also in the 
ManualPartitionSubscription unit test.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-2847

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2454.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2454
    
----

----


---

Reply via email to