Repository: storm Updated Branches: refs/heads/1.0.x-branch 796d4d3f4 -> eeee1ee5d
fix code in KafkaSpoutRetryExponentialBackoff in storm-kafka-client * Closes #1629 * also fix document to use simpler method Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/eeee1ee5 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/eeee1ee5 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/eeee1ee5 Branch: refs/heads/1.0.x-branch Commit: eeee1ee5d784089a4dd8bd167ee5c5e72539b6b0 Parents: 796d4d3 Author: leedohyun <[email protected]> Authored: Wed Aug 17 15:51:51 2016 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Tue Aug 23 09:18:51 2016 +0900 ---------------------------------------------------------------------- external/storm-kafka-client/README.md | 2 +- .../storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java | 2 +- .../kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java | 6 +----- 3 files changed, 3 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/eeee1ee5/external/storm-kafka-client/README.md ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/README.md b/external/storm-kafka-client/README.md index ed20ec7..a45ae0b 100644 --- a/external/storm-kafka-client/README.md +++ b/external/storm-kafka-client/README.md @@ -42,7 +42,7 @@ KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new KafkaSpoutTuplesBuil new TopicTest2TupleBuilder<String, String>(TOPICS[2])) .build(); -KafkaSpoutRetryService retryService = new KafkaSpoutRetryExponentialBackoff(new TimeInterval(500, TimeUnit.MICROSECONDS), +KafkaSpoutRetryService retryService = new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500), TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); ``` http://git-wip-us.apache.org/repos/asf/storm/blob/eeee1ee5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java index 6fe997c..f59367d 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java @@ -117,7 +117,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService } public static TimeInterval microSeconds(long length) { - return new TimeInterval(length, TimeUnit.MILLISECONDS); + return new TimeInterval(length, TimeUnit.MICROSECONDS); } public long lengthNanos() { http://git-wip-us.apache.org/repos/asf/storm/blob/eeee1ee5/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java index 952c5d3..5a78137 100644 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java @@ -104,14 +104,10 @@ public class KafkaSpoutTopologyMainNamedTopics { } protected KafkaSpoutRetryService getRetryService() { - return new KafkaSpoutRetryExponentialBackoff(getTimeInterval(500, TimeUnit.MICROSECONDS), + return new KafkaSpoutRetryExponentialBackoff(TimeInterval.microSeconds(500), TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); } - protected TimeInterval getTimeInterval(long delay, TimeUnit timeUnit) { - return new TimeInterval(delay, timeUnit); - } - protected Map<String,Object> getKafkaConsumerProps() { Map<String, Object> props = new HashMap<>(); // props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
