This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e1c8ff  KAFKA-6849: add transformValues methods to KTable. (#4959)
4e1c8ff is described below

commit 4e1c8ffd0d44ab7d1c0ca9ac9d70a98b46f35181
Author: Andy Coates <[email protected]>
AuthorDate: Fri May 18 16:06:50 2018 -0700

    KAFKA-6849: add transformValues methods to KTable. (#4959)
    
    See the KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-292%3A+Add+transformValues%28%29+method+to+KTable
    
    This PR adds the transformValues method to the KTable interface. The 
semantics of the call are the same as the methods of the same name on the 
KStream interface.
    
    Fixes KAFKA-6849
    
    Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang 
<[email protected]>
---
 docs/streams/developer-guide/dsl-api.html          |   1 +
 docs/streams/upgrade-guide.html                    |   7 +
 .../org/apache/kafka/streams/kstream/KStream.java  |   8 +-
 .../org/apache/kafka/streams/kstream/KTable.java   | 152 ++++++
 .../kstream/internals/KStreamAggregate.java        |   4 +
 .../internals/KStreamKTableJoinProcessor.java      |   5 +
 .../streams/kstream/internals/KStreamReduce.java   |   4 +
 .../internals/KStreamSessionWindowAggregate.java   |   4 +
 .../kstream/internals/KStreamTransformValues.java  | 120 +----
 .../kstream/internals/KStreamWindowAggregate.java  |   4 +
 .../kstream/internals/KStreamWindowReduce.java     |   4 +
 .../streams/kstream/internals/KTableFilter.java    |   5 +
 .../streams/kstream/internals/KTableImpl.java      |  53 ++
 .../kstream/internals/KTableKTableInnerJoin.java   |  11 +
 .../kstream/internals/KTableKTableLeftJoin.java    |  10 +
 .../kstream/internals/KTableKTableOuterJoin.java   |  10 +
 .../kstream/internals/KTableKTableRightJoin.java   |   9 +
 .../streams/kstream/internals/KTableMapValues.java |   6 +
 .../KTableMaterializedValueGetterSupplier.java     |   4 +
 .../kstream/internals/KTableRepartitionMap.java    |   5 +
 .../internals/KTableSourceValueGetterSupplier.java |   3 +
 .../kstream/internals/KTableTransformValues.java   | 149 ++++++
 .../kstream/internals/KTableValueGetter.java       |   1 +
 .../ForwardingDisabledProcessorContext.java        | 149 ++++++
 .../internals/KStreamTransformValuesTest.java      | 102 +---
 .../streams/kstream/internals/KTableImplTest.java  |  21 +
 .../internals/KTableTransformValuesTest.java       | 547 +++++++++++++++++++++
 .../ForwardingDisabledProcessorContextTest.java    |  59 +++
 .../apache/kafka/test/KTableValueGetterStub.java   |   4 +
 .../kafka/test/SingletonNoOpValueTransformer.java} |  46 +-
 .../kafka/streams/scala/kstream/KStream.scala      |   4 +-
 .../kafka/streams/scala/kstream/KTable.scala       |  56 +++
 32 files changed, 1331 insertions(+), 236 deletions(-)

diff --git a/docs/streams/developer-guide/dsl-api.html 
b/docs/streams/developer-guide/dsl-api.html
index 3895522..4c76c5e 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -2906,6 +2906,7 @@ t=5 (blue), which lead to a merge of sessions and an 
extension of a session, res
                     <tr class="row-even"><td><p 
class="first"><strong>Transform (values only)</strong></p>
                         <ul class="last simple">
                             <li>KStream -&gt; KStream</li>
+                            <li>KTable -&gt; KTable</li>
                         </ul>
                     </td>
                         <td><p class="first">Applies a <code class="docutils 
literal"><span class="pre">ValueTransformer</span></code> to each record, while 
retaining the key of the original record.
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 7788638..0e3725e 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -193,6 +193,13 @@
     </p>
 
     <p>
+        New method in <code>KTable</code>
+    </p>
+    <ul>
+        <li> <code>transformValues</code> methods have been added to 
<code>KTable</code>. Similar to those on <code>KStream</code>, these methods 
allow for richer, stateful, value transformation similar to the Processor 
API.</li>
+    </ul>
+
+    <p>
        New method in <code>GlobalKTable</code>
     </p>
     <ul>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 1d42f11..e75bb3a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -544,12 +544,12 @@ public interface KStream<K, V> {
 
     /**
      * Transform the value of each input record into a new value (with 
possible new type) of the output record.
-     * A {@link ValueTransformer} (provided by the given {@link 
ValueTransformerSupplier}) is applies to each input
+     * A {@link ValueTransformer} (provided by the given {@link 
ValueTransformerSupplier}) is applied to each input
      * record value and computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K:V'>}.
      * This is a stateful record-by-record operation (cf. {@link 
#mapValues(ValueMapper)}).
      * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing 
progress can be observed and additional
-     * periodic actions get be performed.
+     * periodic actions can be performed.
      * <p>
      * In order to assign a state, the state must be created and registered 
beforehand:
      * <pre>{@code
@@ -613,12 +613,12 @@ public interface KStream<K, V> {
 
     /**
      * Transform the value of each input record into a new value (with 
possible new type) of the output record.
-     * A {@link ValueTransformerWithKey} (provided by the given {@link 
ValueTransformerWithKeySupplier}) is applies to each input
+     * A {@link ValueTransformerWithKey} (provided by the given {@link 
ValueTransformerWithKeySupplier}) is applied to each input
      * record value and computes a new value for it.
      * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K:V'>}.
      * This is a stateful record-by-record operation (cf. {@link 
#mapValues(ValueMapperWithKey)}).
      * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing 
progress can be observed and additional
-     * periodic actions get be performed.
+     * periodic actions can be performed.
      * <p>
      * In order to assign a state, the state must be created and registered 
beforehand:
      * <pre>{@code
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index dbead79..da540ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
@@ -383,6 +384,157 @@ public interface KTable<K, V> {
     <KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? 
extends KR> mapper);
 
     /**
+     * Create a new {@code KTable} by transforming the value of each record in 
this {@code KTable} into a new value,
+     * (with possibly new type).
+     * A {@link ValueTransformerWithKey} (provided by the given {@link 
ValueTransformerWithKeySupplier}) is applied to each input
+     * record value and computes a new value for it.
+     * Thus, an input record {@code <K,V>} can be transformed into an output 
record {@code <K:V'>}.
+     * This is similar to {@link #mapValues(ValueMapperWithKey)}, but more 
flexible, allowing access to additional state-stores,
+     * and access to the {@link ProcessorContext}.
+     * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing 
progress can be observed and additional
+     * periodic actions can be performed.
+     * <p>
+     * If the downstream topology uses aggregation functions, (e.g. {@link 
KGroupedTable#reduce}, {@link KGroupedTable#aggregate}, etc),
+     * care must be taken when dealing with state, (either held in 
state-stores or transformer instances), to ensure correct aggregate results.
+     * In contrast, if the resulting KTable is materialized, (cf. {@link 
#transformValues(ValueTransformerWithKeySupplier, Materialized, String...)}),
+     * such concerns are handled for you.
+     * <p>
+     * In order to assign a state, the state must be created and registered 
beforehand:
+     * <pre>{@code
+     * // create store
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * KTable outputTable = inputTable.transformValues(new 
ValueTransformerWithKeySupplier() { ... }, "myValueTransformState");
+     * }</pre>
+     * <p>
+     * Within the {@link ValueTransformerWithKey}, the state is obtained via 
the
+     * {@link ProcessorContext}.
+     * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+     * a schedule must be registered.
+     * <pre>{@code
+     * new ValueTransformerWithKeySupplier() {
+     *     ValueTransformerWithKey get() {
+     *         return new ValueTransformerWithKey() {
+     *             private KeyValueStore<String, String> state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.state = (KeyValueStore<String, 
String>)context.getStateStore("myValueTransformState");
+     *                 context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, 
new Punctuator(..)); // punctuate each 1000ms, can access this.state
+     *             }
+     *
+     *             NewValueType transform(K readOnlyKey, V value) {
+     *                 // can access this.state and use read-only key
+     *                 return new NewValueType(readOnlyKey); // or null
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }</pre>
+     * <p>
+     * Note that the key is read-only and should not be modified, as this can 
lead to corrupt partitioning.
+     * Setting a new value preserves data co-location with respect to the key.
+     *
+     * @param transformerSupplier a instance of {@link 
ValueTransformerWithKeySupplier} that generates a
+     *                            {@link ValueTransformerWithKey}.
+     *                            At least one transformer instance will be 
created per streaming task.
+     *                            Transformers do not need to be thread-safe.
+     * @param stateStoreNames     the names of the state stores used by the 
processor
+     * @param <VR>                the value type of the result table
+     * @return a {@code KTable} that contains records with unmodified key and 
new values (possibly of different type)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     */
+    <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? 
super K, ? super V, ? extends VR> transformerSupplier,
+                                       final String... stateStoreNames);
+
+    /**
+     * Create a new {@code KTable} by transforming the value of each record in 
this {@code KTable} into a new value,
+     * (with possibly new type).
+     * A {@link ValueTransformerWithKey} (provided by the given {@link 
ValueTransformerWithKeySupplier}) is applied to each input
+     * record value and computes a new value for it.
+     * This is similar to {@link #mapValues(ValueMapperWithKey)}, but more 
flexible, allowing stateful, rather than stateless,
+     * record-by-record operation, access to additional state-stores, and 
access to the {@link ProcessorContext}.
+     * Furthermore, via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long)} the processing 
progress can be observed and additional
+     * periodic actions can be performed.
+     * The resulting {@code KTable} is materialized into another state store 
(additional to the provided state store names)
+     * as specified by the user via {@link Materialized} parameter, and is 
queryable through its given name.
+     * <p>
+     * In order to assign a state, the state must be created and registered 
beforehand:
+     * <pre>{@code
+     * // create store
+     * StoreBuilder<KeyValueStore<String,String>> keyValueStoreBuilder =
+     *         
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("myValueTransformState"),
+     *                 Serdes.String(),
+     *                 Serdes.String());
+     * // register store
+     * builder.addStateStore(keyValueStoreBuilder);
+     *
+     * KTable outputTable = inputTable.transformValues(
+     *     new ValueTransformerWithKeySupplier() { ... },
+     *     Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("outputTable")
+     *                                 .withKeySerde(Serdes.String())
+     *                                 .withValueSerde(Serdes.String()),
+     *     "myValueTransformState");
+     * }</pre>
+     * <p>
+     * Within the {@link ValueTransformerWithKey}, the state is obtained via 
the
+     * {@link ProcessorContext}.
+     * To trigger periodic actions via {@link 
org.apache.kafka.streams.processor.Punctuator#punctuate(long) punctuate()},
+     * a schedule must be registered.
+     * <pre>{@code
+     * new ValueTransformerWithKeySupplier() {
+     *     ValueTransformerWithKey get() {
+     *         return new ValueTransformerWithKey() {
+     *             private KeyValueStore<String, String> state;
+     *
+     *             void init(ProcessorContext context) {
+     *                 this.state = (KeyValueStore<String, 
String>)context.getStateStore("myValueTransformState");
+     *                 context.schedule(1000, PunctuationType.WALL_CLOCK_TIME, 
new Punctuator(..)); // punctuate each 1000ms, can access this.state
+     *             }
+     *
+     *             NewValueType transform(K readOnlyKey, V value) {
+     *                 // can access this.state and use read-only key
+     *                 return new NewValueType(readOnlyKey); // or null
+     *             }
+     *
+     *             void close() {
+     *                 // can access this.state
+     *             }
+     *         }
+     *     }
+     * }
+     * }</pre>
+     * <p>
+     * Note that the key is read-only and should not be modified, as this can 
lead to corrupt partitioning.
+     * Setting a new value preserves data co-location with respect to the key.
+     *
+     * @param transformerSupplier a instance of {@link 
ValueTransformerWithKeySupplier} that generates a
+     *                            {@link ValueTransformerWithKey}.
+     *                            At least one transformer instance will be 
created per streaming task.
+     *                            Transformers do not need to be thread-safe.
+     * @param materialized        an instance of {@link Materialized} used to 
describe how the state store of the
+     *                            resulting table should be materialized.
+     *                            Cannot be {@code null}
+     * @param stateStoreNames     the names of the state stores used by the 
processor
+     * @param <VR>                the value type of the result table
+     * @return a {@code KTable} that contains records with unmodified key and 
new values (possibly of different type)
+     * @see #mapValues(ValueMapper)
+     * @see #mapValues(ValueMapperWithKey)
+     */
+    <VR> KTable<K, VR> transformValues(final ValueTransformerWithKeySupplier<? 
super K, ? super V, ? extends VR> transformerSupplier,
+                                       final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized,
+                                       final String... stateStoreNames);
+
+    /**
      * Re-groups the records of this {@code KTable} using the provided {@link 
KeyValueMapper} and default serializers
      * and deserializers.
      * Each {@link KeyValue} pair of this {@code KTable} is mapped to a new 
{@link KeyValue} pair by applying the
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
index 1170ff6..1b3a8f4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java
@@ -126,5 +126,9 @@ public class KStreamAggregate<K, V, T> implements 
KStreamAggProcessorSupplier<K,
         public T get(final K key) {
             return store.get(key);
         }
+
+        @Override
+        public void close() {
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
index 157c91b..70b9bad 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
@@ -74,4 +74,9 @@ class KStreamKTableJoinProcessor<K1, K2, V1, V2, R> extends 
AbstractProcessor<K1
             }
         }
     }
+
+    @Override
+    public void close() {
+        valueGetter.close();
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
index fff9348..9f404ea 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java
@@ -123,6 +123,10 @@ public class KStreamReduce<K, V> implements 
KStreamAggProcessorSupplier<K, K, V,
         public V get(final K key) {
             return store.get(key);
         }
+
+        @Override
+        public void close() {
+        }
     }
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
index 114c685..b91431a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java
@@ -175,6 +175,10 @@ class KStreamSessionWindowAggregate<K, V, T> implements 
KStreamAggProcessorSuppl
                 return value;
             }
         }
+
+        @Override
+        public void close() {
+        }
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index d45b7cf..b645960 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -16,24 +16,12 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
-import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-
-import java.io.File;
-import java.util.Map;
+import 
org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 
 public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, 
V> {
 
@@ -59,111 +47,7 @@ public class KStreamTransformValues<K, V, R> implements 
ProcessorSupplier<K, V>
 
         @Override
         public void init(final ProcessorContext context) {
-            valueTransformer.init(
-                new ProcessorContext() {
-                    @Override
-                    public String applicationId() {
-                        return context.applicationId();
-                    }
-
-                    @Override
-                    public TaskId taskId() {
-                        return context.taskId();
-                    }
-
-                    @Override
-                    public Serde<?> keySerde() {
-                        return context.keySerde();
-                    }
-
-                    @Override
-                    public Serde<?> valueSerde() {
-                        return context.valueSerde();
-                    }
-
-                    @Override
-                    public File stateDir() {
-                        return context.stateDir();
-                    }
-
-                    @Override
-                    public StreamsMetrics metrics() {
-                        return context.metrics();
-                    }
-
-                    @Override
-                    public void register(final StateStore store,
-                                         final StateRestoreCallback 
stateRestoreCallback) {
-                        context.register(store, stateRestoreCallback);
-                    }
-
-                    @Override
-                    public StateStore getStateStore(final String name) {
-                        return context.getStateStore(name);
-                    }
-
-                    @Override
-                    public Cancellable schedule(final long interval, final 
PunctuationType type, final Punctuator callback) {
-                        return context.schedule(interval, type, callback);
-                    }
-
-                    @Override
-                    public <K, V> void forward(final K key, final V value) {
-                        throw new StreamsException("ProcessorContext#forward() 
must not be called within TransformValues.");
-                    }
-
-                    @Override
-                    public <K, V> void forward(final K key, final V value, 
final To to) {
-                        throw new StreamsException("ProcessorContext#forward() 
must not be called within TransformValues.");
-                    }
-
-                    @SuppressWarnings("deprecation")
-                    @Override
-                    public <K, V> void forward(final K key, final V value, 
final int childIndex) {
-                        throw new StreamsException("ProcessorContext#forward() 
must not be called within TransformValues.");
-                    }
-
-                    @SuppressWarnings("deprecation")
-                    @Override
-                    public <K, V> void forward(final K key, final V value, 
final String childName) {
-                        throw new StreamsException("ProcessorContext#forward() 
must not be called within TransformValues.");
-                    }
-
-                    @Override
-                    public void commit() {
-                        context.commit();
-                    }
-
-                    @Override
-                    public String topic() {
-                        return context.topic();
-                    }
-
-                    @Override
-                    public int partition() {
-                        return context.partition();
-                    }
-
-                    @Override
-                    public long offset() {
-                        return context.offset();
-                    }
-
-                    @Override
-                    public long timestamp() {
-                        return context.timestamp();
-                    }
-
-                    @Override
-                    public Map<String, Object> appConfigs() {
-                        return context.appConfigs();
-                    }
-
-                    @Override
-                    public Map<String, Object> appConfigsWithPrefix(String 
prefix) {
-                        return context.appConfigsWithPrefix(prefix);
-                    }
-                });
+            valueTransformer.init(new 
ForwardingDisabledProcessorContext(context));
             this.context = context;
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
index f4bd099..a774762 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java
@@ -145,5 +145,9 @@ public class KStreamWindowAggregate<K, V, T, W extends 
Window> implements KStrea
 
             return windowStore.fetch(key, window.start());
         }
+
+        @Override
+        public void close() {
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
index 32dfba3..9db861d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowReduce.java
@@ -141,5 +141,9 @@ public class KStreamWindowReduce<K, V, W extends Window> 
implements KStreamAggPr
 
             return windowStore.fetch(key, window.start());
         }
+
+        @Override
+        public void close() {
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
index ee8982b..d020e4b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java
@@ -131,6 +131,11 @@ class KTableFilter<K, V> implements 
KTableProcessorSupplier<K, V, V> {
         public V get(final K key) {
             return computeValue(key, parentGetter.get(key));
         }
+
+        @Override
+        public void close() {
+            parentGetter.close();
+        }
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index c1f0f7a..bcd31bb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
@@ -63,6 +64,8 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
 
     private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-";
 
+    private static final String TRANSFORMVALUES_NAME = 
"KTABLE-TRANSFORMVALUES-";
+
     private final ProcessorSupplier<?, ?> processorSupplier;
 
     private final String queryableStoreName;
@@ -220,6 +223,56 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> 
implements KTable<K,
     }
 
     @Override
+    public <VR> KTable<K, VR> transformValues(final 
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> 
transformerSupplier,
+                                              final String... stateStoreNames) 
{
+        return doTransformValues(transformerSupplier, null, stateStoreNames);
+    }
+
+    @Override
+    public <VR> KTable<K, VR> transformValues(final 
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> 
transformerSupplier,
+                                              final Materialized<K, VR, 
KeyValueStore<Bytes, byte[]>> materialized,
+                                              final String... stateStoreNames) 
{
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        return doTransformValues(transformerSupplier,
+            new MaterializedInternal<>(materialized, builder, 
TRANSFORMVALUES_NAME), stateStoreNames);
+    }
+
+    private <VR> KTable<K, VR> doTransformValues(final 
ValueTransformerWithKeySupplier<? super K, ? super V, ? extends VR> 
transformerSupplier,
+                                                 final MaterializedInternal<K, 
VR, KeyValueStore<Bytes, byte[]>> materialized,
+                                                 final String... 
stateStoreNames) {
+        Objects.requireNonNull(stateStoreNames, "stateStoreNames");
+
+        final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
+
+        final boolean shouldMaterialize = materialized != null && 
materialized.isQueryable();
+
+        final KTableProcessorSupplier<K, V, VR> processorSupplier = new 
KTableTransformValues<>(
+            this,
+            transformerSupplier,
+            shouldMaterialize ? materialized.storeName() : null);
+
+        builder.internalTopologyBuilder.addProcessor(name, processorSupplier, 
this.name);
+
+        if (stateStoreNames.length > 0) {
+            
builder.internalTopologyBuilder.connectProcessorAndStateStores(name, 
stateStoreNames);
+        }
+
+        if (shouldMaterialize) {
+            builder.internalTopologyBuilder.addStateStore(
+                new KeyValueStoreMaterializer<>(materialized).materialize(),
+                name);
+        }
+
+        return new KTableImpl<>(
+            builder,
+            name,
+            processorSupplier,
+            sourceNodes,
+            shouldMaterialize ? materialized.storeName() : 
this.queryableStoreName,
+            shouldMaterialize);
+    }
+
+    @Override
     public KStream<K, V> toStream() {
         String name = builder.newProcessorName(TOSTREAM_NAME);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
index 5a159ff..a7bbb56 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java
@@ -109,6 +109,11 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
             context().forward(key, new Change<>(newValue, oldValue));
         }
+
+        @Override
+        public void close() {
+            valueGetter.close();
+        }
     }
 
     private class KTableKTableInnerJoinValueGetter implements 
KTableValueGetter<K, R> {
@@ -144,5 +149,11 @@ class KTableKTableInnerJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
                 return null;
             }
         }
+
+        @Override
+        public void close() {
+            valueGetter1.close();
+            valueGetter2.close();
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
index a40ac5d..cbb63c6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java
@@ -102,6 +102,11 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
             context().forward(key, new Change<>(newValue, oldValue));
         }
+
+        @Override
+        public void close() {
+            valueGetter.close();
+        }
     }
 
     private class KTableKTableLeftJoinValueGetter implements 
KTableValueGetter<K, R> {
@@ -133,6 +138,11 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
             }
         }
 
+        @Override
+        public void close() {
+            valueGetter1.close();
+            valueGetter2.close();
+        }
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
index 377e2d9..27eb698 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java
@@ -101,6 +101,11 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
 
             context().forward(key, new Change<>(newValue, oldValue));
         }
+
+        @Override
+        public void close() {
+            valueGetter.close();
+        }
     }
 
     private class KTableKTableOuterJoinValueGetter implements 
KTableValueGetter<K, R> {
@@ -133,6 +138,11 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
             return newValue;
         }
 
+        @Override
+        public void close() {
+            valueGetter1.close();
+            valueGetter2.close();
+        }
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
index d3916dd..1e634d0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java
@@ -100,6 +100,10 @@ class KTableKTableRightJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
             context().forward(key, new Change<>(newValue, oldValue));
         }
 
+        @Override
+        public void close() {
+            valueGetter.close();
+        }
     }
 
     private class KTableKTableRightJoinValueGetter implements 
KTableValueGetter<K, R> {
@@ -131,6 +135,11 @@ class KTableKTableRightJoin<K, R, V1, V2> extends 
KTableKTableAbstractJoin<K, R,
             }
         }
 
+        @Override
+        public void close() {
+            valueGetter1.close();
+            valueGetter2.close();
+        }
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
index 0970b42..1106a94 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMapValues.java
@@ -125,6 +125,12 @@ class KTableMapValues<K, V, V1> implements 
KTableProcessorSupplier<K, V, V1> {
         public V1 get(final K key) {
             return computeValue(key, parentGetter.get(key));
         }
+
+
+        @Override
+        public void close() {
+            parentGetter.close();
+        }
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
index 4ceccce..0c17d59 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
@@ -50,5 +50,9 @@ public class KTableMaterializedValueGetterSupplier<K, V> 
implements KTableValueG
         public V get(final K key) {
             return store.get(key);
         }
+
+        @Override
+        public void close() {
+        }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index 20d943e..e86445a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -115,6 +115,11 @@ public class KTableRepartitionMap<K, V, K1, V1> implements 
KTableProcessorSuppli
         public KeyValue<K1, V1> get(final K key) {
             return mapper.apply(key, parentGetter.get(key));
         }
+
+        @Override
+        public void close() {
+            parentGetter.close();
+        }
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
index 714b8c5..ebf16d1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSourceValueGetterSupplier.java
@@ -49,6 +49,9 @@ public class KTableSourceValueGetterSupplier<K, V> implements 
KTableValueGetterS
             return store.get(key);
         }
 
+        @Override
+        public void close() {
+        }
     }
 
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
new file mode 100644
index 0000000..b3e84d7
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import 
org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.Objects;
+
+class KTableTransformValues<K, V, V1> implements KTableProcessorSupplier<K, V, 
V1> {
+
+    private final KTableImpl<K, ?, V> parent;
+    private final ValueTransformerWithKeySupplier<? super K, ? super V, ? 
extends V1> transformerSupplier;
+    private final String queryableName;
+    private boolean sendOldValues = false;
+
+    KTableTransformValues(final KTableImpl<K, ?, V> parent,
+                          final ValueTransformerWithKeySupplier<? super K, ? 
super V, ? extends V1> transformerSupplier,
+                          final String queryableName) {
+        this.parent = Objects.requireNonNull(parent, "parent");
+        this.transformerSupplier = Objects.requireNonNull(transformerSupplier, 
"transformerSupplier");
+        this.queryableName = queryableName;
+    }
+
+    @Override
+    public Processor<K, Change<V>> get() {
+        return new KTableTransformValuesProcessor(transformerSupplier.get());
+    }
+
+    @Override
+    public KTableValueGetterSupplier<K, V1> view() {
+        if (queryableName != null) {
+            return new KTableMaterializedValueGetterSupplier<>(queryableName);
+        }
+
+        return new KTableValueGetterSupplier<K, V1>() {
+            final KTableValueGetterSupplier<K, V> parentValueGetterSupplier = 
parent.valueGetterSupplier();
+
+            public KTableValueGetter<K, V1> get() {
+                return new KTableTransformValuesGetter(
+                    parentValueGetterSupplier.get(),
+                    transformerSupplier.get());
+            }
+
+            @Override
+            public String[] storeNames() {
+                return parentValueGetterSupplier.storeNames();
+            }
+        };
+    }
+
+    @Override
+    public void enableSendingOldValues() {
+        parent.enableSendingOldValues();
+        sendOldValues = true;
+    }
+
+    private class KTableTransformValuesProcessor extends AbstractProcessor<K, 
Change<V>> {
+        private final ValueTransformerWithKey<? super K, ? super V, ? extends 
V1> valueTransformer;
+        private KeyValueStore<K, V1> store;
+        private TupleForwarder<K, V1> tupleForwarder;
+
+        private KTableTransformValuesProcessor(final ValueTransformerWithKey<? 
super K, ? super V, ? extends V1> valueTransformer) {
+            this.valueTransformer = Objects.requireNonNull(valueTransformer, 
"valueTransformer");
+        }
+
+        @SuppressWarnings("unchecked")
+        @Override
+        public void init(final ProcessorContext context) {
+            super.init(context);
+
+            valueTransformer.init(new 
ForwardingDisabledProcessorContext(context));
+
+            if (queryableName != null) {
+                final ForwardingCacheFlushListener<K, V1> flushListener = new 
ForwardingCacheFlushListener<>(context, sendOldValues);
+                store = (KeyValueStore<K, V1>) 
context.getStateStore(queryableName);
+                tupleForwarder = new TupleForwarder<>(store, context, 
flushListener, sendOldValues);
+            }
+        }
+
+        @Override
+        public void process(final K key, final Change<V> change) {
+            final V1 newValue = valueTransformer.transform(key, 
change.newValue);
+
+            if (queryableName == null) {
+                final V1 oldValue = sendOldValues ? 
valueTransformer.transform(key, change.oldValue) : null;
+                context().forward(key, new Change<>(newValue, oldValue));
+            } else {
+                final V1 oldValue = sendOldValues ? store.get(key) : null;
+                store.put(key, newValue);
+                tupleForwarder.maybeForward(key, newValue, oldValue);
+            }
+        }
+
+        @Override
+        public void close() {
+            valueTransformer.close();
+        }
+    }
+
+    private class KTableTransformValuesGetter implements KTableValueGetter<K, 
V1> {
+
+        private final KTableValueGetter<K, V> parentGetter;
+        private final ValueTransformerWithKey<? super K, ? super V, ? extends 
V1> valueTransformer;
+
+        KTableTransformValuesGetter(final KTableValueGetter<K, V> parentGetter,
+                                    final ValueTransformerWithKey<? super K, ? 
super V, ? extends V1> valueTransformer) {
+            this.parentGetter = Objects.requireNonNull(parentGetter, 
"parentGetter");
+            this.valueTransformer = Objects.requireNonNull(valueTransformer, 
"valueTransformer");
+        }
+
+        @Override
+        public void init(final ProcessorContext context) {
+            parentGetter.init(context);
+
+            valueTransformer.init(new 
ForwardingDisabledProcessorContext(context));
+        }
+
+        @Override
+        public V1 get(final K key) {
+            return valueTransformer.transform(key, parentGetter.get(key));
+        }
+
+        @Override
+        public void close() {
+            parentGetter.close();
+            valueTransformer.close();
+        }
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
index bb69236..edd9644 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableValueGetter.java
@@ -24,4 +24,5 @@ public interface KTableValueGetter<K, V> {
 
     V get(K key);
 
+    void close();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
new file mode 100644
index 0000000..35a0a7e
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * {@code ProcessorContext} implementation that will throw on any forward call.
+ */
+public final class ForwardingDisabledProcessorContext implements 
ProcessorContext {
+    private final ProcessorContext delegate;
+
+    public ForwardingDisabledProcessorContext(final ProcessorContext delegate) 
{
+        this.delegate = Objects.requireNonNull(delegate, "delegate");
+    }
+
+    @Override
+    public String applicationId() {
+        return delegate.applicationId();
+    }
+
+    @Override
+    public TaskId taskId() {
+        return delegate.taskId();
+    }
+
+    @Override
+    public Serde<?> keySerde() {
+        return delegate.keySerde();
+    }
+
+    @Override
+    public Serde<?> valueSerde() {
+        return delegate.valueSerde();
+    }
+
+    @Override
+    public File stateDir() {
+        return delegate.stateDir();
+    }
+
+    @Override
+    public StreamsMetrics metrics() {
+        return delegate.metrics();
+    }
+
+    @Override
+    public void register(final StateStore store,
+                         final StateRestoreCallback stateRestoreCallback) {
+        delegate.register(store, stateRestoreCallback);
+    }
+
+    @Override
+    public StateStore getStateStore(final String name) {
+        return delegate.getStateStore(name);
+    }
+
+    @Override
+    public Cancellable schedule(final long intervalMs,
+                                final PunctuationType type,
+                                final Punctuator callback) {
+        return delegate.schedule(intervalMs, type, callback);
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value) {
+        throw new StreamsException("ProcessorContext#forward() not 
supported.");
+    }
+
+    @Override
+    public <K, V> void forward(final K key, final V value, final To to) {
+        throw new StreamsException("ProcessorContext#forward() not 
supported.");
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public <K, V> void forward(final K key, final V value, final int 
childIndex) {
+        throw new StreamsException("ProcessorContext#forward() not 
supported.");
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public <K, V> void forward(final K key, final V value, final String 
childName) {
+        throw new StreamsException("ProcessorContext#forward() not 
supported.");
+    }
+
+    @Override
+    public void commit() {
+        delegate.commit();
+    }
+
+    @Override
+    public String topic() {
+        return delegate.topic();
+    }
+
+    @Override
+    public int partition() {
+        return delegate.partition();
+    }
+
+    @Override
+    public long offset() {
+        return delegate.offset();
+    }
+
+    @Override
+    public long timestamp() {
+        return delegate.timestamp();
+    }
+
+    @Override
+    public Map<String, Object> appConfigs() {
+        return delegate.appConfigs();
+    }
+
+    @Override
+    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
+        return delegate.appConfigsWithPrefix(prefix);
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index fb8a7bc..eac2585 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
@@ -29,23 +28,32 @@ import 
org.apache.kafka.streams.kstream.ValueTransformerWithKey;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.To;
+import 
org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.SingletonNoOpValueTransformer;
 import org.apache.kafka.test.StreamsTestUtils;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import java.util.Properties;
 
+import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.fail;
 
+@RunWith(EasyMockRunner.class)
 public class KStreamTransformValuesTest {
 
     private String topicName = "topic";
     private final MockProcessorSupplier<Integer, Integer> supplier = new 
MockProcessorSupplier<>();
     private final ConsumerRecordFactory<Integer, Integer> recordFactory = new 
ConsumerRecordFactory<>(new IntegerSerializer(), new IntegerSerializer());
     private final Properties props = 
StreamsTestUtils.topologyTestConfig(Serdes.Integer(), Serdes.Integer());
+    @Mock(MockType.NICE)
+    private ProcessorContext context;
 
     @Test
     public void testTransform() {
@@ -134,90 +142,14 @@ public class KStreamTransformValuesTest {
         assertArrayEquals(expected, 
supplier.theCapturedProcessor().processed.toArray());
     }
 
-
     @Test
-    public void 
shouldNotAllowValueTransformerToCallInternalProcessorContextMethods() {
-        final BadValueTransformer badValueTransformer = new 
BadValueTransformer();
-        final KStreamTransformValues<Integer, Integer, Integer> transformValue 
= new KStreamTransformValues<>(new ValueTransformerWithKeySupplier<Integer, 
Integer, Integer>() {
-            @Override
-            public ValueTransformerWithKey<Integer, Integer, Integer> get() {
-                return new ValueTransformerWithKey<Integer, Integer, 
Integer>() {
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        badValueTransformer.init(context);
-                    }
-
-                    @Override
-                    public Integer transform(final Integer readOnlyKey, final 
Integer value) {
-                        return badValueTransformer.transform(readOnlyKey, 
value);
-                    }
-
-                    @Override
-                    public void close() {
-                        badValueTransformer.close();
-                    }
-                };
-            }
-        });
-
-        final Processor transformValueProcessor = transformValue.get();
-        transformValueProcessor.init(null);
-
-        try {
-            transformValueProcessor.process(null, 0);
-            fail("should not allow call to context.forward() within 
ValueTransformer");
-        } catch (final StreamsException e) {
-            // expected
-        }
+    public void 
shouldInitializeTransformerWithForwardDisabledProcessorContext() {
+        final SingletonNoOpValueTransformer<String, String> transformer = new 
SingletonNoOpValueTransformer<>();
+        final KStreamTransformValues<String, String, String> transformValues = 
new KStreamTransformValues<>(transformer);
+        final Processor<String, String> processor = transformValues.get();
 
-        try {
-            transformValueProcessor.process(null, 1);
-            fail("should not allow call to context.forward() within 
ValueTransformer");
-        } catch (final StreamsException e) {
-            // expected
-        }
-
-        try {
-            transformValueProcessor.process(null, 2);
-            fail("should not allow call to context.forward() within 
ValueTransformer");
-        } catch (final StreamsException e) {
-            // expected
-        }
-
-        try {
-            transformValueProcessor.process(null, 3);
-            fail("should not allow call to context.forward() within 
ValueTransformer");
-        } catch (final StreamsException e) {
-            // expected
-        }
-    }
-
-    private static final class BadValueTransformer implements 
ValueTransformerWithKey<Integer, Integer, Integer> {
-        private ProcessorContext context;
-
-        @Override
-        public void init(final ProcessorContext context) {
-            this.context = context;
-        }
-
-        @Override
-        public Integer transform(final Integer key, final Integer value) {
-            if (value == 0) {
-                context.forward(null, null);
-            }
-            if (value == 1) {
-                context.forward(null, null, (String) null);
-            }
-            if (value == 2) {
-                context.forward(null, null, 0);
-            }
-            if (value == 3) {
-                context.forward(null, null, To.all());
-            }
-            throw new RuntimeException("Should never happen in this test");
-        }
+        processor.init(context);
 
-        @Override
-        public void close() { }
+        assertThat(transformer.context, isA((Class) 
ForwardingDisabledProcessorContext.class));
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index a265c62..399e519 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapperWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.internals.SinkNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -49,6 +50,7 @@ import java.io.File;
 import java.lang.reflect.Field;
 import java.util.List;
 
+import static org.easymock.EasyMock.mock;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -472,4 +474,23 @@ public class KTableImplTest {
     public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() {
         table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) 
null);
     }
+
+    @Test(expected = NullPointerException.class)
+    public void 
shouldThrowNullPointerOnTransformValuesWithKeyWhenTransformerSupplierIsNull() {
+        table.transformValues((ValueTransformerWithKeySupplier) null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void 
shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() {
+        final ValueTransformerWithKeySupplier<String, String, ?> 
valueTransformerSupplier = mock(ValueTransformerWithKeySupplier.class);
+        table.transformValues(valueTransformerSupplier, (Materialized) null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void 
shouldThrowNullPointerOnTransformValuesWithKeyWhenStoreNamesNull() {
+        final ValueTransformerWithKeySupplier<String, String, ?> 
valueTransformerSupplier = mock(ValueTransformerWithKeySupplier.class);
+        table.transformValues(valueTransformerSupplier, (String[]) null);
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
new file mode 100644
index 0000000..7c12dad
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import 
org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockReducer;
+import org.apache.kafka.test.SingletonNoOpValueTransformer;
+import org.apache.kafka.test.TestUtils;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
+
+@RunWith(EasyMockRunner.class)
+public class KTableTransformValuesTest {
+    private static final String QUERYABLE_NAME = "queryable-store";
+    private static final String INPUT_TOPIC = "inputTopic";
+    private static final String STORE_NAME = "someStore";
+    private static final String OTHER_STORE_NAME = "otherStore";
+
+    private static final Consumed<String, String> CONSUMED = 
Consumed.with(Serdes.String(), Serdes.String());
+
+    private final ConsumerRecordFactory<String, String> recordFactory
+        = new ConsumerRecordFactory<>(new StringSerializer(), new 
StringSerializer());
+
+    private TopologyTestDriver driver;
+    private MockProcessorSupplier<String, String> capture;
+    private StreamsBuilder builder;
+    @Mock(MockType.NICE)
+    private KTableImpl<String, String, String> parent;
+    @Mock(MockType.NICE)
+    private InternalProcessorContext context;
+    @Mock(MockType.NICE)
+    private KTableValueGetterSupplier<String, String> parentGetterSupplier;
+    @Mock(MockType.NICE)
+    private KTableValueGetter<String, String> parentGetter;
+    @Mock(MockType.NICE)
+    private KeyValueStore<String, String> stateStore;
+    @Mock(MockType.NICE)
+    private ValueTransformerWithKeySupplier<String, String, String> 
mockSupplier;
+    @Mock(MockType.NICE)
+    private ValueTransformerWithKey<String, String, String> transformer;
+
+    @After
+    public void cleanup() {
+        if (driver != null) {
+            driver.close();
+            driver = null;
+        }
+    }
+
+    @Before
+    public void setUp() {
+        capture = new MockProcessorSupplier<>();
+        builder = new StreamsBuilder();
+    }
+
+    @Test
+    public void shouldThrowOnGetIfSupplierReturnsNull() {
+        final KTableTransformValues<String, String, String> transformer =
+            new KTableTransformValues<>(parent, new NullSupplier(), 
QUERYABLE_NAME);
+
+        try {
+            transformer.get();
+            fail("NPE expected");
+        } catch (final NullPointerException expected) {
+            // expected
+        }
+    }
+
+    @Test
+    public void shouldThrowOnViewGetIfSupplierReturnsNull() {
+        final KTableValueGetterSupplier<String, String> view =
+            new KTableTransformValues<>(parent, new NullSupplier(), 
null).view();
+
+        try {
+            view.get();
+            fail("NPE expected");
+        } catch (final NullPointerException expected) {
+            // expected
+        }
+    }
+
+    @Test
+    public void 
shouldInitializeTransformerWithForwardDisabledProcessorContext() {
+        final SingletonNoOpValueTransformer<String, String> transformer = new 
SingletonNoOpValueTransformer<>();
+        final KTableTransformValues<String, String, String> transformValues = 
new KTableTransformValues<>(parent, transformer, null);
+        final Processor<String, Change<String>> processor = 
transformValues.get();
+
+        processor.init(context);
+
+        assertThat(transformer.context, isA((Class) 
ForwardingDisabledProcessorContext.class));
+    }
+
+    @Test
+    public void shouldNotSendOldValuesByDefault() {
+        final KTableTransformValues<String, String, String> transformValues =
+            new KTableTransformValues<>(parent, new 
ExclamationValueTransformerSupplier(), null);
+
+        final Processor<String, Change<String>> processor = 
transformValues.get();
+        processor.init(context);
+
+        context.forward("Key", new Change<>("Key->newValue!", null));
+        expectLastCall();
+        replay(context);
+
+        processor.process("Key", new Change<>("newValue", "oldValue"));
+
+        verify(context);
+    }
+
+    @Test
+    public void shouldSendOldValuesIfConfigured() {
+        final KTableTransformValues<String, String, String> transformValues =
+            new KTableTransformValues<>(parent, new 
ExclamationValueTransformerSupplier(), null);
+
+        transformValues.enableSendingOldValues();
+        final Processor<String, Change<String>> processor = 
transformValues.get();
+        processor.init(context);
+
+        context.forward("Key", new Change<>("Key->newValue!", 
"Key->oldValue!"));
+        expectLastCall();
+        replay(context);
+
+        processor.process("Key", new Change<>("newValue", "oldValue"));
+
+        verify(context);
+    }
+
+    @Test
+    public void shouldSetSendOldValuesOnParent() {
+        parent.enableSendingOldValues();
+        expectLastCall();
+        replay(parent);
+
+        new KTableTransformValues<>(parent, new 
SingletonNoOpValueTransformer<>(), QUERYABLE_NAME).enableSendingOldValues();
+
+        verify(parent);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldTransformOnGetIfNotMaterialized() {
+        final KTableTransformValues<String, String, String> transformValues =
+            new KTableTransformValues<>(parent, new 
ExclamationValueTransformerSupplier(), null);
+
+        expect(parent.valueGetterSupplier()).andReturn(parentGetterSupplier);
+        expect(parentGetterSupplier.get()).andReturn(parentGetter);
+        expect(parentGetter.get("Key")).andReturn("Value");
+        replay(parent, parentGetterSupplier, parentGetter);
+
+        final KTableValueGetter<String, String> getter = 
transformValues.view().get();
+        getter.init(context);
+
+        final String result = getter.get("Key");
+
+        assertThat(result, is("Key->Value!"));
+    }
+
+    @Test
+    public void shouldGetFromStateStoreIfMaterialized() {
+        final KTableTransformValues<String, String, String> transformValues =
+            new KTableTransformValues<>(parent, new 
ExclamationValueTransformerSupplier(), QUERYABLE_NAME);
+
+        expect(context.getStateStore(QUERYABLE_NAME)).andReturn(stateStore);
+        expect(stateStore.get("Key")).andReturn("something");
+        replay(context, stateStore);
+
+        final KTableValueGetter<String, String> getter = 
transformValues.view().get();
+        getter.init(context);
+
+        final String result = getter.get("Key");
+
+        assertThat(result, is("something"));
+    }
+
+    @Test
+    public void shouldGetStoreNamesFromParentIfNotMaterialized() {
+        final KTableTransformValues<String, String, String> transformValues =
+            new KTableTransformValues<>(parent, new 
ExclamationValueTransformerSupplier(), null);
+
+        expect(parent.valueGetterSupplier()).andReturn(parentGetterSupplier);
+        expect(parentGetterSupplier.storeNames()).andReturn(new 
String[]{"store1", "store2"});
+        replay(parent, parentGetterSupplier);
+
+        final String[] storeNames = transformValues.view().storeNames();
+
+        assertThat(storeNames, is(new String[]{"store1", "store2"}));
+    }
+
+    @Test
+    public void shouldGetQueryableStoreNameIfMaterialized() {
+        final KTableTransformValues<String, String, String> transformValues =
+            new KTableTransformValues<>(parent, new 
ExclamationValueTransformerSupplier(), QUERYABLE_NAME);
+
+        final String[] storeNames = transformValues.view().storeNames();
+
+        assertThat(storeNames, is(new String[]{QUERYABLE_NAME}));
+    }
+
+    @Test
+    public void shouldCloseTransformerOnProcessorClose() {
+        final KTableTransformValues<String, String, String> transformValues =
+            new KTableTransformValues<>(parent, mockSupplier, null);
+
+        expect(mockSupplier.get()).andReturn(transformer);
+        transformer.close();
+        expectLastCall();
+        replay(mockSupplier, transformer);
+
+        final Processor<String, Change<String>> processor = 
transformValues.get();
+        processor.close();
+
+        verify(transformer);
+    }
+
+    @Test
+    public void shouldCloseTransformerOnGetterClose() {
+        final KTableTransformValues<String, String, String> transformValues =
+            new KTableTransformValues<>(parent, mockSupplier, null);
+
+        expect(mockSupplier.get()).andReturn(transformer);
+        expect(parentGetterSupplier.get()).andReturn(parentGetter);
+        expect(parent.valueGetterSupplier()).andReturn(parentGetterSupplier);
+
+        transformer.close();
+        expectLastCall();
+
+        replay(mockSupplier, transformer, parent, parentGetterSupplier);
+
+        final KTableValueGetter<String, String> getter = 
transformValues.view().get();
+        getter.close();
+
+        verify(transformer);
+    }
+
+    @Test
+    public void shouldCloseParentGetterClose() {
+        final KTableTransformValues<String, String, String> transformValues =
+            new KTableTransformValues<>(parent, mockSupplier, null);
+
+        expect(parent.valueGetterSupplier()).andReturn(parentGetterSupplier);
+        expect(mockSupplier.get()).andReturn(transformer);
+        expect(parentGetterSupplier.get()).andReturn(parentGetter);
+
+        parentGetter.close();
+        expectLastCall();
+
+        replay(mockSupplier, parent, parentGetterSupplier, parentGetter);
+
+        final KTableValueGetter<String, String> getter = 
transformValues.view().get();
+        getter.close();
+
+        verify(parentGetter);
+    }
+
+    @Test
+    public void shouldTransformValuesWithKey() {
+        builder
+            .addStateStore(storeBuilder(STORE_NAME))
+            .addStateStore(storeBuilder(OTHER_STORE_NAME))
+            .table(INPUT_TOPIC, CONSUMED)
+            .transformValues(
+                new ExclamationValueTransformerSupplier(STORE_NAME, 
OTHER_STORE_NAME),
+                STORE_NAME, OTHER_STORE_NAME)
+            .toStream()
+            .process(capture);
+
+        driver = new TopologyTestDriver(builder.build(), props());
+
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 0L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "D", null, 0L));
+
+        assertThat(output(), hasItems("A:A->a!", "B:B->b!", "D:D->null!"));
+        assertThat("Store should not be materialized", 
driver.getKeyValueStore(QUERYABLE_NAME), is(nullValue()));
+    }
+
+    @Test
+    public void shouldTransformValuesWithKeyAndMaterialize() {
+        builder
+            .addStateStore(storeBuilder(STORE_NAME))
+            .table(INPUT_TOPIC, CONSUMED)
+            .transformValues(
+                new ExclamationValueTransformerSupplier(STORE_NAME, 
QUERYABLE_NAME),
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as(QUERYABLE_NAME)
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.String()),
+                STORE_NAME)
+            .toStream()
+            .process(capture);
+
+        driver = new TopologyTestDriver(builder.build(), props());
+
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "B", "b", 0L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "C", null, 0L));
+
+        assertThat(output(), hasItems("A:A->a!", "B:B->b!", "C:C->null!"));
+
+        final KeyValueStore<String, String> keyValueStore = 
driver.getKeyValueStore(QUERYABLE_NAME);
+        assertThat(keyValueStore.get("A"), is("A->a!"));
+        assertThat(keyValueStore.get("B"), is("B->b!"));
+        assertThat(keyValueStore.get("C"), is("C->null!"));
+    }
+
+    @Test
+    public void shouldCalculateCorrectOldValuesIfMaterializedEvenIfStateful() {
+        builder
+            .table(INPUT_TOPIC, CONSUMED)
+            .transformValues(
+                new StatefulTransformerSupplier(),
+                Materialized.<String, Integer, KeyValueStore<Bytes, 
byte[]>>as(QUERYABLE_NAME)
+                    .withKeySerde(Serdes.String())
+                    .withValueSerde(Serdes.Integer()))
+            .groupBy(toForceSendingOfOldValues(), 
Serialized.with(Serdes.String(), Serdes.Integer()))
+            .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR)
+            .mapValues(mapBackToStrings())
+            .toStream()
+            .process(capture);
+
+        driver = new TopologyTestDriver(builder.build(), props());
+
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignore", 0L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignored", 
0L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "ignored", 
0L));
+
+        assertThat(output(), hasItems("A:1", "A:0", "A:2", "A:0", "A:3"));
+
+        final KeyValueStore<String, Integer> keyValueStore = 
driver.getKeyValueStore(QUERYABLE_NAME);
+        assertThat(keyValueStore.get("A"), is(3));
+    }
+
+    @Test
+    public void 
shouldCalculateCorrectOldValuesIfNotStatefulEvenIfNotMaterialized() {
+        builder
+            .table(INPUT_TOPIC, CONSUMED)
+            .transformValues(new StatelessTransformerSupplier())
+            .groupBy(toForceSendingOfOldValues(), 
Serialized.with(Serdes.String(), Serdes.Integer()))
+            .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR)
+            .mapValues(mapBackToStrings())
+            .toStream()
+            .process(capture);
+
+        driver = new TopologyTestDriver(builder.build(), props());
+
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "a", 0L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "aa", 0L));
+        driver.pipeInput(recordFactory.create(INPUT_TOPIC, "A", "aaa", 0L));
+
+        assertThat(output(), hasItems("A:1", "A:0", "A:2", "A:0", "A:3"));
+    }
+
+    private ArrayList<String> output() {
+        return capture.capturedProcessors(1).get(0).processed;
+    }
+
+    private static KeyValueMapper<String, Integer, KeyValue<String, Integer>> 
toForceSendingOfOldValues() {
+        return new KeyValueMapper<String, Integer, KeyValue<String, 
Integer>>() {
+            @Override
+            public KeyValue<String, Integer> apply(String key, Integer value) {
+                return new KeyValue<>(key, value);
+            }
+        };
+    }
+
+    private static ValueMapper<Integer, String> mapBackToStrings() {
+        return new ValueMapper<Integer, String>() {
+            @Override
+            public String apply(Integer value) {
+                return value.toString();
+            }
+        };
+    }
+
+    private static StoreBuilder<KeyValueStore<Long, Long>> storeBuilder(final 
String storeName) {
+        return 
Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), 
Serdes.Long(), Serdes.Long());
+    }
+
+    public static Properties props() {
+        final Properties props = new Properties();
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"kstream-transform-values-test");
+        props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9091");
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getAbsolutePath());
+        props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass().getName());
+        props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass().getName());
+        return props;
+    }
+
+    private static void throwIfStoresNotAvailable(final ProcessorContext 
context,
+                                                  final List<String> 
expectedStoredNames) {
+        final List<String> missing = new ArrayList<>();
+
+        for (final String storedName : expectedStoredNames) {
+            if (context.getStateStore(storedName) == null) {
+                missing.add(storedName);
+            }
+        }
+
+        if (!missing.isEmpty()) {
+            throw new AssertionError("State stores are not accessible: " + 
missing);
+        }
+    }
+
+    public static class ExclamationValueTransformerSupplier implements 
ValueTransformerWithKeySupplier<Object, String, String> {
+        private final List<String> expectedStoredNames;
+
+        ExclamationValueTransformerSupplier(final String... 
expectedStoreNames) {
+            this.expectedStoredNames = Arrays.asList(expectedStoreNames);
+        }
+
+        @Override
+        public ExclamationValueTransformer get() {
+            return new ExclamationValueTransformer(expectedStoredNames);
+        }
+    }
+
+    public static class ExclamationValueTransformer implements 
ValueTransformerWithKey<Object, String, String> {
+        private final List<String> expectedStoredNames;
+
+        ExclamationValueTransformer(final List<String> expectedStoredNames) {
+            this.expectedStoredNames = expectedStoredNames;
+        }
+
+        @Override
+        public void init(final ProcessorContext context) {
+            throwIfStoresNotAvailable(context, expectedStoredNames);
+        }
+
+        @Override
+        public String transform(final Object readOnlyKey, final String value) {
+            return readOnlyKey.toString() + "->" + value + "!";
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+
+    private static class NullSupplier implements 
ValueTransformerWithKeySupplier<String, String, String> {
+        @Override
+        public ValueTransformerWithKey<String, String, String> get() {
+            return null;
+        }
+    }
+
+    private static class StatefulTransformerSupplier implements 
ValueTransformerWithKeySupplier<String, String, Integer> {
+        @Override
+        public ValueTransformerWithKey<String, String, Integer> get() {
+            return new StatefulTransformer();
+        }
+    }
+
+    private static class StatefulTransformer implements 
ValueTransformerWithKey<String, String, Integer> {
+        private int counter;
+
+        @Override
+        public void init(final ProcessorContext context) {
+        }
+
+        @Override
+        public Integer transform(final String readOnlyKey, final String value) 
{
+            return ++counter;
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+
+    private static class StatelessTransformerSupplier implements 
ValueTransformerWithKeySupplier<String, String, Integer> {
+        @Override
+        public ValueTransformerWithKey<String, String, Integer> get() {
+            return new StatelessTransformer();
+        }
+    }
+
+    private static class StatelessTransformer implements 
ValueTransformerWithKey<String, String, Integer> {
+        @Override
+        public void init(final ProcessorContext context) {
+        }
+
+        @Override
+        public Integer transform(final String readOnlyKey, final String value) 
{
+            return value == null ? null : value.length();
+        }
+
+        @Override
+        public void close() {
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java
new file mode 100644
index 0000000..03e79b7
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContextTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.To;
+import org.easymock.EasyMockRunner;
+import org.easymock.Mock;
+import org.easymock.MockType;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(EasyMockRunner.class)
+public class ForwardingDisabledProcessorContextTest {
+    @Mock(MockType.NICE)
+    private ProcessorContext delegate;
+    private ForwardingDisabledProcessorContext context;
+
+    @Before
+    public void setUp() {
+        context = new ForwardingDisabledProcessorContext(delegate);
+    }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowOnForward() {
+        context.forward("key", "value");
+    }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowOnForwardWithTo() {
+        context.forward("key", "value", To.all());
+    }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowOnForwardWithChildIndex() {
+        context.forward("key", "value", 1);
+    }
+
+    @Test(expected = StreamsException.class)
+    public void shouldThrowOnForwardWithChildName() {
+        context.forward("key", "value", "child1");
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/test/KTableValueGetterStub.java 
b/streams/src/test/java/org/apache/kafka/test/KTableValueGetterStub.java
index 0efb8d9..f739755 100644
--- a/streams/src/test/java/org/apache/kafka/test/KTableValueGetterStub.java
+++ b/streams/src/test/java/org/apache/kafka/test/KTableValueGetterStub.java
@@ -43,4 +43,8 @@ public class KTableValueGetterStub<K, V> implements 
KTableValueGetter<K, V> {
     public void remove(final K key) {
         data.remove(key);
     }
+
+    @Override
+    public void close() {
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
 
b/streams/src/test/java/org/apache/kafka/test/SingletonNoOpValueTransformer.java
similarity index 54%
copy from 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
copy to 
streams/src/test/java/org/apache/kafka/test/SingletonNoOpValueTransformer.java
index 4ceccce..1a99e00 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableMaterializedValueGetterSupplier.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/SingletonNoOpValueTransformer.java
@@ -14,41 +14,33 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.kafka.test;
 
-package org.apache.kafka.streams.kstream.internals;
-
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.state.KeyValueStore;
-
-public class KTableMaterializedValueGetterSupplier<K, V> implements 
KTableValueGetterSupplier<K, V> {
-
-    private final String storeName;
-
-    KTableMaterializedValueGetterSupplier(final String storeName) {
-        this.storeName = storeName;
-    }
 
-    public KTableValueGetter<K, V> get() {
-        return new KTableMaterializedValueGetter();
-    }
+public class SingletonNoOpValueTransformer<K, V> implements 
ValueTransformerWithKeySupplier<K, V, V> {
+    public ProcessorContext context;
+    private final ValueTransformerWithKey<K, V, V> transformer = new 
ValueTransformerWithKey<K, V, V>() {
 
-    @Override
-    public String[] storeNames() {
-        return new String[]{storeName};
-    }
-
-    private class KTableMaterializedValueGetter implements 
KTableValueGetter<K, V> {
-        private KeyValueStore<K, V> store;
-
-        @SuppressWarnings("unchecked")
         @Override
         public void init(final ProcessorContext context) {
-            store = (KeyValueStore<K, V>) context.getStateStore(storeName);
+            SingletonNoOpValueTransformer.this.context = context;
         }
 
         @Override
-        public V get(final K key) {
-            return store.get(key);
+        public V transform(final K readOnlyKey, final V value) {
+            return value;
         }
+
+        @Override
+        public void close() {
+        }
+    };
+
+    @Override
+    public ValueTransformerWithKey<K, V, V> get() {
+        return transformer;
     }
-}
+}
\ No newline at end of file
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
index e3e8470..4b0dc2b 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
@@ -320,9 +320,9 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
    * @return a [[KStream]] that contains records with unmodified key and new 
values (possibly of different type)
    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
    */ 
-  def transformValues[VR](valueTransformerWithKeySupplier: 
ValueTransformerWithKeySupplier[K, V, VR],
+  def transformValues[VR](valueTransformerSupplier: 
ValueTransformerWithKeySupplier[K, V, VR],
                           stateStoreNames: String*): KStream[K, VR] = {
-    inner.transformValues[VR](valueTransformerWithKeySupplier, 
stateStoreNames: _*)
+    inner.transformValues[VR](valueTransformerSupplier, stateStoreNames: _*)
   }
 
   /**
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
index 218063e..65cf895 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KTable.scala
@@ -20,9 +20,11 @@
 package org.apache.kafka.streams.scala
 package kstream
 
+import org.apache.kafka.common.utils.Bytes
 import org.apache.kafka.streams.kstream.{KTable => KTableJ, _}
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.FunctionConversions._
+import org.apache.kafka.streams.state.KeyValueStore
 
 /**
  * Wraps the Java class [[org.apache.kafka.streams.kstream.KTable]] and 
delegates method calls to the underlying Java object.
@@ -163,6 +165,59 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
     inner.toStream[KR](mapper.asKeyValueMapper)
 
   /**
+    * Create a new `KTable` by transforming the value of each record in this 
`KTable` into a new value, (with possibly new type).
+    * Transform the value of each input record into a new value (with possible 
new type) of the output record.
+    * A `ValueTransformerWithKey` (provided by the given 
`ValueTransformerWithKeySupplier`) is applied to each input
+    * record value and computes a new value for it.
+    * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, 
allowing access to additional state-stores,
+    * and to the `ProcessorContext`.
+    * If the downstream topology uses aggregation functions, (e.g. 
`KGroupedTable#reduce`, `KGroupedTable#aggregate`, etc),
+    * care must be taken when dealing with state, (either held in state-stores 
or transformer instances), to ensure correct
+    * aggregate results.
+    * In contrast, if the resulting KTable is materialized, (cf. 
`#transformValues(ValueTransformerWithKeySupplier, Materialized, String...)`),
+    * such concerns are handled for you.
+    * In order to assign a state, the state must be created and registered
+    * beforehand via stores added via `addStateStore` or `addGlobalStore` 
before they can be connected to the `Transformer`
+    *
+    * @param valueTransformerWithKeySupplier a instance of 
`ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`.
+    *                                 At least one transformer instance will 
be created per streaming task.
+    *                                 Transformer implementations doe not need 
to be thread-safe.
+    * @param stateStoreNames          the names of the state stores used by 
the processor
+    * @return a [[KStream]] that contains records with unmodified key and new 
values (possibly of different type)
+    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
+    */
+  def transformValues[VR](valueTransformerWithKeySupplier: 
ValueTransformerWithKeySupplier[K, V, VR],
+                          stateStoreNames: String*): KTable[K, VR] = {
+    inner.transformValues[VR](valueTransformerWithKeySupplier, 
stateStoreNames: _*)
+  }
+
+  /**
+    * Create a new `KTable` by transforming the value of each record in this 
`KTable` into a new value, (with possibly new type).
+    * A `ValueTransformer` (provided by the given `ValueTransformerSupplier`) 
is applied to each input
+    * record value and computes a new value for it.
+    * This is similar to `#mapValues(ValueMapperWithKey)`, but more flexible, 
allowing stateful, rather than stateless,
+    * record-by-record operation, access to additional state-stores, and 
access to the `ProcessorContext`.
+    * In order to assign a state, the state must be created and registered
+    * beforehand via stores added via `addStateStore` or `addGlobalStore` 
before they can be connected to the `Transformer`
+    * The resulting `KTable` is materialized into another state store 
(additional to the provided state store names)
+    * as specified by the user via `Materialized` parameter, and is queryable 
through its given name.
+    *
+    * @param valueTransformerWithKeySupplier a instance of 
`ValueTransformerWithKeySupplier` that generates a `ValueTransformerWithKey`
+    *                                 At least one transformer instance will 
be created per streaming task.
+    *                                 Transformer implementations doe not need 
to be thread-safe.
+    * @param materialized             an instance of `Materialized` used to 
describe how the state store of the
+    *                                 resulting table should be materialized.
+    * @param stateStoreNames          the names of the state stores used by 
the processor
+    * @return a [[KStream]] that contains records with unmodified key and new 
values (possibly of different type)
+    * @see `org.apache.kafka.streams.kstream.KStream#transformValues`
+    */
+  def transformValues[VR](valueTransformerWithKeySupplier: 
ValueTransformerWithKeySupplier[K, V, VR],
+                          materialized: Materialized[K, VR, 
KeyValueStore[Bytes, Array[Byte]]],
+                          stateStoreNames: String*): KTable[K, VR] = {
+    inner.transformValues[VR](valueTransformerWithKeySupplier, materialized, 
stateStoreNames: _*)
+  }
+
+  /**
    * Re-groups the records of this [[KTable]] using the provided key/value 
mapper
    * and `Serde`s as specified by `Serialized`.
    *
@@ -268,3 +323,4 @@ class KTable[K, V](val inner: KTableJ[K, V]) {
    */
   def queryableStoreName: String = inner.queryableStoreName
 }
+

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to