Repository: kafka
Updated Branches:
  refs/heads/trunk 6508e63c7 -> 23dff4b04


KAFKA-4114: Allow different offset reset strategies

mjsax

Here's my first pass at finer grained auto offset reset strategies.

I've left TODO comments about whether we want to consider adding this to 
`KGroupedTable.aggregate` and `KStreamImpl` when re-partitioning a source.

Author: bbejeck <bbej...@gmail.com>

Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang 
<wangg...@gmail.com>

Closes #2007 from bbejeck/KAFKA-4114_allow_different_offset_reset_strategies


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23dff4b0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23dff4b0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23dff4b0

Branch: refs/heads/trunk
Commit: 23dff4b04a4c4cd338b5f39c9bb7d384857c482c
Parents: 6508e63
Author: Bill Bejeck <bbej...@gmail.com>
Authored: Wed Jan 11 20:30:47 2017 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Wed Jan 11 20:30:47 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/kstream/KStreamBuilder.java   | 118 +++++++++-
 .../kstream/internals/KGroupedTableImpl.java    |   1 -
 .../streams/processor/TopologyBuilder.java      | 225 ++++++++++++++++++-
 .../processor/internals/StreamThread.java       |  54 ++++-
 ...eamsFineGrainedAutoResetIntegrationTest.java | 223 ++++++++++++++++++
 .../streams/processor/TopologyBuilderTest.java  |   2 +-
 6 files changed, 595 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/23dff4b0/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
index 33085c9..b1c1cfb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java
@@ -55,7 +55,22 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link KStream} for the specified topics
      */
     public <K, V> KStream<K, V> stream(String... topics) {
-        return stream(null, null, topics);
+        return stream(null, null, null, topics);
+    }
+
+
+    /**
+     * Create a {@link KStream} instance from the specified topics.
+     * The default deserializers specified in the config are used.
+     * <p>
+     * If multiple topics are specified there are nor ordering guaranteed for 
records from different topics.
+     *
+     * @param offsetReset the auto offset reset policy to use for this stream 
if no committed offsets available; acceptable values are earliest or latest
+     * @param topics    the topic names; must contain at least one topic name
+     * @return a {@link KStream} for the specified topics
+     */
+    public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, String... 
topics) {
+        return stream(offsetReset, null, null, topics);
     }
 
 
@@ -70,10 +85,26 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link KStream} for topics matching the regex pattern.
      */
     public <K, V> KStream<K, V> stream(Pattern topicPattern) {
-        return stream(null, null, topicPattern);
+        return stream(null, null, null, topicPattern);
+    }
+
+    /**
+     * Create a {@link KStream} instance from the specified Pattern.
+     * The default deserializers specified in the config are used.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created 
stream will read data from all of them,
+     * and there is no ordering guarantee between records from different topics
+     *
+     * @param offsetReset the auto offset reset policy to use for this stream 
if no committed offsets available; acceptable values are earliest or latest
+     * @param topicPattern    the Pattern to match for topic names
+     * @return a {@link KStream} for topics matching the regex pattern.
+     */
+    public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Pattern 
topicPattern) {
+        return stream(offsetReset, null, null, topicPattern);
     }
 
 
+
     /**
      * Create a {@link KStream} instance from the specified topics.
      * <p>
@@ -87,13 +118,33 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link KStream} for the specified topics
      */
     public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, 
String... topics) {
+        return stream(null, keySerde, valSerde, topics);
+    }
+
+
+    /**
+     * Create a {@link KStream} instance from the specified topics.
+     * <p>
+     * If multiple topics are specified there are nor ordering guaranteed for 
records from different topics.
+     *
+     * @param offsetReset the auto offset reset policy to use for this stream 
if no committed offsets available; acceptable values are earliest or latest
+     *
+     * @param keySerde  key serde used to read this source {@link KStream},
+     *                  if not specified the default serde defined in the 
configs will be used
+     * @param valSerde  value serde used to read this source {@link KStream},
+     *                  if not specified the default serde defined in the 
configs will be used
+     * @param topics    the topic names; must contain at least one topic name
+     * @return a {@link KStream} for the specified topics
+     */
+    public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Serde<K> 
keySerde, Serde<V> valSerde, String... topics) {
         String name = newName(KStreamImpl.SOURCE_NAME);
 
-        addSource(name, keySerde == null ? null : keySerde.deserializer(), 
valSerde == null ? null : valSerde.deserializer(), topics);
+        addSource(offsetReset, name,  keySerde == null ? null : 
keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), 
topics);
 
         return new KStreamImpl<>(this, name, Collections.singleton(name), 
false);
     }
 
+
     /**
      * Create a {@link KStream} instance from the specified Pattern.
      * <p>
@@ -108,9 +159,27 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link KStream} for the specified topics
      */
     public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, 
Pattern topicPattern) {
+        return stream(null, keySerde, valSerde, topicPattern);
+    }
+
+    /**
+     * Create a {@link KStream} instance from the specified Pattern.
+     * <p>
+     * If multiple topics are matched by the specified pattern, the created 
stream will read data from all of them,
+     * and there is no ordering guarantee between records from different 
topics.
+     *
+     * @param offsetReset the auto offset reset policy to use for this stream 
if no committed offsets available; acceptable values are earliest or latest
+     * @param keySerde  key serde used to read this source {@link KStream},
+     *                  if not specified the default serde defined in the 
configs will be used
+     * @param valSerde  value serde used to read this source {@link KStream},
+     *                  if not specified the default serde defined in the 
configs will be used
+     * @param topicPattern    the Pattern to match for topic names
+     * @return a {@link KStream} for the specified topics
+     */
+    public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Serde<K> 
keySerde, Serde<V> valSerde, Pattern topicPattern) {
         String name = newName(KStreamImpl.SOURCE_NAME);
 
-        addSource(name, keySerde == null ? null : keySerde.deserializer(), 
valSerde == null ? null : valSerde.deserializer(), topicPattern);
+        addSource(offsetReset, name, keySerde == null ? null : 
keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), 
topicPattern);
 
         return new KStreamImpl<>(this, name, Collections.singleton(name), 
false);
     }
@@ -122,14 +191,31 @@ public class KStreamBuilder extends TopologyBuilder {
      * The resulting {@link KTable} will be materialized in a local state 
store with the given store name.
      * However, no new changelog topic is created in this case since the 
underlying topic acts as one.
      *
+     * @param offsetReset the auto offset reset policy to use for this stream 
if no committed offsets available; acceptable values are earliest or latest
+     * @param topic     the topic name; cannot be null
+     * @param storeName the state store name used if this KTable is 
materialized, can be null if materialization not expected
+     * @return a {@link KTable} for the specified topics
+     */
+    public <K, V> KTable<K, V> table(AutoOffsetReset offsetReset, String 
topic, final String storeName) {
+        return table(offsetReset, null, null, topic, storeName);
+    }
+
+    /**
+     * Create a {@link KTable} instance for the specified topic.
+     * Record keys of the topic should never by null, otherwise an exception 
will be thrown at runtime.
+     * The default deserializers specified in the config are used.
+     * The resulting {@link KTable} will be materialized in a local state 
store with the given store name.
+     * However, no new changelog topic is created in this case since the 
underlying topic acts as one.
+     *
      * @param topic     the topic name; cannot be null
      * @param storeName the state store name used if this KTable is 
materialized, can be null if materialization not expected
      * @return a {@link KTable} for the specified topics
      */
     public <K, V> KTable<K, V> table(String topic, final String storeName) {
-        return table(null, null, topic, storeName);
+        return table(null, null, null, topic, storeName);
     }
 
+
     /**
      * Create a {@link KTable} instance for the specified topic.
      * Record keys of the topic should never by null, otherwise an exception 
will be thrown at runtime.
@@ -145,11 +231,30 @@ public class KStreamBuilder extends TopologyBuilder {
      * @return a {@link KTable} for the specified topics
      */
     public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, 
String topic, final String storeName) {
+        return table(null, keySerde, valSerde, topic, storeName);
+    }
+
+    /**
+     * Create a {@link KTable} instance for the specified topic.
+     * Record keys of the topic should never by null, otherwise an exception 
will be thrown at runtime.
+     * The resulting {@link KTable} will be materialized in a local state 
store with the given store name.
+     * However, no new changelog topic is created in this case since the 
underlying topic acts as one.
+     *
+     * @param offsetReset the auto offset reset policy to use for this stream 
if no committed offsets available; acceptable values are earliest or latest
+     * @param keySerde   key serde used to send key-value pairs,
+     *                   if not specified the default key serde defined in the 
configuration will be used
+     * @param valSerde   value serde used to send key-value pairs,
+     *                   if not specified the default value serde defined in 
the configuration will be used
+     * @param topic      the topic name; cannot be null
+     * @param storeName  the state store name used if this KTable is 
materialized, can be null if materialization not expected
+     * @return a {@link KTable} for the specified topics
+     */
+    public <K, V> KTable<K, V> table(AutoOffsetReset offsetReset, Serde<K> 
keySerde, Serde<V> valSerde, String topic, final String storeName) {
         final String source = newName(KStreamImpl.SOURCE_NAME);
         final String name = newName(KTableImpl.SOURCE_NAME);
         final ProcessorSupplier<K, V> processorSupplier = new 
KTableSource<>(storeName);
 
-        addSource(source, keySerde == null ? null : keySerde.deserializer(), 
valSerde == null ? null : valSerde.deserializer(), topic);
+        addSource(offsetReset, source, keySerde == null ? null : 
keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), 
topic);
         addProcessor(name, processorSupplier, source);
 
         final KTableImpl kTable = new KTableImpl<>(this, name, 
processorSupplier, Collections.singleton(source), storeName);
@@ -166,6 +271,7 @@ public class KStreamBuilder extends TopologyBuilder {
         return kTable;
     }
 
+
     /**
      * Create a new instance of {@link KStream} by merging the given streams.
      * <p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/23dff4b0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
index 4edfa89..03fbbce 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java
@@ -68,7 +68,6 @@ public class KGroupedTableImpl<K, V> extends 
AbstractStream<K> implements KGroup
         return aggregate(initializer, adder, subtractor, 
keyValueStore(keySerde, aggValueSerde, storeName));
     }
 
-
     @Override
     public <T> KTable<K, T> aggregate(Initializer<T> initializer,
                                       Aggregator<? super K, ? super V, T> 
adder,

http://git-wip-us.apache.org/repos/asf/kafka/blob/23dff4b0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 211ed64..97cb4be 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.internals.InternalTopicConfig;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
@@ -93,6 +95,16 @@ public class TopologyBuilder {
     // this is used in the extended KStreamBuilder.
     private final HashMap<String, String> sourceStoreToSourceTopic = new 
HashMap<>();
 
+    private final Set<String> earliestResetTopics = new HashSet<>();
+
+    private final Set<String> latestResetTopics = new HashSet<>();
+
+    private final Set<Pattern> earliestResetPatterns = new HashSet<>();
+
+    private final Set<Pattern> latestResetPatterns = new HashSet<>();
+
+    private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
+
     private final QuickUnion<String> nodeGrouper = new QuickUnion<>();
 
     private SubscriptionUpdates subscriptionUpdates = new 
SubscriptionUpdates();
@@ -279,6 +291,13 @@ public class TopologyBuilder {
     }
 
     /**
+     * Enum used to define auto offset reset policy when creating {@link 
KStream} or {@link KTable}
+     */
+    public enum AutoOffsetReset {
+        EARLIEST , LATEST
+    }
+
+    /**
      * Create a new builder.
      */
     public TopologyBuilder() {}
@@ -311,7 +330,23 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never 
null
      */
     public synchronized final TopologyBuilder addSource(String name, String... 
topics) {
-        return addSource(name, (Deserializer) null, (Deserializer) null, 
topics);
+        return addSource(null, name, (Deserializer) null, (Deserializer) null, 
topics);
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forward the records 
to child processor and/or sink nodes.
+     * The source will use the {@link 
org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key 
deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG 
default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
+     *
+     * @param offsetReset the auto offset reset policy to use for this source 
if no committed offsets found; acceptable values earliest or latest
+     * @param name the unique name of the source used to reference this node 
when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}.
+     * @param topics the name of one or more Kafka topics that this source is 
to consume
+     * @return this builder instance so methods can be chained together; never 
null
+     */
+    public synchronized final TopologyBuilder addSource(AutoOffsetReset 
offsetReset, String name,  String... topics) {
+        return addSource(offsetReset, name, (Deserializer) null, 
(Deserializer) null, topics);
     }
 
 
@@ -328,9 +363,27 @@ public class TopologyBuilder {
      * @return this builder instance so methods can be chained together; never 
null
      */
     public synchronized final TopologyBuilder addSource(String name, Pattern 
topicPattern) {
-        return addSource(name, (Deserializer) null, (Deserializer) null, 
topicPattern);
+        return addSource(null, name, (Deserializer) null, (Deserializer) null, 
topicPattern);
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forward the records to child processor and/or sink nodes.
+     * The source will use the {@link 
org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key 
deserializer} and
+     * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG 
default value deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}.
+     *
+     * @param offsetReset the auto offset reset policy value for this source 
if no committed offsets found; acceptable values earliest or latest.
+     * @param name the unique name of the source used to reference this node 
when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}.
+     * @param topicPattern regular expression pattern to match Kafka topics 
that this source is to consume
+     * @return this builder instance so methods can be chained together; never 
null
+     */
+    public synchronized final TopologyBuilder addSource(AutoOffsetReset 
offsetReset, String name,  Pattern topicPattern) {
+        return addSource(offsetReset, name, (Deserializer) null, 
(Deserializer) null, topicPattern);
     }
 
+
     /**
      * Add a new source that consumes the named topics and forwards the 
records to child processor and/or sink nodes.
      * The source will use the specified key and value deserializers.
@@ -348,6 +401,28 @@ public class TopologyBuilder {
      * @throws TopologyBuilderException if processor is already added or if 
topics have already been registered by another source
      */
     public synchronized final TopologyBuilder addSource(String name, 
Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) {
+        return addSource(null, name, keyDeserializer, valDeserializer, topics);
+
+    }
+
+    /**
+     * Add a new source that consumes the named topics and forwards the 
records to child processor and/or sink nodes.
+     * The source will use the specified key and value deserializers.
+     *
+     * @param offsetReset the auto offset reset policy to use for this stream 
if no committed offsets found; acceptable values are earliest or latest.
+     * @param name the unique name of the source used to reference this node 
when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}.
+     * @param keyDeserializer the {@link Deserializer key deserializer} used 
when consuming records; may be null if the source
+     * should use the {@link 
org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key 
deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
+     * @param valDeserializer the {@link Deserializer value deserializer} used 
when consuming records; may be null if the source
+     * should use the {@link 
org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value 
deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
+     * @param topics the name of one or more Kafka topics that this source is 
to consume
+     * @return this builder instance so methods can be chained together; never 
null
+     * @throws TopologyBuilderException if processor is already added or if 
topics have already been registered by another source
+     */
+    public synchronized final TopologyBuilder addSource(AutoOffsetReset 
offsetReset, String name, Deserializer keyDeserializer, Deserializer 
valDeserializer, String... topics) {
         if (topics.length == 0) {
             throw new TopologyBuilderException("You must provide at least one 
topic");
         }
@@ -366,6 +441,9 @@ public class TopologyBuilder {
                 }
             }
 
+
+            maybeAddToResetList(earliestResetTopics, latestResetTopics, 
offsetReset, topic);
+
             sourceTopicNames.add(topic);
         }
 
@@ -397,6 +475,32 @@ public class TopologyBuilder {
      */
 
     public synchronized final TopologyBuilder addSource(String name, 
Deserializer keyDeserializer, Deserializer valDeserializer, Pattern 
topicPattern) {
+        return addSource(null, name,  keyDeserializer, valDeserializer, 
topicPattern);
+
+    }
+
+    /**
+     * Add a new source that consumes from topics matching the given pattern
+     * and forwards the records to child processor and/or sink nodes.
+     * The source will use the specified key and value deserializers. The 
provided
+     * de-/serializers will be used for all matched topics, so care should be 
taken to specify patterns for
+     * topics that share the same key-value data format.
+     *
+     * @param offsetReset  the auto offset reset policy to use for this stream 
if no committed offsets found; acceptable values are earliest or latest
+     * @param name the unique name of the source used to reference this node 
when
+     * {@link #addProcessor(String, ProcessorSupplier, String...) adding 
processor children}.
+     * @param keyDeserializer the {@link Deserializer key deserializer} used 
when consuming records; may be null if the source
+     * should use the {@link 
org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key 
deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
+     * @param valDeserializer the {@link Deserializer value deserializer} used 
when consuming records; may be null if the source
+     * should use the {@link 
org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value 
deserializer} specified in the
+     * {@link org.apache.kafka.streams.StreamsConfig stream configuration}
+     * @param topicPattern regular expression pattern to match Kafka topics 
that this source is to consume
+     * @return this builder instance so methods can be chained together; never 
null
+     * @throws TopologyBuilderException if processor is already added or if 
topics have already been registered by name
+     */
+
+    public synchronized final TopologyBuilder addSource(AutoOffsetReset 
offsetReset, String name,  Deserializer keyDeserializer, Deserializer 
valDeserializer, Pattern topicPattern) {
         Objects.requireNonNull(topicPattern, "topicPattern can't be null");
         Objects.requireNonNull(name, "name can't be null");
 
@@ -410,6 +514,8 @@ public class TopologyBuilder {
             }
         }
 
+        maybeAddToResetList(earliestResetPatterns, latestResetPatterns, 
offsetReset, topicPattern);
+
         nodeFactories.put(name, new SourceNodeFactory(name, null, 
topicPattern, keyDeserializer, valDeserializer));
         nodeToSourcePatterns.put(name, topicPattern);
         nodeGrouper.add(name);
@@ -722,6 +828,22 @@ public class TopologyBuilder {
                 Collections.unmodifiableSet(sourceTopics));
     }
 
+
+    private <T> void maybeAddToResetList(Collection<T> earliestResets, 
Collection<T> latestResets, AutoOffsetReset offsetReset, T item) {
+        if (offsetReset != null) {
+            switch (offsetReset) {
+                case EARLIEST:
+                    earliestResets.add(item);
+                    break;
+                case LATEST:
+                    latestResets.add(item);
+                    break;
+                default:
+                    throw new 
TopologyBuilderException(String.format("Unrecognized reset format %s", 
offsetReset));
+            }
+        }
+    }
+
     /**
      * Returns the map of node groups keyed by the topic group id.
      *
@@ -850,7 +972,7 @@ public class TopologyBuilder {
 
         return new ProcessorTopology(processorNodes, topicSourceMap, 
topicSinkMap, new ArrayList<>(stateStoreMap.values()), 
sourceStoreToSourceTopic, storeToProcessorNodeMap);
     }
-    
+
     /**
      * Returns the map of topic groups keyed by the group id.
      * A topic group is a group of topics in the same task.
@@ -954,6 +1076,84 @@ public class TopologyBuilder {
     }
 
     /**
+     * Get the Pattern to match all topics requiring to start reading from 
earliest available offset
+     * @return the Pattern for matching all topics reading from earliest 
offset, never null
+     */
+    public synchronized Pattern earliestResetTopicsPattern() {
+        Set<String> topics = 
maybeDecorateInternalSourceTopics(earliestResetTopics);
+
+        String[] sourceTopicNames = topics.toArray(new String[topics.size()]);
+        Pattern[] sourceTopicPatterns = earliestResetPatterns.toArray(new 
Pattern[earliestResetPatterns.size()]);
+
+        Pattern earliestPattern =  
buildPatternForOffsetResetTopics(sourceTopicNames, sourceTopicPatterns);
+
+        ensureNoRegexOverlap(earliestPattern, latestResetPatterns, 
latestResetTopics);
+
+        return earliestPattern;
+    }
+
+    /**
+     * Get the Pattern to match all topics requiring to start reading from 
latest available offset
+     * @return the Pattern for matching all topics reading from latest offset, 
never null
+     */
+    public synchronized Pattern latestResetTopicsPattern() {
+        Set<String> topics = 
maybeDecorateInternalSourceTopics(latestResetTopics);
+
+        String[] sourceTopicNames = topics.toArray(new String[topics.size()]);
+        Pattern[] sourceTopicPatterns = latestResetPatterns.toArray(new 
Pattern[latestResetPatterns.size()]);
+
+        Pattern latestPattern = 
buildPatternForOffsetResetTopics(sourceTopicNames, sourceTopicPatterns);
+
+        ensureNoRegexOverlap(latestPattern, earliestResetPatterns, 
earliestResetTopics);
+
+        return  latestPattern;
+    }
+
+    private void ensureNoRegexOverlap(Pattern builtPattern, Set<Pattern> 
otherPatterns, Set<String> otherTopics) {
+
+        for (Pattern otherPattern : otherPatterns) {
+            if (builtPattern.pattern().contains(otherPattern.pattern())) {
+                throw new TopologyBuilderException(String.format("Found 
overlapping regex [%s] against [%s] for a KStream with auto offset resets", 
otherPattern.pattern(), builtPattern.pattern()));
+            }
+        }
+
+        for (String otherTopic : otherTopics) {
+            if (builtPattern.matcher(otherTopic).matches()) {
+                throw new TopologyBuilderException(String.format("Found 
overlapping regex [%s] matching topic [%s] for a KStream with auto offset 
resets", builtPattern.pattern(), otherTopic));
+            }
+        }
+
+    }
+
+
+    /**
+     * Builds a composite pattern out of topic names and Pattern object for 
matching topic names.  If the provided
+     * arrays are empty a Pattern.compile("") instance is returned.
+     *
+     * @param sourceTopics  the name of source topics to add to a composite 
pattern
+     * @param sourcePatterns Patterns for matching source topics to add to a 
composite pattern
+     * @return a Pattern that is composed of the literal source topic names 
and any Patterns for matching source topics
+     */
+    private static synchronized Pattern 
buildPatternForOffsetResetTopics(String[] sourceTopics, Pattern[] 
sourcePatterns) {
+        StringBuilder builder = new StringBuilder();
+
+        for (String topic : sourceTopics) {
+            builder.append(topic).append("|");
+        }
+
+        for (Pattern sourcePattern : sourcePatterns) {
+            builder.append(sourcePattern.pattern()).append("|");
+        }
+
+        if (builder.length() > 0) {
+            builder.setLength(builder.length() - 1);
+            return Pattern.compile(builder.toString());
+        }
+
+        return EMPTY_ZERO_LENGTH_PATTERN;
+    }
+
+    /**
      * @return a mapping from state store name to a Set of source Topics.
      */
     public Map<String, Set<String>> stateStoreNameToSourceTopics() {
@@ -1013,20 +1213,21 @@ public class TopologyBuilder {
 
     public synchronized Pattern sourceTopicPattern() {
         if (this.topicPattern == null && !nodeToSourcePatterns.isEmpty()) {
-            StringBuilder builder = new StringBuilder();
-            for (Pattern pattern : nodeToSourcePatterns.values()) {
-                builder.append(pattern.pattern()).append("|");
-            }
+
+            List<String> allNodeToSourceTopics = new ArrayList<>();
             if (!nodeToSourceTopics.isEmpty()) {
                 for (String[] topics : nodeToSourceTopics.values()) {
-                    for (String topic : topics) {
-                        builder.append(topic).append("|");
-                    }
+                    allNodeToSourceTopics.addAll(Arrays.asList(topics));
+
                 }
             }
+            int numPatterns = nodeToSourcePatterns.values().size();
+            int numTopics = allNodeToSourceTopics.size();
 
-            builder.setLength(builder.length() - 1);
-            this.topicPattern = Pattern.compile(builder.toString());
+            Pattern[] patterns = nodeToSourcePatterns.values().toArray(new 
Pattern[numPatterns]);
+            String[] allTopics = allNodeToSourceTopics.toArray(new 
String[numTopics]);
+
+            this.topicPattern = buildPatternForOffsetResetTopics(allTopics, 
patterns);
         }
         return this.topicPattern;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/23dff4b0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 90194c6..5641849 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -19,9 +19,11 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
@@ -199,6 +201,7 @@ public class StreamThread extends Thread {
     private final long commitTimeMs;
     private final StreamsMetricsThreadImpl streamsMetrics;
     final StateDirectory stateDirectory;
+    private String originalReset;
 
     private StreamPartitionAssignor partitionAssignor = null;
     private boolean cleanRun = false;
@@ -311,7 +314,16 @@ public class StreamThread extends Thread {
         log.info("{} Creating producer client", logPrefix);
         this.producer = 
clientSupplier.getProducer(config.getProducerConfigs(threadClientId));
         log.info("{} Creating consumer client", logPrefix);
-        this.consumer = 
clientSupplier.getConsumer(config.getConsumerConfigs(this, applicationId, 
threadClientId));
+
+        Map<String, Object> consumerConfigs = config.getConsumerConfigs(this, 
applicationId, threadClientId);
+
+        if (!builder.latestResetTopicsPattern().pattern().equals("") || 
!builder.earliestResetTopicsPattern().pattern().equals("")) {
+            originalReset = (String) 
consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+            log.info("{} custom offset resets specified updating configs 
original auto offset reset {}", logPrefix, originalReset);
+            consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"none");
+        }
+
+        this.consumer = clientSupplier.getConsumer(consumerConfigs);
         log.info("{} Creating restore consumer client", logPrefix);
         this.restoreConsumer = 
clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(threadClientId));
 
@@ -453,13 +465,8 @@ public class StreamThread extends Thread {
 
 
     /**
-<<<<<<< HEAD
      * Similar to shutdownTasksAndState, however does not close the task 
managers, in the hope that
      * soon the tasks will be assigned again
-=======
-     * Similar to shutdownTasksAndState, however does not close the task 
managers,
-     * in the hope that soon the tasks will be assigned again
->>>>>>> apache-kafka/trunk
      */
     private void suspendTasksAndState()  {
         log.debug("{} suspendTasksAndState: suspending all active tasks [{}] 
and standby tasks [{}]", logPrefix,
@@ -574,13 +581,44 @@ public class StreamThread extends Thread {
 
                 boolean longPoll = totalNumBuffered == 0;
 
-                ConsumerRecords<byte[], byte[]> records = 
consumer.poll(longPoll ? this.pollTimeMs : 0);
+                ConsumerRecords<byte[], byte[]> records = null;
+
+                try {
+                    records = consumer.poll(longPoll ? this.pollTimeMs : 0);
+                } catch (NoOffsetForPartitionException ex) {
+                    TopicPartition partition = ex.partition();
+                    if 
(builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) {
+                        log.info(String.format("stream-thread [%s] setting 
topic to consume from earliest offset %s", this.getName(), partition.topic()));
+                        consumer.seekToBeginning(ex.partitions());
+                    } else if 
(builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) {
+                        consumer.seekToEnd(ex.partitions());
+                        log.info(String.format("stream-thread [%s] setting 
topic to consume from latest offset %s", this.getName(), partition.topic()));
+                    } else {
+
+                        if (originalReset == null || 
(!originalReset.equals("earliest") && !originalReset.equals("latest"))) {
+                            setState(State.PENDING_SHUTDOWN);
+                            String errorMessage = "No valid committed offset 
found for input topic %s (partition %s) and no valid reset policy configured." +
+                                    " You need to set configuration parameter 
\"auto.offset.reset\" or specify a topic specific reset " +
+                                    "policy via 
KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or 
KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)";
+                            throw new 
StreamsException(String.format(errorMessage, partition.topic(), 
partition.partition()), ex);
+                        }
+
+                        if (originalReset.equals("earliest")) {
+                            consumer.seekToBeginning(ex.partitions());
+                        } else if (originalReset.equals("latest")) {
+                            consumer.seekToEnd(ex.partitions());
+                        }
+                        log.info(String.format("stream-thread [%s] no custom 
setting defined for topic %s using original config %s for offset reset", 
this.getName(), partition.topic(), originalReset));
+                    }
+
+                }
 
                 if (rebalanceException != null)
                     throw new StreamsException(logPrefix + " Failed to 
rebalance", rebalanceException);
 
-                if (!records.isEmpty()) {
+                if (records != null && !records.isEmpty()) {
                     int numAddedRecords = 0;
+
                     for (TopicPartition partition : records.partitions()) {
                         StreamTask task = 
activeTasksByPartition.get(partition);
                         numAddedRecords += task.addRecords(partition, 
records.records(partition));

http://git-wip-us.apache.org/repos/asf/kafka/blob/23dff4b0/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
new file mode 100644
index 0000000..2171de1
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+
+package org.apache.kafka.streams.integration;
+
+
+import kafka.utils.MockTime;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.TopologyBuilderException;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Pattern;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+public class KStreamsFineGrainedAutoResetIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(NUM_BROKERS);
+    private final MockTime mockTime = CLUSTER.time;
+
+    private static final String TOPIC_1 = "topic-1";
+    private static final String TOPIC_2 = "topic-2";
+    private static final String TOPIC_A = "topic-A";
+    private static final String TOPIC_C = "topic-C";
+    private static final String TOPIC_Y = "topic-Y";
+    private static final String TOPIC_Z = "topic-Z";
+    private static final String NOOP = "noop";
+    private final Serde<String> stringSerde = Serdes.String();
+
+    private static final String STRING_SERDE_CLASSNAME = 
Serdes.String().getClass().getName();
+    private Properties streamsConfiguration;
+
+
+    @BeforeClass
+    public static void startKafkaCluster() throws Exception {
+        CLUSTER.createTopic(TOPIC_1);
+        CLUSTER.createTopic(TOPIC_2);
+        CLUSTER.createTopic(TOPIC_A);
+        CLUSTER.createTopic(TOPIC_C);
+        CLUSTER.createTopic(TOPIC_Y);
+        CLUSTER.createTopic(TOPIC_Z);
+        CLUSTER.createTopic(NOOP);
+        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
+
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        Properties props = new Properties();
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        streamsConfiguration = StreamsTestUtils.getStreamsConfig(
+                "testAutoOffsetId",
+                CLUSTER.bootstrapServers(),
+                STRING_SERDE_CLASSNAME,
+                STRING_SERDE_CLASSNAME,
+                props);
+
+        // Remove any state from previous test runs
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @Test
+    public void shouldOnlyReadRecordsWhereEarliestSpecified() throws  
Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream<String, String> pattern1Stream = 
builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, 
Pattern.compile("topic-\\d"));
+        final KStream<String, String> pattern2Stream = 
builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, 
Pattern.compile("topic-[A-D]"));
+        final KStream<String, String> namedTopicsStream = 
builder.stream(TOPIC_Y, TOPIC_Z);
+
+        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+        pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+        namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+
+        final Properties producerConfig = 
TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, 
StringSerializer.class);
+
+        final String topic1TestMessage = "topic-1 test";
+        final String topic2TestMessage = "topic-2 test";
+        final String topicATestMessage = "topic-A test";
+        final String topicCTestMessage = "topic-C test";
+        final String topicYTestMessage = "topic-Y test";
+        final String topicZTestMessage = "topic-Z test";
+
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, 
Collections.singletonList(topic1TestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, 
Collections.singletonList(topic2TestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, 
Collections.singletonList(topicATestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, 
Collections.singletonList(topicCTestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, 
Collections.singletonList(topicYTestMessage), producerConfig, mockTime);
+        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, 
Collections.singletonList(topicZTestMessage), producerConfig, mockTime);
+
+        final Properties consumerConfig = 
TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, 
StringDeserializer.class);
+
+        final KafkaStreams streams = new KafkaStreams(builder, 
streamsConfiguration);
+        streams.start();
+
+        final List<String> expectedReceivedValues = 
Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, 
topicZTestMessage);
+        final List<KeyValue<String, String>> receivedKeyValues = 
IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, 
DEFAULT_OUTPUT_TOPIC, 4);
+        final List<String> actualValues = new ArrayList<>(4);
+
+        for (final KeyValue<String, String> receivedKeyValue : 
receivedKeyValues) {
+            actualValues.add(receivedKeyValue.value);
+        }
+
+        streams.close();
+        Collections.sort(actualValues);
+        Collections.sort(expectedReceivedValues);
+        assertThat(actualValues, equalTo(expectedReceivedValues));
+
+    }
+
+
+    @Test(expected = TopologyBuilderException.class)
+    public void shouldThrowExceptionOverlappingPattern() throws  Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        //NOTE this would realistically get caught when building topology, the 
test is for completeness
+        final KStream<String, String> pattern1Stream = 
builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, 
Pattern.compile("topic-[A-D]"));
+        final KStream<String, String> pattern2Stream = 
builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, 
Pattern.compile("topic-[A-D]"));
+        final KStream<String, String> namedTopicsStream = 
builder.stream(TOPIC_Y, TOPIC_Z);
+
+        builder.earliestResetTopicsPattern();
+
+    }
+
+    @Test(expected = TopologyBuilderException.class)
+    public void shouldThrowExceptionOverlappingTopic() throws  Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+        //NOTE this would realistically get caught when building topology, the 
test is for completeness
+        final KStream<String, String> pattern1Stream = 
builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, 
Pattern.compile("topic-[A-D]"));
+        final KStream<String, String> pattern2Stream = 
builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, 
Pattern.compile("topic-\\d]"));
+        final KStream<String, String> namedTopicsStream = 
builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A, TOPIC_Z);
+
+        builder.latestResetTopicsPattern();
+
+    }
+
+
+    @Test
+    public void shouldThrowStreamsExceptionNoResetSpecified() throws Exception 
{
+        Properties props = new Properties();
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+
+        Properties localConfig = StreamsTestUtils.getStreamsConfig(
+                "testAutoOffsetWithNone",
+                CLUSTER.bootstrapServers(),
+                STRING_SERDE_CLASSNAME,
+                STRING_SERDE_CLASSNAME,
+                props);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream<String, String> exceptionStream = builder.stream(NOOP);
+
+        exceptionStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
+
+        KafkaStreams streams = new KafkaStreams(builder, localConfig);
+
+        final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new 
TestingUncaughtExceptionHandler();
+
+        final TestCondition correctExceptionThrownCondition = new 
TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                return uncaughtExceptionHandler.correctExceptionThrown;
+            }
+        };
+
+        streams.setUncaughtExceptionHandler(uncaughtExceptionHandler);
+        streams.start();
+        TestUtils.waitForCondition(correctExceptionThrownCondition, "The 
expected NoOffsetForPartitionException was never thrown");
+        streams.close();
+    }
+
+
+    private static final class TestingUncaughtExceptionHandler implements 
Thread.UncaughtExceptionHandler {
+        boolean correctExceptionThrown = false;
+        @Override
+        public void uncaughtException(Thread t, Throwable e) {
+            assertThat(e.getClass().getSimpleName(), is("StreamsException"));
+            assertThat(e.getCause().getClass().getSimpleName(), 
is("NoOffsetForPartitionException"));
+            correctExceptionThrown = true;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/23dff4b0/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index c402c9b..b085a84 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -184,7 +184,7 @@ public class TopologyBuilderTest {
     @Test
     public void testSubscribeTopicNameAndPattern() {
         final TopologyBuilder builder = new TopologyBuilder();
-        Pattern expectedPattern = 
Pattern.compile(".*-\\d|topic-foo|topic-bar");
+        Pattern expectedPattern = 
Pattern.compile("topic-foo|topic-bar|.*-\\d");
         builder.addSource("source-1", "topic-foo", "topic-bar");
         builder.addSource("source-2", Pattern.compile(".*-\\d"));
         assertEquals(expectedPattern.pattern(), 
builder.sourceTopicPattern().pattern());

Reply via email to