METRON-1588 Migrate storm-kafka-client to 1.2.1 closes apache/incubator-metron#1039
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/828ab713 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/828ab713 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/828ab713 Branch: refs/heads/feature/METRON-1416-upgrade-solr Commit: 828ab71346b458b73cb14d42d68e7471ead7fa4e Parents: 7757046 Author: Jungtaek Lim <kabh...@gmail.com> Authored: Thu Jun 14 12:07:05 2018 -0400 Committer: cstella <ceste...@gmail.com> Committed: Thu Jun 14 12:07:05 2018 -0400 ---------------------------------------------------------------------- NOTICE | 6 + dependencies_with_url.csv | 4 + metron-platform/metron-api/pom.xml | 4 - .../parsers/topology/ParserTopologyBuilder.java | 2 +- .../metron-storm-kafka-override/pom.xml | 83 ++++- .../KafkaSpoutRetryExponentialBackoff.java | 336 +++++++++++++++++++ .../storm/kafka/spout/internal/Timer.java | 9 +- metron-platform/metron-storm-kafka/pom.xml | 5 - pom.xml | 4 +- 9 files changed, 435 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/NOTICE ---------------------------------------------------------------------- diff --git a/NOTICE b/NOTICE index ff6550b..5227768 100644 --- a/NOTICE +++ b/NOTICE @@ -10,3 +10,9 @@ This product includes software developed by Chef Software (https://www.chef.io) Copyright (c) 2012-2015, Chef Software, Inc. + This includes derived works from the Apache Storm (ASLv2 licensed) project (https://github.com/apache/storm): + Copyright 2015 The Apache Software Foundation + The derived work is adapted from + org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java + org/apache/storm/kafka/spout/internal/Timer.java + and can be found in the org.apache.storm.kafka package http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/dependencies_with_url.csv ---------------------------------------------------------------------- diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv index df3bcd2..438ce3e 100644 --- a/dependencies_with_url.csv +++ b/dependencies_with_url.csv @@ -119,15 +119,19 @@ com.fasterxml.jackson.core:jackson-annotations:jar:2.2.3:compile,ASLv2,http://wi com.fasterxml.jackson.core:jackson-annotations:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson com.fasterxml.jackson.core:jackson-annotations:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson com.fasterxml.jackson.core:jackson-annotations:jar:2.9.0:compile,ASLv2,http://github.com/FasterXML/jackson +com.fasterxml.jackson.core:jackson-annotations:jar:2.9.4:compile,ASLv2,http://github.com/FasterXML/jackson com.fasterxml.jackson.core:jackson-core:jar:2.2.3:compile,ASLv2,http://wiki.fasterxml.com/JacksonHome +com.fasterxml.jackson.core:jackson-core:jar:2.6.3:compile,ASLv2,https://github.com/FasterXML/jackson-core com.fasterxml.jackson.core:jackson-core:jar:2.6.6:compile,ASLv2,https://github.com/FasterXML/jackson-core com.fasterxml.jackson.core:jackson-core:jar:2.7.4:compile,ASLv2,https://github.com/FasterXML/jackson-core com.fasterxml.jackson.core:jackson-core:jar:2.8.3:compile,ASLv2,https://github.com/FasterXML/jackson-core +com.fasterxml.jackson.core:jackson-core:jar:2.9.4:compile,ASLv2,https://github.com/FasterXML/jackson-core com.fasterxml.jackson.core:jackson-core:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-core com.fasterxml.jackson.core:jackson-databind:jar:2.2.3:compile,ASLv2,http://wiki.fasterxml.com/JacksonHome com.fasterxml.jackson.core:jackson-databind:jar:2.4.3:compile,ASLv2,http://github.com/FasterXML/jackson com.fasterxml.jackson.core:jackson-databind:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson com.fasterxml.jackson.core:jackson-databind:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson +com.fasterxml.jackson.core:jackson-databind:jar:2.9.4:compile,ASLv2,http://github.com/FasterXML/jackson com.fasterxml.jackson.core:jackson-databind:jar:2.9.5:compile,ASLv2,http://github.com/FasterXML/jackson com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:jar:2.6.6:compile,ASLv2,http://wiki.fasterxml.com/JacksonForCbor com.fasterxml.jackson.dataformat:jackson-dataformat-smile:jar:2.6.6:compile,ASLv2,http://wiki.fasterxml.com/JacksonForSmile http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/metron-platform/metron-api/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-api/pom.xml b/metron-platform/metron-api/pom.xml index e3bf12d..59b1622 100644 --- a/metron-platform/metron-api/pom.xml +++ b/metron-platform/metron-api/pom.xml @@ -31,10 +31,6 @@ <zookeeper.version>3.4.5.2.0.6.0-76</zookeeper.version> <logger.version>1.2.15</logger.version> - <storm-kafka-client.version>1.0.1</storm-kafka-client.version> - <storm-hdfs.version>0.0.7-SNAPSHOT</storm-hdfs.version> - <storm-hbase.version>0.0.5-SNAPSHOT</storm-hbase.version> - <spring.integration.version>3.0.0.RELEASE</spring.integration.version> <spring.version>3.2.6.RELEASE</spring.version> <commons-fileupload.version>1.2.2</commons-fileupload.version> http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java index cd4ad50..5b3e0d5 100644 --- a/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java +++ b/metron-platform/metron-parsers/src/main/java/org/apache/metron/parsers/topology/ParserTopologyBuilder.java @@ -162,7 +162,7 @@ public class ParserTopologyBuilder { Map<String, Object> kafkaSpoutConfigOptions = kafkaConfigOptional.orElse(new HashMap<>()); String inputTopic = parserConfig.getSensorTopic() != null ? parserConfig.getSensorTopic() : sensorType; kafkaSpoutConfigOptions.putIfAbsent( SpoutConfiguration.FIRST_POLL_OFFSET_STRATEGY.key - , KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST.toString() + , KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST.name() ); kafkaSpoutConfigOptions.putIfAbsent( ConsumerConfig.GROUP_ID_CONFIG , inputTopic + "_parser" http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/metron-platform/metron-storm-kafka-override/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka-override/pom.xml b/metron-platform/metron-storm-kafka-override/pom.xml index 9be733e..a38d3bf 100644 --- a/metron-platform/metron-storm-kafka-override/pom.xml +++ b/metron-platform/metron-storm-kafka-override/pom.xml @@ -92,14 +92,87 @@ </exclusion> </exclusions> </dependency> - <dependency> - <groupId>org.apache.metron</groupId> - <artifactId>metron-common</artifactId> - <version>${project.parent.version}</version> - </dependency> </dependencies> <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${global_shade_version}</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <filters> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <relocations> + <relocation> + <pattern>com.google.common</pattern> + <shadedPattern>org.apache.metron.storm.kafka.override.guava.common</shadedPattern> + </relocation> + <relocation> + <pattern>com.google.thirdparty</pattern> + <shadedPattern>org.apache.metron.storm.kafka.override.guava.thirdparty</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.commons.lang</pattern> + <shadedPattern>org.apache.metron.storm.kafka.override.commons.lang</shadedPattern> + </relocation> + <relocation> + <pattern>com.fasterxml</pattern> + <shadedPattern>org.apache.metron.storm.kafka.override.com.fasterxml</shadedPattern> + </relocation> + </relocations> + <artifactSet> + <excludes> + <exclude>storm:storm-core:*</exclude> + <exclude>storm:storm-lib:*</exclude> + <exclude>org.slf4j.impl*</exclude> + <exclude>org.slf4j:slf4j-log4j*</exclude> + </excludes> + </artifactSet> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer"> + <resources> + <resource>.yaml</resource> + <resource>LICENSE.txt</resource> + <resource>ASL2.0</resource> + <resource>NOTICE.txt</resource> + </resources> + </transformer> + <!-- UNCOMMENT THIS IF YOU NEED TO REGENERATE THE BEST GUESS NOTICES FILE WHICH REQUIRES PRUNING EVERY RELEASE --> + <!--transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"> + <addHeader>false</addHeader> + <projectName>${project.name}</projectName> + </transformer--> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass></mainClass> + </transformer> + </transformers> + </configuration> + </execution> + </executions> + </plugin> + </plugins> <resources> <resource> <directory>src/main/resources</directory> http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java new file mode 100644 index 0000000..439188b --- /dev/null +++ b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.common.TopicPartition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.Validate; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/* + This file is pulled from Apache Storm, with some modification to support lower version of + Apache Storm. + + - Time.nanoTime() is introduced in Storm 1.1.0 so we changed to System.nanoTime() + -- Time.nanoTime() calls System.nanoTime() when it's not in time simulation mode. +*/ + +/** + * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows: + * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod*2^(failCount-1) where failCount = 1, 2, 3, ... + * nextRetry = Min(nextRetry, currentTime + maxDelay) + */ +public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class); + private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator(); + + private final TimeInterval initialDelay; + private final TimeInterval delayPeriod; + private final TimeInterval maxDelay; + private final int maxRetries; + + //This class assumes that there is at most one retry schedule per message id in this set at a time. + private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR); + private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>(); // Convenience data structure to speedup lookups + + /** + * Comparator ordering by timestamp + */ + private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> { + @Override + public int compare(RetrySchedule entry1, RetrySchedule entry2) { + int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos()); + + if(result == 0) { + //TreeSet uses compareTo instead of equals() for the Set contract + //Ensure that we can save two retry schedules with the same timestamp + result = entry1.hashCode() - entry2.hashCode(); + } + return result; + } + } + + private class RetrySchedule { + private final KafkaSpoutMessageId msgId; + private long nextRetryTimeNanos; + + public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTimeNanos) { + this.msgId = msgId; + this.nextRetryTimeNanos = nextRetryTimeNanos; + LOG.debug("Created {}", this); + } + + public void setNextRetryTimeNanos() { + nextRetryTimeNanos = nextTime(msgId); + LOG.debug("Updated {}", this); + } + + public boolean retry(long currentTimeNanos) { + return nextRetryTimeNanos <= currentTimeNanos; + } + + @Override + public String toString() { + return "RetrySchedule{" + + "msgId=" + msgId + + ", nextRetryTimeNanos=" + nextRetryTimeNanos + + '}'; + } + + public KafkaSpoutMessageId msgId() { + return msgId; + } + + public long nextRetryTimeNanos() { + return nextRetryTimeNanos; + } + } + + public static class TimeInterval implements Serializable { + private final long lengthNanos; + private final TimeUnit timeUnit; + private final long length; + + /** + * @param length length of the time interval in the units specified by {@link TimeUnit} + * @param timeUnit unit used to specify a time interval on which to specify a time unit + */ + public TimeInterval(long length, TimeUnit timeUnit) { + this.lengthNanos = timeUnit.toNanos(length); + this.timeUnit = timeUnit; + this.length = length; + } + + public static TimeInterval seconds(long length) { + return new TimeInterval(length, TimeUnit.SECONDS); + } + + public static TimeInterval milliSeconds(long length) { + return new TimeInterval(length, TimeUnit.MILLISECONDS); + } + + public static TimeInterval microSeconds(long length) { + return new TimeInterval(length, TimeUnit.MICROSECONDS); + } + + public long lengthNanos() { + return lengthNanos; + } + + public TimeUnit timeUnit() { + return timeUnit; + } + + @Override + public String toString() { + return "TimeInterval{" + + "length=" + length + + ", timeUnit=" + timeUnit + + '}'; + } + } + + /** + * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression): + * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ... + * nextRetry = Min(nextRetry, currentTime + maxDelay). + * + * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the previous + * polled records in favor of processing more records. + * + * @param initialDelay initial delay of the first retry + * @param delayPeriod the time interval that is the ratio of the exponential backoff formula (geometric progression) + * @param maxRetries maximum number of times a tuple is retried before being acked and scheduled for commit + * @param maxDelay maximum amount of time waiting before retrying + * + */ + public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) { + this.initialDelay = initialDelay; + this.delayPeriod = delayPeriod; + this.maxRetries = maxRetries; + this.maxDelay = maxDelay; + LOG.debug("Instantiated {}", this.toStringImpl()); + } + + @Override + public Map<TopicPartition, Long> earliestRetriableOffsets() { + final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>(); + final long currentTimeNanos = System.nanoTime(); + for (RetrySchedule retrySchedule : retrySchedules) { + if (retrySchedule.retry(currentTimeNanos)) { + final KafkaSpoutMessageId msgId = retrySchedule.msgId; + final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition()); + final Long currentLowestOffset = tpToEarliestRetriableOffset.get(tpForMessage); + if(currentLowestOffset != null) { + tpToEarliestRetriableOffset.put(tpForMessage, Math.min(currentLowestOffset, msgId.offset())); + } else { + tpToEarliestRetriableOffset.put(tpForMessage, msgId.offset()); + } + } else { + break; // Stop searching as soon as passed current time + } + } + LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset); + return tpToEarliestRetriableOffset; + } + + @Override + public boolean isReady(KafkaSpoutMessageId msgId) { + boolean retry = false; + if (isScheduled(msgId)) { + final long currentTimeNanos = System.nanoTime(); + for (RetrySchedule retrySchedule : retrySchedules) { + if (retrySchedule.retry(currentTimeNanos)) { + if (retrySchedule.msgId.equals(msgId)) { + retry = true; + LOG.debug("Found entry to retry {}", retrySchedule); + break; //Stop searching if the message is known to be ready for retry + } + } else { + LOG.debug("Entry to retry not found {}", retrySchedule); + break; // Stop searching as soon as passed current time + } + } + } + return retry; + } + + @Override + public boolean isScheduled(KafkaSpoutMessageId msgId) { + return toRetryMsgs.contains(msgId); + } + + @Override + public boolean remove(KafkaSpoutMessageId msgId) { + boolean removed = false; + if (isScheduled(msgId)) { + toRetryMsgs.remove(msgId); + for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) { + final RetrySchedule retrySchedule = iterator.next(); + if (retrySchedule.msgId().equals(msgId)) { + iterator.remove(); + removed = true; + break; //There is at most one schedule per message id + } + } + } + LOG.debug(removed ? "Removed {} " : "Not removed {}", msgId); + LOG.trace("Current state {}", retrySchedules); + return removed; + } + + @Override + public boolean retainAll(Collection<TopicPartition> topicPartitions) { + boolean result = false; + for (Iterator<RetrySchedule> rsIterator = retrySchedules.iterator(); rsIterator.hasNext(); ) { + final RetrySchedule retrySchedule = rsIterator.next(); + final KafkaSpoutMessageId msgId = retrySchedule.msgId; + final TopicPartition tpRetry= new TopicPartition(msgId.topic(), msgId.partition()); + if (!topicPartitions.contains(tpRetry)) { + rsIterator.remove(); + toRetryMsgs.remove(msgId); + LOG.debug("Removed {}", retrySchedule); + LOG.trace("Current state {}", retrySchedules); + result = true; + } + } + return result; + } + + @Override + public boolean schedule(KafkaSpoutMessageId msgId) { + if (msgId.numFails() > maxRetries) { + LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries); + return false; + } else { + //Remove existing schedule for the message id + remove(msgId); + final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId)); + retrySchedules.add(retrySchedule); + toRetryMsgs.add(msgId); + LOG.debug("Scheduled. {}", retrySchedule); + LOG.trace("Current state {}", retrySchedules); + return true; + } + } + + @Override + public int readyMessageCount() { + int count = 0; + final long currentTimeNanos = System.nanoTime(); + for (RetrySchedule retrySchedule : retrySchedules) { + if (retrySchedule.retry(currentTimeNanos)) { + ++count; + } else { + break; //Stop counting when past current time + } + } + return count; + } + + @Override + public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) { + KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record); + if (isScheduled(msgId)) { + for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) { + if (originalMsgId.equals(msgId)) { + return originalMsgId; + } + } + } + return msgId; + } + + // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE + private long nextTime(KafkaSpoutMessageId msgId) { + Validate.isTrue(msgId.numFails() > 0, "nextTime assumes the message has failed at least once"); + final long currentTimeNanos = System.nanoTime(); + final long nextTimeNanos = msgId.numFails() == 1 // numFails = 1, 2, 3, ... + ? currentTimeNanos + initialDelay.lengthNanos + : currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails()-1)); + return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos); + } + + @Override + public String toString() { + return toStringImpl(); + } + + private String toStringImpl() { + //This is here to avoid an overridable call in the constructor + return "KafkaSpoutRetryExponentialBackoff{" + + "delay=" + initialDelay + + ", ratio=" + delayPeriod + + ", maxRetries=" + maxRetries + + ", maxRetryDelay=" + maxDelay + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java index f9782ab..0b045c0 100644 --- a/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java +++ b/metron-platform/metron-storm-kafka-override/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java @@ -18,7 +18,14 @@ package org.apache.storm.kafka.spout.internal; import java.util.concurrent.TimeUnit; -import org.apache.storm.utils.Time; + +/* + This file is pulled from Apache Storm, with some modification to support lower version of + Apache Storm. + + - Time.nanoTime() is introduced in Storm 1.1.0 so we changed to System.nanoTime() + -- Time.nanoTime() calls System.nanoTime() when it's not in time simulation mode. +*/ public class Timer { private final long delay; http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/metron-platform/metron-storm-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-storm-kafka/pom.xml b/metron-platform/metron-storm-kafka/pom.xml index 5ccc0d9..fdcde4f 100644 --- a/metron-platform/metron-storm-kafka/pom.xml +++ b/metron-platform/metron-storm-kafka/pom.xml @@ -36,11 +36,6 @@ <version>${project.parent.version}</version> </dependency> <dependency> - <groupId>org.apache.storm</groupId> - <artifactId>storm-kafka-client</artifactId> - <version>${global_storm_kafka_version}</version> - </dependency> - <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${global_kafka_version}</version> http://git-wip-us.apache.org/repos/asf/metron/blob/828ab713/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 75c9a31..a5689f5 100644 --- a/pom.xml +++ b/pom.xml @@ -91,7 +91,7 @@ <global_curator_version>2.7.1</global_curator_version> <global_classindex_version>3.3</global_classindex_version> <global_storm_version>1.0.3</global_storm_version> - <global_storm_kafka_version>1.1.0</global_storm_kafka_version> + <global_storm_kafka_version>1.2.2</global_storm_kafka_version> <global_flux_version>${base_flux_version}</global_flux_version> <global_pcap_version>1.7.1</global_pcap_version> <global_kafka_version>0.10.0.1</global_kafka_version> @@ -129,7 +129,7 @@ <properties> <hdp_version>2.5.0.0</hdp_version> <build_number>1245</build_number> - <global_storm_kafka_version>1.1.0</global_storm_kafka_version> + <global_storm_kafka_version>1.2.2</global_storm_kafka_version> <global_storm_version>${base_storm_version}.${hdp_version}-${build_number}</global_storm_version> <global_kafka_version>${base_kafka_version}.${hdp_version}-${build_number}</global_kafka_version> </properties>