Repository: storm
Updated Branches:
  refs/heads/1.0.x-branch a81cb42f4 -> 60c07813f


sync storm-kafka-client to pre-kafka 0.10.0 state


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f423dbc5
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f423dbc5
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f423dbc5

Branch: refs/heads/1.0.x-branch
Commit: f423dbc524400654c4692b6263f12a04d084e567
Parents: a81cb42
Author: P. Taylor Goetz <ptgo...@gmail.com>
Authored: Thu Jul 21 16:37:19 2016 -0400
Committer: P. Taylor Goetz <ptgo...@gmail.com>
Committed: Thu Jul 21 16:37:19 2016 -0400

----------------------------------------------------------------------
 external/storm-kafka-client/pom.xml             |   2 +-
 .../apache/storm/kafka/spout/KafkaSpout.java    |  43 +++++-
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  19 ++-
 .../storm/kafka/spout/KafkaSpoutStream.java     |  61 +++++++-
 .../storm/kafka/spout/KafkaSpoutStreams.java    | 133 +---------------
 .../spout/KafkaSpoutStreamsNamedTopics.java     | 154 +++++++++++++++++++
 .../spout/KafkaSpoutStreamsWildcardTopics.java  |  61 ++++++++
 .../kafka/spout/KafkaSpoutTuplesBuilder.java    |  54 +------
 .../KafkaSpoutTuplesBuilderNamedTopics.java     |  78 ++++++++++
 .../KafkaSpoutTuplesBuilderWildcardTopics.java  |  36 +++++
 .../spout/test/KafkaSpoutTopologyMain.java      | 133 ----------------
 .../test/KafkaSpoutTopologyMainNamedTopics.java | 140 +++++++++++++++++
 .../KafkaSpoutTopologyMainWildcardTopics.java   |  62 ++++++++
 13 files changed, 654 insertions(+), 322 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml 
b/external/storm-kafka-client/pom.xml
index 149be44..c91ddb8 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>1.0.3-SNAPSHOT</version>
+        <version>1.1.0-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index d211ae9..5a701d5 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -45,6 +45,7 @@ import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
 import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
@@ -343,7 +344,16 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private void subscribeKafkaConsumer() {
         kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
                 kafkaSpoutConfig.getKeyDeserializer(), 
kafkaSpoutConfig.getValueDeserializer());
-        kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), new 
KafkaSpoutConsumerRebalanceListener());
+
+        if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
+            final List<String> topics = ((KafkaSpoutStreamsNamedTopics) 
kafkaSpoutStreams).getTopics();
+            kafkaConsumer.subscribe(topics, new 
KafkaSpoutConsumerRebalanceListener());
+            LOG.info("Kafka consumer subscribed topics {}", topics);
+        } else if (kafkaSpoutStreams instanceof 
KafkaSpoutStreamsWildcardTopics) {
+            final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) 
kafkaSpoutStreams).getTopicWildcardPattern();
+            kafkaConsumer.subscribe(pattern, new 
KafkaSpoutConsumerRebalanceListener());
+            LOG.info("Kafka consumer subscribed topics matching wildcard 
pattern [{}]", pattern);
+        }
         // Initial poll to get the consumer registration process going.
         // KafkaSpoutConsumerRebalanceListener will be called following this 
poll, upon partition registration
         kafkaConsumer.poll(0);
@@ -381,6 +391,37 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return "{acked=" + acked + "} ";
     }
 
+    @Override
+    public Map<String, Object> getComponentConfiguration () {
+        Map<String, Object> configuration = super.getComponentConfiguration();
+        if (configuration == null) {
+            configuration = new HashMap<>();
+        }
+        String configKeyPrefix = "config.";
+
+        if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
+            configuration.put(configKeyPrefix + "topics", getNamedTopics());
+        } else if (kafkaSpoutStreams instanceof 
KafkaSpoutStreamsWildcardTopics) {
+            configuration.put(configKeyPrefix + "topics", getWildCardTopics());
+        }
+
+        configuration.put(configKeyPrefix + "groupid", 
kafkaSpoutConfig.getConsumerGroupId());
+        configuration.put(configKeyPrefix + "bootstrap.servers", 
kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
+        return configuration;
+    }
+
+    private String getNamedTopics() {
+        StringBuilder topics = new StringBuilder();
+        for (String topic: kafkaSpoutConfig.getSubscribedTopics()) {
+            topics.append(topic).append(",");
+        }
+        return topics.toString();
+    }
+
+    private String getWildCardTopics() {
+        return kafkaSpoutConfig.getTopicWildcardPattern().toString();
+    }
+
     // ======= Offsets Commit Management ==========
 
     private static class OffsetComparator implements 
Comparator<KafkaSpoutMessageId> {

http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 29cedb2..315e3e9 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -26,6 +26,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 /**
  * KafkaSpoutConfig defines the required configuration to connect a consumer 
to a consumer group, as well as the subscribing topics
@@ -263,10 +264,23 @@ public class KafkaSpoutConfig<K, V> implements 
Serializable {
     }
 
     /**
-     * @return list of topics subscribed and emitting tuples to a stream as 
configured by {@link KafkaSpoutStream}
+     * @return list of topics subscribed and emitting tuples to a stream as 
configured by {@link KafkaSpoutStream},
+     * or null if this stream is associated with a wildcard pattern topic
      */
     public List<String> getSubscribedTopics() {
-        return new ArrayList<>(kafkaSpoutStreams.getTopics());
+        return kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics ?
+            new ArrayList<>(((KafkaSpoutStreamsNamedTopics) 
kafkaSpoutStreams).getTopics()) :
+            null;
+    }
+
+    /**
+     * @return the wildcard pattern topic associated with this {@link 
KafkaSpoutStream}, or null
+     * if this stream is associated with a specific named topic
+     */
+    public Pattern getTopicWildcardPattern() {
+        return kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics ?
+                
((KafkaSpoutStreamsWildcardTopics)kafkaSpoutStreams).getTopicWildcardPattern() :
+                null;
     }
 
     public int getMaxTupleRetries() {
@@ -300,6 +314,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable 
{
                 ", keyDeserializer=" + keyDeserializer +
                 ", valueDeserializer=" + valueDeserializer +
                 ", topics=" + getSubscribedTopics() +
+                ", topicWildcardPattern=" + getTopicWildcardPattern() +
                 ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
                 ", pollTimeoutMs=" + pollTimeoutMs +
                 ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +

http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
index 064a8bb..5375f6c 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
@@ -18,26 +18,35 @@
 
 package org.apache.storm.kafka.spout;
 
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.List;
+import java.util.regex.Pattern;
 
 /**
  * Represents the stream and output fields used by a topic
  */
 public class KafkaSpoutStream implements Serializable {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpoutStream.class);
+
     private final Fields outputFields;
     private final String streamId;
     private final String topic;
+    private Pattern topicWildcardPattern;
 
     /** Represents the specified outputFields and topic with the default 
stream */
-    KafkaSpoutStream(Fields outputFields, String topic) {
+    public KafkaSpoutStream(Fields outputFields, String topic) {
         this(outputFields, Utils.DEFAULT_STREAM_ID, topic);
     }
 
     /** Represents the specified outputFields and topic with the specified 
stream */
-    KafkaSpoutStream(Fields outputFields, String streamId, String topic) {
+    public KafkaSpoutStream(Fields outputFields, String streamId, String 
topic) {
         if (outputFields == null || streamId == null || topic == null) {
             throw new IllegalArgumentException(String.format("Constructor 
parameters cannot be null. " +
                     "[outputFields=%s, streamId=%s, topic=%s]", outputFields, 
streamId, topic));
@@ -45,26 +54,68 @@ public class KafkaSpoutStream implements Serializable {
         this.outputFields = outputFields;
         this.streamId = streamId;
         this.topic = topic;
+        this.topicWildcardPattern = null;
+    }
+
+    /** Represents the specified outputFields and topic wild card with the 
default stream */
+    KafkaSpoutStream(Fields outputFields, Pattern topicWildcardPattern) {
+        this(outputFields, Utils.DEFAULT_STREAM_ID, topicWildcardPattern);
     }
 
-    Fields getOutputFields() {
+    /** Represents the specified outputFields and topic wild card with the 
specified stream */
+    public KafkaSpoutStream(Fields outputFields, String streamId, Pattern 
topicWildcardPattern) {
+
+        if (outputFields == null || streamId == null || topicWildcardPattern 
== null) {
+            throw new IllegalArgumentException(String.format("Constructor 
parameters cannot be null. " +
+                    "[outputFields=%s, streamId=%s, topicWildcardPattern=%s]", 
outputFields, streamId, topicWildcardPattern));
+        }
+        this.outputFields = outputFields;
+        this.streamId = streamId;
+        this.topic = null;
+        this.topicWildcardPattern = topicWildcardPattern;
+    }
+
+    public void emit(SpoutOutputCollector collector, List<Object> tuple, 
KafkaSpoutMessageId messageId) {
+        collector.emit(streamId, tuple, messageId);
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        LOG.info("Declared [streamId = {}], [outputFields = {}] for [topic = 
{}]", streamId, outputFields, topic);
+        declarer.declareStream(streamId, outputFields);
+    }
+
+
+    public Fields getOutputFields() {
         return outputFields;
     }
 
-    String getStreamId() {
+    public String getStreamId() {
         return streamId;
     }
 
-    String getTopic() {
+    /**
+     * @return the topic associated with this {@link KafkaSpoutStream}, or null
+     * if this stream is associated with a wildcard pattern topic
+     */
+    public String getTopic() {
         return topic;
     }
 
+    /**
+     * @return the wildcard pattern topic associated with this {@link 
KafkaSpoutStream}, or null
+     * if this stream is associated with a specific named topic
+     */
+    public Pattern getTopicWildcardPattern() {
+        return topicWildcardPattern;
+    }
+
     @Override
     public String toString() {
         return "KafkaSpoutStream{" +
                 "outputFields=" + outputFields +
                 ", streamId='" + streamId + '\'' +
                 ", topic='" + topic + '\'' +
+                ", topicWildcardPattern=" + topicWildcardPattern +
                 '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
index dc5892e..6910d3c 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
@@ -20,139 +20,16 @@ package org.apache.storm.kafka.spout;
 
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.OutputFieldsGetter;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
- * Represents the {@link KafkaSpoutStream} associated with each topic, and 
provides a public API to
- * declare output streams and emmit tuples, on the appropriate stream, for all 
the topics specified.
+ * Represents the {@link KafkaSpoutStream} associated with each topic or topic 
pattern (wildcard), and provides
+ * a public API to declare output streams and emmit tuples, on the appropriate 
stream, for all the topics specified.
  */
-public class KafkaSpoutStreams implements Serializable {
-    protected static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpoutStreams.class);
+public interface KafkaSpoutStreams extends Serializable {
+    void declareOutputFields(OutputFieldsDeclarer declarer);
 
-    private final Map<String, KafkaSpoutStream> topicToStream;
-
-    private KafkaSpoutStreams(Builder builder) {
-        this.topicToStream = builder.topicToStream;
-        LOG.debug("Built {}", this);
-    }
-
-    /**
-     * @param topic the topic for which to get output fields
-     * @return the declared output fields
-     */
-    public Fields getOutputFields(String topic) {
-        if (topicToStream.containsKey(topic)) {
-            final Fields outputFields = 
topicToStream.get(topic).getOutputFields();
-            LOG.trace("Topic [{}] has output fields [{}]", topic, 
outputFields);
-            return outputFields;
-        }
-        throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
-    }
-
-    /**
-     * @param topic the topic to for which to get the stream id
-     * @return the id of the stream to where the tuples are emitted
-     */
-    public String getStreamId(String topic) {
-        if (topicToStream.containsKey(topic)) {
-            final String streamId = topicToStream.get(topic).getStreamId();
-            LOG.trace("Topic [{}] emitting in stream [{}]", topic, streamId);
-            return streamId;
-        }
-        throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
-    }
-
-    /**
-     * @return list of topics subscribed and emitting tuples to a stream as 
configured by {@link KafkaSpoutStream}
-     */
-    public List<String> getTopics() {
-        return new ArrayList<>(topicToStream.keySet());
-    }
-
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for (KafkaSpoutStream stream : topicToStream.values()) {
-            if 
(!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId()))
 {
-                declarer.declareStream(stream.getStreamId(), 
stream.getOutputFields());
-                LOG.debug("Declared " + stream);
-            }
-        }
-    }
-
-    public void emit(SpoutOutputCollector collector, List<Object> tuple, 
KafkaSpoutMessageId messageId) {
-        collector.emit(getStreamId(messageId.topic()), tuple, messageId);
-    }
-
-    @Override
-    public String toString() {
-        return "KafkaSpoutStreams{" +
-                "topicToStream=" + topicToStream +
-                '}';
-    }
-
-    public static class Builder {
-        private final Map<String, KafkaSpoutStream> topicToStream = new 
HashMap<>();;
-
-        /**
-         * Creates a {@link KafkaSpoutStream} with the given output Fields for 
each topic specified.
-         * All topics will have the same stream id and output fields.
-         */
-        public Builder(Fields outputFields, String... topics) {
-            addStream(outputFields, topics);
-        }
-
-        /**
-         * Creates a {@link KafkaSpoutStream} with this particular stream for 
each topic specified.
-         * All the topics will have the same stream id and output fields.
-         */
-        public Builder (Fields outputFields, String streamId, String... 
topics) {
-            addStream(outputFields, streamId, topics);
-        }
-
-        /**
-         * Adds this stream to the state representing the streams associated 
with each topic
-         */
-        public Builder(KafkaSpoutStream stream) {
-            addStream(stream);
-        }
-
-        /**
-         * Adds this stream to the state representing the streams associated 
with each topic
-         */
-        public Builder addStream(KafkaSpoutStream stream) {
-            topicToStream.put(stream.getTopic(), stream);
-            return this;
-        }
-
-        /**
-         * Please refer to javadoc in {@link #Builder(Fields, String...)}
-         */
-        public Builder addStream(Fields outputFields, String... topics) {
-            addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics);
-            return this;
-        }
-
-        /**
-         * Please refer to javadoc in {@link #Builder(Fields, String, 
String...)}
-         */
-        public Builder addStream(Fields outputFields, String streamId, 
String... topics) {
-            for (String topic : topics) {
-                topicToStream.put(topic, new KafkaSpoutStream(outputFields, 
streamId, topic));
-            }
-            return this;
-        }
-
-        public KafkaSpoutStreams build() {
-            return new KafkaSpoutStreams(this);
-        }
-    }
+    void emit(SpoutOutputCollector collector, List<Object> tuple, 
KafkaSpoutMessageId messageId);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java
new file mode 100644
index 0000000..c230f09
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.OutputFieldsGetter;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the {@link KafkaSpoutStream} associated with each topic, and 
provides a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for all 
the topics specified.
+ */
+public class KafkaSpoutStreamsNamedTopics implements KafkaSpoutStreams {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpoutStreamsNamedTopics.class);
+
+    private final Map<String, KafkaSpoutStream> topicToStream;
+
+    private KafkaSpoutStreamsNamedTopics(Builder builder) {
+        this.topicToStream = builder.topicToStream;
+        LOG.debug("Built {}", this);
+    }
+
+    /**
+     * @param topic the topic for which to get output fields
+     * @return the declared output fields
+     */
+    public Fields getOutputFields(String topic) {
+        if (topicToStream.containsKey(topic)) {
+            final Fields outputFields = 
topicToStream.get(topic).getOutputFields();
+            LOG.trace("Topic [{}] has output fields [{}]", topic, 
outputFields);
+            return outputFields;
+        }
+        throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+    }
+
+    /**
+     * @param topic the topic to for which to get the stream id
+     * @return the id of the stream to where the tuples are emitted
+     */
+    public KafkaSpoutStream getStream(String topic) {
+        if (topicToStream.containsKey(topic)) {
+            return topicToStream.get(topic);
+        }
+        throw new IllegalStateException(this.getClass().getName() + " not 
configured for topic: " + topic);
+    }
+
+    /**
+     * @return list of topics subscribed and emitting tuples to a stream as 
configured by {@link KafkaSpoutStream}
+     */
+    public List<String> getTopics() {
+        return new ArrayList<>(topicToStream.keySet());
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for (KafkaSpoutStream stream : topicToStream.values()) {
+            if 
(!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId()))
 {
+                stream.declareOutputFields(declarer);
+            }
+        }
+    }
+
+    public void emit(SpoutOutputCollector collector, List<Object> tuple, 
KafkaSpoutMessageId messageId) {
+        getStream(messageId.topic()).emit(collector, tuple, messageId);
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaSpoutStreamsNamedTopics{" +
+                "topicToStream=" + topicToStream +
+                '}';
+    }
+
+    public static class Builder {
+        private final Map<String, KafkaSpoutStream> topicToStream = new 
HashMap<>();;
+
+        /**
+         * Creates a {@link KafkaSpoutStream} with the given output Fields for 
each topic specified.
+         * All topics will have the default stream id and the same output 
fields.
+         */
+        public Builder(Fields outputFields, String... topics) {
+            addStream(outputFields, topics);
+        }
+
+        /**
+         * Creates a {@link KafkaSpoutStream} with this particular stream for 
each topic specified.
+         * All the topics will have the specified stream id and the same 
output fields.
+         */
+        public Builder (Fields outputFields, String streamId, String... 
topics) {
+            addStream(outputFields, streamId, topics);
+        }
+
+        /**
+         * Adds this stream to the state representing the streams associated 
with each topic
+         */
+        public Builder(KafkaSpoutStream stream) {
+            addStream(stream);
+        }
+
+        /**
+         * Adds this stream to the state representing the streams associated 
with each topic
+         */
+        public Builder addStream(KafkaSpoutStream stream) {
+            topicToStream.put(stream.getTopic(), stream);
+            return this;
+        }
+
+        /**
+         * Please refer to javadoc in {@link #Builder(Fields, String...)}
+         */
+        public Builder addStream(Fields outputFields, String... topics) {
+            addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics);
+            return this;
+        }
+
+        /**
+         * Please refer to javadoc in {@link #Builder(Fields, String, 
String...)}
+         */
+        public Builder addStream(Fields outputFields, String streamId, 
String... topics) {
+            for (String topic : topics) {
+                topicToStream.put(topic, new KafkaSpoutStream(outputFields, 
streamId, topic));
+            }
+            return this;
+        }
+
+        public KafkaSpoutStreamsNamedTopics build() {
+            return new KafkaSpoutStreamsNamedTopics(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java
new file mode 100644
index 0000000..5c3bd47
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+
+import java.util.List;
+import java.util.regex.Pattern;
+
+public class KafkaSpoutStreamsWildcardTopics implements KafkaSpoutStreams {
+    private KafkaSpoutStream kafkaSpoutStream;
+
+    public KafkaSpoutStreamsWildcardTopics(KafkaSpoutStream kafkaSpoutStream) {
+        this.kafkaSpoutStream = kafkaSpoutStream;
+        if (kafkaSpoutStream.getTopicWildcardPattern() == null) {
+            throw new IllegalStateException("KafkaSpoutStream must be 
configured for wildcard topic");
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        kafkaSpoutStream.declareOutputFields(declarer);
+    }
+
+    @Override
+    public void emit(SpoutOutputCollector collector, List<Object> tuple, 
KafkaSpoutMessageId messageId) {
+        kafkaSpoutStream.emit(collector, tuple, messageId);
+    }
+
+    public KafkaSpoutStream getStream() {
+        return kafkaSpoutStream;
+    }
+
+    public Pattern getTopicWildcardPattern() {
+        return kafkaSpoutStream.getTopicWildcardPattern();
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaSpoutStreamsWildcardTopics{" +
+                "kafkaSpoutStream=" + kafkaSpoutStream +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
index d67c69d..2ba0a79 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
@@ -19,64 +19,14 @@
 package org.apache.storm.kafka.spout;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
 /**
  * {@link KafkaSpoutTuplesBuilder} wraps all the logic that builds tuples from 
{@link ConsumerRecord}s.
  * The logic is provided by the user by implementing the appropriate number of 
{@link KafkaSpoutTupleBuilder} instances
  */
-public class KafkaSpoutTuplesBuilder<K,V> implements Serializable {
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpoutTuplesBuilder.class);
-
-    private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
-
-    private KafkaSpoutTuplesBuilder(Builder<K,V> builder) {
-        this.topicToTupleBuilders = builder.topicToTupleBuilders;
-        LOG.debug("Instantiated {}", this);
-    }
-
-    public static class Builder<K,V> {
-        private List<KafkaSpoutTupleBuilder<K, V>> tupleBuilders;
-        private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
-
-        @SafeVarargs
-        public Builder(KafkaSpoutTupleBuilder<K,V>... tupleBuilders) {
-            if (tupleBuilders == null || tupleBuilders.length == 0) {
-                throw new IllegalArgumentException("Must specify at last one 
tuple builder per topic declared in KafkaSpoutStreams");
-            }
-
-            this.tupleBuilders = Arrays.asList(tupleBuilders);
-            topicToTupleBuilders = new HashMap<>();
-        }
-
-        public KafkaSpoutTuplesBuilder<K,V> build() {
-            for (KafkaSpoutTupleBuilder<K, V> tupleBuilder : tupleBuilders) {
-                for (String topic : tupleBuilder.getTopics()) {
-                    if (!topicToTupleBuilders.containsKey(topic)) {
-                        topicToTupleBuilders.put(topic, tupleBuilder);
-                    }
-                }
-            }
-            return new KafkaSpoutTuplesBuilder<>(this);
-        }
-    }
-
-    public List<Object>buildTuple(ConsumerRecord<K,V> consumerRecord) {
-        final String topic = consumerRecord.topic();
-        return topicToTupleBuilders.get(topic).buildTuple(consumerRecord);
-    }
-
-    @Override
-    public String toString() {
-        return "KafkaSpoutTuplesBuilder{" +
-                "topicToTupleBuilders=" + topicToTupleBuilders +
-                '}';
-    }
+public interface KafkaSpoutTuplesBuilder<K,V> extends Serializable {
+    List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java
new file mode 100644
index 0000000..80fe543
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaSpoutTuplesBuilderNamedTopics<K,V> implements 
KafkaSpoutTuplesBuilder<K,V> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSpoutTuplesBuilderNamedTopics.class);
+
+    private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
+
+    private KafkaSpoutTuplesBuilderNamedTopics(Builder<K,V> builder) {
+        this.topicToTupleBuilders = builder.topicToTupleBuilders;
+        LOG.debug("Instantiated {}", this);
+    }
+
+    public static class Builder<K,V> {
+        private List<KafkaSpoutTupleBuilder<K, V>> tupleBuilders;
+        private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
+
+        @SafeVarargs
+        public Builder(KafkaSpoutTupleBuilder<K,V>... tupleBuilders) {
+            if (tupleBuilders == null || tupleBuilders.length == 0) {
+                throw new IllegalArgumentException("Must specify at last one 
tuple builder per topic declared in KafkaSpoutStreams");
+            }
+
+            this.tupleBuilders = Arrays.asList(tupleBuilders);
+            topicToTupleBuilders = new HashMap<>();
+        }
+
+        public KafkaSpoutTuplesBuilderNamedTopics<K,V> build() {
+            for (KafkaSpoutTupleBuilder<K, V> tupleBuilder : tupleBuilders) {
+                for (String topic : tupleBuilder.getTopics()) {
+                    if (!topicToTupleBuilders.containsKey(topic)) {
+                        topicToTupleBuilders.put(topic, tupleBuilder);
+                    }
+                }
+            }
+            return new KafkaSpoutTuplesBuilderNamedTopics<>(this);
+        }
+    }
+
+    public List<Object>buildTuple(ConsumerRecord<K,V> consumerRecord) {
+        final String topic = consumerRecord.topic();
+        return topicToTupleBuilders.get(topic).buildTuple(consumerRecord);
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaSpoutTuplesBuilderNamedTopics {" +
+                "topicToTupleBuilders=" + topicToTupleBuilders +
+                '}';
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java
new file mode 100644
index 0000000..85d4809
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.List;
+
+public class KafkaSpoutTuplesBuilderWildcardTopics<K,V> implements 
KafkaSpoutTuplesBuilder<K,V> {
+    private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
+
+    public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder<K, V> 
tupleBuilder) {
+        this.tupleBuilder = tupleBuilder;
+    }
+
+    @Override
+    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+        return tupleBuilder.buildTuple(consumerRecord);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
deleted file mode 100644
index 0691dd3..0000000
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.test;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.spout.KafkaSpout;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
-import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-
-public class KafkaSpoutTopologyMain {
-    private static final String[] STREAMS = new 
String[]{"test_stream","test1_stream","test2_stream"};
-    private static final String[] TOPICS = new 
String[]{"test","test1","test2"};
-
-
-    public static void main(String[] args) throws Exception {
-        if (args.length == 0) {
-            submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig());
-        } else {
-            submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), 
getConfig());
-        }
-    }
-
-    protected static void submitTopologyLocalCluster(StormTopology topology, 
Config config) throws InterruptedException {
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology("test", config, topology);
-        stopWaitingForInput();
-    }
-
-    protected static void submitTopologyRemoteCluster(String arg, 
StormTopology topology, Config config) throws Exception {
-        StormSubmitter.submitTopology(arg, config, topology);
-    }
-
-    private static void stopWaitingForInput() {
-        try {
-            System.out.println("PRESS ENTER TO STOP");
-            new BufferedReader(new InputStreamReader(System.in)).readLine();
-            System.exit(0);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-    }
-
-    protected static Config getConfig() {
-        Config config = new Config();
-        config.setDebug(true);
-        return config;
-    }
-
-    public static StormTopology getTopolgyKafkaSpout() {
-        final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new 
KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
-        tp.setBolt("kafka_bolt", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
-        tp.setBolt("kafka_bolt_1", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
-        return tp.createTopology();
-    }
-
-    public static KafkaSpoutConfig<String,String> 
getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
-        return new KafkaSpoutConfig.Builder<String, 
String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), 
getRetryService())
-                .setOffsetCommitPeriodMs(10_000)
-                .setFirstPollOffsetStrategy(EARLIEST)
-                .setMaxUncommittedOffsets(250)
-                .build();
-    }
-
-    private static KafkaSpoutRetryService getRetryService() {
-            return new KafkaSpoutRetryExponentialBackoff(getTimeInterval(500, 
TimeUnit.MICROSECONDS),
-                    TimeInterval.milliSeconds(2), Integer.MAX_VALUE, 
TimeInterval.seconds(10));
-    }
-
-    private static TimeInterval getTimeInterval(long delay, TimeUnit timeUnit) 
{
-        return new TimeInterval(delay, timeUnit);
-    }
-
-    public static Map<String,Object> getKafkaConsumerProps() {
-        Map<String, Object> props = new HashMap<>();
-//        props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
-        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, 
"127.0.0.1:9092");
-        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
-        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
"org.apache.kafka.common.serialization.StringDeserializer");
-        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
"org.apache.kafka.common.serialization.StringDeserializer");
-        return props;
-    }
-
-    public static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
-        return new KafkaSpoutTuplesBuilder.Builder<>(
-                new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], 
TOPICS[1]),
-                new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
-                .build();
-    }
-
-    public static KafkaSpoutStreams getKafkaSpoutStreams() {
-        final Fields outputFields = new Fields("topic", "partition", "offset", 
"key", "value");
-        final Fields outputFields1 = new Fields("topic", "partition", 
"offset");
-        return new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new 
String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, test1, sent to 
test_stream
-                .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  
// contents of topic test2 sent to test_stream
-                .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) 
 // contents of topic test2 sent to test2_stream
-                .build();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
new file mode 100644
index 0000000..952c5d3
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
+import 
org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
+import org.apache.storm.kafka.spout.KafkaSpoutStreams;
+import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
+import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
+import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderNamedTopics;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+public class KafkaSpoutTopologyMainNamedTopics {
+    private static final String[] STREAMS = new 
String[]{"test_stream","test1_stream","test2_stream"};
+    private static final String[] TOPICS = new 
String[]{"test","test1","test2"};
+
+
+    public static void main(String[] args) throws Exception {
+        new KafkaSpoutTopologyMainNamedTopics().runMain(args);
+    }
+
+    protected void runMain(String[] args) throws Exception {
+        if (args.length == 0) {
+            submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig());
+        } else {
+            submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), 
getConfig());
+        }
+
+    }
+
+    protected void submitTopologyLocalCluster(StormTopology topology, Config 
config) throws InterruptedException {
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("test", config, topology);
+        stopWaitingForInput();
+    }
+
+    protected void submitTopologyRemoteCluster(String arg, StormTopology 
topology, Config config) throws Exception {
+        StormSubmitter.submitTopology(arg, config, topology);
+    }
+
+    protected void stopWaitingForInput() {
+        try {
+            System.out.println("PRESS ENTER TO STOP");
+            new BufferedReader(new InputStreamReader(System.in)).readLine();
+            System.exit(0);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    protected Config getConfig() {
+        Config config = new Config();
+        config.setDebug(true);
+        return config;
+    }
+
+    protected StormTopology getTopolgyKafkaSpout() {
+        final TopologyBuilder tp = new TopologyBuilder();
+        tp.setSpout("kafka_spout", new 
KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
+        tp.setBolt("kafka_bolt", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
+        tp.setBolt("kafka_bolt_1", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
+        return tp.createTopology();
+    }
+
+    protected KafkaSpoutConfig<String,String> 
getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
+        return new KafkaSpoutConfig.Builder<String, 
String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), 
getRetryService())
+                .setOffsetCommitPeriodMs(10_000)
+                .setFirstPollOffsetStrategy(EARLIEST)
+                .setMaxUncommittedOffsets(250)
+                .build();
+    }
+
+    protected KafkaSpoutRetryService getRetryService() {
+            return new KafkaSpoutRetryExponentialBackoff(getTimeInterval(500, 
TimeUnit.MICROSECONDS),
+                    TimeInterval.milliSeconds(2), Integer.MAX_VALUE, 
TimeInterval.seconds(10));
+    }
+
+    protected TimeInterval getTimeInterval(long delay, TimeUnit timeUnit) {
+        return new TimeInterval(delay, timeUnit);
+    }
+
+    protected Map<String,Object> getKafkaConsumerProps() {
+        Map<String, Object> props = new HashMap<>();
+//        props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
+        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, 
"127.0.0.1:9092");
+        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
+        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
"org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
"org.apache.kafka.common.serialization.StringDeserializer");
+        return props;
+    }
+
+    protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
+        return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
+                new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], 
TOPICS[1]),
+                new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
+                .build();
+    }
+
+    protected KafkaSpoutStreams getKafkaSpoutStreams() {
+        final Fields outputFields = new Fields("topic", "partition", "offset", 
"key", "value");
+        final Fields outputFields1 = new Fields("topic", "partition", 
"offset");
+        return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, 
STREAMS[0], new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, 
test1, sent to test_stream
+                .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  
// contents of topic test2 sent to test_stream
+                .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]}) 
 // contents of topic test2 sent to test2_stream
+                .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/f423dbc5/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
new file mode 100644
index 0000000..c362a2b
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainWildcardTopics.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutStream;
+import org.apache.storm.kafka.spout.KafkaSpoutStreams;
+import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
+import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilderWildcardTopics;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+import java.util.regex.Pattern;
+
+public class KafkaSpoutTopologyMainWildcardTopics extends 
KafkaSpoutTopologyMainNamedTopics {
+    private static final String STREAM = "test_wildcard_stream";
+    private static final String TOPIC_WILDCARD_PATTERN = "test[1|2]";
+
+    public static void main(String[] args) throws Exception {
+        new KafkaSpoutTopologyMainWildcardTopics().runMain(args);
+    }
+
+    protected StormTopology getTopolgyKafkaSpout() {
+        final TopologyBuilder tp = new TopologyBuilder();
+        tp.setSpout("kafka_spout", new 
KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
+        tp.setBolt("kafka_bolt", new 
KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
+        return tp.createTopology();
+    }
+
+    protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
+        return new KafkaSpoutTuplesBuilderWildcardTopics<>(getTupleBuilder());
+    }
+
+    protected KafkaSpoutTupleBuilder<String, String> getTupleBuilder() {
+        return new TopicsTest0Test1TupleBuilder<>(TOPIC_WILDCARD_PATTERN);
+    }
+
+    protected KafkaSpoutStreams getKafkaSpoutStreams() {
+        final Fields outputFields = new Fields("topic", "partition", "offset", 
"key", "value");
+        final KafkaSpoutStream kafkaSpoutStream = new 
KafkaSpoutStream(outputFields, STREAM, Pattern.compile(TOPIC_WILDCARD_PATTERN));
+        return new KafkaSpoutStreamsWildcardTopics(kafkaSpoutStream);
+    }
+}

Reply via email to