METRON-1588 Migrate storm-kafka-client to 1.2.1 closes 
apache/incubator-metron#1039


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/828ab713
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/828ab713
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/828ab713

Branch: refs/heads/feature/METRON-1416-upgrade-solr
Commit: 828ab71346b458b73cb14d42d68e7471ead7fa4e
Parents: 7757046
Author: Jungtaek Lim <kabh...@gmail.com>
Authored: Thu Jun 14 12:07:05 2018 -0400
Committer: cstella <ceste...@gmail.com>
Committed: Thu Jun 14 12:07:05 2018 -0400

----------------------------------------------------------------------
 NOTICE                                          |   6 +
 dependencies_with_url.csv                       |   4 +
 metron-platform/metron-api/pom.xml              |   4 -
 .../parsers/topology/ParserTopologyBuilder.java |   2 +-
 .../metron-storm-kafka-override/pom.xml         |  83 ++++-
 .../KafkaSpoutRetryExponentialBackoff.java      | 336 +++++++++++++++++++
 .../storm/kafka/spout/internal/Timer.java       |   9 +-
 metron-platform/metron-storm-kafka/pom.xml      |   5 -
 pom.xml                                         |   4 +-
 9 files changed, 435 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/NOTICE
----------------------------------------------------------------------
diff --git a/NOTICE b/NOTICE
index ff6550b..5227768 100644
--- a/NOTICE
+++ b/NOTICE
@@ -10,3 +10,9 @@
    This product includes software developed by Chef Software 
(https://www.chef.io)
    Copyright (c) 2012-2015, Chef Software, Inc.
 
+   This includes derived works from the Apache Storm (ASLv2 licensed) project 
(https://github.com/apache/storm):
+   Copyright 2015 The Apache Software Foundation
+   The derived work is adapted from
+     org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+     org/apache/storm/kafka/spout/internal/Timer.java
+   and can be found in the org.apache.storm.kafka package

http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index df3bcd2..438ce3e 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -119,15 +119,19 @@ 
com.fasterxml.jackson.core:jackson-annotations:jar:2.2.3:compile,ASLv2,http://wi
 
com.fasterxml.jackson.core:jackson-annotations:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson
 
com.fasterxml.jackson.core:jackson-annotations:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson
 
com.fasterxml.jackson.core:jackson-annotations:jar:2.9.0:compile,ASLv2,http://github.com/FasterXML/jackson
+com.fasterxml.jackson.core:jackson-annotations:jar:2.9.4:compile,ASLv2,http://github.com/FasterXML/jackson
 
com.fasterxml.jackson.core:jackson-core:jar:2.2.3:compile,ASLv2,http://wiki.fasterxml.com/JacksonHome
+com.fasterxml.jackson.core:jackson-core:jar:2.6.3:compile,ASLv2,https://github.com/FasterXML/jackson-core
 
com.fasterxml.jackson.core:jackson-core:jar:2.6.6:compile,ASLv2,https://github.com/FasterXML/jackson-core
 
com.fasterxml.jackson.core:jackson-core:jar:2.7.4:compile,ASLv2,https://github.com/FasterXML/jackson-core
 
com.fasterxml.jackson.core:jackson-core:jar:2.8.3:compile,ASLv2,https://github.com/FasterXML/jackson-core
+com.fasterxml.jackson.core:jackson-core:jar:2.9.4:compile,ASLv2,https://github.com/FasterXML/jackson-core
 
com.fasterxml.jackson.core:jackson-core:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-core
 
com.fasterxml.jackson.core:jackson-databind:jar:2.2.3:compile,ASLv2,http://wiki.fasterxml.com/JacksonHome
 
com.fasterxml.jackson.core:jackson-databind:jar:2.4.3:compile,ASLv2,http://github.com/FasterXML/jackson
 
com.fasterxml.jackson.core:jackson-databind:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson
 
com.fasterxml.jackson.core:jackson-databind:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson
+com.fasterxml.jackson.core:jackson-databind:jar:2.9.4:compile,ASLv2,http://github.com/FasterXML/jackson
 
com.fasterxml.jackson.core:jackson-databind:jar:2.9.5:compile,ASLv2,http://github.com/FasterXML/jackson
 
com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.6.6:compile,ASLv2,http://wiki.fasterxml.com/JacksonForCbor
 
com.fasterxml.jackson.dataformat:jackson-dataformat-smile:jar:2.6.6:compile,ASLv2,http://wiki.fasterxml.com/JacksonForSmile

http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/metron-platform/metron-api/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-api/pom.xml 
b/metron-platform/metron-api/pom.xml
index e3bf12d..59b1622 100644
--- a/metron-platform/metron-api/pom.xml
+++ b/metron-platform/metron-api/pom.xml
@@ -31,10 +31,6 @@
                <zookeeper.version>3.4.5.2.0.6.0-76</zookeeper.version>
                <logger.version>1.2.15</logger.version>
 
-        <storm-kafka-client.version>1.0.1</storm-kafka-client.version>
-        <storm-hdfs.version>0.0.7-SNAPSHOT</storm-hdfs.version>
-        <storm-hbase.version>0.0.5-SNAPSHOT</storm-hbase.version>
-
         <spring.integration.version>3.0.0.RELEASE</spring.integration.version>
         <spring.version>3.2.6.RELEASE</spring.version>
         <commons-fileupload.version>1.2.2</commons-fileupload.version>

http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
index cd4ad50..5b3e0d5 100644
--- 
a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
+++ 
b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java
@@ -162,7 +162,7 @@ public class ParserTopologyBuilder {
     Map<String, Object> kafkaSpoutConfigOptions = 
kafkaConfigOptional.orElse(new HashMap<>());
     String inputTopic = parserConfig.getSensorTopic() != null ? 
parserConfig.getSensorTopic() : sensorType;
     kafkaSpoutConfigOptions.putIfAbsent( 
SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key
-            , 
KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST.toString()
+            , 
KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST.name()
     );
     kafkaSpoutConfigOptions.putIfAbsent( ConsumerConfig.GROUP_ID_CONFIG
             , inputTopic + "_parser"

http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/metron-platform/metron-storm-kafka-override/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka-override/pom.xml 
b/metron-platform/metron-storm-kafka-override/pom.xml
index 9be733e..a38d3bf 100644
--- a/metron-platform/metron-storm-kafka-override/pom.xml
+++ b/metron-platform/metron-storm-kafka-override/pom.xml
@@ -92,14 +92,87 @@
                 </exclusion>
             </exclusions>
         </dependency>
-        <dependency>
-            <groupId>org.apache.metron</groupId>
-            <artifactId>metron-common</artifactId>
-            <version>${project.parent.version}</version>
-        </dependency>
     </dependencies>
 
     <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${global_shade_version}</version>
+                <configuration>
+                    
<createDependencyReducedPom>true</createDependencyReducedPom>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                          <filters>
+                            <filter>
+                              <artifact>*:*</artifact>
+                              <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                              </excludes>
+                            </filter>
+                          </filters>
+                          <relocations>
+                                <relocation>
+                                    <pattern>com.google.common</pattern>
+                                    
<shadedPattern>org.apache.metron.storm.kafka.override.guava.common</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.google.thirdparty</pattern>
+                                    
<shadedPattern>org.apache.metron.storm.kafka.override.guava.thirdparty</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.commons.lang</pattern>
+                                    
<shadedPattern>org.apache.metron.storm.kafka.override.commons.lang</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>com.fasterxml</pattern>
+                                    
<shadedPattern>org.apache.metron.storm.kafka.override.com.fasterxml</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>storm:storm-core:*</exclude>
+                                    <exclude>storm:storm-lib:*</exclude>
+                                    <exclude>org.slf4j.impl*</exclude>
+                                    <exclude>org.slf4j:slf4j-log4j*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <transformers>
+                                <transformer
+                                  
implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                                     <resources>
+                                        <resource>.yaml</resource>
+                                        <resource>LICENSE.txt</resource>
+                                        <resource>ASL2.0</resource>
+                                        <resource>NOTICE.txt</resource>
+                                      </resources>
+                                </transformer>
+                                <!-- UNCOMMENT THIS IF YOU NEED TO REGENERATE 
THE BEST GUESS NOTICES FILE WHICH REQUIRES PRUNING EVERY RELEASE -->
+                                <!--transformer 
implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
+                                    <addHeader>false</addHeader>
+                                    <projectName>${project.name}</projectName>
+                                </transformer-->
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass></mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
         <resources>
             <resource>
                 <directory>src/main/resources</directory>

http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
 
b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
new file mode 100644
index 0000000..439188b
--- /dev/null
+++ 
b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.Validate;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+/*
+ This file is pulled from Apache Storm, with some modification to support 
lower version of
+ Apache Storm.
+
+ - Time.nanoTime() is introduced in Storm 1.1.0 so we changed to 
System.nanoTime()
+ -- Time.nanoTime() calls System.nanoTime() when it's not in time simulation 
mode.
+*/
+
+/**
+ * Implementation of {@link KafkaSpoutRetryService} using the exponential 
backoff formula. The time of the nextRetry is set as follows:
+ * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + 
delayPeriod*2^(failCount-1)    where failCount = 1, 2, 3, ...
+ * nextRetry = Min(nextRetry, currentTime + maxDelay)
+ */
+public class KafkaSpoutRetryExponentialBackoff implements 
KafkaSpoutRetryService {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
+    private static final RetryEntryTimeStampComparator 
RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
+
+    private final TimeInterval initialDelay;
+    private final TimeInterval delayPeriod;
+    private final TimeInterval maxDelay;
+    private final int maxRetries;
+
+    //This class assumes that there is at most one retry schedule per message 
id in this set at a time.
+    private final Set<RetrySchedule> retrySchedules = new 
TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
+    private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>();      
// Convenience data structure to speedup lookups
+
+    /**
+     * Comparator ordering by timestamp 
+     */
+    private static class RetryEntryTimeStampComparator implements 
Serializable, Comparator<RetrySchedule> {
+        @Override
+        public int compare(RetrySchedule entry1, RetrySchedule entry2) {
+            int result = 
Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
+            
+            if(result == 0) {
+                //TreeSet uses compareTo instead of equals() for the Set 
contract
+                //Ensure that we can save two retry schedules with the same 
timestamp
+                result = entry1.hashCode() - entry2.hashCode();
+            }
+            return result;
+        }
+    }
+
+    private class RetrySchedule {
+        private final KafkaSpoutMessageId msgId;
+        private long nextRetryTimeNanos;
+
+        public RetrySchedule(KafkaSpoutMessageId msgId, long 
nextRetryTimeNanos) {
+            this.msgId = msgId;
+            this.nextRetryTimeNanos = nextRetryTimeNanos;
+            LOG.debug("Created {}", this);
+        }
+
+        public void setNextRetryTimeNanos() {
+            nextRetryTimeNanos = nextTime(msgId);
+            LOG.debug("Updated {}", this);
+        }
+
+        public boolean retry(long currentTimeNanos) {
+            return nextRetryTimeNanos <= currentTimeNanos;
+        }
+
+        @Override
+        public String toString() {
+            return "RetrySchedule{" +
+                    "msgId=" + msgId +
+                    ", nextRetryTimeNanos=" + nextRetryTimeNanos +
+                    '}';
+        }
+
+        public KafkaSpoutMessageId msgId() {
+            return msgId;
+        }
+
+        public long nextRetryTimeNanos() {
+            return nextRetryTimeNanos;
+        }
+    }
+
+    public static class TimeInterval implements Serializable {
+        private final long lengthNanos;
+        private final TimeUnit timeUnit;
+        private final long length;
+
+        /**
+         * @param length length of the time interval in the units specified by 
{@link TimeUnit}
+         * @param timeUnit unit used to specify a time interval on which to 
specify a time unit
+         */
+        public TimeInterval(long length, TimeUnit timeUnit) {
+            this.lengthNanos = timeUnit.toNanos(length);
+            this.timeUnit = timeUnit;
+            this.length = length;
+        }
+        
+        public static TimeInterval seconds(long length) {
+            return new TimeInterval(length, TimeUnit.SECONDS);
+        }
+
+        public static TimeInterval milliSeconds(long length) {
+            return new TimeInterval(length, TimeUnit.MILLISECONDS);
+        }
+        
+        public static TimeInterval microSeconds(long length) {
+            return new TimeInterval(length, TimeUnit.MICROSECONDS);
+        }
+        
+        public long lengthNanos() {
+            return lengthNanos;
+        }
+        
+        public TimeUnit timeUnit() {
+            return timeUnit;
+        }
+
+        @Override
+        public String toString() {
+            return "TimeInterval{" +
+                    "length=" + length +
+                    ", timeUnit=" + timeUnit +
+                    '}';
+        }
+    }
+
+    /**
+     * The time stamp of the next retry is scheduled according to the 
exponential backoff formula ( geometric progression):
+     * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + 
delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
+     * nextRetry = Min(nextRetry, currentTime + maxDelay).
+     * 
+     * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the 
user decides to sacrifice guarantee of delivery for the previous
+     * polled records in favor of processing more records.
+     *
+     * @param initialDelay      initial delay of the first retry
+     * @param delayPeriod       the time interval that is the ratio of the 
exponential backoff formula (geometric progression)
+     * @param maxRetries        maximum number of times a tuple is retried 
before being acked and scheduled for commit
+     * @param maxDelay          maximum amount of time waiting before retrying
+     *
+     */
+    public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, 
TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
+        this.initialDelay = initialDelay;
+        this.delayPeriod = delayPeriod;
+        this.maxRetries = maxRetries;
+        this.maxDelay = maxDelay;
+        LOG.debug("Instantiated {}", this.toStringImpl());
+    }
+
+    @Override
+    public Map<TopicPartition, Long> earliestRetriableOffsets() {
+        final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new 
HashMap<>();
+        final long currentTimeNanos = System.nanoTime();
+        for (RetrySchedule retrySchedule : retrySchedules) {
+            if (retrySchedule.retry(currentTimeNanos)) {
+                final KafkaSpoutMessageId msgId = retrySchedule.msgId;
+                final TopicPartition tpForMessage = new 
TopicPartition(msgId.topic(), msgId.partition());
+                final Long currentLowestOffset = 
tpToEarliestRetriableOffset.get(tpForMessage);
+                if(currentLowestOffset != null) {
+                    tpToEarliestRetriableOffset.put(tpForMessage, 
Math.min(currentLowestOffset, msgId.offset()));
+                } else {
+                    tpToEarliestRetriableOffset.put(tpForMessage, 
msgId.offset());
+                }
+            } else {
+                break;  // Stop searching as soon as passed current time
+            }
+        }
+        LOG.debug("Topic partitions with entries ready to be retried [{}] ", 
tpToEarliestRetriableOffset);
+        return tpToEarliestRetriableOffset;
+    }
+
+    @Override
+    public boolean isReady(KafkaSpoutMessageId msgId) {
+        boolean retry = false;
+        if (isScheduled(msgId)) {
+            final long currentTimeNanos = System.nanoTime();
+            for (RetrySchedule retrySchedule : retrySchedules) {
+                if (retrySchedule.retry(currentTimeNanos)) {
+                    if (retrySchedule.msgId.equals(msgId)) {
+                        retry = true;
+                        LOG.debug("Found entry to retry {}", retrySchedule);
+                        break; //Stop searching if the message is known to be 
ready for retry
+                    }
+                } else {
+                    LOG.debug("Entry to retry not found {}", retrySchedule);
+                    break;  // Stop searching as soon as passed current time
+                }
+            }
+        }
+        return retry;
+    }
+
+    @Override
+    public boolean isScheduled(KafkaSpoutMessageId msgId) {
+        return toRetryMsgs.contains(msgId);
+    }
+
+    @Override
+    public boolean remove(KafkaSpoutMessageId msgId) {
+        boolean removed = false;
+        if (isScheduled(msgId)) {
+            toRetryMsgs.remove(msgId);
+            for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); 
iterator.hasNext(); ) {
+                final RetrySchedule retrySchedule = iterator.next();
+                if (retrySchedule.msgId().equals(msgId)) {
+                    iterator.remove();
+                    removed = true;
+                    break; //There is at most one schedule per message id
+                }
+            }
+        }
+        LOG.debug(removed ? "Removed {} " : "Not removed {}", msgId);
+        LOG.trace("Current state {}", retrySchedules);
+        return removed;
+    }
+
+    @Override
+    public boolean retainAll(Collection<TopicPartition> topicPartitions) {
+        boolean result = false;
+        for (Iterator<RetrySchedule> rsIterator = retrySchedules.iterator(); 
rsIterator.hasNext(); ) {
+            final RetrySchedule retrySchedule = rsIterator.next();
+            final KafkaSpoutMessageId msgId = retrySchedule.msgId;
+            final TopicPartition tpRetry= new TopicPartition(msgId.topic(), 
msgId.partition());
+            if (!topicPartitions.contains(tpRetry)) {
+                rsIterator.remove();
+                toRetryMsgs.remove(msgId);
+                LOG.debug("Removed {}", retrySchedule);
+                LOG.trace("Current state {}", retrySchedules);
+                result = true;
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public boolean schedule(KafkaSpoutMessageId msgId) {
+        if (msgId.numFails() > maxRetries) {
+            LOG.debug("Not scheduling [{}] because reached maximum number of 
retries [{}].", msgId, maxRetries);
+            return false;
+        } else {
+            //Remove existing schedule for the message id
+            remove(msgId);
+            final RetrySchedule retrySchedule = new RetrySchedule(msgId, 
nextTime(msgId));
+            retrySchedules.add(retrySchedule);
+            toRetryMsgs.add(msgId);
+            LOG.debug("Scheduled. {}", retrySchedule);
+            LOG.trace("Current state {}", retrySchedules);
+            return true;
+        }
+    }
+    
+    @Override
+    public int readyMessageCount() {
+        int count = 0;
+        final long currentTimeNanos = System.nanoTime();
+        for (RetrySchedule retrySchedule : retrySchedules) {
+            if (retrySchedule.retry(currentTimeNanos)) {
+                ++count;
+            } else {
+                break; //Stop counting when past current time
+            }
+        }
+        return count;
+    }
+
+    @Override
+    public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
+        KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+        if (isScheduled(msgId)) {
+            for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
+                if (originalMsgId.equals(msgId)) {
+                    return originalMsgId;
+                }
+            }
+        }
+        return msgId;
+    }
+
+    // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
+    private long nextTime(KafkaSpoutMessageId msgId) {
+        Validate.isTrue(msgId.numFails() > 0, "nextTime assumes the message 
has failed at least once");
+        final long currentTimeNanos = System.nanoTime();
+        final long nextTimeNanos = msgId.numFails() == 1                // 
numFails = 1, 2, 3, ...
+                ? currentTimeNanos + initialDelay.lengthNanos
+                : currentTimeNanos + delayPeriod.lengthNanos * 
(long)(Math.pow(2, msgId.numFails()-1));
+        return Math.min(nextTimeNanos, currentTimeNanos + 
maxDelay.lengthNanos);
+    }
+
+    @Override
+    public String toString() {
+        return toStringImpl();
+    }
+    
+    private String toStringImpl() {
+        //This is here to avoid an overridable call in the constructor
+        return "KafkaSpoutRetryExponentialBackoff{" +
+                "delay=" + initialDelay +
+                ", ratio=" + delayPeriod +
+                ", maxRetries=" + maxRetries +
+                ", maxRetryDelay=" + maxDelay +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
 
b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
index f9782ab..0b045c0 100644
--- 
a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
+++ 
b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
@@ -18,7 +18,14 @@
 package org.apache.storm.kafka.spout.internal;
 
 import java.util.concurrent.TimeUnit;
-import org.apache.storm.utils.Time;
+
+/*
+ This file is pulled from Apache Storm, with some modification to support 
lower version of
+ Apache Storm.
+
+ - Time.nanoTime() is introduced in Storm 1.1.0 so we changed to 
System.nanoTime()
+ -- Time.nanoTime() calls System.nanoTime() when it's not in time simulation 
mode.
+*/
 
 public class Timer {
   private final long delay;

http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/metron-platform/metron-storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-storm-kafka/pom.xml 
b/metron-platform/metron-storm-kafka/pom.xml
index 5ccc0d9..fdcde4f 100644
--- a/metron-platform/metron-storm-kafka/pom.xml
+++ b/metron-platform/metron-storm-kafka/pom.xml
@@ -36,11 +36,6 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.storm</groupId>
-            <artifactId>storm-kafka-client</artifactId>
-            <version>${global_storm_kafka_version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
             <version>${global_kafka_version}</version>

http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 75c9a31..a5689f5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,7 +91,7 @@
         <global_curator_version>2.7.1</global_curator_version>
         <global_classindex_version>3.3</global_classindex_version>
         <global_storm_version>1.0.3</global_storm_version>
-        <global_storm_kafka_version>1.1.0</global_storm_kafka_version>
+        <global_storm_kafka_version>1.2.2</global_storm_kafka_version>
         <global_flux_version>${base_flux_version}</global_flux_version>
         <global_pcap_version>1.7.1</global_pcap_version>
         <global_kafka_version>0.10.0.1</global_kafka_version>
@@ -129,7 +129,7 @@
             <properties>
                 <hdp_version>2.5.0.0</hdp_version>
                 <build_number>1245</build_number>
-                <global_storm_kafka_version>1.1.0</global_storm_kafka_version>
+                <global_storm_kafka_version>1.2.2</global_storm_kafka_version>
                 
<global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version>
                 
<global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version>
             </properties>

Reply via email to