Repository: storm Updated Branches: refs/heads/1.0.x-branch a81cb42f4 -> 60c07813f
sync storm-kafka-client to pre-kafka 0.10.0 state Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f423dbc5 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f423dbc5 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f423dbc5 Branch: refs/heads/1.0.x-branch Commit: f423dbc524400654c4692b6263f12a04d084e567 Parents: a81cb42 Author: P. Taylor Goetz <ptgo...@gmail.com> Authored: Thu Jul 21 16:37:19 2016 -0400 Committer: P. Taylor Goetz <ptgo...@gmail.com> Committed: Thu Jul 21 16:37:19 2016 -0400 ---------------------------------------------------------------------- external/storm-kafka-client/pom.xml | 2 +- .../apache/storm/kafka/spout/KafkaSpout.java | 43 +++++- .../storm/kafka/spout/KafkaSpoutConfig.java | 19 ++- .../storm/kafka/spout/KafkaSpoutStream.java | 61 +++++++- .../storm/kafka/spout/KafkaSpoutStreams.java | 133 +--------------- .../spout/KafkaSpoutStreamsNamedTopics.java | 154 +++++++++++++++++++ .../spout/KafkaSpoutStreamsWildcardTopics.java | 61 ++++++++ .../kafka/spout/KafkaSpoutTuplesBuilder.java | 54 +------ .../KafkaSpoutTuplesBuilderNamedTopics.java | 78 ++++++++++ .../KafkaSpoutTuplesBuilderWildcardTopics.java | 36 +++++ .../spout/test/KafkaSpoutTopologyMain.java | 133 ---------------- .../test/KafkaSpoutTopologyMainNamedTopics.java | 140 +++++++++++++++++ .../KafkaSpoutTopologyMainWildcardTopics.java | 62 ++++++++ 13 files changed, 654 insertions(+), 322 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml index 149be44..c91ddb8 100644 --- a/external/storm-kafka-client/pom.xml +++ b/external/storm-kafka-client/pom.xml @@ -22,7 +22,7 @@ <parent> <artifactId>storm</artifactId> <groupId>org.apache.storm</groupId> - <version>1.0.3-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index d211ae9..5a701d5 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -45,6 +45,7 @@ import java.util.NavigableSet; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST; @@ -343,7 +344,16 @@ public class KafkaSpout<K, V> extends BaseRichSpout { private void subscribeKafkaConsumer() { kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(), kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer()); - kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), new KafkaSpoutConsumerRebalanceListener()); + + if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) { + final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics(); + kafkaConsumer.subscribe(topics, new KafkaSpoutConsumerRebalanceListener()); + LOG.info("Kafka consumer subscribed topics {}", topics); + } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) { + final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) kafkaSpoutStreams).getTopicWildcardPattern(); + kafkaConsumer.subscribe(pattern, new KafkaSpoutConsumerRebalanceListener()); + LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern); + } // Initial poll to get the consumer registration process going. // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration kafkaConsumer.poll(0); @@ -381,6 +391,37 @@ public class KafkaSpout<K, V> extends BaseRichSpout { return "{acked=" + acked + "} "; } + @Override + public Map<String, Object> getComponentConfiguration () { + Map<String, Object> configuration = super.getComponentConfiguration(); + if (configuration == null) { + configuration = new HashMap<>(); + } + String configKeyPrefix = "config."; + + if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) { + configuration.put(configKeyPrefix + "topics", getNamedTopics()); + } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) { + configuration.put(configKeyPrefix + "topics", getWildCardTopics()); + } + + configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId()); + configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers")); + return configuration; + } + + private String getNamedTopics() { + StringBuilder topics = new StringBuilder(); + for (String topic: kafkaSpoutConfig.getSubscribedTopics()) { + topics.append(topic).append(","); + } + return topics.toString(); + } + + private String getWildCardTopics() { + return kafkaSpoutConfig.getTopicWildcardPattern().toString(); + } + // ======= Offsets Commit Management ========== private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> { http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java index 29cedb2..315e3e9 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java @@ -26,6 +26,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; /** * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics @@ -263,10 +264,23 @@ public class KafkaSpoutConfig<K, V> implements Serializable { } /** - * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream} + * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream}, + * or null if this stream is associated with a wildcard pattern topic */ public List<String> getSubscribedTopics() { - return new ArrayList<>(kafkaSpoutStreams.getTopics()); + return kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics ? + new ArrayList<>(((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics()) : + null; + } + + /** + * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null + * if this stream is associated with a specific named topic + */ + public Pattern getTopicWildcardPattern() { + return kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics ? + ((KafkaSpoutStreamsWildcardTopics)kafkaSpoutStreams).getTopicWildcardPattern() : + null; } public int getMaxTupleRetries() { @@ -300,6 +314,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable { ", keyDeserializer=" + keyDeserializer + ", valueDeserializer=" + valueDeserializer + ", topics=" + getSubscribedTopics() + + ", topicWildcardPattern=" + getTopicWildcardPattern() + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy + ", pollTimeoutMs=" + pollTimeoutMs + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs + http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java index 064a8bb..5375f6c 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java @@ -18,26 +18,35 @@ package org.apache.storm.kafka.spout; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.List; +import java.util.regex.Pattern; /** * Represents the stream and output fields used by a topic */ public class KafkaSpoutStream implements Serializable { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStream.class); + private final Fields outputFields; private final String streamId; private final String topic; + private Pattern topicWildcardPattern; /** Represents the specified outputFields and topic with the default stream */ - KafkaSpoutStream(Fields outputFields, String topic) { + public KafkaSpoutStream(Fields outputFields, String topic) { this(outputFields, Utils.DEFAULT_STREAM_ID, topic); } /** Represents the specified outputFields and topic with the specified stream */ - KafkaSpoutStream(Fields outputFields, String streamId, String topic) { + public KafkaSpoutStream(Fields outputFields, String streamId, String topic) { if (outputFields == null || streamId == null || topic == null) { throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " + "[outputFields=%s, streamId=%s, topic=%s]", outputFields, streamId, topic)); @@ -45,26 +54,68 @@ public class KafkaSpoutStream implements Serializable { this.outputFields = outputFields; this.streamId = streamId; this.topic = topic; + this.topicWildcardPattern = null; + } + + /** Represents the specified outputFields and topic wild card with the default stream */ + KafkaSpoutStream(Fields outputFields, Pattern topicWildcardPattern) { + this(outputFields, Utils.DEFAULT_STREAM_ID, topicWildcardPattern); } - Fields getOutputFields() { + /** Represents the specified outputFields and topic wild card with the specified stream */ + public KafkaSpoutStream(Fields outputFields, String streamId, Pattern topicWildcardPattern) { + + if (outputFields == null || streamId == null || topicWildcardPattern == null) { + throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " + + "[outputFields=%s, streamId=%s, topicWildcardPattern=%s]", outputFields, streamId, topicWildcardPattern)); + } + this.outputFields = outputFields; + this.streamId = streamId; + this.topic = null; + this.topicWildcardPattern = topicWildcardPattern; + } + + public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) { + collector.emit(streamId, tuple, messageId); + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + LOG.info("Declared [streamId = {}], [outputFields = {}] for [topic = {}]", streamId, outputFields, topic); + declarer.declareStream(streamId, outputFields); + } + + + public Fields getOutputFields() { return outputFields; } - String getStreamId() { + public String getStreamId() { return streamId; } - String getTopic() { + /** + * @return the topic associated with this {@link KafkaSpoutStream}, or null + * if this stream is associated with a wildcard pattern topic + */ + public String getTopic() { return topic; } + /** + * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null + * if this stream is associated with a specific named topic + */ + public Pattern getTopicWildcardPattern() { + return topicWildcardPattern; + } + @Override public String toString() { return "KafkaSpoutStream{" + "outputFields=" + outputFields + ", streamId='" + streamId + '\'' + ", topic='" + topic + '\'' + + ", topicWildcardPattern=" + topicWildcardPattern + '}'; } } http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java index dc5892e..6910d3c 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java @@ -20,139 +20,16 @@ package org.apache.storm.kafka.spout; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.OutputFieldsGetter; -import org.apache.storm.tuple.Fields; -import org.apache.storm.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; /** - * Represents the {@link KafkaSpoutStream} associated with each topic, and provides a public API to - * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified. + * Represents the {@link KafkaSpoutStream} associated with each topic or topic pattern (wildcard), and provides + * a public API to declare output streams and emmit tuples, on the appropriate stream, for all the topics specified. */ -public class KafkaSpoutStreams implements Serializable { - protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStreams.class); +public interface KafkaSpoutStreams extends Serializable { + void declareOutputFields(OutputFieldsDeclarer declarer); - private final Map<String, KafkaSpoutStream> topicToStream; - - private KafkaSpoutStreams(Builder builder) { - this.topicToStream = builder.topicToStream; - LOG.debug("Built {}", this); - } - - /** - * @param topic the topic for which to get output fields - * @return the declared output fields - */ - public Fields getOutputFields(String topic) { - if (topicToStream.containsKey(topic)) { - final Fields outputFields = topicToStream.get(topic).getOutputFields(); - LOG.trace("Topic [{}] has output fields [{}]", topic, outputFields); - return outputFields; - } - throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); - } - - /** - * @param topic the topic to for which to get the stream id - * @return the id of the stream to where the tuples are emitted - */ - public String getStreamId(String topic) { - if (topicToStream.containsKey(topic)) { - final String streamId = topicToStream.get(topic).getStreamId(); - LOG.trace("Topic [{}] emitting in stream [{}]", topic, streamId); - return streamId; - } - throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); - } - - /** - * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream} - */ - public List<String> getTopics() { - return new ArrayList<>(topicToStream.keySet()); - } - - public void declareOutputFields(OutputFieldsDeclarer declarer) { - for (KafkaSpoutStream stream : topicToStream.values()) { - if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) { - declarer.declareStream(stream.getStreamId(), stream.getOutputFields()); - LOG.debug("Declared " + stream); - } - } - } - - public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) { - collector.emit(getStreamId(messageId.topic()), tuple, messageId); - } - - @Override - public String toString() { - return "KafkaSpoutStreams{" + - "topicToStream=" + topicToStream + - '}'; - } - - public static class Builder { - private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();; - - /** - * Creates a {@link KafkaSpoutStream} with the given output Fields for each topic specified. - * All topics will have the same stream id and output fields. - */ - public Builder(Fields outputFields, String... topics) { - addStream(outputFields, topics); - } - - /** - * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified. - * All the topics will have the same stream id and output fields. - */ - public Builder (Fields outputFields, String streamId, String... topics) { - addStream(outputFields, streamId, topics); - } - - /** - * Adds this stream to the state representing the streams associated with each topic - */ - public Builder(KafkaSpoutStream stream) { - addStream(stream); - } - - /** - * Adds this stream to the state representing the streams associated with each topic - */ - public Builder addStream(KafkaSpoutStream stream) { - topicToStream.put(stream.getTopic(), stream); - return this; - } - - /** - * Please refer to javadoc in {@link #Builder(Fields, String...)} - */ - public Builder addStream(Fields outputFields, String... topics) { - addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics); - return this; - } - - /** - * Please refer to javadoc in {@link #Builder(Fields, String, String...)} - */ - public Builder addStream(Fields outputFields, String streamId, String... topics) { - for (String topic : topics) { - topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic)); - } - return this; - } - - public KafkaSpoutStreams build() { - return new KafkaSpoutStreams(this); - } - } + void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId); } http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java new file mode 100644 index 0000000..c230f09 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.OutputFieldsGetter; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Represents the {@link KafkaSpoutStream} associated with each topic, and provides a public API to + * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified. + */ +public class KafkaSpoutStreamsNamedTopics implements KafkaSpoutStreams { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStreamsNamedTopics.class); + + private final Map<String, KafkaSpoutStream> topicToStream; + + private KafkaSpoutStreamsNamedTopics(Builder builder) { + this.topicToStream = builder.topicToStream; + LOG.debug("Built {}", this); + } + + /** + * @param topic the topic for which to get output fields + * @return the declared output fields + */ + public Fields getOutputFields(String topic) { + if (topicToStream.containsKey(topic)) { + final Fields outputFields = topicToStream.get(topic).getOutputFields(); + LOG.trace("Topic [{}] has output fields [{}]", topic, outputFields); + return outputFields; + } + throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); + } + + /** + * @param topic the topic to for which to get the stream id + * @return the id of the stream to where the tuples are emitted + */ + public KafkaSpoutStream getStream(String topic) { + if (topicToStream.containsKey(topic)) { + return topicToStream.get(topic); + } + throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic); + } + + /** + * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream} + */ + public List<String> getTopics() { + return new ArrayList<>(topicToStream.keySet()); + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + for (KafkaSpoutStream stream : topicToStream.values()) { + if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) { + stream.declareOutputFields(declarer); + } + } + } + + public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) { + getStream(messageId.topic()).emit(collector, tuple, messageId); + } + + @Override + public String toString() { + return "KafkaSpoutStreamsNamedTopics{" + + "topicToStream=" + topicToStream + + '}'; + } + + public static class Builder { + private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();; + + /** + * Creates a {@link KafkaSpoutStream} with the given output Fields for each topic specified. + * All topics will have the default stream id and the same output fields. + */ + public Builder(Fields outputFields, String... topics) { + addStream(outputFields, topics); + } + + /** + * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified. + * All the topics will have the specified stream id and the same output fields. + */ + public Builder (Fields outputFields, String streamId, String... topics) { + addStream(outputFields, streamId, topics); + } + + /** + * Adds this stream to the state representing the streams associated with each topic + */ + public Builder(KafkaSpoutStream stream) { + addStream(stream); + } + + /** + * Adds this stream to the state representing the streams associated with each topic + */ + public Builder addStream(KafkaSpoutStream stream) { + topicToStream.put(stream.getTopic(), stream); + return this; + } + + /** + * Please refer to javadoc in {@link #Builder(Fields, String...)} + */ + public Builder addStream(Fields outputFields, String... topics) { + addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics); + return this; + } + + /** + * Please refer to javadoc in {@link #Builder(Fields, String, String...)} + */ + public Builder addStream(Fields outputFields, String streamId, String... topics) { + for (String topic : topics) { + topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic)); + } + return this; + } + + public KafkaSpoutStreamsNamedTopics build() { + return new KafkaSpoutStreamsNamedTopics(this); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java new file mode 100644 index 0000000..5c3bd47 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; + +import java.util.List; +import java.util.regex.Pattern; + +public class KafkaSpoutStreamsWildcardTopics implements KafkaSpoutStreams { + private KafkaSpoutStream kafkaSpoutStream; + + public KafkaSpoutStreamsWildcardTopics(KafkaSpoutStream kafkaSpoutStream) { + this.kafkaSpoutStream = kafkaSpoutStream; + if (kafkaSpoutStream.getTopicWildcardPattern() == null) { + throw new IllegalStateException("KafkaSpoutStream must be configured for wildcard topic"); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + kafkaSpoutStream.declareOutputFields(declarer); + } + + @Override + public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) { + kafkaSpoutStream.emit(collector, tuple, messageId); + } + + public KafkaSpoutStream getStream() { + return kafkaSpoutStream; + } + + public Pattern getTopicWildcardPattern() { + return kafkaSpoutStream.getTopicWildcardPattern(); + } + + @Override + public String toString() { + return "KafkaSpoutStreamsWildcardTopics{" + + "kafkaSpoutStream=" + kafkaSpoutStream + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java index d67c69d..2ba0a79 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java @@ -19,64 +19,14 @@ package org.apache.storm.kafka.spout; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.Serializable; -import java.util.Arrays; -import java.util.HashMap; import java.util.List; -import java.util.Map; /** * {@link KafkaSpoutTuplesBuilder} wraps all the logic that builds tuples from {@link ConsumerRecord}s. * The logic is provided by the user by implementing the appropriate number of {@link KafkaSpoutTupleBuilder} instances */ -public class KafkaSpoutTuplesBuilder<K,V> implements Serializable { - private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTuplesBuilder.class); - - private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders; - - private KafkaSpoutTuplesBuilder(Builder<K,V> builder) { - this.topicToTupleBuilders = builder.topicToTupleBuilders; - LOG.debug("Instantiated {}", this); - } - - public static class Builder<K,V> { - private List<KafkaSpoutTupleBuilder<K, V>> tupleBuilders; - private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders; - - @SafeVarargs - public Builder(KafkaSpoutTupleBuilder<K,V>... tupleBuilders) { - if (tupleBuilders == null || tupleBuilders.length == 0) { - throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams"); - } - - this.tupleBuilders = Arrays.asList(tupleBuilders); - topicToTupleBuilders = new HashMap<>(); - } - - public KafkaSpoutTuplesBuilder<K,V> build() { - for (KafkaSpoutTupleBuilder<K, V> tupleBuilder : tupleBuilders) { - for (String topic : tupleBuilder.getTopics()) { - if (!topicToTupleBuilders.containsKey(topic)) { - topicToTupleBuilders.put(topic, tupleBuilder); - } - } - } - return new KafkaSpoutTuplesBuilder<>(this); - } - } - - public List<Object>buildTuple(ConsumerRecord<K,V> consumerRecord) { - final String topic = consumerRecord.topic(); - return topicToTupleBuilders.get(topic).buildTuple(consumerRecord); - } - - @Override - public String toString() { - return "KafkaSpoutTuplesBuilder{" + - "topicToTupleBuilders=" + topicToTupleBuilders + - '}'; - } +public interface KafkaSpoutTuplesBuilder<K,V> extends Serializable { + List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord); } http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java new file mode 100644 index 0000000..80fe543 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class KafkaSpoutTuplesBuilderNamedTopics<K,V> implements KafkaSpoutTuplesBuilder<K,V> { + private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTuplesBuilderNamedTopics.class); + + private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders; + + private KafkaSpoutTuplesBuilderNamedTopics(Builder<K,V> builder) { + this.topicToTupleBuilders = builder.topicToTupleBuilders; + LOG.debug("Instantiated {}", this); + } + + public static class Builder<K,V> { + private List<KafkaSpoutTupleBuilder<K, V>> tupleBuilders; + private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders; + + @SafeVarargs + public Builder(KafkaSpoutTupleBuilder<K,V>... tupleBuilders) { + if (tupleBuilders == null || tupleBuilders.length == 0) { + throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams"); + } + + this.tupleBuilders = Arrays.asList(tupleBuilders); + topicToTupleBuilders = new HashMap<>(); + } + + public KafkaSpoutTuplesBuilderNamedTopics<K,V> build() { + for (KafkaSpoutTupleBuilder<K, V> tupleBuilder : tupleBuilders) { + for (String topic : tupleBuilder.getTopics()) { + if (!topicToTupleBuilders.containsKey(topic)) { + topicToTupleBuilders.put(topic, tupleBuilder); + } + } + } + return new KafkaSpoutTuplesBuilderNamedTopics<>(this); + } + } + + public List<Object>buildTuple(ConsumerRecord<K,V> consumerRecord) { + final String topic = consumerRecord.topic(); + return topicToTupleBuilders.get(topic).buildTuple(consumerRecord); + } + + @Override + public String toString() { + return "KafkaSpoutTuplesBuilderNamedTopics {" + + "topicToTupleBuilders=" + topicToTupleBuilders + + '}'; + } + +} http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java new file mode 100644 index 0000000..85d4809 --- /dev/null +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import java.util.List; + +public class KafkaSpoutTuplesBuilderWildcardTopics<K,V> implements KafkaSpoutTuplesBuilder<K,V> { + private KafkaSpoutTupleBuilder<K, V> tupleBuilder; + + public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder<K, V> tupleBuilder) { + this.tupleBuilder = tupleBuilder; + } + + @Override + public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) { + return tupleBuilder.buildTuple(consumerRecord); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java deleted file mode 100644 index 0691dd3..0000000 --- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm.kafka.spout.test; - -import org.apache.storm.Config; -import org.apache.storm.LocalCluster; -import org.apache.storm.StormSubmitter; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.kafka.spout.KafkaSpout; -import org.apache.storm.kafka.spout.KafkaSpoutConfig; -import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; -import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; -import org.apache.storm.kafka.spout.KafkaSpoutRetryService; -import org.apache.storm.kafka.spout.KafkaSpoutStreams; -import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.tuple.Fields; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.TimeUnit; - -import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; - -public class KafkaSpoutTopologyMain { - private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"}; - private static final String[] TOPICS = new String[]{"test","test1","test2"}; - - - public static void main(String[] args) throws Exception { - if (args.length == 0) { - submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig()); - } else { - submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig()); - } - } - - protected static void submitTopologyLocalCluster(StormTopology topology, Config config) throws InterruptedException { - LocalCluster cluster = new LocalCluster(); - cluster.submitTopology("test", config, topology); - stopWaitingForInput(); - } - - protected static void submitTopologyRemoteCluster(String arg, StormTopology topology, Config config) throws Exception { - StormSubmitter.submitTopology(arg, config, topology); - } - - private static void stopWaitingForInput() { - try { - System.out.println("PRESS ENTER TO STOP"); - new BufferedReader(new InputStreamReader(System.in)).readLine(); - System.exit(0); - } catch (IOException e) { - e.printStackTrace(); - } - } - - protected static Config getConfig() { - Config config = new Config(); - config.setDebug(true); - return config; - } - - public static StormTopology getTopolgyKafkaSpout() { - final TopologyBuilder tp = new TopologyBuilder(); - tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1); - tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]); - tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]); - return tp.createTopology(); - } - - public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) { - return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService()) - .setOffsetCommitPeriodMs(10_000) - .setFirstPollOffsetStrategy(EARLIEST) - .setMaxUncommittedOffsets(250) - .build(); - } - - private static KafkaSpoutRetryService getRetryService() { - return new KafkaSpoutRetryExponentialBackoff(getTimeInterval(500, TimeUnit.MICROSECONDS), - TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); - } - - private static TimeInterval getTimeInterval(long delay, TimeUnit timeUnit) { - return new TimeInterval(delay, timeUnit); - } - - public static Map<String,Object> getKafkaConsumerProps() { - Map<String, Object> props = new HashMap<>(); -// props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true"); - props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092"); - props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup"); - props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); - return props; - } - - public static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() { - return new KafkaSpoutTuplesBuilder.Builder<>( - new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]), - new TopicTest2TupleBuilder<String, String>(TOPICS[2])) - .build(); - } - - public static KafkaSpoutStreams getKafkaSpoutStreams() { - final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value"); - final Fields outputFields1 = new Fields("topic", "partition", "offset"); - return new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent to test_stream - .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents of topic test2 sent to test_stream - .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents of topic test2 sent to test2_stream - .build(); - } -} http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java new file mode 100644 index 0000000..952c5d3 --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.test; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.spout.KafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpoutConfig; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff; +import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval; +import org.apache.storm.kafka.spout.KafkaSpoutRetryService; +import org.apache.storm.kafka.spout.KafkaSpoutStreams; +import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics; +import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder; +import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST; + +public class KafkaSpoutTopologyMainNamedTopics { + private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"}; + private static final String[] TOPICS = new String[]{"test","test1","test2"}; + + + public static void main(String[] args) throws Exception { + new KafkaSpoutTopologyMainNamedTopics().runMain(args); + } + + protected void runMain(String[] args) throws Exception { + if (args.length == 0) { + submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig()); + } else { + submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig()); + } + + } + + protected void submitTopologyLocalCluster(StormTopology topology, Config config) throws InterruptedException { + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("test", config, topology); + stopWaitingForInput(); + } + + protected void submitTopologyRemoteCluster(String arg, StormTopology topology, Config config) throws Exception { + StormSubmitter.submitTopology(arg, config, topology); + } + + protected void stopWaitingForInput() { + try { + System.out.println("PRESS ENTER TO STOP"); + new BufferedReader(new InputStreamReader(System.in)).readLine(); + System.exit(0); + } catch (IOException e) { + e.printStackTrace(); + } + } + + protected Config getConfig() { + Config config = new Config(); + config.setDebug(true); + return config; + } + + protected StormTopology getTopolgyKafkaSpout() { + final TopologyBuilder tp = new TopologyBuilder(); + tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1); + tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]); + tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]); + return tp.createTopology(); + } + + protected KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) { + return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService()) + .setOffsetCommitPeriodMs(10_000) + .setFirstPollOffsetStrategy(EARLIEST) + .setMaxUncommittedOffsets(250) + .build(); + } + + protected KafkaSpoutRetryService getRetryService() { + return new KafkaSpoutRetryExponentialBackoff(getTimeInterval(500, TimeUnit.MICROSECONDS), + TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10)); + } + + protected TimeInterval getTimeInterval(long delay, TimeUnit timeUnit) { + return new TimeInterval(delay, timeUnit); + } + + protected Map<String,Object> getKafkaConsumerProps() { + Map<String, Object> props = new HashMap<>(); +// props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true"); + props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092"); + props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup"); + props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer"); + return props; + } + + protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() { + return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>( + new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]), + new TopicTest2TupleBuilder<String, String>(TOPICS[2])) + .build(); + } + + protected KafkaSpoutStreams getKafkaSpoutStreams() { + final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value"); + final Fields outputFields1 = new Fields("topic", "partition", "offset"); + return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]}) // contents of topics test, test1, sent to test_stream + .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]}) // contents of topic test2 sent to test_stream + .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) // contents of topic test2 sent to test2_stream + .build(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java new file mode 100644 index 0000000..c362a2b --- /dev/null +++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.kafka.spout.test; + +import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.spout.KafkaSpout; +import org.apache.storm.kafka.spout.KafkaSpoutStream; +import org.apache.storm.kafka.spout.KafkaSpoutStreams; +import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics; +import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder; +import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder; +import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderWildcardTopics; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; + +import java.util.regex.Pattern; + +public class KafkaSpoutTopologyMainWildcardTopics extends KafkaSpoutTopologyMainNamedTopics { + private static final String STREAM = "test_wildcard_stream"; + private static final String TOPIC_WILDCARD_PATTERN = "test[1|2]"; + + public static void main(String[] args) throws Exception { + new KafkaSpoutTopologyMainWildcardTopics().runMain(args); + } + + protected StormTopology getTopolgyKafkaSpout() { + final TopologyBuilder tp = new TopologyBuilder(); + tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1); + tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM); + return tp.createTopology(); + } + + protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() { + return new KafkaSpoutTuplesBuilderWildcardTopics<>(getTupleBuilder()); + } + + protected KafkaSpoutTupleBuilder<String, String> getTupleBuilder() { + return new TopicsTest0Test1TupleBuilder<>(TOPIC_WILDCARD_PATTERN); + } + + protected KafkaSpoutStreams getKafkaSpoutStreams() { + final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value"); + final KafkaSpoutStream kafkaSpoutStream = new KafkaSpoutStream(outputFields, STREAM, Pattern.compile(TOPIC_WILDCARD_PATTERN)); + return new KafkaSpoutStreamsWildcardTopics(kafkaSpoutStream); + } +}