Repository: kafka Updated Branches: refs/heads/trunk 6508e63c7 -> 23dff4b04
KAFKA-4114: Allow different offset reset strategies mjsax Here's my first pass at finer grained auto offset reset strategies. I've left TODO comments about whether we want to consider adding this to `KGroupedTable.aggregate` and `KStreamImpl` when re-partitioning a source. Author: bbejeck <bbej...@gmail.com> Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #2007 from bbejeck/KAFKA-4114_allow_different_offset_reset_strategies Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/23dff4b0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/23dff4b0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/23dff4b0 Branch: refs/heads/trunk Commit: 23dff4b04a4c4cd338b5f39c9bb7d384857c482c Parents: 6508e63 Author: Bill Bejeck <bbej...@gmail.com> Authored: Wed Jan 11 20:30:47 2017 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed Jan 11 20:30:47 2017 -0800 ---------------------------------------------------------------------- .../kafka/streams/kstream/KStreamBuilder.java | 118 +++++++++- .../kstream/internals/KGroupedTableImpl.java | 1 - .../streams/processor/TopologyBuilder.java | 225 ++++++++++++++++++- .../processor/internals/StreamThread.java | 54 ++++- ...eamsFineGrainedAutoResetIntegrationTest.java | 223 ++++++++++++++++++ .../streams/processor/TopologyBuilderTest.java | 2 +- 6 files changed, 595 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/23dff4b0/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index 33085c9..b1c1cfb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -55,7 +55,22 @@ public class KStreamBuilder extends TopologyBuilder { * @return a {@link KStream} for the specified topics */ public <K, V> KStream<K, V> stream(String... topics) { - return stream(null, null, topics); + return stream(null, null, null, topics); + } + + + /** + * Create a {@link KStream} instance from the specified topics. + * The default deserializers specified in the config are used. + * <p> + * If multiple topics are specified there are nor ordering guaranteed for records from different topics. + * + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets available; acceptable values are earliest or latest + * @param topics the topic names; must contain at least one topic name + * @return a {@link KStream} for the specified topics + */ + public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, String... topics) { + return stream(offsetReset, null, null, topics); } @@ -70,10 +85,26 @@ public class KStreamBuilder extends TopologyBuilder { * @return a {@link KStream} for topics matching the regex pattern. */ public <K, V> KStream<K, V> stream(Pattern topicPattern) { - return stream(null, null, topicPattern); + return stream(null, null, null, topicPattern); + } + + /** + * Create a {@link KStream} instance from the specified Pattern. + * The default deserializers specified in the config are used. + * <p> + * If multiple topics are matched by the specified pattern, the created stream will read data from all of them, + * and there is no ordering guarantee between records from different topics + * + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets available; acceptable values are earliest or latest + * @param topicPattern the Pattern to match for topic names + * @return a {@link KStream} for topics matching the regex pattern. + */ + public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Pattern topicPattern) { + return stream(offsetReset, null, null, topicPattern); } + /** * Create a {@link KStream} instance from the specified topics. * <p> @@ -87,13 +118,33 @@ public class KStreamBuilder extends TopologyBuilder { * @return a {@link KStream} for the specified topics */ public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, String... topics) { + return stream(null, keySerde, valSerde, topics); + } + + + /** + * Create a {@link KStream} instance from the specified topics. + * <p> + * If multiple topics are specified there are nor ordering guaranteed for records from different topics. + * + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets available; acceptable values are earliest or latest + * + * @param keySerde key serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param topics the topic names; must contain at least one topic name + * @return a {@link KStream} for the specified topics + */ + public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valSerde, String... topics) { String name = newName(KStreamImpl.SOURCE_NAME); - addSource(name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics); + addSource(offsetReset, name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics); return new KStreamImpl<>(this, name, Collections.singleton(name), false); } + /** * Create a {@link KStream} instance from the specified Pattern. * <p> @@ -108,9 +159,27 @@ public class KStreamBuilder extends TopologyBuilder { * @return a {@link KStream} for the specified topics */ public <K, V> KStream<K, V> stream(Serde<K> keySerde, Serde<V> valSerde, Pattern topicPattern) { + return stream(null, keySerde, valSerde, topicPattern); + } + + /** + * Create a {@link KStream} instance from the specified Pattern. + * <p> + * If multiple topics are matched by the specified pattern, the created stream will read data from all of them, + * and there is no ordering guarantee between records from different topics. + * + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets available; acceptable values are earliest or latest + * @param keySerde key serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param valSerde value serde used to read this source {@link KStream}, + * if not specified the default serde defined in the configs will be used + * @param topicPattern the Pattern to match for topic names + * @return a {@link KStream} for the specified topics + */ + public <K, V> KStream<K, V> stream(AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valSerde, Pattern topicPattern) { String name = newName(KStreamImpl.SOURCE_NAME); - addSource(name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern); + addSource(offsetReset, name, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern); return new KStreamImpl<>(this, name, Collections.singleton(name), false); } @@ -122,14 +191,31 @@ public class KStreamBuilder extends TopologyBuilder { * The resulting {@link KTable} will be materialized in a local state store with the given store name. * However, no new changelog topic is created in this case since the underlying topic acts as one. * + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets available; acceptable values are earliest or latest + * @param topic the topic name; cannot be null + * @param storeName the state store name used if this KTable is materialized, can be null if materialization not expected + * @return a {@link KTable} for the specified topics + */ + public <K, V> KTable<K, V> table(AutoOffsetReset offsetReset, String topic, final String storeName) { + return table(offsetReset, null, null, topic, storeName); + } + + /** + * Create a {@link KTable} instance for the specified topic. + * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime. + * The default deserializers specified in the config are used. + * The resulting {@link KTable} will be materialized in a local state store with the given store name. + * However, no new changelog topic is created in this case since the underlying topic acts as one. + * * @param topic the topic name; cannot be null * @param storeName the state store name used if this KTable is materialized, can be null if materialization not expected * @return a {@link KTable} for the specified topics */ public <K, V> KTable<K, V> table(String topic, final String storeName) { - return table(null, null, topic, storeName); + return table(null, null, null, topic, storeName); } + /** * Create a {@link KTable} instance for the specified topic. * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime. @@ -145,11 +231,30 @@ public class KStreamBuilder extends TopologyBuilder { * @return a {@link KTable} for the specified topics */ public <K, V> KTable<K, V> table(Serde<K> keySerde, Serde<V> valSerde, String topic, final String storeName) { + return table(null, keySerde, valSerde, topic, storeName); + } + + /** + * Create a {@link KTable} instance for the specified topic. + * Record keys of the topic should never by null, otherwise an exception will be thrown at runtime. + * The resulting {@link KTable} will be materialized in a local state store with the given store name. + * However, no new changelog topic is created in this case since the underlying topic acts as one. + * + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets available; acceptable values are earliest or latest + * @param keySerde key serde used to send key-value pairs, + * if not specified the default key serde defined in the configuration will be used + * @param valSerde value serde used to send key-value pairs, + * if not specified the default value serde defined in the configuration will be used + * @param topic the topic name; cannot be null + * @param storeName the state store name used if this KTable is materialized, can be null if materialization not expected + * @return a {@link KTable} for the specified topics + */ + public <K, V> KTable<K, V> table(AutoOffsetReset offsetReset, Serde<K> keySerde, Serde<V> valSerde, String topic, final String storeName) { final String source = newName(KStreamImpl.SOURCE_NAME); final String name = newName(KTableImpl.SOURCE_NAME); final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeName); - addSource(source, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic); + addSource(offsetReset, source, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic); addProcessor(name, processorSupplier, source); final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), storeName); @@ -166,6 +271,7 @@ public class KStreamBuilder extends TopologyBuilder { return kTable; } + /** * Create a new instance of {@link KStream} by merging the given streams. * <p> http://git-wip-us.apache.org/repos/asf/kafka/blob/23dff4b0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java index 4edfa89..03fbbce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImpl.java @@ -68,7 +68,6 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K> implements KGroup return aggregate(initializer, adder, subtractor, keyValueStore(keySerde, aggValueSerde, storeName)); } - @Override public <T> KTable<K, T> aggregate(Initializer<T> initializer, Aggregator<? super K, ? super V, T> adder, http://git-wip-us.apache.org/repos/asf/kafka/blob/23dff4b0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 211ed64..97cb4be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -21,6 +21,8 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.TopologyBuilderException; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.processor.internals.InternalTopicConfig; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; @@ -93,6 +95,16 @@ public class TopologyBuilder { // this is used in the extended KStreamBuilder. private final HashMap<String, String> sourceStoreToSourceTopic = new HashMap<>(); + private final Set<String> earliestResetTopics = new HashSet<>(); + + private final Set<String> latestResetTopics = new HashSet<>(); + + private final Set<Pattern> earliestResetPatterns = new HashSet<>(); + + private final Set<Pattern> latestResetPatterns = new HashSet<>(); + + private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = Pattern.compile(""); + private final QuickUnion<String> nodeGrouper = new QuickUnion<>(); private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); @@ -279,6 +291,13 @@ public class TopologyBuilder { } /** + * Enum used to define auto offset reset policy when creating {@link KStream} or {@link KTable} + */ + public enum AutoOffsetReset { + EARLIEST , LATEST + } + + /** * Create a new builder. */ public TopologyBuilder() {} @@ -311,7 +330,23 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null */ public synchronized final TopologyBuilder addSource(String name, String... topics) { - return addSource(name, (Deserializer) null, (Deserializer) null, topics); + return addSource(null, name, (Deserializer) null, (Deserializer) null, topics); + } + + /** + * Add a new source that consumes the named topics and forward the records to child processor and/or sink nodes. + * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and + * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. + * + * @param offsetReset the auto offset reset policy to use for this source if no committed offsets found; acceptable values earliest or latest + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param topics the name of one or more Kafka topics that this source is to consume + * @return this builder instance so methods can be chained together; never null + */ + public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, String... topics) { + return addSource(offsetReset, name, (Deserializer) null, (Deserializer) null, topics); } @@ -328,9 +363,27 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null */ public synchronized final TopologyBuilder addSource(String name, Pattern topicPattern) { - return addSource(name, (Deserializer) null, (Deserializer) null, topicPattern); + return addSource(null, name, (Deserializer) null, (Deserializer) null, topicPattern); + } + + /** + * Add a new source that consumes from topics matching the given pattern + * and forward the records to child processor and/or sink nodes. + * The source will use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} and + * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. + * + * @param offsetReset the auto offset reset policy value for this source if no committed offsets found; acceptable values earliest or latest. + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume + * @return this builder instance so methods can be chained together; never null + */ + public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Pattern topicPattern) { + return addSource(offsetReset, name, (Deserializer) null, (Deserializer) null, topicPattern); } + /** * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. * The source will use the specified key and value deserializers. @@ -348,6 +401,28 @@ public class TopologyBuilder { * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source */ public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) { + return addSource(null, name, keyDeserializer, valDeserializer, topics); + + } + + /** + * Add a new source that consumes the named topics and forwards the records to child processor and/or sink nodes. + * The source will use the specified key and value deserializers. + * + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latest. + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source + * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the + * {@link org.apache.kafka.streams.StreamsConfig stream configuration} + * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source + * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * {@link org.apache.kafka.streams.StreamsConfig stream configuration} + * @param topics the name of one or more Kafka topics that this source is to consume + * @return this builder instance so methods can be chained together; never null + * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source + */ + public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) { if (topics.length == 0) { throw new TopologyBuilderException("You must provide at least one topic"); } @@ -366,6 +441,9 @@ public class TopologyBuilder { } } + + maybeAddToResetList(earliestResetTopics, latestResetTopics, offsetReset, topic); + sourceTopicNames.add(topic); } @@ -397,6 +475,32 @@ public class TopologyBuilder { */ public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) { + return addSource(null, name, keyDeserializer, valDeserializer, topicPattern); + + } + + /** + * Add a new source that consumes from topics matching the given pattern + * and forwards the records to child processor and/or sink nodes. + * The source will use the specified key and value deserializers. The provided + * de-/serializers will be used for all matched topics, so care should be taken to specify patterns for + * topics that share the same key-value data format. + * + * @param offsetReset the auto offset reset policy to use for this stream if no committed offsets found; acceptable values are earliest or latest + * @param name the unique name of the source used to reference this node when + * {@link #addProcessor(String, ProcessorSupplier, String...) adding processor children}. + * @param keyDeserializer the {@link Deserializer key deserializer} used when consuming records; may be null if the source + * should use the {@link org.apache.kafka.streams.StreamsConfig#KEY_SERDE_CLASS_CONFIG default key deserializer} specified in the + * {@link org.apache.kafka.streams.StreamsConfig stream configuration} + * @param valDeserializer the {@link Deserializer value deserializer} used when consuming records; may be null if the source + * should use the {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERDE_CLASS_CONFIG default value deserializer} specified in the + * {@link org.apache.kafka.streams.StreamsConfig stream configuration} + * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume + * @return this builder instance so methods can be chained together; never null + * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name + */ + + public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) { Objects.requireNonNull(topicPattern, "topicPattern can't be null"); Objects.requireNonNull(name, "name can't be null"); @@ -410,6 +514,8 @@ public class TopologyBuilder { } } + maybeAddToResetList(earliestResetPatterns, latestResetPatterns, offsetReset, topicPattern); + nodeFactories.put(name, new SourceNodeFactory(name, null, topicPattern, keyDeserializer, valDeserializer)); nodeToSourcePatterns.put(name, topicPattern); nodeGrouper.add(name); @@ -722,6 +828,22 @@ public class TopologyBuilder { Collections.unmodifiableSet(sourceTopics)); } + + private <T> void maybeAddToResetList(Collection<T> earliestResets, Collection<T> latestResets, AutoOffsetReset offsetReset, T item) { + if (offsetReset != null) { + switch (offsetReset) { + case EARLIEST: + earliestResets.add(item); + break; + case LATEST: + latestResets.add(item); + break; + default: + throw new TopologyBuilderException(String.format("Unrecognized reset format %s", offsetReset)); + } + } + } + /** * Returns the map of node groups keyed by the topic group id. * @@ -850,7 +972,7 @@ public class TopologyBuilder { return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), sourceStoreToSourceTopic, storeToProcessorNodeMap); } - + /** * Returns the map of topic groups keyed by the group id. * A topic group is a group of topics in the same task. @@ -954,6 +1076,84 @@ public class TopologyBuilder { } /** + * Get the Pattern to match all topics requiring to start reading from earliest available offset + * @return the Pattern for matching all topics reading from earliest offset, never null + */ + public synchronized Pattern earliestResetTopicsPattern() { + Set<String> topics = maybeDecorateInternalSourceTopics(earliestResetTopics); + + String[] sourceTopicNames = topics.toArray(new String[topics.size()]); + Pattern[] sourceTopicPatterns = earliestResetPatterns.toArray(new Pattern[earliestResetPatterns.size()]); + + Pattern earliestPattern = buildPatternForOffsetResetTopics(sourceTopicNames, sourceTopicPatterns); + + ensureNoRegexOverlap(earliestPattern, latestResetPatterns, latestResetTopics); + + return earliestPattern; + } + + /** + * Get the Pattern to match all topics requiring to start reading from latest available offset + * @return the Pattern for matching all topics reading from latest offset, never null + */ + public synchronized Pattern latestResetTopicsPattern() { + Set<String> topics = maybeDecorateInternalSourceTopics(latestResetTopics); + + String[] sourceTopicNames = topics.toArray(new String[topics.size()]); + Pattern[] sourceTopicPatterns = latestResetPatterns.toArray(new Pattern[latestResetPatterns.size()]); + + Pattern latestPattern = buildPatternForOffsetResetTopics(sourceTopicNames, sourceTopicPatterns); + + ensureNoRegexOverlap(latestPattern, earliestResetPatterns, earliestResetTopics); + + return latestPattern; + } + + private void ensureNoRegexOverlap(Pattern builtPattern, Set<Pattern> otherPatterns, Set<String> otherTopics) { + + for (Pattern otherPattern : otherPatterns) { + if (builtPattern.pattern().contains(otherPattern.pattern())) { + throw new TopologyBuilderException(String.format("Found overlapping regex [%s] against [%s] for a KStream with auto offset resets", otherPattern.pattern(), builtPattern.pattern())); + } + } + + for (String otherTopic : otherTopics) { + if (builtPattern.matcher(otherTopic).matches()) { + throw new TopologyBuilderException(String.format("Found overlapping regex [%s] matching topic [%s] for a KStream with auto offset resets", builtPattern.pattern(), otherTopic)); + } + } + + } + + + /** + * Builds a composite pattern out of topic names and Pattern object for matching topic names. If the provided + * arrays are empty a Pattern.compile("") instance is returned. + * + * @param sourceTopics the name of source topics to add to a composite pattern + * @param sourcePatterns Patterns for matching source topics to add to a composite pattern + * @return a Pattern that is composed of the literal source topic names and any Patterns for matching source topics + */ + private static synchronized Pattern buildPatternForOffsetResetTopics(String[] sourceTopics, Pattern[] sourcePatterns) { + StringBuilder builder = new StringBuilder(); + + for (String topic : sourceTopics) { + builder.append(topic).append("|"); + } + + for (Pattern sourcePattern : sourcePatterns) { + builder.append(sourcePattern.pattern()).append("|"); + } + + if (builder.length() > 0) { + builder.setLength(builder.length() - 1); + return Pattern.compile(builder.toString()); + } + + return EMPTY_ZERO_LENGTH_PATTERN; + } + + /** * @return a mapping from state store name to a Set of source Topics. */ public Map<String, Set<String>> stateStoreNameToSourceTopics() { @@ -1013,20 +1213,21 @@ public class TopologyBuilder { public synchronized Pattern sourceTopicPattern() { if (this.topicPattern == null && !nodeToSourcePatterns.isEmpty()) { - StringBuilder builder = new StringBuilder(); - for (Pattern pattern : nodeToSourcePatterns.values()) { - builder.append(pattern.pattern()).append("|"); - } + + List<String> allNodeToSourceTopics = new ArrayList<>(); if (!nodeToSourceTopics.isEmpty()) { for (String[] topics : nodeToSourceTopics.values()) { - for (String topic : topics) { - builder.append(topic).append("|"); - } + allNodeToSourceTopics.addAll(Arrays.asList(topics)); + } } + int numPatterns = nodeToSourcePatterns.values().size(); + int numTopics = allNodeToSourceTopics.size(); - builder.setLength(builder.length() - 1); - this.topicPattern = Pattern.compile(builder.toString()); + Pattern[] patterns = nodeToSourcePatterns.values().toArray(new Pattern[numPatterns]); + String[] allTopics = allNodeToSourceTopics.toArray(new String[numTopics]); + + this.topicPattern = buildPatternForOffsetResetTopics(allTopics, patterns); } return this.topicPattern; } http://git-wip-us.apache.org/repos/asf/kafka/blob/23dff4b0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 90194c6..5641849 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -19,9 +19,11 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.NoOffsetForPartitionException; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; @@ -199,6 +201,7 @@ public class StreamThread extends Thread { private final long commitTimeMs; private final StreamsMetricsThreadImpl streamsMetrics; final StateDirectory stateDirectory; + private String originalReset; private StreamPartitionAssignor partitionAssignor = null; private boolean cleanRun = false; @@ -311,7 +314,16 @@ public class StreamThread extends Thread { log.info("{} Creating producer client", logPrefix); this.producer = clientSupplier.getProducer(config.getProducerConfigs(threadClientId)); log.info("{} Creating consumer client", logPrefix); - this.consumer = clientSupplier.getConsumer(config.getConsumerConfigs(this, applicationId, threadClientId)); + + Map<String, Object> consumerConfigs = config.getConsumerConfigs(this, applicationId, threadClientId); + + if (!builder.latestResetTopicsPattern().pattern().equals("") || !builder.earliestResetTopicsPattern().pattern().equals("")) { + originalReset = (String) consumerConfigs.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG); + log.info("{} custom offset resets specified updating configs original auto offset reset {}", logPrefix, originalReset); + consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); + } + + this.consumer = clientSupplier.getConsumer(consumerConfigs); log.info("{} Creating restore consumer client", logPrefix); this.restoreConsumer = clientSupplier.getRestoreConsumer(config.getRestoreConsumerConfigs(threadClientId)); @@ -453,13 +465,8 @@ public class StreamThread extends Thread { /** -<<<<<<< HEAD * Similar to shutdownTasksAndState, however does not close the task managers, in the hope that * soon the tasks will be assigned again -======= - * Similar to shutdownTasksAndState, however does not close the task managers, - * in the hope that soon the tasks will be assigned again ->>>>>>> apache-kafka/trunk */ private void suspendTasksAndState() { log.debug("{} suspendTasksAndState: suspending all active tasks [{}] and standby tasks [{}]", logPrefix, @@ -574,13 +581,44 @@ public class StreamThread extends Thread { boolean longPoll = totalNumBuffered == 0; - ConsumerRecords<byte[], byte[]> records = consumer.poll(longPoll ? this.pollTimeMs : 0); + ConsumerRecords<byte[], byte[]> records = null; + + try { + records = consumer.poll(longPoll ? this.pollTimeMs : 0); + } catch (NoOffsetForPartitionException ex) { + TopicPartition partition = ex.partition(); + if (builder.earliestResetTopicsPattern().matcher(partition.topic()).matches()) { + log.info(String.format("stream-thread [%s] setting topic to consume from earliest offset %s", this.getName(), partition.topic())); + consumer.seekToBeginning(ex.partitions()); + } else if (builder.latestResetTopicsPattern().matcher(partition.topic()).matches()) { + consumer.seekToEnd(ex.partitions()); + log.info(String.format("stream-thread [%s] setting topic to consume from latest offset %s", this.getName(), partition.topic())); + } else { + + if (originalReset == null || (!originalReset.equals("earliest") && !originalReset.equals("latest"))) { + setState(State.PENDING_SHUTDOWN); + String errorMessage = "No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured." + + " You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset " + + "policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)"; + throw new StreamsException(String.format(errorMessage, partition.topic(), partition.partition()), ex); + } + + if (originalReset.equals("earliest")) { + consumer.seekToBeginning(ex.partitions()); + } else if (originalReset.equals("latest")) { + consumer.seekToEnd(ex.partitions()); + } + log.info(String.format("stream-thread [%s] no custom setting defined for topic %s using original config %s for offset reset", this.getName(), partition.topic(), originalReset)); + } + + } if (rebalanceException != null) throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException); - if (!records.isEmpty()) { + if (records != null && !records.isEmpty()) { int numAddedRecords = 0; + for (TopicPartition partition : records.partitions()) { StreamTask task = activeTasksByPartition.get(partition); numAddedRecords += task.addRecords(partition, records.records(partition)); http://git-wip-us.apache.org/repos/asf/kafka/blob/23dff4b0/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java new file mode 100644 index 0000000..2171de1 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + + +package org.apache.kafka.streams.integration; + + +import kafka.utils.MockTime; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.errors.TopologyBuilderException; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.regex.Pattern; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +public class KStreamsFineGrainedAutoResetIntegrationTest { + + private static final int NUM_BROKERS = 1; + private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic"; + + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS); + private final MockTime mockTime = CLUSTER.time; + + private static final String TOPIC_1 = "topic-1"; + private static final String TOPIC_2 = "topic-2"; + private static final String TOPIC_A = "topic-A"; + private static final String TOPIC_C = "topic-C"; + private static final String TOPIC_Y = "topic-Y"; + private static final String TOPIC_Z = "topic-Z"; + private static final String NOOP = "noop"; + private final Serde<String> stringSerde = Serdes.String(); + + private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName(); + private Properties streamsConfiguration; + + + @BeforeClass + public static void startKafkaCluster() throws Exception { + CLUSTER.createTopic(TOPIC_1); + CLUSTER.createTopic(TOPIC_2); + CLUSTER.createTopic(TOPIC_A); + CLUSTER.createTopic(TOPIC_C); + CLUSTER.createTopic(TOPIC_Y); + CLUSTER.createTopic(TOPIC_Z); + CLUSTER.createTopic(NOOP); + CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC); + + } + + @Before + public void setUp() throws Exception { + + Properties props = new Properties(); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + streamsConfiguration = StreamsTestUtils.getStreamsConfig( + "testAutoOffsetId", + CLUSTER.bootstrapServers(), + STRING_SERDE_CLASSNAME, + STRING_SERDE_CLASSNAME, + props); + + // Remove any state from previous test runs + IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); + } + + @Test + public void shouldOnlyReadRecordsWhereEarliestSpecified() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + + final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d")); + final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]")); + final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z); + + pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + + final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); + + final String topic1TestMessage = "topic-1 test"; + final String topic2TestMessage = "topic-2 test"; + final String topicATestMessage = "topic-A test"; + final String topicCTestMessage = "topic-C test"; + final String topicYTestMessage = "topic-Y test"; + final String topicZTestMessage = "topic-Z test"; + + IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Collections.singletonList(topic1TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Collections.singletonList(topic2TestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Collections.singletonList(topicATestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Collections.singletonList(topicCTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Collections.singletonList(topicYTestMessage), producerConfig, mockTime); + IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Collections.singletonList(topicZTestMessage), producerConfig, mockTime); + + final Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class); + + final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + streams.start(); + + final List<String> expectedReceivedValues = Arrays.asList(topic1TestMessage, topic2TestMessage, topicYTestMessage, topicZTestMessage); + final List<KeyValue<String, String>> receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 4); + final List<String> actualValues = new ArrayList<>(4); + + for (final KeyValue<String, String> receivedKeyValue : receivedKeyValues) { + actualValues.add(receivedKeyValue.value); + } + + streams.close(); + Collections.sort(actualValues); + Collections.sort(expectedReceivedValues); + assertThat(actualValues, equalTo(expectedReceivedValues)); + + } + + + @Test(expected = TopologyBuilderException.class) + public void shouldThrowExceptionOverlappingPattern() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + //NOTE this would realistically get caught when building topology, the test is for completeness + final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]")); + final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]")); + final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z); + + builder.earliestResetTopicsPattern(); + + } + + @Test(expected = TopologyBuilderException.class) + public void shouldThrowExceptionOverlappingTopic() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + //NOTE this would realistically get caught when building topology, the test is for completeness + final KStream<String, String> pattern1Stream = builder.stream(KStreamBuilder.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]")); + final KStream<String, String> pattern2Stream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, Pattern.compile("topic-\\d]")); + final KStream<String, String> namedTopicsStream = builder.stream(KStreamBuilder.AutoOffsetReset.LATEST, TOPIC_A, TOPIC_Z); + + builder.latestResetTopicsPattern(); + + } + + + @Test + public void shouldThrowStreamsExceptionNoResetSpecified() throws Exception { + Properties props = new Properties(); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); + + Properties localConfig = StreamsTestUtils.getStreamsConfig( + "testAutoOffsetWithNone", + CLUSTER.bootstrapServers(), + STRING_SERDE_CLASSNAME, + STRING_SERDE_CLASSNAME, + props); + + final KStreamBuilder builder = new KStreamBuilder(); + final KStream<String, String> exceptionStream = builder.stream(NOOP); + + exceptionStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC); + + KafkaStreams streams = new KafkaStreams(builder, localConfig); + + final TestingUncaughtExceptionHandler uncaughtExceptionHandler = new TestingUncaughtExceptionHandler(); + + final TestCondition correctExceptionThrownCondition = new TestCondition() { + @Override + public boolean conditionMet() { + return uncaughtExceptionHandler.correctExceptionThrown; + } + }; + + streams.setUncaughtExceptionHandler(uncaughtExceptionHandler); + streams.start(); + TestUtils.waitForCondition(correctExceptionThrownCondition, "The expected NoOffsetForPartitionException was never thrown"); + streams.close(); + } + + + private static final class TestingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler { + boolean correctExceptionThrown = false; + @Override + public void uncaughtException(Thread t, Throwable e) { + assertThat(e.getClass().getSimpleName(), is("StreamsException")); + assertThat(e.getCause().getClass().getSimpleName(), is("NoOffsetForPartitionException")); + correctExceptionThrown = true; + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/23dff4b0/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index c402c9b..b085a84 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -184,7 +184,7 @@ public class TopologyBuilderTest { @Test public void testSubscribeTopicNameAndPattern() { final TopologyBuilder builder = new TopologyBuilder(); - Pattern expectedPattern = Pattern.compile(".*-\\d|topic-foo|topic-bar"); + Pattern expectedPattern = Pattern.compile("topic-foo|topic-bar|.*-\\d"); builder.addSource("source-1", "topic-foo", "topic-bar"); builder.addSource("source-2", Pattern.compile(".*-\\d")); assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern());