[ 
https://issues.apache.org/jira/browse/KAFKA-4936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16495558#comment-16495558
 ] 

ASF GitHub Bot commented on KAFKA-4936:
---------------------------------------

mjsax closed pull request #5018: KAFKA-4936: Add dynamic routing in Streams
URL: https://github.com/apache/kafka/pull/5018
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/developer-guide/dsl-api.html 
b/docs/streams/developer-guide/dsl-api.html
index 4c76c5e84b6..bdfae055f58 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -3023,7 +3023,7 @@ <h2><a class="toc-backref" href="#id7">Overview</a><a 
class="headerlink" href="#
                         <li>KStream -&gt; void</li>
                     </ul>
                 </td>
-                    <td><p class="first"><strong>Terminal operation.</strong>  
Write the records to a Kafka topic.
+                    <td><p class="first"><strong>Terminal operation.</strong>  
Write the records to Kafka topic(s).
                         (<a class="reference external" 
href="../../../javadoc/org/apache/kafka/streams/kstream/KStream.html#to(java.lang.String)">KStream
 details</a>)</p>
                         <p>When to provide serdes explicitly:</p>
                         <ul class="simple">
@@ -3037,6 +3037,8 @@ <h2><a class="toc-backref" href="#id7">Overview</a><a 
class="headerlink" href="#
                         <p>A variant of <code class="docutils literal"><span 
class="pre">to</span></code> exists that enables you to specify how the data is 
produced by using a <code class="docutils literal"><span 
class="pre">Produced</span></code>
                             instance to specify, for example, a <code 
class="docutils literal"><span class="pre">StreamPartitioner</span></code> that 
gives you control over
                             how output records are distributed across the 
partitions of the output topic.</p>
+                        <p>Another variant of <code class="docutils 
literal"><span class="pre">to</span></code> exists that enables you to 
dynamically choose which topic to send to for each record via a <code 
class="docutils literal"><span class="pre">TopicNameExtractor</span></code>
+                            instance.</p>
                         <div class="highlight-java"><div 
class="highlight"><pre><span></span><span class="n">KStream</span><span 
class="o">&lt;</span><span class="n">String</span><span class="o">,</span> 
<span class="n">Long</span><span class="o">&gt;</span> <span 
class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
 <span class="n">KTable</span><span class="o">&lt;</span><span 
class="n">String</span><span class="o">,</span> <span 
class="n">Long</span><span class="o">&gt;</span> <span class="n">table</span> 
<span class="o">=</span> <span class="o">...;</span>
 
diff --git a/docs/streams/developer-guide/processor-api.html 
b/docs/streams/developer-guide/processor-api.html
index ef8937258ad..8236ded5c82 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -384,7 +384,8 @@ <h2><a class="toc-backref" href="#id8">Connecting 
Processors and State Stores</a
                 <li>A predefined persistent key-value state store is created 
and associated with the <code class="docutils literal"><span 
class="pre">&quot;Process&quot;</span></code> node, using
                     <code class="docutils literal"><span 
class="pre">countStoreBuilder</span></code>.</li>
                 <li>A sink processor node is then added to complete the 
topology using the <code class="docutils literal"><span 
class="pre">addSink</span></code> method, taking the <code class="docutils 
literal"><span class="pre">&quot;Process&quot;</span></code> node
-                    as its upstream processor and writing to a separate <code 
class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> 
Kafka topic.</li>
+                    as its upstream processor and writing to a separate <code 
class="docutils literal"><span class="pre">&quot;sink-topic&quot;</span></code> 
Kafka topic (note that users can also use another overloaded variant of <code 
class="docutils literal"><span class="pre">addSink</span></code>
+                    to dynamically determine the Kafka topic to write to for 
each received record from the upstream processor).</li>
             </ul>
             <p>In this topology, the <code class="docutils literal"><span 
class="pre">&quot;Process&quot;</span></code> stream processor node is 
considered a downstream processor of the <code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node, and an
                 upstream processor of the <code class="docutils literal"><span 
class="pre">&quot;Sink&quot;</span></code> node.  As a result, whenever the 
<code class="docutils literal"><span 
class="pre">&quot;Source&quot;</span></code> node forwards a newly fetched 
record from
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 0e3725ebe32..45d1f7df8a3 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -136,6 +136,13 @@ <h3><a id="streams_api_changes_200" 
href="#streams_api_changes_200">Streams API
         The new class <code>To</code> allows you to send records to all or 
specific downstream processors by name and to set the timestamp for the output 
record.
         Forwarding based on child index is not supported in the new API any 
longer.
     </p>
+    <p>
+        We have added support to allow routing records dynamically to Kafka 
topics. More specifically, in both the lower-level 
<code>Topology#addSink</code> and higher-level <code>KStream#to</code> APIs, we 
have added variants that
+        take a <code>TopicNameExtractor</code> instance instead of a specific 
<code>String</code> typed topic name, such that for each received record from 
the upstream processor, the library will dynamically determine which Kafka 
topic to write to
+        based on the record's key and value, as well as record context. Note 
that all the Kafka topics that that may possibly be used are still considered 
as user topics and hence required to be pre-created. In addition to that, we 
have modified the
+        <code>StreamPartitioner</code> interface to add the topic name 
parameter since the topic name now may not be known beforehand; users who have 
customized implementations of this interface would need to update their code 
while upgrading their application
+        to use Kafka Streams 2.0.0.
+    </p>
     <p>
         <a href="https://cwiki.apache.org/confluence/x/DVyHB";>KIP-284</a> 
changed the retention time for repartition topics by setting its default value 
to <code>Long.MAX_VALUE</code>.
         Instead of relying on data retention Kafka Streams uses the new purge 
data API to delete consumed data from those topics and to keep used storage 
small now.
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java 
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index c137a305fb1..22f6ea8362b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -26,6 +26,7 @@
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
@@ -411,7 +412,8 @@ public synchronized Topology addSource(final 
AutoOffsetReset offsetReset,
      * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
      * and write to its topic
      * @return itself
-     * @throws TopologyException itself
+     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the 
parent's name
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
@@ -443,7 +445,8 @@ public synchronized Topology addSink(final String name,
      * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
      * and write to its topic
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the 
parent's name
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
      * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
@@ -471,7 +474,8 @@ public synchronized Topology addSink(final String name,
      * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
      * and write to its topic
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the 
parent's name
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
@@ -501,7 +505,8 @@ public synchronized Topology addSink(final String name,
      * @param parentNames the name of one or more source or processor nodes 
whose output records this sink should consume
      * and write to its topic
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the 
parent's name
      * @see #addSink(String, String, String...)
      * @see #addSink(String, String, StreamPartitioner, String...)
      * @see #addSink(String, String, Serializer, Serializer, String...)
@@ -516,6 +521,130 @@ public synchronized Topology addSink(final String name,
         return this;
     }
 
+    /**
+     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to Kafka topics based on {@code topicExtractor}.
+     * The topics that it may ever send to should be pre-created.
+     * The sink will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
serializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     *
+     * @param name              the unique name of the sink
+     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to which this sink should write for each record
+     * @param parentNames       the name of one or more source or processor 
nodes whose output records this sink should consume
+     *                          and dynamically write to topics
+     * @return                  itself
+     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the 
parent's name
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
+     */
+    public synchronized <K, V> Topology addSink(final String name,
+                                                final TopicNameExtractor<K, V> 
topicExtractor,
+                                                final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topicExtractor, null, null, 
null, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to Kafka topics based on {@code topicExtractor},
+     * using the supplied partitioner.
+     * The topics that it may ever send to should be pre-created.
+     * The sink will use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} and
+     * {@link StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value 
serializer} specified in the
+     * {@link StreamsConfig stream configuration}.
+     * <p>
+     * The sink will also use the specified {@link StreamPartitioner} to 
determine how records are distributed among
+     * the named Kafka topic's partitions.
+     * Such control is often useful with topologies that use {@link 
#addStateStore(StoreBuilder, String...) state
+     * stores} in its processors.
+     * In most other cases, however, a partitioner needs not be specified and 
Kafka will automatically distribute
+     * records among partitions using Kafka's default partitioning logic.
+     *
+     * @param name              the unique name of the sink
+     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to which this sink should write for each record
+     * @param partitioner       the function that should be used to determine 
the partition for each record processed by the sink
+     * @param parentNames       the name of one or more source or processor 
nodes whose output records this sink should consume
+     *                          and dynamically write to topics
+     * @return                  itself
+     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the 
parent's name
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
+     */
+    public synchronized <K, V> Topology addSink(final String name,
+                                                final TopicNameExtractor<K, V> 
topicExtractor,
+                                                final StreamPartitioner<? 
super K, ? super V> partitioner,
+                                                final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topicExtractor, null, null, 
partitioner, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to Kafka topics based on {@code topicExtractor}.
+     * The topics that it may ever send to should be pre-created.
+     * The sink will use the specified key and value serializers.
+     *
+     * @param name              the unique name of the sink
+     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to which this sink should write for each record
+     * @param keySerializer     the {@link Serializer key serializer} used 
when consuming records; may be null if the sink
+     *                          should use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified 
in the
+     *                          {@link StreamsConfig stream configuration}
+     * @param valueSerializer   the {@link Serializer value serializer} used 
when consuming records; may be null if the sink
+     *                          should use the {@link 
StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} 
specified in the
+     *                          {@link StreamsConfig stream configuration}
+     * @param parentNames       the name of one or more source or processor 
nodes whose output records this sink should consume
+     *                          and dynamically write to topics
+     * @return                  itself
+     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the 
parent's name
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, 
StreamPartitioner, String...)
+     */
+    public synchronized <K, V> Topology addSink(final String name,
+                                                final TopicNameExtractor<K, V> 
topicExtractor,
+                                                final Serializer<K> 
keySerializer,
+                                                final Serializer<V> 
valueSerializer,
+                                                final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, 
valueSerializer, null, parentNames);
+        return this;
+    }
+
+    /**
+     * Add a new sink that forwards records from upstream parent processor 
and/or source nodes to Kafka topics based on {@code topicExtractor}.
+     * The topics that it may ever send to should be pre-created.
+     * The sink will use the specified key and value serializers, and the 
supplied partitioner.
+     *
+     * @param name              the unique name of the sink
+     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to which this sink should write for each record
+     * @param keySerializer     the {@link Serializer key serializer} used 
when consuming records; may be null if the sink
+     *                          should use the {@link 
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG default key serializer} specified 
in the
+     *                          {@link StreamsConfig stream configuration}
+     * @param valueSerializer   the {@link Serializer value serializer} used 
when consuming records; may be null if the sink
+     *                          should use the {@link 
StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG default value serializer} 
specified in the
+     *                          {@link StreamsConfig stream configuration}
+     * @param partitioner       the function that should be used to determine 
the partition for each record processed by the sink
+     * @param parentNames       the name of one or more source or processor 
nodes whose output records this sink should consume
+     *                          and dynamically write to topics
+     * @return                  itself
+     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the 
parent's name
+     * @see #addSink(String, String, String...)
+     * @see #addSink(String, String, StreamPartitioner, String...)
+     * @see #addSink(String, String, Serializer, Serializer, String...)
+     */
+    public synchronized <K, V> Topology addSink(final String name,
+                                                final TopicNameExtractor<K, V> 
topicExtractor,
+                                                final Serializer<K> 
keySerializer,
+                                                final Serializer<V> 
valueSerializer,
+                                                final StreamPartitioner<? 
super K, ? super V> partitioner,
+                                                final String... parentNames) {
+        internalTopologyBuilder.addSink(name, topicExtractor, keySerializer, 
valueSerializer, partitioner, parentNames);
+        return this;
+    }
+
     /**
      * Add a new processor node that receives and processes records output by 
one or more parent source or processor
      * node.
@@ -526,7 +655,8 @@ public synchronized Topology addSink(final String name,
      * @param parentNames the name of one or more source or processor nodes 
whose output records this processor should receive
      * and process
      * @return itself
-     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
+     * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name,
+     *                           or if this processor's name is equal to the 
parent's name
      */
     public synchronized Topology addProcessor(final String name,
                                               final ProcessorSupplier supplier,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 131e8d31cbe..04a292f9a97 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -134,6 +134,7 @@
     interface Sink extends Node {
         /**
          * The topic name this sink node is writing to.
+         * Could be null if the topic name can only be dynamically determined 
based on {@code TopicNameExtractor}
          * @return a topic name
          */
         String topic();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index e75bb3aa227..da86a75b9e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -27,6 +27,7 @@
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 
 /**
  * {@code KStream} is an abstraction of a <i>record stream</i> of {@link 
KeyValue} pairs, i.e., each record is an
@@ -461,12 +462,31 @@
      * The specified topic should be manually created before it is used (i.e., 
before the Kafka Streams application is
      * started).
      *
-     * @param produced    the options to use when producing to the topic
      * @param topic       the topic name
+     * @param produced    the options to use when producing to the topic
      */
     void to(final String topic,
             final Produced<K, V> produced);
 
+    /**
+     * Dynamically materialize this stream to topics using default serializers 
specified in the config and producer's
+     * {@link DefaultPartitioner}.
+     * The topic names for each record to send to is dynamically determined 
based on the {@link TopicNameExtractor}.
+     *
+     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to write to for each record
+     */
+    void to(final TopicNameExtractor<K, V> topicExtractor);
+
+    /**
+     * Dynamically materialize this stream to topics using the provided {@link 
Produced} instance.
+     * The topic names for each record to send to is dynamically determined 
based on the {@link TopicNameExtractor}.
+     *
+     * @param topicExtractor    the extractor to determine the name of the 
Kafka topic to write to for each record
+     * @param produced          the options to use when producing to the topic
+     */
+    void to(final TopicNameExtractor<K, V> topicExtractor,
+            final Produced<K, V> produced);
+
     /**
      * Transform each record of the input stream into zero or more records in 
the output stream (both key and value type
      * can be altered arbitrarily).
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 5331a958ac0..7356aff153f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -41,6 +41,8 @@
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
+import org.apache.kafka.streams.processor.internals.StaticTopicNameExtractor;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
@@ -304,27 +306,37 @@ public void to(final String topic) {
         to(topic, Produced.<K, V>with(null, null, null));
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void to(final String topic, final Produced<K, V> produced) {
         Objects.requireNonNull(topic, "topic can't be null");
         Objects.requireNonNull(produced, "Produced can't be null");
-        to(topic, new ProducedInternal<>(produced));
+        to(new StaticTopicNameExtractor<K, V>(topic), new 
ProducedInternal<>(produced));
+    }
 
+    @Override
+    public void to(final TopicNameExtractor<K, V> topicExtractor) {
+        to(topicExtractor, Produced.<K, V>with(null, null, null));
+    }
+
+    @Override
+    public void to(final TopicNameExtractor<K, V> topicExtractor, final 
Produced<K, V> produced) {
+        Objects.requireNonNull(topicExtractor, "topic extractor can't be 
null");
+        Objects.requireNonNull(produced, "Produced can't be null");
+        to(topicExtractor, new ProducedInternal<>(produced));
     }
 
     @SuppressWarnings("unchecked")
-    private void to(final String topic, final ProducedInternal<K, V> produced) 
{
+    private void to(final TopicNameExtractor<K, V> topicExtractor, final 
ProducedInternal<K, V> produced) {
         final String name = builder.newProcessorName(SINK_NAME);
         final Serializer<K> keySerializer = produced.keySerde() == null ? null 
: produced.keySerde().serializer();
         final Serializer<V> valSerializer = produced.valueSerde() == null ? 
null : produced.valueSerde().serializer();
         final StreamPartitioner<? super K, ? super V> partitioner = 
produced.streamPartitioner();
 
         if (partitioner == null && keySerializer instanceof 
WindowedSerializer) {
-            final StreamPartitioner<K, V> windowedPartitioner = 
(StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(topic, 
(WindowedSerializer) keySerializer);
-            builder.internalTopologyBuilder.addSink(name, topic, 
keySerializer, valSerializer, windowedPartitioner, this.name);
+            final StreamPartitioner<K, V> windowedPartitioner = 
(StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, 
V>((WindowedSerializer) keySerializer);
+            builder.internalTopologyBuilder.addSink(name, topicExtractor, 
keySerializer, valSerializer, windowedPartitioner, this.name);
         } else {
-            builder.internalTopologyBuilder.addSink(name, topic, 
keySerializer, valSerializer, partitioner, this.name);
+            builder.internalTopologyBuilder.addSink(name, topicExtractor, 
keySerializer, valSerializer, partitioner, this.name);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
index 7b04f6a3ff6..04a9ab2e9b5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java
@@ -24,11 +24,9 @@
 
 public class WindowedStreamPartitioner<K, V> implements 
StreamPartitioner<Windowed<K>, V> {
 
-    private final String topic;
     private final WindowedSerializer<K> serializer;
 
-    WindowedStreamPartitioner(final String topic, final WindowedSerializer<K> 
serializer) {
-        this.topic = topic;
+    WindowedStreamPartitioner(final WindowedSerializer<K> serializer) {
         this.serializer = serializer;
     }
 
@@ -37,12 +35,14 @@
      * and the current number of partitions. The partition number id 
determined by the original key of the windowed key
      * using the same logic as DefaultPartitioner so that the topic is 
partitioned by the original key.
      *
+     * @param topic the topic name this record is sent to
      * @param windowedKey the key of the record
      * @param value the value of the record
      * @param numPartitions the total number of partitions
      * @return an integer between 0 and {@code numPartitions-1}, or {@code 
null} if the default partitioning logic should be used
      */
-    public Integer partition(final Windowed<K> windowedKey, final V value, 
final int numPartitions) {
+    @Override
+    public Integer partition(final String topic, final Windowed<K> 
windowedKey, final V value, final int numPartitions) {
         final byte[] keyBytes = serializer.serializeBaseKey(topic, 
windowedKey);
 
         // hash the keyBytes to choose a partition
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index f2a9f64359b..d21667fc6dd 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -199,7 +199,7 @@ Cancellable schedule(final long intervalMs,
     long offset();
 
     /**
-     * Returns the headers of the current input record
+     * Returns the headers of the current input record; could be null if it is 
not available
      * @return the headers
      */
     Headers headers();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
 b/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
similarity index 65%
rename from 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
rename to 
streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
index 15add71d89a..5819a4603f5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/RecordContext.java
@@ -14,10 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.processor.internals;
+package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.streams.processor.Processor;
 
 /**
  * The context associated with the current record being processed by
@@ -25,32 +24,32 @@
  */
 public interface RecordContext {
     /**
-     * @return The offset of the original record received from Kafka
+     * @return  The offset of the original record received from Kafka;
+     *          could be -1 if it is not available
      */
     long offset();
 
     /**
-     * @return The timestamp extracted from the record received from Kafka
+     * @return  The timestamp extracted from the record received from Kafka;
+     *          could be -1 if it is not available
      */
     long timestamp();
 
     /**
-     * Sets a new timestamp for the output record.
-     */
-    void setTimestamp(final long timestamp);
-
-    /**
-     * @return The topic the record was received on
+     * @return  The topic the record was received on;
+     *          could be null if it is not available
      */
     String topic();
 
     /**
-     * @return The partition the record was received on
+     * @return  The partition the record was received on;
+     *          could be -1 if it is not available
      */
     int partition();
 
     /**
-     * @return The headers from the record received from Kafka
+     * @return  The headers from the record received from Kafka;
+     *          could be null if it is not available
      */
     Headers headers();
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
index 1fa5e3d8d6b..a435cafb89b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java
@@ -52,11 +52,12 @@
 
     /**
      * Determine the partition number for a record with the given key and 
value and the current number of partitions.
-     * 
+     *
+     * @param topic the topic name this record is sent to
      * @param key the key of the record
      * @param value the value of the record
      * @param numPartitions the total number of partitions
      * @return an integer between 0 and {@code numPartitions-1}, or {@code 
null} if the default partitioning logic should be used
      */
-    Integer partition(K key, V value, int numPartitions);
+    Integer partition(String topic, K key, V value, int numPartitions);
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/TopicNameExtractor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopicNameExtractor.java
new file mode 100644
index 00000000000..5d79751384e
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/TopicNameExtractor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * An interface that allows to dynamically determine the name of the Kafka 
topic to send at the sink node of the topology.
+ */
+@InterfaceStability.Evolving
+public interface TopicNameExtractor<K, V> {
+
+    /**
+     * Extracts the topic name to send to. The topic name must already exist, 
since the Kafka Streams library will not
+     * try to automatically create the topic with the extracted name.
+     *
+     * @param key           the record key
+     * @param value         the record value
+     * @param recordContext current context metadata of the record
+     * @return              the topic name this record should be sent to
+     */
+    String extract(final K key, final V value, final RecordContext 
recordContext);
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 33386696eea..0c3fcf20146 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -42,7 +42,7 @@
     private final ThreadCache cache;
     private final Serde valueSerde;
     private boolean initialized;
-    protected RecordContext recordContext;
+    protected ProcessorRecordContext recordContext;
     protected ProcessorNode currentNode;
     final StateManager stateManager;
 
@@ -178,12 +178,12 @@ public long timestamp() {
     }
 
     @Override
-    public void setRecordContext(final RecordContext recordContext) {
+    public void setRecordContext(final ProcessorRecordContext recordContext) {
         this.recordContext = recordContext;
     }
 
     @Override
-    public RecordContext recordContext() {
+    public ProcessorRecordContext recordContext() {
         return recordContext;
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
index da48e8f1fef..cb64c3aed5c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStreamPartitioner.java
@@ -23,20 +23,18 @@
 
 public class DefaultStreamPartitioner<K, V> implements StreamPartitioner<K, V> 
{
 
-    private final Serializer<K> keySerializer;
     private final Cluster cluster;
-    private final String topic;
+    private final Serializer<K> keySerializer;
     private final DefaultPartitioner defaultPartitioner;
 
-    public DefaultStreamPartitioner(final Serializer<K> keySerializer, final 
Cluster cluster, final String topic) {
-        this.keySerializer = keySerializer;
+    public DefaultStreamPartitioner(final Serializer<K> keySerializer, final 
Cluster cluster) {
         this.cluster = cluster;
-        this.topic = topic;
+        this.keySerializer = keySerializer;
         this.defaultPartitioner = new DefaultPartitioner();
     }
 
     @Override
-    public Integer partition(final K key, final V value, final int 
numPartitions) {
+    public Integer partition(final String topic, final K key, final V value, 
final int numPartitions) {
         final byte[] keyBytes = keySerializer.serialize(topic, key);
         return defaultPartitioner.partition(topic, key, keyBytes, value, null, 
cluster);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 0ebaf60a99f..9439abaab2d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.RecordContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -34,12 +35,12 @@
      * Returns the current {@link RecordContext}
      * @return the current {@link RecordContext}
      */
-    RecordContext recordContext();
+    ProcessorRecordContext recordContext();
 
     /**
-     * @param recordContext the {@link RecordContext} for the record about to 
be processes
+     * @param recordContext the {@link ProcessorRecordContext} for the record 
about to be processes
      */
-    void setRecordContext(RecordContext recordContext);
+    void setRecordContext(ProcessorRecordContext recordContext);
 
     /**
      * @param currentNode the current {@link ProcessorNode}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 1651bbd90b3..7d09031d713 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -25,6 +25,7 @@
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
@@ -319,19 +320,19 @@ Source describe() {
     }
 
     private class SinkNodeFactory<K, V> extends NodeFactory {
-        private final String topic;
         private final Serializer<K> keySerializer;
         private final Serializer<V> valSerializer;
         private final StreamPartitioner<? super K, ? super V> partitioner;
+        private final TopicNameExtractor<K, V> topicExtractor;
 
         private SinkNodeFactory(final String name,
                                 final String[] predecessors,
-                                final String topic,
+                                final TopicNameExtractor<K, V> topicExtractor,
                                 final Serializer<K> keySerializer,
                                 final Serializer<V> valSerializer,
                                 final StreamPartitioner<? super K, ? super V> 
partitioner) {
             super(name, predecessors.clone());
-            this.topic = topic;
+            this.topicExtractor = topicExtractor;
             this.keySerializer = keySerializer;
             this.valSerializer = valSerializer;
             this.partitioner = partitioner;
@@ -339,17 +340,22 @@ private SinkNodeFactory(final String name,
 
         @Override
         public ProcessorNode build() {
-            if (internalTopicNames.contains(topic)) {
-                // prefix the internal topic name with the application id
-                return new SinkNode<>(name, decorateTopic(topic), 
keySerializer, valSerializer, partitioner);
+            if (topicExtractor instanceof StaticTopicNameExtractor) {
+                final String topic = ((StaticTopicNameExtractor) 
topicExtractor).topicName;
+                if (internalTopicNames.contains(topic)) {
+                    // prefix the internal topic name with the application id
+                    return new SinkNode<>(name, new 
StaticTopicNameExtractor<K, V>(decorateTopic(topic)), keySerializer, 
valSerializer, partitioner);
+                } else {
+                    return new SinkNode<>(name, topicExtractor, keySerializer, 
valSerializer, partitioner);
+                }
             } else {
-                return new SinkNode<>(name, topic, keySerializer, 
valSerializer, partitioner);
+                return new SinkNode<>(name, topicExtractor, keySerializer, 
valSerializer, partitioner);
             }
         }
 
         @Override
         Sink describe() {
-            return new Sink(name, topic);
+            return new Sink(name, topicExtractor);
         }
     }
 
@@ -432,6 +438,18 @@ public final void addSource(final Topology.AutoOffsetReset 
offsetReset,
                                      final String... predecessorNames) {
         Objects.requireNonNull(name, "name must not be null");
         Objects.requireNonNull(topic, "topic must not be null");
+        addSink(name, new StaticTopicNameExtractor<K, V>(topic), 
keySerializer, valSerializer, partitioner, predecessorNames);
+        nodeToSinkTopic.put(name, topic);
+    }
+
+    public final <K, V> void addSink(final String name,
+                                     final TopicNameExtractor<K, V> 
topicExtractor,
+                                     final Serializer<K> keySerializer,
+                                     final Serializer<V> valSerializer,
+                                     final StreamPartitioner<? super K, ? 
super V> partitioner,
+                                     final String... predecessorNames) {
+        Objects.requireNonNull(name, "name must not be null");
+        Objects.requireNonNull(topicExtractor, "topic extractor must not be 
null");
         if (nodeFactories.containsKey(name)) {
             throw new TopologyException("Processor " + name + " is already 
added.");
         }
@@ -449,8 +467,7 @@ public final void addSource(final Topology.AutoOffsetReset 
offsetReset,
             }
         }
 
-        nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, 
topic, keySerializer, valSerializer, partitioner));
-        nodeToSinkTopic.put(name, topic);
+        nodeFactories.put(name, new SinkNodeFactory<>(name, predecessorNames, 
topicExtractor, keySerializer, valSerializer, partitioner));
         nodeGrouper.add(name);
         nodeGrouper.unite(name, predecessorNames);
     }
@@ -888,13 +905,18 @@ private void buildSinkNode(final Map<String, 
ProcessorNode> processorMap,
 
         for (final String predecessor : sinkNodeFactory.predecessors) {
             processorMap.get(predecessor).addChild(node);
-            if (internalTopicNames.contains(sinkNodeFactory.topic)) {
-                // prefix the internal topic name with the application id
-                final String decoratedTopic = 
decorateTopic(sinkNodeFactory.topic);
-                topicSinkMap.put(decoratedTopic, node);
-                repartitionTopics.add(decoratedTopic);
-            } else {
-                topicSinkMap.put(sinkNodeFactory.topic, node);
+            if (sinkNodeFactory.topicExtractor instanceof 
StaticTopicNameExtractor) {
+                final String topic = ((StaticTopicNameExtractor) 
sinkNodeFactory.topicExtractor).topicName;
+
+                if (internalTopicNames.contains(topic)) {
+                    // prefix the internal topic name with the application id
+                    final String decoratedTopic = decorateTopic(topic);
+                    topicSinkMap.put(decoratedTopic, node);
+                    repartitionTopics.add(decoratedTopic);
+                } else {
+                    topicSinkMap.put(topic, node);
+                }
+
             }
         }
     }
@@ -1489,17 +1511,26 @@ public int hashCode() {
     }
 
     public final static class Sink extends AbstractNode implements 
TopologyDescription.Sink {
-        private final String topic;
+        private final TopicNameExtractor topicNameExtractor;
+
+        public Sink(final String name,
+                    final TopicNameExtractor topicNameExtractor) {
+            super(name);
+            this.topicNameExtractor = topicNameExtractor;
+        }
 
         public Sink(final String name,
                     final String topic) {
             super(name);
-            this.topic = topic;
+            this.topicNameExtractor = new StaticTopicNameExtractor(topic);
         }
 
         @Override
         public String topic() {
-            return topic;
+            if (topicNameExtractor instanceof StaticTopicNameExtractor)
+                return ((StaticTopicNameExtractor) 
topicNameExtractor).topicName;
+            else
+                return null;
         }
 
         @Override
@@ -1509,7 +1540,7 @@ public void addSuccessor(final TopologyDescription.Node 
successor) {
 
         @Override
         public String toString() {
-            return "Sink: " + name + " (topic: " + topic + ")\n      <-- " + 
nodeNames(predecessors);
+            return "Sink: " + name + " (topic: " + topic() + ")\n      <-- " + 
nodeNames(predecessors);
         }
 
         @Override
@@ -1523,14 +1554,14 @@ public boolean equals(final Object o) {
 
             final Sink sink = (Sink) o;
             return name.equals(sink.name)
-                && topic.equals(sink.topic)
+                && topicNameExtractor.equals(sink.topicNameExtractor)
                 && predecessors.equals(sink.predecessors);
         }
 
         @Override
         public int hashCode() {
             // omit predecessors as it might change and alter the hash code
-            return Objects.hash(name, topic);
+            return Objects.hash(name, topicNameExtractor);
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
index c0715259fcf..dd572649765 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
@@ -17,16 +17,17 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.streams.processor.RecordContext;
 
 import java.util.Objects;
 
 public class ProcessorRecordContext implements RecordContext {
 
-    private long timestamp;
-    private final long offset;
-    private final String topic;
-    private final int partition;
-    private final Headers headers;
+    long timestamp;
+    final long offset;
+    final String topic;
+    final int partition;
+    final Headers headers;
 
     public ProcessorRecordContext(final long timestamp,
                                   final long offset,
@@ -41,18 +42,27 @@ public ProcessorRecordContext(final long timestamp,
         this.headers = headers;
     }
 
+    public ProcessorRecordContext(final long timestamp,
+                                  final long offset,
+                                  final int partition,
+                                  final String topic) {
+        this(timestamp, offset, partition, topic, null);
+    }
+
+    public void setTimestamp(final long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
     public long offset() {
         return offset;
     }
 
+    @Override
     public long timestamp() {
         return timestamp;
     }
 
-    public void setTimestamp(final long timestamp) {
-        this.timestamp = timestamp;
-    }
-
     @Override
     public String topic() {
         return topic;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 1c8b0a09f48..d753648eede 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -49,11 +49,11 @@
 
 public class RecordCollectorImpl implements RecordCollector {
     private final Logger log;
+    private final String logPrefix;
+    private final Sensor skippedRecordsSensor;
     private final Producer<byte[], byte[]> producer;
     private final Map<TopicPartition, Long> offsets;
-    private final String logPrefix;
     private final ProductionExceptionHandler productionExceptionHandler;
-    private final Sensor skippedRecordsSensor;
 
     private final static String LOG_MESSAGE = "Error sending record (key {} 
value {} timestamp {}) to topic {} due to {}; " +
         "No more records will be sent and no more offsets will be recorded for 
this task.";
@@ -88,7 +88,7 @@ public RecordCollectorImpl(final Producer<byte[], byte[]> 
producer,
         if (partitioner != null) {
             final List<PartitionInfo> partitions = 
producer.partitionsFor(topic);
             if (partitions.size() > 0) {
-                partition = partitioner.partition(key, value, 
partitions.size());
+                partition = partitioner.partition(topic, key, value, 
partitions.size());
             } else {
                 throw new StreamsException("Could not get partition 
information for topic '" + topic + "'." +
                     " This can happen if the topic does not exist.");
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index 7711905423e..73bffc80ed3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -19,26 +19,26 @@
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.internals.ChangedSerializer;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 
 public class SinkNode<K, V> extends ProcessorNode<K, V> {
 
-    private final String topic;
     private Serializer<K> keySerializer;
     private Serializer<V> valSerializer;
+    private final TopicNameExtractor<K, V> topicExtractor;
     private final StreamPartitioner<? super K, ? super V> partitioner;
 
-    private ProcessorContext context;
+    private InternalProcessorContext context;
 
-    public SinkNode(final String name,
-                    final String topic,
-                    final Serializer<K> keySerializer,
-                    final Serializer<V> valSerializer,
-                    final StreamPartitioner<? super K, ? super V> partitioner) 
{
+    SinkNode(final String name,
+             final TopicNameExtractor<K, V> topicExtractor,
+             final Serializer<K> keySerializer,
+             final Serializer<V> valSerializer,
+             final StreamPartitioner<? super K, ? super V> partitioner) {
         super(name);
 
-        this.topic = topic;
+        this.topicExtractor = topicExtractor;
         this.keySerializer = keySerializer;
         this.valSerializer = valSerializer;
         this.partitioner = partitioner;
@@ -83,6 +83,8 @@ public void process(final K key, final V value) {
             throw new StreamsException("Invalid (negative) timestamp of " + 
timestamp + " for output record <" + key + ":" + value + ">.");
         }
 
+        final String topic = topicExtractor.extract(key, value, 
this.context.recordContext());
+
         try {
             collector.send(topic, key, value, context.headers(), timestamp, 
keySerializer, valSerializer, partitioner);
         } catch (final ClassCastException e) {
@@ -115,7 +117,7 @@ public String toString() {
     public String toString(final String indent) {
         final StringBuilder sb = new StringBuilder(super.toString(indent));
         sb.append(indent).append("\ttopic:\t\t");
-        sb.append(topic);
+        sb.append(topicExtractor);
         sb.append("\n");
         return sb.toString();
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 14f986c1c9b..5c278c9c754 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -191,7 +191,7 @@ public Cancellable schedule(long interval, PunctuationType 
type, Punctuator call
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public RecordContext recordContext() {
+    public ProcessorRecordContext recordContext() {
         throw new UnsupportedOperationException("this should not happen: 
recordContext not supported in standby tasks.");
     }
 
@@ -199,7 +199,7 @@ public RecordContext recordContext() {
      * @throws UnsupportedOperationException on every invocation
      */
     @Override
-    public void setRecordContext(final RecordContext recordContext) {
+    public void setRecordContext(final ProcessorRecordContext recordContext) {
         throw new UnsupportedOperationException("this should not happen: 
setRecordContext not supported in standby tasks.");
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
new file mode 100644
index 00000000000..c525112cfdc
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StaticTopicNameExtractor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.processor.RecordContext;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
+
+/**
+ * Static topic name extractor
+ */
+public class StaticTopicNameExtractor<K, V> implements TopicNameExtractor<K, 
V> {
+
+    public final String topicName;
+
+    public StaticTopicNameExtractor(final String topicName) {
+        this.topicName = topicName;
+    }
+
+    public String extract(final K key, final V value, final RecordContext 
recordContext) {
+        return topicName;
+    }
+
+    @Override
+    public String toString() {
+        return "StaticTopicNameExtractor(" + topicName + ")";
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
index 7fb0352753b..ea306fbc3c2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java
@@ -154,9 +154,7 @@ public String toString(final String indent) {
 
         return getStreamsMetadataForKey(storeName,
                                         key,
-                                        new 
DefaultStreamPartitioner<>(keySerializer,
-                                                                       
clusterMetadata,
-                                                                       
sourceTopicsInfo.topicWithMostPartitions),
+                                        new 
DefaultStreamPartitioner<>(keySerializer, clusterMetadata),
                                         sourceTopicsInfo);
     }
 
@@ -254,7 +252,7 @@ private void rebuildMetadata(final Map<HostInfo, 
Set<TopicPartition>> currentSta
                                                          final 
StreamPartitioner<? super K, ?> partitioner,
                                                          final 
SourceTopicsInfo sourceTopicsInfo) {
 
-        final Integer partition = partitioner.partition(key, null, 
sourceTopicsInfo.maxPartitions);
+        final Integer partition = 
partitioner.partition(sourceTopicsInfo.topicWithMostPartitions, key, null, 
sourceTopicsInfo.maxPartitions);
         final Set<TopicPartition> matchingPartitions = new HashSet<>();
         for (String sourceTopic : sourceTopicsInfo.sourceTopics) {
             matchingPartitions.add(new TopicPartition(sourceTopic, partition));
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
index 6eb9a0a2426..1a3b0752f6b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractMergedSortedCacheStoreIterator.java
@@ -49,7 +49,7 @@
     abstract V deserializeCacheValue(final LRUCacheEntry cacheEntry);
 
     private boolean isDeletedCacheEntry(final KeyValue<Bytes, LRUCacheEntry> 
nextFromCache) {
-        return nextFromCache.value.value == null;
+        return nextFromCache.value.value() == null;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 285bde5aedf..da308a1f5c7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -23,8 +23,8 @@
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -87,7 +87,7 @@ public void apply(final List<ThreadCache.DirtyEntry> entries) 
{
     }
 
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final 
InternalProcessorContext context) {
-        final RecordContext current = context.recordContext();
+        final ProcessorRecordContext current = context.recordContext();
         try {
             context.setRecordContext(entry.recordContext());
             if (flushListener != null) {
@@ -179,7 +179,7 @@ public boolean isOpen() {
             }
             return rawValue;
         } else {
-            return entry.value;
+            return entry.value();
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index c099fafc269..69506932e53 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -24,8 +24,8 @@
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.SessionStore;
 import org.apache.kafka.streams.state.StateSerdes;
@@ -168,7 +168,7 @@ public void put(final Windowed<Bytes> key, byte[] value) {
 
     private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, final 
InternalProcessorContext context) {
         final Bytes binaryKey = cacheFunction.key(entry.key());
-        final RecordContext current = context.recordContext();
+        final ProcessorRecordContext current = context.recordContext();
         context.setRecordContext(entry.recordContext());
         try {
             final Windowed<K> key = SessionKeySchema.from(binaryKey.get(), 
serdes.keyDeserializer(), topic);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index ca24ffd4e4d..1f08f5157b9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -24,8 +24,8 @@
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
-import org.apache.kafka.streams.processor.internals.RecordContext;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.StateSerdes;
 import org.apache.kafka.streams.state.WindowStore;
@@ -107,7 +107,7 @@ private void maybeForward(final ThreadCache.DirtyEntry 
entry,
                               final Windowed<K> windowedKey,
                               final InternalProcessorContext context) {
         if (flushListener != null) {
-            final RecordContext current = context.recordContext();
+            final ProcessorRecordContext current = context.recordContext();
             context.setRecordContext(entry.recordContext());
             try {
                 final V oldValue = sendOldValues ? fetchPrevious(key, 
windowedKey.window().start()) : null;
@@ -174,7 +174,7 @@ public synchronized void put(final Bytes key, final byte[] 
value, final long tim
         if (entry == null) {
             return underlying.fetch(key, timestamp);
         } else {
-            return entry.value;
+            return entry.value();
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
index 78c0331a5af..0ac0b77dd37 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/LRUCacheEntry.java
@@ -17,21 +17,18 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.streams.processor.internals.RecordContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+
+import java.util.Arrays;
+import java.util.Objects;
 
 /**
  * A cache entry
  */
-class LRUCacheEntry implements RecordContext {
-
-    public final byte[] value;
-    private final Headers headers;
-    private final long offset;
-    private final String topic;
-    private final int partition;
-    private long timestamp;
+class LRUCacheEntry extends ProcessorRecordContext {
 
-    private long sizeBytes;
+    private final byte[] value;
+    private final long sizeBytes;
     private boolean isDirty;
 
     LRUCacheEntry(final byte[] value) {
@@ -45,13 +42,9 @@
                   final long timestamp,
                   final int partition,
                   final String topic) {
+        super(timestamp, offset, partition, topic, headers);
         this.value = value;
-        this.headers = headers;
-        this.partition = partition;
-        this.topic = topic;
-        this.offset = offset;
         this.isDirty = isDirty;
-        this.timestamp = timestamp;
         this.sizeBytes = (value == null ? 0 : value.length) +
                 1 + // isDirty
                 8 + // timestamp
@@ -60,47 +53,38 @@
                 (topic == null ? 0 : topic.length());
     }
 
-    @Override
-    public long offset() {
-        return offset;
+    void markClean() {
+        isDirty = false;
     }
 
-    @Override
-    public long timestamp() {
-        return timestamp;
+    boolean isDirty() {
+        return isDirty;
     }
 
-    @Override
-    public void setTimestamp(final long timestamp) {
-        throw new UnsupportedOperationException();
+    long size() {
+        return sizeBytes;
     }
 
-    @Override
-    public String topic() {
-        return topic;
+    byte[] value() {
+        return value;
     }
 
     @Override
-    public int partition() {
-        return partition;
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        final LRUCacheEntry that = (LRUCacheEntry) o;
+        return timestamp() == that.timestamp() &&
+                offset() == that.offset() &&
+                partition() == that.partition() &&
+                Objects.equals(topic(), that.topic()) &&
+                Objects.equals(headers(), that.headers()) &&
+                Arrays.equals(this.value, that.value()) &&
+                this.isDirty == that.isDirty();
     }
 
     @Override
-    public Headers headers() {
-        return headers;
-    }
-
-    void markClean() {
-        isDirty = false;
-    }
-
-    boolean isDirty() {
-        return isDirty;
-    }
-
-    public long size() {
-        return sizeBytes;
+    public int hashCode() {
+        return Objects.hash(timestamp(), offset(), topic(), partition(), 
headers(), value, isDirty);
     }
-
-
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java
index 8f9d152fafd..7a545e4da3f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheKeyValueBytesStoreIterator.java
@@ -44,7 +44,7 @@ Bytes deserializeCacheKey(final Bytes cacheKey) {
 
     @Override
     byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
-        return cacheEntry.value;
+        return cacheEntry.value();
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
index 01b577cf95f..dd2b1932993 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheSessionStoreIterator.java
@@ -53,7 +53,7 @@
 
     @Override
     byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
-        return cacheEntry.value;
+        return cacheEntry.value();
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
index b08cf851e1a..d5bb42121d0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreIterator.java
@@ -48,7 +48,7 @@ Long deserializeCacheKey(final Bytes cacheKey) {
 
     @Override
     byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
-        return cacheEntry.value;
+        return cacheEntry.value();
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
index ef0a44e3128..a48c81a384e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MergedSortedCacheWindowStoreKeyValueIterator.java
@@ -61,7 +61,7 @@
 
     @Override
     byte[] deserializeCacheValue(final LRUCacheEntry cacheEntry) {
-        return cacheEntry.value;
+        return cacheEntry.value();
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
index d058c9cee4f..92e8b9b043c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java
@@ -121,7 +121,7 @@ private void flush(final LRUNode evicted) {
         // evicted already been removed from the cache so add it to the list of
         // flushed entries and remove from dirtyKeys.
         if (evicted != null) {
-            entries.add(new ThreadCache.DirtyEntry(evicted.key, 
evicted.entry.value, evicted.entry));
+            entries.add(new ThreadCache.DirtyEntry(evicted.key, 
evicted.entry.value(), evicted.entry));
             dirtyKeys.remove(evicted.key);
         }
 
@@ -130,9 +130,9 @@ private void flush(final LRUNode evicted) {
             if (node == null) {
                 throw new IllegalStateException("Key = " + key + " found in 
dirty key set, but entry is null");
             }
-            entries.add(new ThreadCache.DirtyEntry(key, node.entry.value, 
node.entry));
+            entries.add(new ThreadCache.DirtyEntry(key, node.entry.value(), 
node.entry));
             node.entry.markClean();
-            if (node.entry.value == null) {
+            if (node.entry.value() == null) {
                 deleted.add(node.key);
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
index 8c3716b3542..7ce03a18c01 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
@@ -19,7 +19,7 @@
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.internals.RecordContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.slf4j.Logger;
 
@@ -332,9 +332,9 @@ public void close() {
     static class DirtyEntry {
         private final Bytes key;
         private final byte[] newValue;
-        private final RecordContext recordContext;
+        private final ProcessorRecordContext recordContext;
 
-        DirtyEntry(final Bytes key, final byte[] newValue, final RecordContext 
recordContext) {
+        DirtyEntry(final Bytes key, final byte[] newValue, final 
ProcessorRecordContext recordContext) {
             this.key = key;
             this.newValue = newValue;
             this.recordContext = recordContext;
@@ -348,7 +348,7 @@ public Bytes key() {
             return newValue;
         }
 
-        public RecordContext recordContext() {
+        public ProcessorRecordContext recordContext() {
             return recordContext;
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 904ebe2a611..297b2434c06 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -406,7 +406,7 @@ public void 
shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
     public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
         streams.metadataForKey("store", "key", new StreamPartitioner<String, 
Object>() {
             @Override
-            public Integer partition(final String key, final Object value, 
final int numPartitions) {
+            public Integer partition(final String topic, final String key, 
final Object value, final int numPartitions) {
                 return 0;
             }
         });
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 0c34723ae3c..a845be3569f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
@@ -95,7 +96,12 @@ public void shouldNotAllowNullNameWhenAddingSink() {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullTopicWhenAddingSink() {
-        topology.addSink("name", null);
+        topology.addSink("name", (String) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicChooserWhenAddingSink() {
+        topology.addSink("name", (TopicNameExtractor) null);
     }
 
     @Test(expected = NullPointerException.class)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 6da148a9c97..7aed8e1788f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -38,10 +38,12 @@
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockMapper;
+import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.StreamsTestUtils;
@@ -50,6 +52,7 @@
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
@@ -220,6 +223,26 @@ public void shouldSendDataToTopicUsingProduced() {
         assertThat(processorSupplier.theCapturedProcessor().processed, 
equalTo(Collections.singletonList("e:f")));
     }
 
+    @Test
+    public void shouldSendDataToDynamicTopics() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String input = "topic";
+        final KStream<String, String> stream = builder.stream(input, 
stringConsumed);
+        stream.to((key, value, context) -> context.topic() + "-" + key + "-" + 
value.substring(0, 1),
+                  Produced.with(Serdes.String(), Serdes.String()));
+        builder.stream(input + "-a-v", 
stringConsumed).process(processorSupplier);
+        builder.stream(input + "-b-v", 
stringConsumed).process(processorSupplier);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+            driver.pipeInput(recordFactory.create(input, "a", "v1"));
+            driver.pipeInput(recordFactory.create(input, "a", "v2"));
+            driver.pipeInput(recordFactory.create(input, "b", "v1"));
+        }
+        List<MockProcessor<String, String>> mockProcessors = 
processorSupplier.capturedProcessors(2);
+        assertThat(mockProcessors.get(0).processed, 
equalTo(Utils.mkList("a:v1", "a:v2")));
+        assertThat(mockProcessors.get(1).processed, 
equalTo(Collections.singletonList("b:v1")));
+    }
+
     @Test
     public void 
shouldUseRecordMetadataTimestampExtractorWhenInternalRepartitioningTopicCreated()
 {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -300,12 +323,12 @@ public void shouldNotAllowNullMapperOnFlatMap() {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnFlatMapValues() {
-        testStream.flatMapValues((ValueMapper) null);
+        testStream.flatMapValues((ValueMapper<? super String, ? extends 
Iterable<? extends String>>) null);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnFlatMapValuesWithKey() {
-        testStream.flatMapValues((ValueMapperWithKey) null);
+        testStream.flatMapValues((ValueMapperWithKey<? super String, ? super 
String, ? extends Iterable<? extends String>>) null);
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -325,7 +348,12 @@ public void shouldNotAllowNullTopicOnThrough() {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullTopicOnTo() {
-        testStream.to(null);
+        testStream.to((String) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicChooserOnTo() {
+        testStream.to((TopicNameExtractor<String, String>) null);
     }
 
     @Test(expected = NullPointerException.class)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
index 3aafa33e2e4..c41ae15f564 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java
@@ -57,7 +57,7 @@ public void testCopartitioning() {
         final Random rand = new Random();
         final DefaultPartitioner defaultPartitioner = new DefaultPartitioner();
         final WindowedSerializer<Integer> timeWindowedSerializer = new 
TimeWindowedSerializer<>(intSerializer);
-        final WindowedStreamPartitioner<Integer, String> streamPartitioner = 
new WindowedStreamPartitioner<>(topicName, timeWindowedSerializer);
+        final WindowedStreamPartitioner<Integer, String> streamPartitioner = 
new WindowedStreamPartitioner<>(timeWindowedSerializer);
 
         for (int k = 0; k < 10; k++) {
             Integer key = rand.nextInt();
@@ -72,7 +72,7 @@ public void testCopartitioning() {
                 TimeWindow window = new TimeWindow(10 * w, 20 * w);
 
                 Windowed<Integer> windowedKey = new Windowed<>(key, window);
-                Integer actual = streamPartitioner.partition(windowedKey, 
value, infos.size());
+                Integer actual = streamPartitioner.partition(topicName, 
windowedKey, value, infos.size());
 
                 assertEquals(expected, actual);
             }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 9aaa8a6147a..d3f8dda8aee 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -49,7 +49,7 @@
     private final AbstractProcessorContext context = new 
TestProcessorContext(metrics);
     private final MockStateStore stateStore = new MockStateStore("store", 
false);
     private final Headers headers = new RecordHeaders(new Header[]{new 
RecordHeader("key", "value".getBytes())});
-    private final RecordContext recordContext = new RecordContextStub(10, 
System.currentTimeMillis(), 1, "foo", headers);
+    private final ProcessorRecordContext recordContext = new 
ProcessorRecordContext(10, System.currentTimeMillis(), 1, "foo", headers);
 
     @Before
     public void before() {
@@ -95,7 +95,7 @@ public void shouldReturnTopicFromRecordContext() {
 
     @Test
     public void shouldReturnNullIfTopicEqualsNonExistTopic() {
-        context.setRecordContext(new RecordContextStub(0, 0, 0, 
AbstractProcessorContext.NONEXIST_TOPIC));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, 
AbstractProcessorContext.NONEXIST_TOPIC));
         assertThat(context.topic(), nullValue());
     }
 
@@ -153,7 +153,7 @@ public void shouldReturnHeadersFromRecordContext() {
 
     @Test
     public void shouldReturnNullIfHeadersAreNotSet() {
-        context.setRecordContext(new RecordContextStub(0, 0, 0, 
AbstractProcessorContext.NONEXIST_TOPIC));
+        context.setRecordContext(new ProcessorRecordContext(0, 0, 0, 
AbstractProcessorContext.NONEXIST_TOPIC));
         assertThat(context.headers(), nullValue());
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index d8e66b87be8..1da04255b11 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -25,6 +25,7 @@
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
@@ -411,7 +412,12 @@ public void shouldNotAllowNullNameWhenAddingSink() {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullTopicWhenAddingSink() {
-        builder.addSink("name", null, null, null, null);
+        builder.addSink("name", (String) null, null, null, null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullTopicChooserWhenAddingSink() {
+        builder.addSink("name", (TopicNameExtractor) null, null, null, null);
     }
 
     @Test(expected = NullPointerException.class)
@@ -456,7 +462,7 @@ public void shouldNotSetApplicationIdToNull() {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAddNullStateStoreSupplier() {
-        builder.addStateStore((StoreBuilder) null);
+        builder.addStateStore(null);
     }
 
     private Set<String> nodeNames(final Collection<ProcessorNode> nodes) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 033c0e13825..d88d3b5694a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -420,7 +420,7 @@ private void assertNoOutputRecord(final String topic) {
     private StreamPartitioner<Object, Object> constantPartitioner(final 
Integer partition) {
         return new StreamPartitioner<Object, Object>() {
             @Override
-            public Integer partition(final Object key, final Object value, 
final int numPartitions) {
+            public Integer partition(final String topic, final Object key, 
final Object value, final int numPartitions) {
                 return partition;
             }
         };
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index e439372d130..6954eda529f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -73,7 +73,7 @@
 
     private final StreamPartitioner<String, Object> streamPartitioner = new 
StreamPartitioner<String, Object>() {
         @Override
-        public Integer partition(final String key, final Object value, final 
int numPartitions) {
+        public Integer partition(final String topic, final String key, final 
Object value, final int numPartitions) {
             return Integer.parseInt(key) % numPartitions;
         }
     };
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
deleted file mode 100644
index 7afd51eb06d..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordContextStub.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-
-public class RecordContextStub implements RecordContext {
-
-    private final long offset;
-    private long timestamp;
-    private final int partition;
-    private final String topic;
-    private final Headers headers;
-
-    public RecordContextStub() {
-        this(-1, -1, -1, "", null);
-    }
-
-    public RecordContextStub(final long offset,
-                             final long timestamp,
-                             final int partition,
-                             final String topic,
-                             final Headers headers) {
-        this.offset = offset;
-        this.timestamp = timestamp;
-        this.partition = partition;
-        this.topic = topic;
-        this.headers = headers;
-    }
-
-    public RecordContextStub(final long offset,
-                             final long timestamp,
-                             final int partition,
-                             final String topic) {
-        this(offset, timestamp, partition, topic, null);
-    }
-
-    @Override
-    public long offset() {
-        return offset;
-    }
-
-    @Override
-    public long timestamp() {
-        return timestamp;
-    }
-
-    @Override
-    public void setTimestamp(final long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    @Override
-    public String topic() {
-        return topic;
-    }
-
-    @Override
-    public int partition() {
-        return partition;
-    }
-
-    @Override
-    public Headers headers() {
-        return headers;
-    }
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
index 753d26bd86e..dacc17e86e7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SinkNodeTest.java
@@ -47,7 +47,7 @@
             new Metrics().sensor("skipped-records")
         )
     );
-    private final SinkNode sink = new SinkNode<>("anyNodeName", 
"any-output-topic", anySerializer, anySerializer, null);
+    private final SinkNode sink = new SinkNode<>("anyNodeName", new 
StaticTopicNameExtractor("any-output-topic"), anySerializer, anySerializer, 
null);
 
     @Before
     public void before() {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index 5ab2f17aa7a..fce5342b0f5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -65,7 +65,6 @@
     private TopicPartition topic1P1;
     private TopicPartition topic2P1;
     private TopicPartition topic4P0;
-    private List<PartitionInfo> partitionInfos;
     private Cluster cluster;
     private final String globalTable = "global-table";
     private StreamPartitioner<String, Object> partitioner;
@@ -113,7 +112,7 @@ public Object apply(final Object value) {
         hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1));
         hostToPartitions.put(hostThree, Collections.singleton(topic3P0));
 
-        partitionInfos = Arrays.asList(
+        final List<PartitionInfo> partitionInfos = Arrays.asList(
                 new PartitionInfo("topic-one", 0, null, null, null),
                 new PartitionInfo("topic-one", 1, null, null, null),
                 new PartitionInfo("topic-two", 0, null, null, null),
@@ -126,7 +125,7 @@ public Object apply(final Object value) {
         discovery.onChange(hostToPartitions, cluster);
         partitioner = new StreamPartitioner<String, Object>() {
             @Override
-            public Integer partition(final String key, final Object value, 
final int numPartitions) {
+            public Integer partition(final String topic, final String key, 
final Object value, final int numPartitions) {
                 return 1;
             }
         };
@@ -246,7 +245,7 @@ public void shouldGetInstanceWithKeyWithMergedStreams() {
 
         final StreamsMetadata actual = 
discovery.getMetadataWithKey("merged-table", "123", new 
StreamPartitioner<String, Object>() {
             @Override
-            public Integer partition(final String key, final Object value, 
final int numPartitions) {
+            public Integer partition(final String topic, final String key, 
final Object value, final int numPartitions) {
                 return 2;
             }
         });
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
index 92653ce69b2..3f78be6dd4e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/NamedCacheTest.java
@@ -72,8 +72,8 @@ public void 
shouldKeepTrackOfMostRecentlyAndLeastRecentlyUsed() throws IOExcepti
             cache.put(Bytes.wrap(key), new LRUCacheEntry(value, null, true, 1, 
1, 1, ""));
             LRUCacheEntry head = cache.first();
             LRUCacheEntry tail = cache.last();
-            assertEquals(new String(head.value), toInsert.get(i).value);
-            assertEquals(new String(tail.value), toInsert.get(0).value);
+            assertEquals(new String(head.value()), toInsert.get(i).value);
+            assertEquals(new String(tail.value()), toInsert.get(0).value);
             assertEquals(cache.flushes(), 0);
             assertEquals(cache.hits(), 0);
             assertEquals(cache.misses(), 0);
@@ -116,9 +116,9 @@ public void shouldPutGet() {
         cache.put(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new 
byte[]{11}));
         cache.put(Bytes.wrap(new byte[]{2}), new LRUCacheEntry(new 
byte[]{12}));
 
-        assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] 
{0})).value);
-        assertArrayEquals(new byte[] {11}, cache.get(Bytes.wrap(new byte[] 
{1})).value);
-        assertArrayEquals(new byte[] {12}, cache.get(Bytes.wrap(new byte[] 
{2})).value);
+        assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] 
{0})).value());
+        assertArrayEquals(new byte[] {11}, cache.get(Bytes.wrap(new byte[] 
{1})).value());
+        assertArrayEquals(new byte[] {12}, cache.get(Bytes.wrap(new byte[] 
{2})).value());
         assertEquals(cache.hits(), 3);
     }
 
@@ -128,15 +128,15 @@ public void shouldPutIfAbsent() {
         cache.putIfAbsent(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new 
byte[]{20}));
         cache.putIfAbsent(Bytes.wrap(new byte[]{1}), new LRUCacheEntry(new 
byte[]{30}));
 
-        assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] 
{0})).value);
-        assertArrayEquals(new byte[] {30}, cache.get(Bytes.wrap(new byte[] 
{1})).value);
+        assertArrayEquals(new byte[] {10}, cache.get(Bytes.wrap(new byte[] 
{0})).value());
+        assertArrayEquals(new byte[] {30}, cache.get(Bytes.wrap(new byte[] 
{1})).value());
     }
 
     @Test
     public void shouldDeleteAndUpdateSize() {
         cache.put(Bytes.wrap(new byte[]{0}), new LRUCacheEntry(new 
byte[]{10}));
         final LRUCacheEntry deleted = cache.delete(Bytes.wrap(new byte[]{0}));
-        assertArrayEquals(new byte[] {10}, deleted.value);
+        assertArrayEquals(new byte[] {10}, deleted.value());
         assertEquals(0, cache.sizeInBytes());
     }
 
@@ -146,9 +146,9 @@ public void shouldPutAll() {
                                    KeyValue.pair(new byte[] {1}, new 
LRUCacheEntry(new byte[]{1})),
                                    KeyValue.pair(new byte[] {2}, new 
LRUCacheEntry(new byte[]{2}))));
 
-        assertArrayEquals(new byte[]{0}, cache.get(Bytes.wrap(new 
byte[]{0})).value);
-        assertArrayEquals(new byte[]{1}, cache.get(Bytes.wrap(new 
byte[]{1})).value);
-        assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new 
byte[]{2})).value);
+        assertArrayEquals(new byte[]{0}, cache.get(Bytes.wrap(new 
byte[]{0})).value());
+        assertArrayEquals(new byte[]{1}, cache.get(Bytes.wrap(new 
byte[]{1})).value());
+        assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new 
byte[]{2})).value());
     }
 
     @Test
@@ -157,7 +157,7 @@ public void shouldOverwriteAll() {
             KeyValue.pair(new byte[] {0}, new LRUCacheEntry(new byte[]{1})),
             KeyValue.pair(new byte[] {0}, new LRUCacheEntry(new byte[]{2}))));
 
-        assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new 
byte[]{0})).value);
+        assertArrayEquals(new byte[]{2}, cache.get(Bytes.wrap(new 
byte[]{0})).value());
         assertEquals(cache.overwrites(), 2);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
index d100ae5372b..3fa93676dd9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ThreadCacheTest.java
@@ -66,7 +66,7 @@ public void basicPutGet() throws IOException {
             Bytes key = Bytes.wrap(kvToInsert.key.getBytes());
             LRUCacheEntry entry = cache.get(namespace, key);
             assertEquals(entry.isDirty(), true);
-            assertEquals(new String(entry.value), kvToInsert.value);
+            assertEquals(new String(entry.value()), kvToInsert.value);
         }
         assertEquals(cache.gets(), 5);
         assertEquals(cache.puts(), 5);
@@ -188,7 +188,7 @@ public void shouldDelete() {
         final Bytes key = Bytes.wrap(new byte[]{0});
 
         cache.put(namespace, key, dirtyEntry(key.get()));
-        assertEquals(key.get(), cache.delete(namespace, key).value);
+        assertEquals(key.get(), cache.delete(namespace, key).value());
         assertNull(cache.get(namespace, key));
     }
 
@@ -204,7 +204,7 @@ public void apply(final List<ThreadCache.DirtyEntry> dirty) 
{
             }
         });
         cache.put(namespace, key, dirtyEntry(key.get()));
-        assertEquals(key.get(), cache.delete(namespace, key).value);
+        assertEquals(key.get(), cache.delete(namespace, key).value());
 
         // flushing should have no further effect
         cache.flush(namespace);
@@ -235,8 +235,8 @@ public void shouldNotClashWithOverlappingNames() {
         cache.put(namespace1, nameByte, dirtyEntry(nameByte.get()));
         cache.put(namespace2, nameByte, dirtyEntry(name1Byte.get()));
 
-        assertArrayEquals(nameByte.get(), cache.get(namespace1, 
nameByte).value);
-        assertArrayEquals(name1Byte.get(), cache.get(namespace2, 
nameByte).value);
+        assertArrayEquals(nameByte.get(), cache.get(namespace1, 
nameByte).value());
+        assertArrayEquals(name1Byte.get(), cache.get(namespace2, 
nameByte).value());
     }
 
     @Test
@@ -413,8 +413,8 @@ public void shouldPutAll() {
         cache.putAll(namespace, Arrays.asList(KeyValue.pair(Bytes.wrap(new 
byte[]{0}), dirtyEntry(new byte[]{5})),
                                            KeyValue.pair(Bytes.wrap(new 
byte[]{1}), dirtyEntry(new byte[]{6}))));
 
-        assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new 
byte[]{0})).value);
-        assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new 
byte[]{1})).value);
+        assertArrayEquals(new byte[]{5}, cache.get(namespace, Bytes.wrap(new 
byte[]{0})).value());
+        assertArrayEquals(new byte[]{6}, cache.get(namespace, Bytes.wrap(new 
byte[]{1})).value());
     }
 
     @Test
@@ -436,8 +436,8 @@ public void shouldPutIfAbsent() {
         final Bytes key = Bytes.wrap(new byte[]{10});
         final byte[] value = {30};
         assertNull(cache.putIfAbsent(namespace, key, dirtyEntry(value)));
-        assertArrayEquals(value, cache.putIfAbsent(namespace, key, 
dirtyEntry(new byte[]{8})).value);
-        assertArrayEquals(value, cache.get(namespace, key).value);
+        assertArrayEquals(value, cache.putIfAbsent(namespace, key, 
dirtyEntry(new byte[]{8})).value());
+        assertArrayEquals(value, cache.get(namespace, key).value());
     }
 
     @Test
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index 49d9fe4b064..0f1fc82a489 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -22,7 +22,7 @@ package kstream
 
 import org.apache.kafka.streams.KeyValue
 import org.apache.kafka.streams.kstream.{KStream => KStreamJ, _}
-import org.apache.kafka.streams.processor.{Processor, ProcessorContext, 
ProcessorSupplier}
+import org.apache.kafka.streams.processor.{Processor, ProcessorContext, 
ProcessorSupplier, TopicNameExtractor}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionConversions._
 
@@ -249,6 +249,38 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
   def to(topic: String)(implicit produced: Produced[K, V]): Unit =
     inner.to(topic, produced)
 
+  /**
+    * Dynamically materialize this stream to topics using the `Produced` 
instance for
+    * configuration of the `Serde key serde`, `Serde value serde`, and 
`StreamPartitioner`.
+    * The topic names for each record to send to is dynamically determined 
based on the given mapper.
+    * <p>
+    * The user can either supply the `Produced` instance as an implicit in 
scope or she can also provide implicit
+    * key and value serdes that will be converted to a `Produced` instance 
implicitly.
+    * <p>
+    * {{{
+    * Example:
+    *
+    * // brings implicit serdes in scope
+    * import Serdes._
+    *
+    * //..
+    * val clicksPerRegion: KTable[String, Long] = //..
+    *
+    * // Implicit serdes in scope will generate an implicit Produced instance, 
which
+    * // will be passed automatically to the call of through below
+    * clicksPerRegion.to(topicChooser)
+    *
+    * // Similarly you can create an implicit Produced and it will be passed 
implicitly
+    * // to the through call
+    * }}}
+    *
+    * @param extractor the extractor to determine the name of the Kafka topic 
to write to for reach record
+    * @param (implicit) produced the instance of Produced that gives the 
serdes and `StreamPartitioner`
+    * @see `org.apache.kafka.streams.kstream.KStream#to`
+    */
+  def to(extractor: TopicNameExtractor[K, V])(implicit produced: Produced[K, 
V]): Unit =
+    inner.to(extractor, produced)
+
   /**
    * Transform each record of the input stream into zero or more records in 
the output stream (both key and value type
    * can be altered arbitrarily).


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow dynamic routing of output records
> ---------------------------------------
>
>                 Key: KAFKA-4936
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4936
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Matthias J. Sax
>            Assignee: Guozhang Wang
>            Priority: Major
>              Labels: kip
>             Fix For: 2.0.0
>
>
> Currently, all used output topics must be know beforehand, and thus, it's not 
> possible to send output records to topic in a dynamic fashion.
> There have been couple of request for this feature and we should consider 
> adding it. There are many open questions however, with regard to topic 
> creation and configuration (replication factor, number of partitions) etc.
> KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-303%3A+Add+Dynamic+Routing+in+Streams+Sink



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to