[ https://issues.apache.org/jira/browse/KAFKA-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495558#comment-16495558 ]
ASF GitHub Bot commented on KAFKA-4936: --------------------------------------- mjsax closed pull request #5018: KAFKA-4936: Add dynamic routing in Streams URL: https://github.com/apache/kafka/pull/5018 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index 4c76c5e84b6..bdfae055f58 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -3023,7 +3023,7 @@ <h2><a class="toc-backref" href="#id7">Overview</a><a class="headerlink" href="# <li>KStream -> void</li> </ul> </td> - <td><p class="first"><strong>Terminal operation.</strong> Write the records to a Kafka topic. + <td><p class="first"><strong>Terminal operation.</strong> Write the records to Kafka topic(s). (<a class="reference external" href="../../../javadoc/org/apache/kafka/streams/kstream/KStream.html#to(java.lang.String)">KStream details</a>)</p> <p>When to provide serdes explicitly:</p> <ul class="simple"> @@ -3037,6 +3037,8 @@ <h2><a class="toc-backref" href="#id7">Overview</a><a class="headerlink" href="# <p>A variant of <code class="docutils literal"><span class="pre">to</span></code> exists that enables you to specify how the data is produced by using a <code class="docutils literal"><span class="pre">Produced</span></code> instance to specify, for example, a <code class="docutils literal"><span class="pre">StreamPartitioner</span></code> that gives you control over how output records are distributed across the partitions of the output topic.</p> + <p>Another variant of <code class="docutils literal"><span class="pre">to</span></code> exists that enables you to dynamically choose which topic to send to for each record via a <code class="docutils literal"><span class="pre">TopicNameExtractor</span></code> + instance.</p> <div class="highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span> <span class="n">KTable</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">></span> <span class="n">table</span> <span class="o">=</span> <span class="o">...;</span> diff --git a/docs/streams/developer-guide/processor-api.html b/docs/streams/developer-guide/processor-api.html index ef8937258ad..8236ded5c82 100644 --- a/docs/streams/developer-guide/processor-api.html +++ b/docs/streams/developer-guide/processor-api.html @@ -384,7 +384,8 @@ <h2><a class="toc-backref" href="#id8">Connecting Processors and State Stores</a <li>A predefined persistent key-value state store is created and associated with the <code class="docutils literal"><span class="pre">"Process"</span></code> node, using <code class="docutils literal"><span class="pre">countStoreBuilder</span></code>.</li> <li>A sink processor node is then added to complete the topology using the <code class="docutils literal"><span class="pre">addSink</span></code> method, taking the <code class="docutils literal"><span class="pre">"Process"</span></code> node - as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">"sink-topic"</span></code> Kafka topic.</li> + as its upstream processor and writing to a separate <code class="docutils literal"><span class="pre">"sink-topic"</span></code> Kafka topic (note that users can also use another overloaded variant of <code class="docutils literal"><span class="pre">addSink</span></code> + to dynamically determine the Kafka topic to write to for each received record from the upstream processor).</li> </ul> <p>In this topology, the <code class="docutils literal"><span class="pre">"Process"</span></code> stream processor node is considered a downstream processor of the <code class="docutils literal"><span class="pre">"Source"</span></code> node, and an upstream processor of the <code class="docutils literal"><span class="pre">"Sink"</span></code> node. As a result, whenever the <code class="docutils literal"><span class="pre">"Source"</span></code> node forwards a newly fetched record from diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 0e3725ebe32..45d1f7df8a3 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -136,6 +136,13 @@ <h3><a id="streams_api_changes_200" href="#streams_api_changes_200">Streams API The new class <code>To</code> allows you to send records to all or specific downstream processors by name and to set the timestamp for the output record. Forwarding based on child index is not supported in the new API any longer. </p> + <p> + We have added support to allow routing records dynamically to Kafka topics. More specifically, in both the lower-level <code>Topology#addSink</code> and higher-level <code>KStream#to</code> APIs, we have added variants that + take a <code>TopicNameExtractor</code> instance instead of a specific <code>String</code> typed topic name, such that for each received record from the upstream processor, the library will dynamically determine which Kafka topic to write to + based on the record's key and value, as well as record context. Note that all the Kafka topics that that may possibly be used are still considered as user topics and hence required to be pre-created. In addition to that, we have modified the + <code>StreamPartitioner</code> interface to add the topic name parameter since the topic name now may not be known beforehand; users who have customized implementations of this interface would need to update their code while upgrading their application + to use Kafka Streams 2.0.0. + </p> <p> <a href="https://cwiki.apache.org/confluence/x/DVyHB">KIP-284</a> changed the retention time for repartition topics by setting its default value to <code>Long.MAX_VALUE</code>. Instead of relying on data retention Kafka Streams uses the new purge data API to delete consumed data from those topics and to keep used storage small now. diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index c137a305fb1..22f6ea8362b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorTopology; @@ -411,7 +412,8 @@ public synchronized Topology addSource(final AutoOffsetReset offsetReset, * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume * and write to its topic * @return itself - * @throws TopologyException itself + * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name, + * or if this processor's name is equal to the parent's name * @see #addSink(String, String, StreamPartitioner, String...) * @see #addSink(String, String, Serializer, Serializer, String...) * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) @@ -443,7 +445,8 @@ public synchronized Topology addSink(final String name, * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume * and write to its topic * @return itself - * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name + * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name, + * or if this processor's name is equal to the parent's name * @see #addSink(String, String, String...) * @see #addSink(String, String, Serializer, Serializer, String...) * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) @@ -471,7 +474,8 @@ public synchronized Topology addSink(final String name, * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume * and write to its topic * @return itself - * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name + * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name, + * or if this processor's name is equal to the parent's name * @see #addSink(String, String, String...) * @see #addSink(String, String, StreamPartitioner, String...) * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) @@ -501,7 +505,8 @@ public synchronized Topology addSink(final String name, * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume * and write to its topic * @return itself - * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name + * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name, + * or if this processor's name is equal to the parent's name * @see #addSink(String, String, String...) * @see #addSink(String, String, StreamPartitioner, String...) * @see #addSink(String, String, Serializer, Serializer, String...) @@ -516,6 +521,130 @@ public synchronized Topology addSink(final String name, return this; } + /** + * Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}. + * The topics that it may ever send to should be pre-created. + * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and + * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the + * {@link StreamsConfig stream configuration}. + * + * @param name the unique name of the sink + * @param topicExtractor the extractor to determine the name of the Kafka topic to which this sink should write for each record + * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume + * and dynamically write to topics + * @return itself + * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name, + * or if this processor's name is equal to the parent's name + * @see #addSink(String, String, StreamPartitioner, String...) + * @see #addSink(String, String, Serializer, Serializer, String...) + * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) + */ + public synchronized <K, V> Topology addSink(final String name, + final TopicNameExtractor<K, V> topicExtractor, + final String... parentNames) { + internalTopologyBuilder.addSink(name, topicExtractor, null, null, null, parentNames); + return this; + } + + /** + * Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}, + * using the supplied partitioner. + * The topics that it may ever send to should be pre-created. + * The sink will use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and + * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the + * {@link StreamsConfig stream configuration}. + * <p> + * The sink will also use the specified {@link StreamPartitioner} to determine how records are distributed among + * the named Kafka topic's partitions. + * Such control is often useful with topologies that use {@link #addStateStore(StoreBuilder, String...) state + * stores} in its processors. + * In most other cases, however, a partitioner needs not be specified and Kafka will automatically distribute + * records among partitions using Kafka's default partitioning logic. + * + * @param name the unique name of the sink + * @param topicExtractor the extractor to determine the name of the Kafka topic to which this sink should write for each record + * @param partitioner the function that should be used to determine the partition for each record processed by the sink + * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume + * and dynamically write to topics + * @return itself + * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name, + * or if this processor's name is equal to the parent's name + * @see #addSink(String, String, String...) + * @see #addSink(String, String, Serializer, Serializer, String...) + * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) + */ + public synchronized <K, V> Topology addSink(final String name, + final TopicNameExtractor<K, V> topicExtractor, + final StreamPartitioner<? super K, ? super V> partitioner, + final String... parentNames) { + internalTopologyBuilder.addSink(name, topicExtractor, null, null, partitioner, parentNames); + return this; + } + + /** + * Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}. + * The topics that it may ever send to should be pre-created. + * The sink will use the specified key and value serializers. + * + * @param name the unique name of the sink + * @param topicExtractor the extractor to determine the name of the Kafka topic to which this sink should write for each record + * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink + * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the + * {@link StreamsConfig stream configuration} + * @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink + * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the + * {@link StreamsConfig stream configuration} + * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume + * and dynamically write to topics + * @return itself + * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name, + * or if this processor's name is equal to the parent's name + * @see #addSink(String, String, String...) + * @see #addSink(String, String, StreamPartitioner, String...) + * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) + */ + public synchronized <K, V> Topology addSink(final String name, + final TopicNameExtractor<K, V> topicExtractor, + final Serializer<K> keySerializer, + final Serializer<V> valueSerializer, + final String... parentNames) { + internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valueSerializer, null, parentNames); + return this; + } + + /** + * Add a new sink that forwards records from upstream parent processor and/or source nodes to Kafka topics based on {@code topicExtractor}. + * The topics that it may ever send to should be pre-created. + * The sink will use the specified key and value serializers, and the supplied partitioner. + * + * @param name the unique name of the sink + * @param topicExtractor the extractor to determine the name of the Kafka topic to which this sink should write for each record + * @param keySerializer the {@link Serializer key serializer} used when consuming records; may be null if the sink + * should use the {@link StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified in the + * {@link StreamsConfig stream configuration} + * @param valueSerializer the {@link Serializer value serializer} used when consuming records; may be null if the sink + * should use the {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} specified in the + * {@link StreamsConfig stream configuration} + * @param partitioner the function that should be used to determine the partition for each record processed by the sink + * @param parentNames the name of one or more source or processor nodes whose output records this sink should consume + * and dynamically write to topics + * @return itself + * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name, + * or if this processor's name is equal to the parent's name + * @see #addSink(String, String, String...) + * @see #addSink(String, String, StreamPartitioner, String...) + * @see #addSink(String, String, Serializer, Serializer, String...) + */ + public synchronized <K, V> Topology addSink(final String name, + final TopicNameExtractor<K, V> topicExtractor, + final Serializer<K> keySerializer, + final Serializer<V> valueSerializer, + final StreamPartitioner<? super K, ? super V> partitioner, + final String... parentNames) { + internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valueSerializer, partitioner, parentNames); + return this; + } + /** * Add a new processor node that receives and processes records output by one or more parent source or processor * node. @@ -526,7 +655,8 @@ public synchronized Topology addSink(final String name, * @param parentNames the name of one or more source or processor nodes whose output records this processor should receive * and process * @return itself - * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name + * @throws TopologyException if parent processor is not added yet, or if this processor's name is equal to the parent's name, + * or if this processor's name is equal to the parent's name */ public synchronized Topology addProcessor(final String name, final ProcessorSupplier supplier, diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java index 131e8d31cbe..04a292f9a97 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java @@ -134,6 +134,7 @@ interface Sink extends Node { /** * The topic name this sink node is writing to. + * Could be null if the topic name can only be dynamically determined based on {@code TopicNameExtractor} * @return a topic name */ String topic(); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index e75bb3aa227..da86a75b9e3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.TopicNameExtractor; /** * {@code KStream} is an abstraction of a <i>record stream</i> of {@link KeyValue} pairs, i.e., each record is an @@ -461,12 +462,31 @@ * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is * started). * - * @param produced the options to use when producing to the topic * @param topic the topic name + * @param produced the options to use when producing to the topic */ void to(final String topic, final Produced<K, V> produced); + /** + * Dynamically materialize this stream to topics using default serializers specified in the config and producer's + * {@link DefaultPartitioner}. + * The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}. + * + * @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record + */ + void to(final TopicNameExtractor<K, V> topicExtractor); + + /** + * Dynamically materialize this stream to topics using the provided {@link Produced} instance. + * The topic names for each record to send to is dynamically determined based on the {@link TopicNameExtractor}. + * + * @param topicExtractor the extractor to determine the name of the Kafka topic to write to for each record + * @param produced the options to use when producing to the topic + */ + void to(final TopicNameExtractor<K, V> topicExtractor, + final Produced<K, V> produced); + /** * Transform each record of the input stream into zero or more records in the output stream (both key and value type * can be altered arbitrarily). diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 5331a958ac0..7356aff153f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -41,6 +41,8 @@ import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.TopicNameExtractor; +import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; @@ -304,27 +306,37 @@ public void to(final String topic) { to(topic, Produced.<K, V>with(null, null, null)); } - @SuppressWarnings("unchecked") @Override public void to(final String topic, final Produced<K, V> produced) { Objects.requireNonNull(topic, "topic can't be null"); Objects.requireNonNull(produced, "Produced can't be null"); - to(topic, new ProducedInternal<>(produced)); + to(new StaticTopicNameExtractor<K, V>(topic), new ProducedInternal<>(produced)); + } + @Override + public void to(final TopicNameExtractor<K, V> topicExtractor) { + to(topicExtractor, Produced.<K, V>with(null, null, null)); + } + + @Override + public void to(final TopicNameExtractor<K, V> topicExtractor, final Produced<K, V> produced) { + Objects.requireNonNull(topicExtractor, "topic extractor can't be null"); + Objects.requireNonNull(produced, "Produced can't be null"); + to(topicExtractor, new ProducedInternal<>(produced)); } @SuppressWarnings("unchecked") - private void to(final String topic, final ProducedInternal<K, V> produced) { + private void to(final TopicNameExtractor<K, V> topicExtractor, final ProducedInternal<K, V> produced) { final String name = builder.newProcessorName(SINK_NAME); final Serializer<K> keySerializer = produced.keySerde() == null ? null : produced.keySerde().serializer(); final Serializer<V> valSerializer = produced.valueSerde() == null ? null : produced.valueSerde().serializer(); final StreamPartitioner<? super K, ? super V> partitioner = produced.streamPartitioner(); if (partitioner == null && keySerializer instanceof WindowedSerializer) { - final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, (WindowedSerializer) keySerializer); - builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, windowedPartitioner, this.name); + final StreamPartitioner<K, V> windowedPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>((WindowedSerializer) keySerializer); + builder.internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valSerializer, windowedPartitioner, this.name); } else { - builder.internalTopologyBuilder.addSink(name, topic, keySerializer, valSerializer, partitioner, this.name); + builder.internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, valSerializer, partitioner, this.name); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java index 7b04f6a3ff6..04a9ab2e9b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java @@ -24,11 +24,9 @@ public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> { - private final String topic; private final WindowedSerializer<K> serializer; - WindowedStreamPartitioner(final String topic, final WindowedSerializer<K> serializer) { - this.topic = topic; + WindowedStreamPartitioner(final WindowedSerializer<K> serializer) { this.serializer = serializer; } @@ -37,12 +35,14 @@ * and the current number of partitions. The partition number id determined by the original key of the windowed key * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key. * + * @param topic the topic name this record is sent to * @param windowedKey the key of the record * @param value the value of the record * @param numPartitions the total number of partitions * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used */ - public Integer partition(final Windowed<K> windowedKey, final V value, final int numPartitions) { + @Override + public Integer partition(final String topic, final Windowed<K> windowedKey, final V value, final int numPartitions) { final byte[] keyBytes = serializer.serializeBaseKey(topic, windowedKey); // hash the keyBytes to choose a partition diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index f2a9f64359b..d21667fc6dd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -199,7 +199,7 @@ Cancellable schedule(final long intervalMs, long offset(); /** - * Returns the headers of the current input record + * Returns the headers of the current input record; could be null if it is not available * @return the headers */ Headers headers(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java similarity index 65% rename from streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java rename to streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java index 15add71d89a..5819a4603f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java @@ -14,10 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.processor.internals; +package org.apache.kafka.streams.processor; import org.apache.kafka.common.header.Headers; -import org.apache.kafka.streams.processor.Processor; /** * The context associated with the current record being processed by @@ -25,32 +24,32 @@ */ public interface RecordContext { /** - * @return The offset of the original record received from Kafka + * @return The offset of the original record received from Kafka; + * could be -1 if it is not available */ long offset(); /** - * @return The timestamp extracted from the record received from Kafka + * @return The timestamp extracted from the record received from Kafka; + * could be -1 if it is not available */ long timestamp(); /** - * Sets a new timestamp for the output record. - */ - void setTimestamp(final long timestamp); - - /** - * @return The topic the record was received on + * @return The topic the record was received on; + * could be null if it is not available */ String topic(); /** - * @return The partition the record was received on + * @return The partition the record was received on; + * could be -1 if it is not available */ int partition(); /** - * @return The headers from the record received from Kafka + * @return The headers from the record received from Kafka; + * could be null if it is not available */ Headers headers(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java index 1fa5e3d8d6b..a435cafb89b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java @@ -52,11 +52,12 @@ /** * Determine the partition number for a record with the given key and value and the current number of partitions. - * + * + * @param topic the topic name this record is sent to * @param key the key of the record * @param value the value of the record * @param numPartitions the total number of partitions * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used */ - Integer partition(K key, V value, int numPartitions); + Integer partition(String topic, K key, V value, int numPartitions); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopicNameExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopicNameExtractor.java new file mode 100644 index 00000000000..5d79751384e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopicNameExtractor.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * An interface that allows to dynamically determine the name of the Kafka topic to send at the sink node of the topology. + */ +@InterfaceStability.Evolving +public interface TopicNameExtractor<K, V> { + + /** + * Extracts the topic name to send to. The topic name must already exist, since the Kafka Streams library will not + * try to automatically create the topic with the extracted name. + * + * @param key the record key + * @param value the record value + * @param recordContext current context metadata of the record + * @return the topic name this record should be sent to + */ + String extract(final K key, final V value, final RecordContext recordContext); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 33386696eea..0c3fcf20146 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -42,7 +42,7 @@ private final ThreadCache cache; private final Serde valueSerde; private boolean initialized; - protected RecordContext recordContext; + protected ProcessorRecordContext recordContext; protected ProcessorNode currentNode; final StateManager stateManager; @@ -178,12 +178,12 @@ public long timestamp() { } @Override - public void setRecordContext(final RecordContext recordContext) { + public void setRecordContext(final ProcessorRecordContext recordContext) { this.recordContext = recordContext; } @Override - public RecordContext recordContext() { + public ProcessorRecordContext recordContext() { return recordContext; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java index da48e8f1fef..cb64c3aed5c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java @@ -23,20 +23,18 @@ public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> { - private final Serializer<K> keySerializer; private final Cluster cluster; - private final String topic; + private final Serializer<K> keySerializer; private final DefaultPartitioner defaultPartitioner; - public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster, final String topic) { - this.keySerializer = keySerializer; + public DefaultStreamPartitioner(final Serializer<K> keySerializer, final Cluster cluster) { this.cluster = cluster; - this.topic = topic; + this.keySerializer = keySerializer; this.defaultPartitioner = new DefaultPartitioner(); } @Override - public Integer partition(final K key, final V value, final int numPartitions) { + public Integer partition(final String topic, final K key, final V value, final int numPartitions) { final byte[] keyBytes = keySerializer.serialize(topic, key); return defaultPartitioner.partition(topic, key, keyBytes, value, null, cluster); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java index 0ebaf60a99f..9439abaab2d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.RecordContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -34,12 +35,12 @@ * Returns the current {@link RecordContext} * @return the current {@link RecordContext} */ - RecordContext recordContext(); + ProcessorRecordContext recordContext(); /** - * @param recordContext the {@link RecordContext} for the record about to be processes + * @param recordContext the {@link ProcessorRecordContext} for the record about to be processes */ - void setRecordContext(RecordContext recordContext); + void setRecordContext(ProcessorRecordContext recordContext); /** * @param currentNode the current {@link ProcessorNode} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 1651bbd90b3..7d09031d713 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.WindowStoreBuilder; @@ -319,19 +320,19 @@ Source describe() { } private class SinkNodeFactory<K, V> extends NodeFactory { - private final String topic; private final Serializer<K> keySerializer; private final Serializer<V> valSerializer; private final StreamPartitioner<? super K, ? super V> partitioner; + private final TopicNameExtractor<K, V> topicExtractor; private SinkNodeFactory(final String name, final String[] predecessors, - final String topic, + final TopicNameExtractor<K, V> topicExtractor, final Serializer<K> keySerializer, final Serializer<V> valSerializer, final StreamPartitioner<? super K, ? super V> partitioner) { super(name, predecessors.clone()); - this.topic = topic; + this.topicExtractor = topicExtractor; this.keySerializer = keySerializer; this.valSerializer = valSerializer; this.partitioner = partitioner; @@ -339,17 +340,22 @@ private SinkNodeFactory(final String name, @Override public ProcessorNode build() { - if (internalTopicNames.contains(topic)) { - // prefix the internal topic name with the application id - return new SinkNode<>(name, decorateTopic(topic), keySerializer, valSerializer, partitioner); + if (topicExtractor instanceof StaticTopicNameExtractor) { + final String topic = ((StaticTopicNameExtractor) topicExtractor).topicName; + if (internalTopicNames.contains(topic)) { + // prefix the internal topic name with the application id + return new SinkNode<>(name, new StaticTopicNameExtractor<K, V>(decorateTopic(topic)), keySerializer, valSerializer, partitioner); + } else { + return new SinkNode<>(name, topicExtractor, keySerializer, valSerializer, partitioner); + } } else { - return new SinkNode<>(name, topic, keySerializer, valSerializer, partitioner); + return new SinkNode<>(name, topicExtractor, keySerializer, valSerializer, partitioner); } } @Override Sink describe() { - return new Sink(name, topic); + return new Sink(name, topicExtractor); } } @@ -432,6 +438,18 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset, final String... predecessorNames) { Objects.requireNonNull(name, "name must not be null"); Objects.requireNonNull(topic, "topic must not be null"); + addSink(name, new StaticTopicNameExtractor<K, V>(topic), keySerializer, valSerializer, partitioner, predecessorNames); + nodeToSinkTopic.put(name, topic); + } + + public final <K, V> void addSink(final String name, + final TopicNameExtractor<K, V> topicExtractor, + final Serializer<K> keySerializer, + final Serializer<V> valSerializer, + final StreamPartitioner<? super K, ? super V> partitioner, + final String... predecessorNames) { + Objects.requireNonNull(name, "name must not be null"); + Objects.requireNonNull(topicExtractor, "topic extractor must not be null"); if (nodeFactories.containsKey(name)) { throw new TopologyException("Processor " + name + " is already added."); } @@ -449,8 +467,7 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset, } } - nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topic, keySerializer, valSerializer, partitioner)); - nodeToSinkTopic.put(name, topic); + nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, topicExtractor, keySerializer, valSerializer, partitioner)); nodeGrouper.add(name); nodeGrouper.unite(name, predecessorNames); } @@ -888,13 +905,18 @@ private void buildSinkNode(final Map<String, ProcessorNode> processorMap, for (final String predecessor : sinkNodeFactory.predecessors) { processorMap.get(predecessor).addChild(node); - if (internalTopicNames.contains(sinkNodeFactory.topic)) { - // prefix the internal topic name with the application id - final String decoratedTopic = decorateTopic(sinkNodeFactory.topic); - topicSinkMap.put(decoratedTopic, node); - repartitionTopics.add(decoratedTopic); - } else { - topicSinkMap.put(sinkNodeFactory.topic, node); + if (sinkNodeFactory.topicExtractor instanceof StaticTopicNameExtractor) { + final String topic = ((StaticTopicNameExtractor) sinkNodeFactory.topicExtractor).topicName; + + if (internalTopicNames.contains(topic)) { + // prefix the internal topic name with the application id + final String decoratedTopic = decorateTopic(topic); + topicSinkMap.put(decoratedTopic, node); + repartitionTopics.add(decoratedTopic); + } else { + topicSinkMap.put(topic, node); + } + } } } @@ -1489,17 +1511,26 @@ public int hashCode() { } public final static class Sink extends AbstractNode implements TopologyDescription.Sink { - private final String topic; + private final TopicNameExtractor topicNameExtractor; + + public Sink(final String name, + final TopicNameExtractor topicNameExtractor) { + super(name); + this.topicNameExtractor = topicNameExtractor; + } public Sink(final String name, final String topic) { super(name); - this.topic = topic; + this.topicNameExtractor = new StaticTopicNameExtractor(topic); } @Override public String topic() { - return topic; + if (topicNameExtractor instanceof StaticTopicNameExtractor) + return ((StaticTopicNameExtractor) topicNameExtractor).topicName; + else + return null; } @Override @@ -1509,7 +1540,7 @@ public void addSuccessor(final TopologyDescription.Node successor) { @Override public String toString() { - return "Sink: " + name + " (topic: " + topic + ")\n <-- " + nodeNames(predecessors); + return "Sink: " + name + " (topic: " + topic() + ")\n <-- " + nodeNames(predecessors); } @Override @@ -1523,14 +1554,14 @@ public boolean equals(final Object o) { final Sink sink = (Sink) o; return name.equals(sink.name) - && topic.equals(sink.topic) + && topicNameExtractor.equals(sink.topicNameExtractor) && predecessors.equals(sink.predecessors); } @Override public int hashCode() { // omit predecessors as it might change and alter the hash code - return Objects.hash(name, topic); + return Objects.hash(name, topicNameExtractor); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java index c0715259fcf..dd572649765 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java @@ -17,16 +17,17 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.header.Headers; +import org.apache.kafka.streams.processor.RecordContext; import java.util.Objects; public class ProcessorRecordContext implements RecordContext { - private long timestamp; - private final long offset; - private final String topic; - private final int partition; - private final Headers headers; + long timestamp; + final long offset; + final String topic; + final int partition; + final Headers headers; public ProcessorRecordContext(final long timestamp, final long offset, @@ -41,18 +42,27 @@ public ProcessorRecordContext(final long timestamp, this.headers = headers; } + public ProcessorRecordContext(final long timestamp, + final long offset, + final int partition, + final String topic) { + this(timestamp, offset, partition, topic, null); + } + + public void setTimestamp(final long timestamp) { + this.timestamp = timestamp; + } + + @Override public long offset() { return offset; } + @Override public long timestamp() { return timestamp; } - public void setTimestamp(final long timestamp) { - this.timestamp = timestamp; - } - @Override public String topic() { return topic; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 1c8b0a09f48..d753648eede 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -49,11 +49,11 @@ public class RecordCollectorImpl implements RecordCollector { private final Logger log; + private final String logPrefix; + private final Sensor skippedRecordsSensor; private final Producer<byte[], byte[]> producer; private final Map<TopicPartition, Long> offsets; - private final String logPrefix; private final ProductionExceptionHandler productionExceptionHandler; - private final Sensor skippedRecordsSensor; private final static String LOG_MESSAGE = "Error sending record (key {} value {} timestamp {}) to topic {} due to {}; " + "No more records will be sent and no more offsets will be recorded for this task."; @@ -88,7 +88,7 @@ public RecordCollectorImpl(final Producer<byte[], byte[]> producer, if (partitioner != null) { final List<PartitionInfo> partitions = producer.partitionsFor(topic); if (partitions.size() > 0) { - partition = partitioner.partition(key, value, partitions.size()); + partition = partitioner.partition(topic, key, value, partitions.size()); } else { throw new StreamsException("Could not get partition information for topic '" + topic + "'." + " This can happen if the topic does not exist."); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 7711905423e..73bffc80ed3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -19,26 +19,26 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.internals.ChangedSerializer; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.TopicNameExtractor; public class SinkNode<K, V> extends ProcessorNode<K, V> { - private final String topic; private Serializer<K> keySerializer; private Serializer<V> valSerializer; + private final TopicNameExtractor<K, V> topicExtractor; private final StreamPartitioner<? super K, ? super V> partitioner; - private ProcessorContext context; + private InternalProcessorContext context; - public SinkNode(final String name, - final String topic, - final Serializer<K> keySerializer, - final Serializer<V> valSerializer, - final StreamPartitioner<? super K, ? super V> partitioner) { + SinkNode(final String name, + final TopicNameExtractor<K, V> topicExtractor, + final Serializer<K> keySerializer, + final Serializer<V> valSerializer, + final StreamPartitioner<? super K, ? super V> partitioner) { super(name); - this.topic = topic; + this.topicExtractor = topicExtractor; this.keySerializer = keySerializer; this.valSerializer = valSerializer; this.partitioner = partitioner; @@ -83,6 +83,8 @@ public void process(final K key, final V value) { throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + key + ":" + value + ">."); } + final String topic = topicExtractor.extract(key, value, this.context.recordContext()); + try { collector.send(topic, key, value, context.headers(), timestamp, keySerializer, valSerializer, partitioner); } catch (final ClassCastException e) { @@ -115,7 +117,7 @@ public String toString() { public String toString(final String indent) { final StringBuilder sb = new StringBuilder(super.toString(indent)); sb.append(indent).append("\ttopic:\t\t"); - sb.append(topic); + sb.append(topicExtractor); sb.append("\n"); return sb.toString(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 14f986c1c9b..5c278c9c754 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -191,7 +191,7 @@ public Cancellable schedule(long interval, PunctuationType type, Punctuator call * @throws UnsupportedOperationException on every invocation */ @Override - public RecordContext recordContext() { + public ProcessorRecordContext recordContext() { throw new UnsupportedOperationException("this should not happen: recordContext not supported in standby tasks."); } @@ -199,7 +199,7 @@ public RecordContext recordContext() { * @throws UnsupportedOperationException on every invocation */ @Override - public void setRecordContext(final RecordContext recordContext) { + public void setRecordContext(final ProcessorRecordContext recordContext) { throw new UnsupportedOperationException("this should not happen: setRecordContext not supported in standby tasks."); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java new file mode 100644 index 00000000000..c525112cfdc --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.streams.processor.RecordContext; +import org.apache.kafka.streams.processor.TopicNameExtractor; + +/** + * Static topic name extractor + */ +public class StaticTopicNameExtractor<K, V> implements TopicNameExtractor<K, V> { + + public final String topicName; + + public StaticTopicNameExtractor(final String topicName) { + this.topicName = topicName; + } + + public String extract(final K key, final V value, final RecordContext recordContext) { + return topicName; + } + + @Override + public String toString() { + return "StaticTopicNameExtractor(" + topicName + ")"; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java index 7fb0352753b..ea306fbc3c2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java @@ -154,9 +154,7 @@ public String toString(final String indent) { return getStreamsMetadataForKey(storeName, key, - new DefaultStreamPartitioner<>(keySerializer, - clusterMetadata, - sourceTopicsInfo.topicWithMostPartitions), + new DefaultStreamPartitioner<>(keySerializer, clusterMetadata), sourceTopicsInfo); } @@ -254,7 +252,7 @@ private void rebuildMetadata(final Map<HostInfo, Set<TopicPartition>> currentSta final StreamPartitioner<? super K, ?> partitioner, final SourceTopicsInfo sourceTopicsInfo) { - final Integer partition = partitioner.partition(key, null, sourceTopicsInfo.maxPartitions); + final Integer partition = partitioner.partition(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions); final Set<TopicPartition> matchingPartitions = new HashSet<>(); for (String sourceTopic : sourceTopicsInfo.sourceTopics) { matchingPartitions.add(new TopicPartition(sourceTopic, partition)); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java index 6eb9a0a2426..1a3b0752f6b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java @@ -49,7 +49,7 @@ abstract V deserializeCacheValue(final LRUCacheEntry cacheEntry); private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> nextFromCache) { - return nextFromCache.value.value == null; + return nextFromCache.value.value() == null; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java index 285bde5aedf..da308a1f5c7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java @@ -23,8 +23,8 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; -import org.apache.kafka.streams.processor.internals.RecordContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StateSerdes; @@ -87,7 +87,7 @@ public void apply(final List<ThreadCache.DirtyEntry> entries) { } private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) { - final RecordContext current = context.recordContext(); + final ProcessorRecordContext current = context.recordContext(); try { context.setRecordContext(entry.recordContext()); if (flushListener != null) { @@ -179,7 +179,7 @@ public boolean isOpen() { } return rawValue; } else { - return entry.value; + return entry.value(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java index c099fafc269..69506932e53 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java @@ -24,8 +24,8 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; -import org.apache.kafka.streams.processor.internals.RecordContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.SessionStore; import org.apache.kafka.streams.state.StateSerdes; @@ -168,7 +168,7 @@ public void put(final Windowed<Bytes> key, byte[] value) { private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final InternalProcessorContext context) { final Bytes binaryKey = cacheFunction.key(entry.key()); - final RecordContext current = context.recordContext(); + final ProcessorRecordContext current = context.recordContext(); context.setRecordContext(entry.recordContext()); try { final Windowed<K> key = SessionKeySchema.from(binaryKey.get(), serdes.keyDeserializer(), topic); diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java index ca24ffd4e4d..1f08f5157b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java @@ -24,8 +24,8 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; -import org.apache.kafka.streams.processor.internals.RecordContext; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.WindowStore; @@ -107,7 +107,7 @@ private void maybeForward(final ThreadCache.DirtyEntry entry, final Windowed<K> windowedKey, final InternalProcessorContext context) { if (flushListener != null) { - final RecordContext current = context.recordContext(); + final ProcessorRecordContext current = context.recordContext(); context.setRecordContext(entry.recordContext()); try { final V oldValue = sendOldValues ? fetchPrevious(key, windowedKey.window().start()) : null; @@ -174,7 +174,7 @@ public synchronized void put(final Bytes key, final byte[] value, final long tim if (entry == null) { return underlying.fetch(key, timestamp); } else { - return entry.value; + return entry.value(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java index 78c0331a5af..0ac0b77dd37 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java @@ -17,21 +17,18 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.header.Headers; -import org.apache.kafka.streams.processor.internals.RecordContext; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; + +import java.util.Arrays; +import java.util.Objects; /** * A cache entry */ -class LRUCacheEntry implements RecordContext { - - public final byte[] value; - private final Headers headers; - private final long offset; - private final String topic; - private final int partition; - private long timestamp; +class LRUCacheEntry extends ProcessorRecordContext { - private long sizeBytes; + private final byte[] value; + private final long sizeBytes; private boolean isDirty; LRUCacheEntry(final byte[] value) { @@ -45,13 +42,9 @@ final long timestamp, final int partition, final String topic) { + super(timestamp, offset, partition, topic, headers); this.value = value; - this.headers = headers; - this.partition = partition; - this.topic = topic; - this.offset = offset; this.isDirty = isDirty; - this.timestamp = timestamp; this.sizeBytes = (value == null ? 0 : value.length) + 1 + // isDirty 8 + // timestamp @@ -60,47 +53,38 @@ (topic == null ? 0 : topic.length()); } - @Override - public long offset() { - return offset; + void markClean() { + isDirty = false; } - @Override - public long timestamp() { - return timestamp; + boolean isDirty() { + return isDirty; } - @Override - public void setTimestamp(final long timestamp) { - throw new UnsupportedOperationException(); + long size() { + return sizeBytes; } - @Override - public String topic() { - return topic; + byte[] value() { + return value; } @Override - public int partition() { - return partition; + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final LRUCacheEntry that = (LRUCacheEntry) o; + return timestamp() == that.timestamp() && + offset() == that.offset() && + partition() == that.partition() && + Objects.equals(topic(), that.topic()) && + Objects.equals(headers(), that.headers()) && + Arrays.equals(this.value, that.value()) && + this.isDirty == that.isDirty(); } @Override - public Headers headers() { - return headers; - } - - void markClean() { - isDirty = false; - } - - boolean isDirty() { - return isDirty; - } - - public long size() { - return sizeBytes; + public int hashCode() { + return Objects.hash(timestamp(), offset(), topic(), partition(), headers(), value, isDirty); } - - } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java index 8f9d152fafd..7a545e4da3f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java @@ -44,7 +44,7 @@ Bytes deserializeCacheKey(final Bytes cacheKey) { @Override byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) { - return cacheEntry.value; + return cacheEntry.value(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java index 01b577cf95f..dd2b1932993 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java @@ -53,7 +53,7 @@ @Override byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) { - return cacheEntry.value; + return cacheEntry.value(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java index b08cf851e1a..d5bb42121d0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java @@ -48,7 +48,7 @@ Long deserializeCacheKey(final Bytes cacheKey) { @Override byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) { - return cacheEntry.value; + return cacheEntry.value(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java index ef0a44e3128..a48c81a384e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java @@ -61,7 +61,7 @@ @Override byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) { - return cacheEntry.value; + return cacheEntry.value(); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java index d058c9cee4f..92e8b9b043c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java @@ -121,7 +121,7 @@ private void flush(final LRUNode evicted) { // evicted already been removed from the cache so add it to the list of // flushed entries and remove from dirtyKeys. if (evicted != null) { - entries.add(new ThreadCache.DirtyEntry(evicted.key, evicted.entry.value, evicted.entry)); + entries.add(new ThreadCache.DirtyEntry(evicted.key, evicted.entry.value(), evicted.entry)); dirtyKeys.remove(evicted.key); } @@ -130,9 +130,9 @@ private void flush(final LRUNode evicted) { if (node == null) { throw new IllegalStateException("Key = " + key + " found in dirty key set, but entry is null"); } - entries.add(new ThreadCache.DirtyEntry(key, node.entry.value, node.entry)); + entries.add(new ThreadCache.DirtyEntry(key, node.entry.value(), node.entry)); node.entry.markClean(); - if (node.entry.value == null) { + if (node.entry.value() == null) { deleted.add(node.key); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java index 8c3716b3542..7ce03a18c01 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java @@ -19,7 +19,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.internals.RecordContext; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.slf4j.Logger; @@ -332,9 +332,9 @@ public void close() { static class DirtyEntry { private final Bytes key; private final byte[] newValue; - private final RecordContext recordContext; + private final ProcessorRecordContext recordContext; - DirtyEntry(final Bytes key, final byte[] newValue, final RecordContext recordContext) { + DirtyEntry(final Bytes key, final byte[] newValue, final ProcessorRecordContext recordContext) { this.key = key; this.newValue = newValue; this.recordContext = recordContext; @@ -348,7 +348,7 @@ public Bytes key() { return newValue; } - public RecordContext recordContext() { + public ProcessorRecordContext recordContext() { return recordContext; } } diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index 904ebe2a611..297b2434c06 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -406,7 +406,7 @@ public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() { public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() { streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() { @Override - public Integer partition(final String key, final Object value, final int numPartitions) { + public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { return 0; } }); diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index 0c34723ae3c..a845be3569f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder; @@ -95,7 +96,12 @@ public void shouldNotAllowNullNameWhenAddingSink() { @Test(expected = NullPointerException.class) public void shouldNotAllowNullTopicWhenAddingSink() { - topology.addSink("name", null); + topology.addSink("name", (String) null); + } + + @Test(expected = NullPointerException.class) + public void shouldNotAllowNullTopicChooserWhenAddingSink() { + topology.addSink("name", (TopicNameExtractor) null); } @Test(expected = NullPointerException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 6da148a9c97..7aed8e1788f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -38,10 +38,12 @@ import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.test.MockMapper; +import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.StreamsTestUtils; @@ -50,6 +52,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -220,6 +223,26 @@ public void shouldSendDataToTopicUsingProduced() { assertThat(processorSupplier.theCapturedProcessor().processed, equalTo(Collections.singletonList("e:f"))); } + @Test + public void shouldSendDataToDynamicTopics() { + final StreamsBuilder builder = new StreamsBuilder(); + final String input = "topic"; + final KStream<String, String> stream = builder.stream(input, stringConsumed); + stream.to((key, value, context) -> context.topic() + "-" + key + "-" + value.substring(0, 1), + Produced.with(Serdes.String(), Serdes.String())); + builder.stream(input + "-a-v", stringConsumed).process(processorSupplier); + builder.stream(input + "-b-v", stringConsumed).process(processorSupplier); + + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + driver.pipeInput(recordFactory.create(input, "a", "v1")); + driver.pipeInput(recordFactory.create(input, "a", "v2")); + driver.pipeInput(recordFactory.create(input, "b", "v1")); + } + List<MockProcessor<String, String>> mockProcessors = processorSupplier.capturedProcessors(2); + assertThat(mockProcessors.get(0).processed, equalTo(Utils.mkList("a:v1", "a:v2"))); + assertThat(mockProcessors.get(1).processed, equalTo(Collections.singletonList("b:v1"))); + } + @Test public void shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated() { final StreamsBuilder builder = new StreamsBuilder(); @@ -300,12 +323,12 @@ public void shouldNotAllowNullMapperOnFlatMap() { @Test(expected = NullPointerException.class) public void shouldNotAllowNullMapperOnFlatMapValues() { - testStream.flatMapValues((ValueMapper) null); + testStream.flatMapValues((ValueMapper<? super String, ? extends Iterable<? extends String>>) null); } @Test(expected = NullPointerException.class) public void shouldNotAllowNullMapperOnFlatMapValuesWithKey() { - testStream.flatMapValues((ValueMapperWithKey) null); + testStream.flatMapValues((ValueMapperWithKey<? super String, ? super String, ? extends Iterable<? extends String>>) null); } @Test(expected = IllegalArgumentException.class) @@ -325,7 +348,12 @@ public void shouldNotAllowNullTopicOnThrough() { @Test(expected = NullPointerException.class) public void shouldNotAllowNullTopicOnTo() { - testStream.to(null); + testStream.to((String) null); + } + + @Test(expected = NullPointerException.class) + public void shouldNotAllowNullTopicChooserOnTo() { + testStream.to((TopicNameExtractor<String, String>) null); } @Test(expected = NullPointerException.class) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java index 3aafa33e2e4..c41ae15f564 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -57,7 +57,7 @@ public void testCopartitioning() { final Random rand = new Random(); final DefaultPartitioner defaultPartitioner = new DefaultPartitioner(); final WindowedSerializer<Integer> timeWindowedSerializer = new TimeWindowedSerializer<>(intSerializer); - final WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(topicName, timeWindowedSerializer); + final WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(timeWindowedSerializer); for (int k = 0; k < 10; k++) { Integer key = rand.nextInt(); @@ -72,7 +72,7 @@ public void testCopartitioning() { TimeWindow window = new TimeWindow(10 * w, 20 * w); Windowed<Integer> windowedKey = new Windowed<>(key, window); - Integer actual = streamPartitioner.partition(windowedKey, value, infos.size()); + Integer actual = streamPartitioner.partition(topicName, windowedKey, value, infos.size()); assertEquals(expected, actual); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java index 9aaa8a6147a..d3f8dda8aee 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java @@ -49,7 +49,7 @@ private final AbstractProcessorContext context = new TestProcessorContext(metrics); private final MockStateStore stateStore = new MockStateStore("store", false); private final Headers headers = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())}); - private final RecordContext recordContext = new RecordContextStub(10, System.currentTimeMillis(), 1, "foo", headers); + private final ProcessorRecordContext recordContext = new ProcessorRecordContext(10, System.currentTimeMillis(), 1, "foo", headers); @Before public void before() { @@ -95,7 +95,7 @@ public void shouldReturnTopicFromRecordContext() { @Test public void shouldReturnNullIfTopicEqualsNonExistTopic() { - context.setRecordContext(new RecordContextStub(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC)); + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC)); assertThat(context.topic(), nullValue()); } @@ -153,7 +153,7 @@ public void shouldReturnHeadersFromRecordContext() { @Test public void shouldReturnNullIfHeadersAreNotSet() { - context.setRecordContext(new RecordContextStub(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC)); + context.setRecordContext(new ProcessorRecordContext(0, 0, 0, AbstractProcessorContext.NONEXIST_TOPIC)); assertThat(context.headers(), nullValue()); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index d8e66b87be8..1da04255b11 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -25,6 +25,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -411,7 +412,12 @@ public void shouldNotAllowNullNameWhenAddingSink() { @Test(expected = NullPointerException.class) public void shouldNotAllowNullTopicWhenAddingSink() { - builder.addSink("name", null, null, null, null); + builder.addSink("name", (String) null, null, null, null); + } + + @Test(expected = NullPointerException.class) + public void shouldNotAllowNullTopicChooserWhenAddingSink() { + builder.addSink("name", (TopicNameExtractor) null, null, null, null); } @Test(expected = NullPointerException.class) @@ -456,7 +462,7 @@ public void shouldNotSetApplicationIdToNull() { @Test(expected = NullPointerException.class) public void shouldNotAddNullStateStoreSupplier() { - builder.addStateStore((StoreBuilder) null); + builder.addStateStore(null); } private Set<String> nodeNames(final Collection<ProcessorNode> nodes) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 033c0e13825..d88d3b5694a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -420,7 +420,7 @@ private void assertNoOutputRecord(final String topic) { private StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) { return new StreamPartitioner<Object, Object>() { @Override - public Integer partition(final Object key, final Object value, final int numPartitions) { + public Integer partition(final String topic, final Object key, final Object value, final int numPartitions) { return partition; } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index e439372d130..6954eda529f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -73,7 +73,7 @@ private final StreamPartitioner<String, Object> streamPartitioner = new StreamPartitioner<String, Object>() { @Override - public Integer partition(final String key, final Object value, final int numPartitions) { + public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { return Integer.parseInt(key) % numPartitions; } }; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java deleted file mode 100644 index 7afd51eb06d..00000000000 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -import org.apache.kafka.common.header.Headers; - -public class RecordContextStub implements RecordContext { - - private final long offset; - private long timestamp; - private final int partition; - private final String topic; - private final Headers headers; - - public RecordContextStub() { - this(-1, -1, -1, "", null); - } - - public RecordContextStub(final long offset, - final long timestamp, - final int partition, - final String topic, - final Headers headers) { - this.offset = offset; - this.timestamp = timestamp; - this.partition = partition; - this.topic = topic; - this.headers = headers; - } - - public RecordContextStub(final long offset, - final long timestamp, - final int partition, - final String topic) { - this(offset, timestamp, partition, topic, null); - } - - @Override - public long offset() { - return offset; - } - - @Override - public long timestamp() { - return timestamp; - } - - @Override - public void setTimestamp(final long timestamp) { - this.timestamp = timestamp; - } - - @Override - public String topic() { - return topic; - } - - @Override - public int partition() { - return partition; - } - - @Override - public Headers headers() { - return headers; - } -} diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java index 753d26bd86e..dacc17e86e7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java @@ -47,7 +47,7 @@ new Metrics().sensor("skipped-records") ) ); - private final SinkNode sink = new SinkNode<>("anyNodeName", "any-output-topic", anySerializer, anySerializer, null); + private final SinkNode sink = new SinkNode<>("anyNodeName", new StaticTopicNameExtractor("any-output-topic"), anySerializer, anySerializer, null); @Before public void before() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index 5ab2f17aa7a..fce5342b0f5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -65,7 +65,6 @@ private TopicPartition topic1P1; private TopicPartition topic2P1; private TopicPartition topic4P0; - private List<PartitionInfo> partitionInfos; private Cluster cluster; private final String globalTable = "global-table"; private StreamPartitioner<String, Object> partitioner; @@ -113,7 +112,7 @@ public Object apply(final Object value) { hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1)); hostToPartitions.put(hostThree, Collections.singleton(topic3P0)); - partitionInfos = Arrays.asList( + final List<PartitionInfo> partitionInfos = Arrays.asList( new PartitionInfo("topic-one", 0, null, null, null), new PartitionInfo("topic-one", 1, null, null, null), new PartitionInfo("topic-two", 0, null, null, null), @@ -126,7 +125,7 @@ public Object apply(final Object value) { discovery.onChange(hostToPartitions, cluster); partitioner = new StreamPartitioner<String, Object>() { @Override - public Integer partition(final String key, final Object value, final int numPartitions) { + public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { return 1; } }; @@ -246,7 +245,7 @@ public void shouldGetInstanceWithKeyWithMergedStreams() { final StreamsMetadata actual = discovery.getMetadataWithKey("merged-table", "123", new StreamPartitioner<String, Object>() { @Override - public Integer partition(final String key, final Object value, final int numPartitions) { + public Integer partition(final String topic, final String key, final Object value, final int numPartitions) { return 2; } }); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java index 92653ce69b2..3f78be6dd4e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java @@ -72,8 +72,8 @@ public void shouldKeepTrackOfMostRecentlyAndLeastRecentlyUsed() throws IOExcepti cache.put(Bytes.wrap(key), new LRUCacheEntry(value, null, true, 1, 1, 1, "")); LRUCacheEntry head = cache.first(); LRUCacheEntry tail = cache.last(); - assertEquals(new String(head.value), toInsert.get(i).value); - assertEquals(new String(tail.value), toInsert.get(0).value); + assertEquals(new String(head.value()), toInsert.get(i).value); + assertEquals(new String(tail.value()), toInsert.get(0).value); assertEquals(cache.flushes(), 0); assertEquals(cache.hits(), 0); assertEquals(cache.misses(), 0); @@ -116,9 +116,9 @@ public void shouldPutGet() { cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{11})); cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new byte[]{12})); - assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] {0})).value); - assertArrayEquals(new byte[] {11}, cache.get(Bytes.wrap(new byte[] {1})).value); - assertArrayEquals(new byte[] {12}, cache.get(Bytes.wrap(new byte[] {2})).value); + assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] {0})).value()); + assertArrayEquals(new byte[] {11}, cache.get(Bytes.wrap(new byte[] {1})).value()); + assertArrayEquals(new byte[] {12}, cache.get(Bytes.wrap(new byte[] {2})).value()); assertEquals(cache.hits(), 3); } @@ -128,15 +128,15 @@ public void shouldPutIfAbsent() { cache.putIfAbsent(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{20})); cache.putIfAbsent(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new byte[]{30})); - assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] {0})).value); - assertArrayEquals(new byte[] {30}, cache.get(Bytes.wrap(new byte[] {1})).value); + assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] {0})).value()); + assertArrayEquals(new byte[] {30}, cache.get(Bytes.wrap(new byte[] {1})).value()); } @Test public void shouldDeleteAndUpdateSize() { cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new byte[]{10})); final LRUCacheEntry deleted = cache.delete(Bytes.wrap(new byte[]{0})); - assertArrayEquals(new byte[] {10}, deleted.value); + assertArrayEquals(new byte[] {10}, deleted.value()); assertEquals(0, cache.sizeInBytes()); } @@ -146,9 +146,9 @@ public void shouldPutAll() { KeyValue.pair(new byte[] {1}, new LRUCacheEntry(new byte[]{1})), KeyValue.pair(new byte[] {2}, new LRUCacheEntry(new byte[]{2})))); - assertArrayEquals(new byte[]{0}, cache.get(Bytes.wrap(new byte[]{0})).value); - assertArrayEquals(new byte[]{1}, cache.get(Bytes.wrap(new byte[]{1})).value); - assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new byte[]{2})).value); + assertArrayEquals(new byte[]{0}, cache.get(Bytes.wrap(new byte[]{0})).value()); + assertArrayEquals(new byte[]{1}, cache.get(Bytes.wrap(new byte[]{1})).value()); + assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new byte[]{2})).value()); } @Test @@ -157,7 +157,7 @@ public void shouldOverwriteAll() { KeyValue.pair(new byte[] {0}, new LRUCacheEntry(new byte[]{1})), KeyValue.pair(new byte[] {0}, new LRUCacheEntry(new byte[]{2})))); - assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new byte[]{0})).value); + assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new byte[]{0})).value()); assertEquals(cache.overwrites(), 2); } diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java index d100ae5372b..3fa93676dd9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java @@ -66,7 +66,7 @@ public void basicPutGet() throws IOException { Bytes key = Bytes.wrap(kvToInsert.key.getBytes()); LRUCacheEntry entry = cache.get(namespace, key); assertEquals(entry.isDirty(), true); - assertEquals(new String(entry.value), kvToInsert.value); + assertEquals(new String(entry.value()), kvToInsert.value); } assertEquals(cache.gets(), 5); assertEquals(cache.puts(), 5); @@ -188,7 +188,7 @@ public void shouldDelete() { final Bytes key = Bytes.wrap(new byte[]{0}); cache.put(namespace, key, dirtyEntry(key.get())); - assertEquals(key.get(), cache.delete(namespace, key).value); + assertEquals(key.get(), cache.delete(namespace, key).value()); assertNull(cache.get(namespace, key)); } @@ -204,7 +204,7 @@ public void apply(final List<ThreadCache.DirtyEntry> dirty) { } }); cache.put(namespace, key, dirtyEntry(key.get())); - assertEquals(key.get(), cache.delete(namespace, key).value); + assertEquals(key.get(), cache.delete(namespace, key).value()); // flushing should have no further effect cache.flush(namespace); @@ -235,8 +235,8 @@ public void shouldNotClashWithOverlappingNames() { cache.put(namespace1, nameByte, dirtyEntry(nameByte.get())); cache.put(namespace2, nameByte, dirtyEntry(name1Byte.get())); - assertArrayEquals(nameByte.get(), cache.get(namespace1, nameByte).value); - assertArrayEquals(name1Byte.get(), cache.get(namespace2, nameByte).value); + assertArrayEquals(nameByte.get(), cache.get(namespace1, nameByte).value()); + assertArrayEquals(name1Byte.get(), cache.get(namespace2, nameByte).value()); } @Test @@ -413,8 +413,8 @@ public void shouldPutAll() { cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new byte[]{0}), dirtyEntry(new byte[]{5})), KeyValue.pair(Bytes.wrap(new byte[]{1}), dirtyEntry(new byte[]{6})))); - assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new byte[]{0})).value); - assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new byte[]{1})).value); + assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new byte[]{0})).value()); + assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new byte[]{1})).value()); } @Test @@ -436,8 +436,8 @@ public void shouldPutIfAbsent() { final Bytes key = Bytes.wrap(new byte[]{10}); final byte[] value = {30}; assertNull(cache.putIfAbsent(namespace, key, dirtyEntry(value))); - assertArrayEquals(value, cache.putIfAbsent(namespace, key, dirtyEntry(new byte[]{8})).value); - assertArrayEquals(value, cache.get(namespace, key).value); + assertArrayEquals(value, cache.putIfAbsent(namespace, key, dirtyEntry(new byte[]{8})).value()); + assertArrayEquals(value, cache.get(namespace, key).value()); } @Test diff --git a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala index 49d9fe4b064..0f1fc82a489 100644 --- a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala +++ b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala @@ -22,7 +22,7 @@ package kstream import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _} -import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier} +import org.apache.kafka.streams.processor.{Processor, ProcessorContext, ProcessorSupplier, TopicNameExtractor} import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.FunctionConversions._ @@ -249,6 +249,38 @@ class KStream[K, V](val inner: KStreamJ[K, V]) { def to(topic: String)(implicit produced: Produced[K, V]): Unit = inner.to(topic, produced) + /** + * Dynamically materialize this stream to topics using the `Produced` instance for + * configuration of the `Serde key serde`, `Serde value serde`, and `StreamPartitioner`. + * The topic names for each record to send to is dynamically determined based on the given mapper. + * <p> + * The user can either supply the `Produced` instance as an implicit in scope or she can also provide implicit + * key and value serdes that will be converted to a `Produced` instance implicitly. + * <p> + * {{{ + * Example: + * + * // brings implicit serdes in scope + * import Serdes._ + * + * //.. + * val clicksPerRegion: KTable[String, Long] = //.. + * + * // Implicit serdes in scope will generate an implicit Produced instance, which + * // will be passed automatically to the call of through below + * clicksPerRegion.to(topicChooser) + * + * // Similarly you can create an implicit Produced and it will be passed implicitly + * // to the through call + * }}} + * + * @param extractor the extractor to determine the name of the Kafka topic to write to for reach record + * @param (implicit) produced the instance of Produced that gives the serdes and `StreamPartitioner` + * @see `org.apache.kafka.streams.kstream.KStream#to` + */ + def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, V]): Unit = + inner.to(extractor, produced) + /** * Transform each record of the input stream into zero or more records in the output stream (both key and value type * can be altered arbitrarily). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow dynamic routing of output records > --------------------------------------- > > Key: KAFKA-4936 > URL: https://issues.apache.org/jira/browse/KAFKA-4936 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Matthias J. Sax > Assignee: Guozhang Wang > Priority: Major > Labels: kip > Fix For: 2.0.0 > > > Currently, all used output topics must be know beforehand, and thus, it's not > possible to send output records to topic in a dynamic fashion. > There have been couple of request for this feature and we should consider > adding it. There are many open questions however, with regard to topic > creation and configuration (replication factor, number of partitions) etc. > KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-303%3A+Add+Dynamic+Routing+in+Streams+Sink -- This message was sent by Atlassian JIRA (v7.6.3#76005)