GitHub user srdo opened a pull request: https://github.com/apache/storm/pull/1832
STORM-2250: Refactor new Kafka spout so it is easier to read, and easier to test without breaking encapsulation. Making these changes based on a discussion here https://github.com/apache/storm/pull/1826 and https://github.com/apache/storm/pull/1825 about the spout getting a bit large. This PR makes the following changes: * OffsetEntry was moved into a new class and renamed OffsetManager * OffsetManager.commit now returns numCommittedOffsets, so it doesn't have to refer to a KafkaSpout internal variable. * KafkaSpout.commitOffsetsForAckedTuples no longer iterates over acked twice. The second iteration was used to commit offsets to OffsetEntries, but we may as well use nextCommitOffsets for that iteration, since those were the offsets committed to Kafka. This also saves us a null check in OffsetManager.commit. * OffsetManager.commit no longer checks if the parameter is null. It should no longer be possible that it is null, unless there's a bug, in which case we want to throw an NPE so we can fix it. * Timer was moved into a separate class and renamed PeriodicallyExpiringTimer. * PeriodicallyExpiringTimer supports Storm's time simulation. This also means that it only supports time units down to milliseconds. If we need nanosecond precision, we need to update Storm's Utils.Time. * Cleaned up a few redundant version declarations in the pom. Also switched out hamcrest-all for hamcrest-core + hamcrest-library, since tests were throwing NoSuchMethodError when an assertThat failed. * Updated the tests to use time simulation, and to check if the spout calls KafkaConsumer.commitSync instead of reading the spout's internal state I think it would be nice if RetryService also supported time simulation, let me know what you think @hmcl You can merge this pull request into a Git repository by running: $ git pull https://github.com/srdo/storm STORM-2250 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/storm/pull/1832.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1832 ---- commit 119e35611ad841e703db6ce4b77cf483b9859df7 Author: Stig Rohde Døssing <s...@it-minds.dk> Date: 2016-12-17T13:48:44Z STORM-2250: Put Timer and OffsetEntry into internal package classes, slightly adjust KafkaSpout.commitOffsetsForAckedTuples to iterate over nextCommitOffsets instead of acked for second loop commit 7e59df243f4d633c801f250da4ceddb6b583fc64 Author: Stig Rohde Døssing <s...@it-minds.dk> Date: 2016-12-17T16:33:48Z STORM-2250: Rewrite single topic kafka tests to use simulated time + a spy to check if offsets are committed, rather than using a package private field. Get rid of Timer factories again, since we can just use Storm's time simulation in tests. Remove some redundant version declarations in poms ---- --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---