GitHub user srdo opened a pull request:

    https://github.com/apache/storm/pull/1832

    STORM-2250: Refactor new Kafka spout so it is easier to read, and easier to 
test without breaking encapsulation.

    Making these changes based on a discussion here 
https://github.com/apache/storm/pull/1826 and 
https://github.com/apache/storm/pull/1825 about the spout getting a bit large.
    
    This PR makes the following changes:
    * OffsetEntry was moved into a new class and renamed OffsetManager
    * OffsetManager.commit now returns numCommittedOffsets, so it doesn't have 
to refer to a KafkaSpout internal variable.
    * KafkaSpout.commitOffsetsForAckedTuples no longer iterates over acked 
twice. The second iteration was used to commit offsets to OffsetEntries, but we 
may as well use nextCommitOffsets for that iteration, since those were the 
offsets committed to Kafka. This also saves us a null check in 
OffsetManager.commit.
    * OffsetManager.commit no longer checks if the parameter is null. It should 
no longer be possible that it is null, unless there's a bug, in which case we 
want to throw an NPE so we can fix it.
    * Timer was moved into a separate class and renamed 
PeriodicallyExpiringTimer.
    * PeriodicallyExpiringTimer supports Storm's time simulation. This also 
means that it only supports time units down to milliseconds. If we need 
nanosecond precision, we need to update Storm's Utils.Time. 
    * Cleaned up a few redundant version declarations in the pom. Also switched 
out hamcrest-all for hamcrest-core + hamcrest-library, since tests were 
throwing NoSuchMethodError when an assertThat failed.
    * Updated the tests to use time simulation, and to check if the spout calls 
KafkaConsumer.commitSync instead of reading the spout's internal state
    
    I think it would be nice if RetryService also supported time simulation, 
let me know what you think @hmcl

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/srdo/storm STORM-2250

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/1832.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1832
    
----
commit 119e35611ad841e703db6ce4b77cf483b9859df7
Author: Stig Rohde Døssing <s...@it-minds.dk>
Date:   2016-12-17T13:48:44Z

    STORM-2250: Put Timer and OffsetEntry into internal package classes, 
slightly adjust KafkaSpout.commitOffsetsForAckedTuples to iterate over 
nextCommitOffsets instead of acked for second loop

commit 7e59df243f4d633c801f250da4ceddb6b583fc64
Author: Stig Rohde Døssing <s...@it-minds.dk>
Date:   2016-12-17T16:33:48Z

    STORM-2250: Rewrite single topic kafka tests to use simulated time + a spy 
to check if offsets are committed, rather than using a package private field. 
Get rid of Timer factories again, since we can just use Storm's time simulation 
in tests. Remove some redundant version declarations in poms

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to