GitHub user srdo opened a pull request:

    https://github.com/apache/storm/pull/2277

    STORM-2666: Fix storm-kafka-client spout sometimes emitting messages …

    …that were already committed. Expand tests, add some runtime validation, 
minor refactoring to increase code readability. Ensure OffsetManager commits as 
many offsets as possible when an offset void (deleted offsets) occurs, rather 
than just up to the gap.
    
    See https://issues.apache.org/jira/browse/STORM-2666. I suspect this issue 
was also the root cause of the double acks we saw in 
https://github.com/apache/storm/pull/1679.
    
    The following changes are made here:
    * Make OffsetManager commit as many offsets as possible when there is a gap 
in emitted offsets due to offsets being deleted from Kafka. We used to have to 
do two commits to get past a gap, because the OffsetManager would stop in 
findNextCommitOffset at the last offset before the gap, commit up to the gap, 
and then have to do another round to commit the acked tuples past the gap. It 
should now just pick the highest acked tuple to commit immediately, as long as 
the previous unacked tuples were not emitted.
    * Fix case where the KafkaConsumer position could fall behind the committed 
offset. This can happen in some cases where there are a lot of acked 
uncommitted tuples, and an older tuple is preventing commit because it needs to 
be retried. Once the failed tuple is retried, all the acked tuples are 
committed, but the consumer position isn't necessarily caught up. 
    * Don't seek the consumer on partition reassignment for partitions that 
were previously assigned. Since the spout keeps the emitted/acked state for 
those partitions, we shouldn't be moving the consumer offset.
    * Minor changes to the retry service, mainly stopping iterations once it 
has found what it was looking for.
    * Add in some Validate calls to ensure that the spout state is good, e.g. 
check that the spout doesn't try to emit tuples that are already committed.
    * Add more tests

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-2666-clean

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/2277.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2277
    
----
commit 52b14d31038c04d81cb31a9668f4a77e5584aa7b
Author: Stig Rohde Døssing <[email protected]>
Date:   2017-08-12T14:56:45Z

    STORM-2666: Fix storm-kafka-client spout sometimes emitting messages that 
were already committed. Expand tests, add some runtime validation, minor 
refactoring to increase code readability. Ensure OffsetManager commits as many 
offsets as possible when an offset void (deleted offsets) occurs, rather than 
just up to the gap.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to