This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kip-478-part-3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit f58e275aaabdc6aeee531cb2102d62798746fab8
Author: John Roesler <[email protected]>
AuthorDate: Tue Aug 25 17:40:29 2020 -0500

    roll back statestore change
---
 .../examples/wordcount/WordCountProcessorDemo.java |  21 +-
 .../examples/wordcount/WordCountProcessorTest.java |  10 +-
 .../apache/kafka/streams/processor/StateStore.java |  32 +-
 .../InternalProcessorContextReverseAdapter.java    | 248 ----------
 .../processor/internals/ProcessorAdapter.java      |   2 +-
 .../internals/ProcessorContextAdapter.java         |   4 +-
 .../internals/ProcessorContextReverseAdapter.java  | 129 +++--
 .../processor/api/MockProcessorContext.java        | 546 ---------------------
 .../kafka/streams/MockApiProcessorContextTest.java | 405 ---------------
 9 files changed, 119 insertions(+), 1278 deletions(-)

diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index a6a9b8a..c3f47da 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -22,9 +22,9 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -54,17 +54,17 @@ import java.util.concurrent.CountDownLatch;
  */
 public final class WordCountProcessorDemo {
 
-    static class MyProcessorSupplier implements ProcessorSupplier<String, 
String, String, String> {
+    static class MyProcessorSupplier implements ProcessorSupplier<String, 
String> {
 
         @Override
-        public Processor<String, String, String, String> get() {
-            return new Processor<String, String, String, String>() {
-                private ProcessorContext<String, String> context;
+        public Processor<String, String> get() {
+            return new Processor<String, String>() {
+                private ProcessorContext context;
                 private KeyValueStore<String, Integer> kvStore;
 
                 @Override
                 @SuppressWarnings("unchecked")
-                public void init(final ProcessorContext<String, String> 
context) {
+                public void init(final ProcessorContext context) {
                     this.context = context;
                     this.context.schedule(Duration.ofSeconds(1), 
PunctuationType.STREAM_TIME, timestamp -> {
                         try (final KeyValueIterator<String, Integer> iter = 
kvStore.all()) {
@@ -79,7 +79,7 @@ public final class WordCountProcessorDemo {
                             }
                         }
                     });
-                    this.kvStore = context.getStateStore("Counts");
+                    this.kvStore = (KeyValueStore<String, Integer>) 
context.getStateStore("Counts");
                 }
 
                 @Override
@@ -96,6 +96,9 @@ public final class WordCountProcessorDemo {
                         }
                     }
                 }
+
+                @Override
+                public void close() {}
             };
         }
     }
diff --git 
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
 
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
index 5ddda08..bec77e6 100644
--- 
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
+++ 
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
@@ -18,8 +18,8 @@ package org.apache.kafka.streams.examples.wordcount;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.api.MockProcessorContext;
-import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.MockProcessorContext;
+import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
@@ -36,7 +36,7 @@ import static org.junit.Assert.assertTrue;
 public class WordCountProcessorTest {
     @Test
     public void test() {
-        final MockProcessorContext<String, String> context = new 
MockProcessorContext<>();
+        final MockProcessorContext context = new MockProcessorContext();
 
         // Create, initialize, and register the state store.
         final KeyValueStore<String, Integer> store =
@@ -48,7 +48,7 @@ public class WordCountProcessorTest {
         context.register(store, null);
 
         // Create and initialize the processor under test
-        final Processor<String, String, String, String> processor = new 
WordCountProcessorDemo.MyProcessorSupplier().get();
+        final Processor<String, String> processor = new 
WordCountProcessorDemo.MyProcessorSupplier().get();
         processor.init(context);
 
         // send a record to the processor
@@ -61,7 +61,7 @@ public class WordCountProcessorTest {
         context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L);
 
         // finally, we can verify the output.
-        final Iterator<MockProcessorContext.CapturedForward<String, String>> 
capturedForwards = context.forwarded().iterator();
+        final Iterator<MockProcessorContext.CapturedForward> capturedForwards 
= context.forwarded().iterator();
         assertEquals(new KeyValue<>("alpha", "2"), 
capturedForwards.next().keyValue());
         assertEquals(new KeyValue<>("beta", "1"), 
capturedForwards.next().keyValue());
         assertEquals(new KeyValue<>("gamma", "1"), 
capturedForwards.next().keyValue());
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
index d143f69..df53ee2 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
@@ -17,8 +17,6 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.streams.errors.StreamsException;
-import 
org.apache.kafka.streams.processor.internals.ProcessorContextReverseAdapter;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
 
 /**
  * A storage engine for managing state maintained by a stream processor.
@@ -51,27 +49,6 @@ public interface StateStore {
      * Initializes this state store.
      * <p>
      * The implementation of this function must register the root store in the 
context via the
-     * {@link 
org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, 
StateRestoreCallback)} function,
-     * where the first {@link StateStore} parameter should always be the 
passed-in {@code root} object, and
-     * the second parameter should be an object of user's implementation
-     * of the {@link StateRestoreCallback} interface used for restoring the 
state store from the changelog.
-     * <p>
-     * Note that if the state store engine itself supports bulk writes, users 
can implement another
-     * interface {@link BatchingStateRestoreCallback} which extends {@link 
StateRestoreCallback} to
-     * let users implement bulk-load restoration logic instead of restoring 
one record at a time.
-     * <p>
-     * This method is not called if {@link StateStore#init(ProcessorContext, 
org.apache.kafka.streams.processor.StateStore)}
-     * is implemented.
-     *
-     * @throws IllegalStateException If store gets registered after 
initialized is already finished
-     * @throws StreamsException if the store's change log does not contain the 
partition
-     */
-    void init(org.apache.kafka.streams.processor.ProcessorContext context, 
StateStore root);
-
-    /**
-     * Initializes this state store.
-     * <p>
-     * The implementation of this function must register the root store in the 
context via the
      * {@link ProcessorContext#register(StateStore, StateRestoreCallback)} 
function, where the
      * first {@link StateStore} parameter should always be the passed-in 
{@code root} object, and
      * the second parameter should be an object of user's implementation
@@ -84,14 +61,7 @@ public interface StateStore {
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
-    default void init(final ProcessorContext<?, ?> context, final StateStore 
root) {
-        final org.apache.kafka.streams.processor.ProcessorContext adapted =
-            ProcessorContextReverseAdapter.adapt(
-                context,
-                new 
ProcessorContextReverseAdapter.UnsupportedDeprecatedForwarder()
-            );
-        init(adapted, root);
-    }
+    void init(ProcessorContext context, StateStore root);
 
     /**
      * Flush any cached data
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java
deleted file mode 100644
index 11fc1f9..0000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class InternalProcessorContextReverseAdapter implements 
InternalProcessorContext {
-    private final InternalApiProcessorContext<Object, Object> delegate;
-
-    static InternalProcessorContext adapt(final 
InternalApiProcessorContext<Object, Object> delegate) {
-        if (delegate instanceof ProcessorContextAdapter) {
-            return ((ProcessorContextAdapter<Object, Object>) 
delegate).delegate();
-        } else {
-            return new InternalProcessorContextReverseAdapter(delegate);
-        }
-    }
-
-    private InternalProcessorContextReverseAdapter(final 
InternalApiProcessorContext<Object, Object> delegate) {
-        this.delegate = delegate;
-    }
-
-    @Override
-    public String applicationId() {
-        return delegate.applicationId();
-    }
-
-    @Override
-    public TaskId taskId() {
-        return delegate.taskId();
-    }
-
-    @Override
-    public Serde<?> keySerde() {
-        return delegate.keySerde();
-    }
-
-    @Override
-    public Serde<?> valueSerde() {
-        return delegate.valueSerde();
-    }
-
-    @Override
-    public File stateDir() {
-        return delegate.stateDir();
-    }
-
-    @Override
-    public StreamsMetricsImpl metrics() {
-        return delegate.metrics();
-    }
-
-    @Override
-    public void setSystemTimeMs(final long timeMs) {
-        delegate.setSystemTimeMs(timeMs);
-    }
-
-    @Override
-    public long currentSystemTimeMs() {
-        return delegate.currentSystemTimeMs();
-    }
-
-    @Override
-    public ProcessorRecordContext recordContext() {
-        return delegate.recordContext();
-    }
-
-    @Override
-    public void setRecordContext(final ProcessorRecordContext recordContext) {
-        delegate.setRecordContext(recordContext);
-    }
-
-    @Override
-    public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
-        delegate.setCurrentNode(currentNode);
-    }
-
-    @Override
-    public ProcessorNode<?, ?, ?, ?> currentNode() {
-        return delegate.currentNode();
-    }
-
-    @Override
-    public ThreadCache cache() {
-        return delegate.cache();
-    }
-
-    @Override
-    public void initialize() {
-        delegate.initialize();
-    }
-
-    @Override
-    public void uninitialize() {
-        delegate.uninitialize();
-    }
-
-    @Override
-    public Task.TaskType taskType() {
-        return delegate.taskType();
-    }
-
-    @Override
-    public void transitionToActive(final StreamTask streamTask, final 
RecordCollector recordCollector, final ThreadCache newCache) {
-        delegate.transitionToActive(streamTask, recordCollector, newCache);
-    }
-
-    @Override
-    public void transitionToStandby(final ThreadCache newCache) {
-        delegate.transitionToStandby(newCache);
-    }
-
-    @Override
-    public void registerCacheFlushListener(final String namespace, final 
ThreadCache.DirtyEntryFlushListener listener) {
-        delegate.registerCacheFlushListener(namespace, listener);
-    }
-
-    @Override
-    public <T extends StateStore> T getStateStore(final StoreBuilder<T> 
builder) {
-        return delegate.getStateStore(builder);
-    }
-
-    @Override
-    public void logChange(final String storeName, final Bytes key, final 
byte[] value, final long timestamp) {
-        delegate.logChange(storeName, key, value, timestamp);
-    }
-
-    @Override
-    public String changelogFor(final String storeName) {
-        return delegate.changelogFor(storeName);
-    }
-
-    @Override
-    public void register(final StateStore store, final StateRestoreCallback 
stateRestoreCallback) {
-        delegate.register(store, stateRestoreCallback);
-    }
-
-    @Override
-    public StateStore getStateStore(final String name) {
-        return delegate.getStateStore(name);
-    }
-
-    @Deprecated
-    @Override
-    public Cancellable schedule(final long intervalMs, final PunctuationType 
type, final Punctuator callback) {
-        return delegate.schedule(Duration.ofMillis(intervalMs), type, 
callback);
-    }
-
-    @Override
-    public Cancellable schedule(final Duration interval, final PunctuationType 
type, final Punctuator callback) throws IllegalArgumentException {
-        return delegate.schedule(interval, type, callback);
-    }
-
-    @Override
-    public <K, V> void forward(final K key, final V value) {
-        delegate.forward(key, value);
-    }
-
-    @Override
-    public <K, V> void forward(final K key, final V value, final To to) {
-        delegate.forward(key, value, to);
-    }
-
-    @Deprecated
-    @Override
-    public <K, V> void forward(final K key, final V value, final int 
childIndex) {
-        delegate.forward(key, value, 
To.child((currentNode().children()).get(childIndex).name()));
-    }
-
-    @Deprecated
-    @Override
-    public <K, V> void forward(final K key, final V value, final String 
childName) {
-        delegate.forward(key, value, To.child(childName));
-    }
-
-    @Override
-    public void commit() {
-        delegate.commit();
-    }
-
-    @Override
-    public String topic() {
-        return delegate.topic();
-    }
-
-    @Override
-    public int partition() {
-        return delegate.partition();
-    }
-
-    @Override
-    public long offset() {
-        return delegate.offset();
-    }
-
-    @Override
-    public Headers headers() {
-        return delegate.headers();
-    }
-
-    @Override
-    public long timestamp() {
-        return delegate.timestamp();
-    }
-
-    @Override
-    public Map<String, Object> appConfigs() {
-        return delegate.appConfigs();
-    }
-
-    @Override
-    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
-        return delegate.appConfigsWithPrefix(prefix);
-    }
-
-    InternalApiProcessorContext<Object, Object> delegate() {
-        return delegate;
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
index 5a9d881..948109e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
@@ -47,7 +47,7 @@ public final class ProcessorAdapter<KIn, VIn, KOut, VOut> 
implements Processor<K
     @SuppressWarnings("unchecked")
     @Override
     public void init(final ProcessorContext<KOut, VOut> context) {
-        
delegate.init(InternalProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object,
 Object>) context));
+        
delegate.init(ProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object,
 Object>) context));
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
index 1704ac4..410adcf 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
@@ -42,8 +42,8 @@ public final class ProcessorContextAdapter<KForward, VForward>
 
     @SuppressWarnings("unchecked")
     public static <KForward, VForward> InternalApiProcessorContext<KForward, 
VForward> adapt(final InternalProcessorContext delegate) {
-        if (delegate instanceof InternalProcessorContextReverseAdapter) {
-            return (InternalApiProcessorContext<KForward, VForward>) 
((InternalProcessorContextReverseAdapter) delegate).delegate();
+        if (delegate instanceof ProcessorContextReverseAdapter) {
+            return (InternalApiProcessorContext<KForward, VForward>) 
((ProcessorContextReverseAdapter) delegate).delegate();
         } else {
             return new ProcessorContextAdapter<>(delegate);
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
index 131b776..6e82a5e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
@@ -18,52 +18,35 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
 import java.time.Duration;
 import java.util.Map;
 
-public final class ProcessorContextReverseAdapter implements 
org.apache.kafka.streams.processor.ProcessorContext {
-    private final ProcessorContext<Object, Object> delegate;
-    private final DeprecatedForwarder deprecatedForwarder;
+public final class ProcessorContextReverseAdapter implements 
InternalProcessorContext {
+    private final InternalApiProcessorContext<Object, Object> delegate;
 
-    public interface DeprecatedForwarder {
-        <K, V> void forward(final K key, final V value, final int childIndex);
-    }
-
-    public static final class UnsupportedDeprecatedForwarder implements 
DeprecatedForwarder {
-        @Override
-        public <K, V> void forward(final K key, final V value, final int 
childIndex) {
-            throw new UnsupportedOperationException("Forwarding by index was 
deprecated in 2.0 and is not supported by this ProcessorContext.");
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    public static org.apache.kafka.streams.processor.ProcessorContext 
adapt(final ProcessorContext<?, ?> delegate,
-                                                                            
final DeprecatedForwarder deprecatedForwarder) {
+    static InternalProcessorContext adapt(final 
InternalApiProcessorContext<Object, Object> delegate) {
         if (delegate instanceof ProcessorContextAdapter) {
-            return ((ProcessorContextAdapter<?, ?>) delegate).delegate();
-        } else if (delegate instanceof InternalApiProcessorContext) {
-            return 
InternalProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object,
 Object>) delegate);
+            return ((ProcessorContextAdapter<Object, Object>) 
delegate).delegate();
         } else {
-            return new ProcessorContextReverseAdapter(delegate, 
deprecatedForwarder);
+            return new ProcessorContextReverseAdapter(delegate);
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private ProcessorContextReverseAdapter(final ProcessorContext<?, ?> 
delegate,
-                                           final DeprecatedForwarder 
deprecatedForwarder) {
-        this.delegate = (ProcessorContext<Object, Object>) delegate;
-        this.deprecatedForwarder = deprecatedForwarder;
+    private ProcessorContextReverseAdapter(final 
InternalApiProcessorContext<Object, Object> delegate) {
+        this.delegate = delegate;
     }
 
     @Override
@@ -92,11 +75,91 @@ public final class ProcessorContextReverseAdapter 
implements org.apache.kafka.st
     }
 
     @Override
-    public StreamsMetrics metrics() {
+    public StreamsMetricsImpl metrics() {
         return delegate.metrics();
     }
 
     @Override
+    public void setSystemTimeMs(final long timeMs) {
+        delegate.setSystemTimeMs(timeMs);
+    }
+
+    @Override
+    public long currentSystemTimeMs() {
+        return delegate.currentSystemTimeMs();
+    }
+
+    @Override
+    public ProcessorRecordContext recordContext() {
+        return delegate.recordContext();
+    }
+
+    @Override
+    public void setRecordContext(final ProcessorRecordContext recordContext) {
+        delegate.setRecordContext(recordContext);
+    }
+
+    @Override
+    public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) {
+        delegate.setCurrentNode(currentNode);
+    }
+
+    @Override
+    public ProcessorNode<?, ?, ?, ?> currentNode() {
+        return delegate.currentNode();
+    }
+
+    @Override
+    public ThreadCache cache() {
+        return delegate.cache();
+    }
+
+    @Override
+    public void initialize() {
+        delegate.initialize();
+    }
+
+    @Override
+    public void uninitialize() {
+        delegate.uninitialize();
+    }
+
+    @Override
+    public Task.TaskType taskType() {
+        return delegate.taskType();
+    }
+
+    @Override
+    public void transitionToActive(final StreamTask streamTask, final 
RecordCollector recordCollector, final ThreadCache newCache) {
+        delegate.transitionToActive(streamTask, recordCollector, newCache);
+    }
+
+    @Override
+    public void transitionToStandby(final ThreadCache newCache) {
+        delegate.transitionToStandby(newCache);
+    }
+
+    @Override
+    public void registerCacheFlushListener(final String namespace, final 
ThreadCache.DirtyEntryFlushListener listener) {
+        delegate.registerCacheFlushListener(namespace, listener);
+    }
+
+    @Override
+    public <T extends StateStore> T getStateStore(final StoreBuilder<T> 
builder) {
+        return delegate.getStateStore(builder);
+    }
+
+    @Override
+    public void logChange(final String storeName, final Bytes key, final 
byte[] value, final long timestamp) {
+        delegate.logChange(storeName, key, value, timestamp);
+    }
+
+    @Override
+    public String changelogFor(final String storeName) {
+        return delegate.changelogFor(storeName);
+    }
+
+    @Override
     public void register(final StateStore store, final StateRestoreCallback 
stateRestoreCallback) {
         delegate.register(store, stateRestoreCallback);
     }
@@ -113,7 +176,7 @@ public final class ProcessorContextReverseAdapter 
implements org.apache.kafka.st
     }
 
     @Override
-    public Cancellable schedule(final Duration interval, final PunctuationType 
type, final Punctuator callback) {
+    public Cancellable schedule(final Duration interval, final PunctuationType 
type, final Punctuator callback) throws IllegalArgumentException {
         return delegate.schedule(interval, type, callback);
     }
 
@@ -130,7 +193,7 @@ public final class ProcessorContextReverseAdapter 
implements org.apache.kafka.st
     @Deprecated
     @Override
     public <K, V> void forward(final K key, final V value, final int 
childIndex) {
-        deprecatedForwarder.forward(key, value, childIndex);
+        delegate.forward(key, value, 
To.child((currentNode().children()).get(childIndex).name()));
     }
 
     @Deprecated
@@ -178,4 +241,8 @@ public final class ProcessorContextReverseAdapter 
implements org.apache.kafka.st
     public Map<String, Object> appConfigsWithPrefix(final String prefix) {
         return delegate.appConfigsWithPrefix(prefix);
     }
+
+    InternalApiProcessorContext<Object, Object> delegate() {
+        return delegate;
+    }
 }
diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
deleted file mode 100644
index bbb024d..0000000
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java
+++ /dev/null
@@ -1,546 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.api;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.StreamsMetrics;
-import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.TopologyTestDriver;
-import org.apache.kafka.streams.internals.ApiUtils;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.kstream.ValueTransformer;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.ClientUtils;
-import org.apache.kafka.streams.processor.internals.RecordCollector;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
-import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
-
-import java.io.File;
-import java.lang.reflect.Field;
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.apache.kafka.common.utils.Utils.mkEntry;
-import static org.apache.kafka.common.utils.Utils.mkMap;
-import static org.apache.kafka.common.utils.Utils.mkProperties;
-
-/**
- * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for 
users to test their {@link Processor},
- * {@link Transformer}, and {@link ValueTransformer} implementations.
- * <p>
- * The tests for this class 
(org.apache.kafka.streams.MockProcessorContextTest) include several behavioral
- * tests that serve as example usage.
- * <p>
- * Note that this class does not take any automated actions (such as firing 
scheduled punctuators).
- * It simply captures any data it witnesses.
- * If you require more automated tests, we recommend wrapping your {@link 
Processor} in a minimal source-processor-sink
- * {@link Topology} and using the {@link TopologyTestDriver}.
- */
-public class MockProcessorContext<KForward, VForward> implements 
ProcessorContext<KForward, VForward>, RecordCollector.Supplier {
-    // Immutable fields ================================================
-    private final StreamsMetricsImpl metrics;
-    private final TaskId taskId;
-    private final StreamsConfig config;
-    private final File stateDir;
-
-    // settable record metadata 
================================================
-    private String topic;
-    private Integer partition;
-    private Long offset;
-    private Headers headers;
-    private Long timestamp;
-
-    // mocks ================================================
-    private final Map<String, StateStore> stateStores = new HashMap<>();
-    private final List<CapturedPunctuator> punctuators = new LinkedList<>();
-    private final List<CapturedForward<KForward, VForward>> capturedForwards = 
new LinkedList<>();
-    private boolean committed = false;
-
-
-    /**
-     * {@link CapturedPunctuator} holds captured punctuators, along with their 
scheduling information.
-     */
-    public static final class CapturedPunctuator {
-        private final long intervalMs;
-        private final PunctuationType type;
-        private final Punctuator punctuator;
-        private boolean cancelled = false;
-
-        private CapturedPunctuator(final long intervalMs, final 
PunctuationType type, final Punctuator punctuator) {
-            this.intervalMs = intervalMs;
-            this.type = type;
-            this.punctuator = punctuator;
-        }
-
-        @SuppressWarnings("unused")
-        public long getIntervalMs() {
-            return intervalMs;
-        }
-
-        @SuppressWarnings("unused")
-        public PunctuationType getType() {
-            return type;
-        }
-
-        @SuppressWarnings("unused")
-        public Punctuator getPunctuator() {
-            return punctuator;
-        }
-
-        @SuppressWarnings({"WeakerAccess", "unused"})
-        public void cancel() {
-            cancelled = true;
-        }
-
-        @SuppressWarnings("unused")
-        public boolean cancelled() {
-            return cancelled;
-        }
-    }
-
-    public static final class CapturedForward<KForward, VForward> {
-        private final String childName;
-        private final long timestamp;
-        private final KeyValue<KForward, VForward> keyValue;
-
-        private CapturedForward(final To to, final KeyValue<KForward, 
VForward> keyValue) {
-            if (keyValue == null) {
-                throw new IllegalArgumentException("keyValue can't be null");
-            }
-
-            try {
-                final Field field = To.class.getDeclaredField("childName");
-                field.setAccessible(true);
-                childName = (String) field.get(to);
-            } catch (final IllegalAccessException | NoSuchFieldException e) {
-                throw new RuntimeException(e);
-            }
-            timestamp = getTimestamp(to);
-
-            this.keyValue = keyValue;
-        }
-
-        /**
-         * The child this data was forwarded to.
-         *
-         * @return The child name, or {@code null} if it was broadcast.
-         */
-        @SuppressWarnings({"WeakerAccess", "unused"})
-        public String childName() {
-            return childName;
-        }
-
-        /**
-         * The timestamp attached to the forwarded record.
-         *
-         * @return A timestamp, or {@code -1} if none was forwarded.
-         */
-        @SuppressWarnings("unused")
-        public long timestamp() {
-            return timestamp;
-        }
-
-        /**
-         * The data forwarded.
-         *
-         * @return A key/value pair. Not null.
-         */
-        @SuppressWarnings("unused")
-        public KeyValue<KForward, VForward> keyValue() {
-            return keyValue;
-        }
-
-        @Override
-        public String toString() {
-            return "CapturedForward{" +
-                "childName='" + childName + '\'' +
-                ", timestamp=" + timestamp +
-                ", keyValue=" + keyValue +
-                '}';
-        }
-    }
-
-    // constructors ================================================
-
-    /**
-     * Create a {@link MockProcessorContext} with dummy {@code config} and 
{@code taskId} and {@code null} {@code stateDir}.
-     * Most unit tests using this mock won't need to know the taskId,
-     * and most unit tests should be able to get by with the
-     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
-     */
-    @SuppressWarnings("unused")
-    public MockProcessorContext() {
-        this(
-            mkProperties(mkMap(
-                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""),
-                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "")
-            )),
-            new TaskId(0, 0),
-            null
-        );
-    }
-
-    /**
-     * Create a {@link MockProcessorContext} with dummy {@code taskId} and 
{@code null} {@code stateDir}.
-     * Most unit tests using this mock won't need to know the taskId,
-     * and most unit tests should be able to get by with the
-     * {@link InMemoryKeyValueStore}, so the stateDir won't matter.
-     *
-     * @param config a Properties object, used to configure the context and 
the processor.
-     */
-    @SuppressWarnings("unused")
-    public MockProcessorContext(final Properties config) {
-        this(config, new TaskId(0, 0), null);
-    }
-
-    /**
-     * Create a {@link MockProcessorContext} with a specified taskId and null 
stateDir.
-     *
-     * @param config   a {@link Properties} object, used to configure the 
context and the processor.
-     * @param taskId   a {@link TaskId}, which the context makes available via 
{@link MockProcessorContext#taskId()}.
-     * @param stateDir a {@link File}, which the context makes available viw 
{@link MockProcessorContext#stateDir()}.
-     */
-    @SuppressWarnings("unused")
-    public MockProcessorContext(final Properties config, final TaskId taskId, 
final File stateDir) {
-        final StreamsConfig streamsConfig = new 
ClientUtils.QuietStreamsConfig(config);
-        this.taskId = taskId;
-        this.config = streamsConfig;
-        this.stateDir = stateDir;
-        final MetricConfig metricConfig = new MetricConfig();
-        metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG);
-        final String threadId = Thread.currentThread().getName();
-        metrics = new StreamsMetricsImpl(
-            new Metrics(metricConfig),
-            threadId,
-            
streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG),
-            Time.SYSTEM
-        );
-        TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
taskId.toString(), metrics);
-    }
-
-    @Override
-    public String applicationId() {
-        return config.getString(StreamsConfig.APPLICATION_ID_CONFIG);
-    }
-
-    @Override
-    public TaskId taskId() {
-        return taskId;
-    }
-
-    @Override
-    public Map<String, Object> appConfigs() {
-        final Map<String, Object> combined = new HashMap<>();
-        combined.putAll(config.originals());
-        combined.putAll(config.values());
-        return combined;
-    }
-
-    @Override
-    public Map<String, Object> appConfigsWithPrefix(final String prefix) {
-        return config.originalsWithPrefix(prefix);
-    }
-
-    @Override
-    public Serde<?> keySerde() {
-        return config.defaultKeySerde();
-    }
-
-    @Override
-    public Serde<?> valueSerde() {
-        return config.defaultValueSerde();
-    }
-
-    @Override
-    public File stateDir() {
-        return stateDir;
-    }
-
-    @Override
-    public StreamsMetrics metrics() {
-        return metrics;
-    }
-
-    // settable record metadata 
================================================
-
-    /**
-     * The context exposes these metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
-     * but for the purpose of driving unit tests, you can set them directly.
-     *
-     * @param topic     A topic name
-     * @param partition A partition number
-     * @param offset    A record offset
-     * @param timestamp A record timestamp
-     */
-    @SuppressWarnings("unused")
-    public void setRecordMetadata(final String topic,
-                                  final int partition,
-                                  final long offset,
-                                  final Headers headers,
-                                  final long timestamp) {
-        this.topic = topic;
-        this.partition = partition;
-        this.offset = offset;
-        this.headers = headers;
-        this.timestamp = timestamp;
-    }
-
-    /**
-     * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
-     * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
-     *
-     * @param topic A topic name
-     */
-    @SuppressWarnings("unused")
-    public void setTopic(final String topic) {
-        this.topic = topic;
-    }
-
-    /**
-     * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
-     * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
-     *
-     * @param partition A partition number
-     */
-    @SuppressWarnings("unused")
-    public void setPartition(final int partition) {
-        this.partition = partition;
-    }
-
-    /**
-     * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
-     * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
-     *
-     * @param offset A record offset
-     */
-    @SuppressWarnings("unused")
-    public void setOffset(final long offset) {
-        this.offset = offset;
-    }
-
-    /**
-     * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
-     * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
-     *
-     * @param headers Record headers
-     */
-    @SuppressWarnings("unused")
-    public void setHeaders(final Headers headers) {
-        this.headers = headers;
-    }
-
-    /**
-     * The context exposes this metadata for use in the processor. Normally, 
they are set by the Kafka Streams framework,
-     * but for the purpose of driving unit tests, you can set it directly. 
Setting this attribute doesn't affect the others.
-     *
-     * @param timestamp A record timestamp
-     */
-    @SuppressWarnings("unused")
-    public void setTimestamp(final long timestamp) {
-        this.timestamp = timestamp;
-    }
-
-    @Override
-    public String topic() {
-        if (topic == null) {
-            throw new IllegalStateException("Topic must be set before use via 
setRecordMetadata() or setTopic().");
-        }
-        return topic;
-    }
-
-    @Override
-    public int partition() {
-        if (partition == null) {
-            throw new IllegalStateException("Partition must be set before use 
via setRecordMetadata() or setPartition().");
-        }
-        return partition;
-    }
-
-    @Override
-    public long offset() {
-        if (offset == null) {
-            throw new IllegalStateException("Offset must be set before use via 
setRecordMetadata() or setOffset().");
-        }
-        return offset;
-    }
-
-    @Override
-    public Headers headers() {
-        return headers;
-    }
-
-    @Override
-    public long timestamp() {
-        if (timestamp == null) {
-            throw new IllegalStateException("Timestamp must be set before use 
via setRecordMetadata() or setTimestamp().");
-        }
-        return timestamp;
-    }
-
-    // mocks ================================================
-
-    @Override
-    public void register(final StateStore store,
-                         final StateRestoreCallback 
stateRestoreCallbackIsIgnoredInMock) {
-        stateStores.put(store.name(), store);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public <S> S getStateStore(final String name) {
-        return (S) stateStores.get(name);
-    }
-
-    @Override
-    public Cancellable schedule(final Duration interval,
-                                final PunctuationType type,
-                                final Punctuator callback) {
-        final CapturedPunctuator capturedPunctuator =
-            new 
CapturedPunctuator(ApiUtils.validateMillisecondDuration(interval, "interval"), 
type, callback);
-
-        punctuators.add(capturedPunctuator);
-
-        return capturedPunctuator::cancel;
-    }
-
-    /**
-     * Get the punctuators scheduled so far. The returned list is not affected 
by subsequent calls to {@code schedule(...)}.
-     *
-     * @return A list of captured punctuators.
-     */
-    @SuppressWarnings("unused")
-    public List<CapturedPunctuator> scheduledPunctuators() {
-        return new LinkedList<>(punctuators);
-    }
-
-    @Override
-    public <K extends KForward, V extends VForward> void forward(final K key, 
final V value){
-        forward(key, value, To.all());
-    }
-
-    @Override
-    public <K extends KForward, V extends VForward> void forward(final K key, 
final V value, final To to){
-        capturedForwards.add(
-            new CapturedForward<>(
-                (getTimestamp(to)) == -1 ? to.withTimestamp(timestamp == null 
? -1 : timestamp) : to,
-                new KeyValue<>(key, value)
-            )
-        );
-    }
-
-    /**
-     * Get all the forwarded data this context has observed. The returned list 
will not be
-     * affected by subsequent interactions with the context. The data in the 
list is in the same order as the calls to
-     * {@code forward(...)}.
-     *
-     * @return A list of key/value pairs that were previously passed to the 
context.
-     */
-    public List<CapturedForward<KForward, VForward>> forwarded() {
-        return new LinkedList<>(capturedForwards);
-    }
-
-    /**
-     * Get all the forwarded data this context has observed for a specific 
child by name.
-     * The returned list will not be affected by subsequent interactions with 
the context.
-     * The data in the list is in the same order as the calls to {@code 
forward(...)}.
-     *
-     * @param childName The child name to retrieve forwards for
-     * @return A list of key/value pairs that were previously passed to the 
context.
-     */
-    @SuppressWarnings("unused")
-    public List<CapturedForward<KForward, VForward>> forwarded(final String 
childName) {
-        final LinkedList<CapturedForward<KForward, VForward>> result = new 
LinkedList<>();
-        for (final CapturedForward<KForward, VForward> capture : 
capturedForwards) {
-            if (capture.childName() == null || 
capture.childName().equals(childName)) {
-                result.add(capture);
-            }
-        }
-        return result;
-    }
-
-    /**
-     * Clear the captured forwarded data.
-     */
-    @SuppressWarnings("unused")
-    public void resetForwards() {
-        capturedForwards.clear();
-    }
-
-    @Override
-    public void commit() {
-        committed = true;
-    }
-
-    /**
-     * Whether {@link ProcessorContext#commit()} has been called in this 
context.
-     *
-     * @return {@code true} iff {@link ProcessorContext#commit()} has been 
called in this context since construction or reset.
-     */
-    public boolean committed() {
-        return committed;
-    }
-
-    /**
-     * Reset the commit capture to {@code false} (whether or not it was 
previously {@code true}).
-     */
-    @SuppressWarnings("unused")
-    public void resetCommit() {
-        committed = false;
-    }
-
-    @Override
-    public RecordCollector recordCollector() {
-        // This interface is assumed by state stores that add change-logging.
-        // Rather than risk a mysterious ClassCastException during unit tests, 
throw an explanatory exception.
-
-        throw new UnsupportedOperationException(
-            "MockProcessorContext does not provide record collection. " +
-                "For processor unit tests, use an in-memory state store with 
change-logging disabled. " +
-                "Alternatively, use the TopologyTestDriver for testing 
processor/store/topology integration."
-        );
-    }
-
-    private static long getTimestamp(final To to) {
-        final long timestamp;
-        try {
-            final Field field = To.class.getDeclaredField("timestamp");
-            field.setAccessible(true);
-            timestamp = (long) field.get(to);
-        } catch (final IllegalAccessException | NoSuchFieldException e) {
-            throw new RuntimeException(e);
-        }
-        return timestamp;
-    }
-}
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java
deleted file mode 100644
index 6e9c5a6..0000000
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams;
-
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.processor.api.MockProcessorContext;
-import 
org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import 
org.apache.kafka.streams.processor.internals.ProcessorContextReverseAdapter;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.junit.Test;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Iterator;
-import java.util.Properties;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class MockApiProcessorContextTest {
-    @Test
-    public void shouldCaptureOutputRecords() {
-        final Processor<String, Long, String, Long> processor = new 
Processor<String, Long, String, Long>() {
-            private ProcessorContext<String, Long> context;
-
-            @Override
-            public void init(final ProcessorContext<String, Long> context) {
-                this.context = context;
-            }
-
-            @Override
-            public void process(final String key, final Long value) {
-                context.forward(key + value, key.length() + value);
-            }
-        };
-
-        final MockProcessorContext<String, Long> context = new 
MockProcessorContext<>();
-        processor.init(context);
-
-        processor.process("foo", 5L);
-        processor.process("barbaz", 50L);
-
-        final Iterator<CapturedForward<String, Long>> forwarded = 
context.forwarded().iterator();
-        assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
-        assertEquals(new KeyValue<>("barbaz50", 56L), 
forwarded.next().keyValue());
-        assertFalse(forwarded.hasNext());
-
-        context.resetForwards();
-
-        assertEquals(0, context.forwarded().size());
-    }
-
-    @Test
-    public void shouldCaptureOutputRecordsUsingTo() {
-        final Processor<String, Long, String, Long> processor = new 
Processor<String, Long, String, Long>() {
-            private ProcessorContext<String, Long> context;
-
-            @Override
-            public void init(final ProcessorContext<String, Long> context) {
-                this.context = context;
-            }
-
-            @Override
-            public void process(final String key, final Long value) {
-                context.forward(key + value, key.length() + value, To.all());
-            }
-        };
-
-        final MockProcessorContext<String, Long> context = new 
MockProcessorContext<>();
-
-        processor.init(context);
-
-        processor.process("foo", 5L);
-        processor.process("barbaz", 50L);
-
-        final Iterator<CapturedForward<String, Long>> forwarded = 
context.forwarded().iterator();
-        assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue());
-        assertEquals(new KeyValue<>("barbaz50", 56L), 
forwarded.next().keyValue());
-        assertFalse(forwarded.hasNext());
-
-        context.resetForwards();
-
-        assertEquals(0, context.forwarded().size());
-    }
-
-    @Test
-    public void shouldCaptureRecordsOutputToChildByName() {
-        final Processor<String, Long, String, Long> processor = new 
Processor<String, Long, String, Long>() {
-            private int count = 0;
-            private ProcessorContext<String, Long> context;
-
-            @Override
-            public void init(final ProcessorContext<String, Long> context) {
-                this.context = context;
-            }
-
-            @Override
-            public void process(final String key, final Long value) {
-                if (count == 0) {
-                    context.forward("start", -1L, To.all()); // broadcast
-                }
-                final To toChild = count % 2 == 0 ? To.child("george") : 
To.child("pete");
-                context.forward(key + value, key.length() + value, toChild);
-                count++;
-            }
-        };
-
-        final MockProcessorContext<String, Long> context = new 
MockProcessorContext<>();
-
-        processor.init(context);
-
-        processor.process("foo", 5L);
-        processor.process("barbaz", 50L);
-
-        {
-            final Iterator<CapturedForward<String, Long>> forwarded = 
context.forwarded().iterator();
-
-            final CapturedForward forward1 = forwarded.next();
-            assertEquals(new KeyValue<>("start", -1L), forward1.keyValue());
-            assertNull(forward1.childName());
-
-            final CapturedForward forward2 = forwarded.next();
-            assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue());
-            assertEquals("george", forward2.childName());
-
-            final CapturedForward forward3 = forwarded.next();
-            assertEquals(new KeyValue<>("barbaz50", 56L), forward3.keyValue());
-            assertEquals("pete", forward3.childName());
-
-            assertFalse(forwarded.hasNext());
-        }
-
-        {
-            final Iterator<CapturedForward<String, Long>> forwarded = 
context.forwarded("george").iterator();
-            assertEquals(new KeyValue<>("start", -1L), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("foo5", 8L), 
forwarded.next().keyValue());
-            assertFalse(forwarded.hasNext());
-        }
-
-        {
-            final Iterator<CapturedForward<String, Long>> forwarded = 
context.forwarded("pete").iterator();
-            assertEquals(new KeyValue<>("start", -1L), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("barbaz50", 56L), 
forwarded.next().keyValue());
-            assertFalse(forwarded.hasNext());
-        }
-
-        {
-            final Iterator<CapturedForward<String, Long>> forwarded = 
context.forwarded("steve").iterator();
-            assertEquals(new KeyValue<>("start", -1L), 
forwarded.next().keyValue());
-            assertFalse(forwarded.hasNext());
-        }
-    }
-
-    @Test
-    public void shouldCaptureCommitsAndAllowReset() {
-        final Processor<String, Long, Void, Void> processor = new 
Processor<String, Long, Void, Void>() {
-            private int count = 0;
-            private ProcessorContext<Void, Void> context;
-
-            @Override
-            public void init(final ProcessorContext<Void, Void> context) {
-                this.context = context;
-            }
-
-            @Override
-            public void process(final String key, final Long value) {
-                if (++count > 2) {
-                    context.commit();
-                }
-            }
-        };
-
-        final MockProcessorContext<Void, Void> context = new 
MockProcessorContext<>();
-
-        processor.init(context);
-
-        processor.process("foo", 5L);
-        processor.process("barbaz", 50L);
-
-        assertFalse(context.committed());
-
-        processor.process("foobar", 500L);
-
-        assertTrue(context.committed());
-
-        context.resetCommit();
-
-        assertFalse(context.committed());
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldStoreAndReturnStateStores() {
-        final Processor<String, Long, Void, Void> processor = new 
Processor<String, Long, Void, Void>() {
-            private ProcessorContext<Void, Void> context;
-
-            @Override
-            public void init(final ProcessorContext<Void, Void> context) {
-                this.context = context;
-            }
-
-            @Override
-            public void process(final String key, final Long value) {
-                final KeyValueStore<String, Long> stateStore = 
context.getStateStore("my-state");
-                stateStore.put(key, (stateStore.get(key) == null ? 0 : 
stateStore.get(key)) + value);
-                stateStore.put("all", (stateStore.get("all") == null ? 0 : 
stateStore.get("all")) + value);
-            }
-        };
-
-        final MockProcessorContext<Void, Void> context = new 
MockProcessorContext<>();
-
-        final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = 
Stores.keyValueStoreBuilder(
-                Stores.inMemoryKeyValueStore("my-state"),
-                Serdes.String(),
-                Serdes.Long()).withLoggingDisabled();
-
-        final KeyValueStore<String, Long> store = storeBuilder.build();
-
-        store.init(ProcessorContextReverseAdapter.adapt(context, new 
ProcessorContextReverseAdapter.DeprecatedForwarder() {
-            @Override
-            public <K, V> void forward(final K key, final V value, final int 
childIndex) {
-                throw new UnsupportedOperationException();
-            }
-        }), store);
-
-        processor.init(context);
-
-        processor.process("foo", 5L);
-        processor.process("bar", 50L);
-
-        assertEquals(5L, (long) store.get("foo"));
-        assertEquals(50L, (long) store.get("bar"));
-        assertEquals(55L, (long) store.get("all"));
-    }
-
-    @Test
-    public void shouldCaptureApplicationAndRecordMetadata() {
-        final Properties config = new Properties();
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata");
-        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
-
-        final Processor<String, Object, String, Object> processor = new 
Processor<String, Object, String, Object>() {
-            private ProcessorContext<String, Object> context;
-
-            @Override
-            public void init(final ProcessorContext<String, Object> context) {
-                this.context = context;
-            }
-
-            @Override
-            public void process(final String key, final Object value) {
-                context.forward("appId", context.applicationId());
-                context.forward("taskId", context.taskId());
-
-                context.forward("topic", context.topic());
-                context.forward("partition", context.partition());
-                context.forward("offset", context.offset());
-                context.forward("timestamp", context.timestamp());
-
-                context.forward("key", key);
-                context.forward("value", value);
-            }
-        };
-
-        final MockProcessorContext<String, Object> context = new 
MockProcessorContext<>(config);
-        processor.init(context);
-
-        try {
-            processor.process("foo", 5L);
-            fail("Should have thrown an exception.");
-        } catch (final IllegalStateException expected) {
-            // expected, since the record metadata isn't initialized
-        }
-
-        context.resetForwards();
-        context.setRecordMetadata("t1", 0, 0L, null, 0L);
-
-        {
-            processor.process("foo", 5L);
-            final Iterator<CapturedForward<String, Object>> forwarded = 
context.forwarded().iterator();
-            assertEquals(new KeyValue<>("appId", "testMetadata"), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("topic", "t1"), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("partition", 0), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("offset", 0L), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("timestamp", 0L), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("key", "foo"), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("value", 5L), 
forwarded.next().keyValue());
-        }
-
-        context.resetForwards();
-
-        // record metadata should be "sticky"
-        context.setOffset(1L);
-        context.setTimestamp(10L);
-
-        {
-            processor.process("bar", 50L);
-            final Iterator<CapturedForward<String, Object>> forwarded = 
context.forwarded().iterator();
-            assertEquals(new KeyValue<>("appId", "testMetadata"), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("topic", "t1"), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("partition", 0), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("offset", 1L), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("timestamp", 10L), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("key", "bar"), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("value", 50L), 
forwarded.next().keyValue());
-        }
-
-        context.resetForwards();
-        // record metadata should be "sticky"
-        context.setTopic("t2");
-        context.setPartition(30);
-
-        {
-            processor.process("baz", 500L);
-            final Iterator<CapturedForward<String, Object>> forwarded = 
context.forwarded().iterator();
-            assertEquals(new KeyValue<>("appId", "testMetadata"), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("topic", "t2"), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("partition", 30), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("offset", 1L), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("timestamp", 10L), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("key", "baz"), 
forwarded.next().keyValue());
-            assertEquals(new KeyValue<>("value", 500L), 
forwarded.next().keyValue());
-        }
-    }
-
-    @Test
-    public void shouldCapturePunctuator() {
-        final Processor<String, Long, Void, Void> processor = new 
Processor<String, Long, Void, Void>() {
-            @Override
-            public void init(final ProcessorContext<Void, Void> context) {
-                context.schedule(
-                    Duration.ofSeconds(1L),
-                    PunctuationType.WALL_CLOCK_TIME,
-                    timestamp -> context.commit()
-                );
-            }
-
-            @Override
-            public void process(final String key, final Long value) {
-            }
-        };
-
-        final MockProcessorContext<Void, Void> context = new 
MockProcessorContext<>();
-
-        processor.init(context);
-
-        final MockProcessorContext.CapturedPunctuator capturedPunctuator = 
context.scheduledPunctuators().get(0);
-        assertEquals(1000L, capturedPunctuator.getIntervalMs());
-        assertEquals(PunctuationType.WALL_CLOCK_TIME, 
capturedPunctuator.getType());
-        assertFalse(capturedPunctuator.cancelled());
-
-        final Punctuator punctuator = capturedPunctuator.getPunctuator();
-        assertFalse(context.committed());
-        punctuator.punctuate(1234L);
-        assertTrue(context.committed());
-    }
-
-    @Test
-    public void fullConstructorShouldSetAllExpectedAttributes() {
-        final Properties config = new Properties();
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor");
-        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
-        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
-
-        final File dummyFile = new File("");
-        final MockProcessorContext<Void, Void> context = new 
MockProcessorContext<>(config, new TaskId(1, 1), dummyFile);
-
-        assertEquals("testFullConstructor", context.applicationId());
-        assertEquals(new TaskId(1, 1), context.taskId());
-        assertEquals("testFullConstructor", 
context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG));
-        assertEquals("testFullConstructor", 
context.appConfigsWithPrefix("application.").get("id"));
-        assertEquals(Serdes.String().getClass(), 
context.keySerde().getClass());
-        assertEquals(Serdes.Long().getClass(), 
context.valueSerde().getClass());
-        assertEquals(dummyFile, context.stateDir());
-    }
-}

Reply via email to