This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch kip-478-part-4 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit eb55d70adadc0c692d29a4c23d039131ae2f1f80 Author: John Roesler <[email protected]> AuthorDate: Tue Aug 25 19:29:00 2020 -0500 Convert test-utils and StateStore --- .../examples/wordcount/WordCountProcessorDemo.java | 21 +- .../examples/wordcount/WordCountProcessorTest.java | 10 +- .../apache/kafka/streams/processor/StateStore.java | 32 +- ...=> InternalProcessorContextReverseAdapter.java} | 6 +- .../processor/internals/ProcessorAdapter.java | 2 +- .../internals/ProcessorContextAdapter.java | 4 +- .../internals/ProcessorContextReverseAdapter.java | 129 ++--- .../internals/GlobalProcessorContextImplTest.java | 11 +- .../internals/ProcessorContextImplTest.java | 2 +- .../processor/api/MockProcessorContext.java | 546 +++++++++++++++++++++ .../kafka/streams/MockApiProcessorContextTest.java | 405 +++++++++++++++ 11 files changed, 1040 insertions(+), 128 deletions(-) diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index c3f47da..a6a9b8a 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -22,9 +22,9 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -54,17 +54,17 @@ import java.util.concurrent.CountDownLatch; */ public final class WordCountProcessorDemo { - static class MyProcessorSupplier implements ProcessorSupplier<String, String> { + static class MyProcessorSupplier implements ProcessorSupplier<String, String, String, String> { @Override - public Processor<String, String> get() { - return new Processor<String, String>() { - private ProcessorContext context; + public Processor<String, String, String, String> get() { + return new Processor<String, String, String, String>() { + private ProcessorContext<String, String> context; private KeyValueStore<String, Integer> kvStore; @Override @SuppressWarnings("unchecked") - public void init(final ProcessorContext context) { + public void init(final ProcessorContext<String, String> context) { this.context = context; this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> { try (final KeyValueIterator<String, Integer> iter = kvStore.all()) { @@ -79,7 +79,7 @@ public final class WordCountProcessorDemo { } } }); - this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts"); + this.kvStore = context.getStateStore("Counts"); } @Override @@ -96,9 +96,6 @@ public final class WordCountProcessorDemo { } } } - - @Override - public void close() {} }; } } diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java index bec77e6..5ddda08 100644 --- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java +++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java @@ -18,8 +18,8 @@ package org.apache.kafka.streams.examples.wordcount; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.MockProcessorContext; -import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.api.MockProcessorContext; +import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.junit.Test; @@ -36,7 +36,7 @@ import static org.junit.Assert.assertTrue; public class WordCountProcessorTest { @Test public void test() { - final MockProcessorContext context = new MockProcessorContext(); + final MockProcessorContext<String, String> context = new MockProcessorContext<>(); // Create, initialize, and register the state store. final KeyValueStore<String, Integer> store = @@ -48,7 +48,7 @@ public class WordCountProcessorTest { context.register(store, null); // Create and initialize the processor under test - final Processor<String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get(); + final Processor<String, String, String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get(); processor.init(context); // send a record to the processor @@ -61,7 +61,7 @@ public class WordCountProcessorTest { context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L); // finally, we can verify the output. - final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator(); + final Iterator<MockProcessorContext.CapturedForward<String, String>> capturedForwards = context.forwarded().iterator(); assertEquals(new KeyValue<>("alpha", "2"), capturedForwards.next().keyValue()); assertEquals(new KeyValue<>("beta", "1"), capturedForwards.next().keyValue()); assertEquals(new KeyValue<>("gamma", "1"), capturedForwards.next().keyValue()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index df53ee2..d143f69 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -17,6 +17,8 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.internals.ProcessorContextReverseAdapter; +import org.apache.kafka.streams.processor.api.ProcessorContext; /** * A storage engine for managing state maintained by a stream processor. @@ -49,6 +51,27 @@ public interface StateStore { * Initializes this state store. * <p> * The implementation of this function must register the root store in the context via the + * {@link org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, StateRestoreCallback)} function, + * where the first {@link StateStore} parameter should always be the passed-in {@code root} object, and + * the second parameter should be an object of user's implementation + * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog. + * <p> + * Note that if the state store engine itself supports bulk writes, users can implement another + * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to + * let users implement bulk-load restoration logic instead of restoring one record at a time. + * <p> + * This method is not called if {@link StateStore#init(ProcessorContext, org.apache.kafka.streams.processor.StateStore)} + * is implemented. + * + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition + */ + void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root); + + /** + * Initializes this state store. + * <p> + * The implementation of this function must register the root store in the context via the * {@link ProcessorContext#register(StateStore, StateRestoreCallback)} function, where the * first {@link StateStore} parameter should always be the passed-in {@code root} object, and * the second parameter should be an object of user's implementation @@ -61,7 +84,14 @@ public interface StateStore { * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - void init(ProcessorContext context, StateStore root); + default void init(final ProcessorContext<?, ?> context, final StateStore root) { + final org.apache.kafka.streams.processor.ProcessorContext adapted = + ProcessorContextReverseAdapter.adapt( + context, + new ProcessorContextReverseAdapter.UnsupportedDeprecatedForwarder() + ); + init(adapted, root); + } /** * Flush any cached data diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java similarity index 96% copy from streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java copy to streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java index 6e82a5e..11fc1f9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java @@ -34,18 +34,18 @@ import java.io.File; import java.time.Duration; import java.util.Map; -public final class ProcessorContextReverseAdapter implements InternalProcessorContext { +public final class InternalProcessorContextReverseAdapter implements InternalProcessorContext { private final InternalApiProcessorContext<Object, Object> delegate; static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) { if (delegate instanceof ProcessorContextAdapter) { return ((ProcessorContextAdapter<Object, Object>) delegate).delegate(); } else { - return new ProcessorContextReverseAdapter(delegate); + return new InternalProcessorContextReverseAdapter(delegate); } } - private ProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) { + private InternalProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) { this.delegate = delegate; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java index d8e4af4..3bbbe78 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java @@ -47,7 +47,7 @@ public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<K @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext<KOut, VOut> context) { - delegate.init(ProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context)); + delegate.init(InternalProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context)); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java index 85dbd52..8d18ec7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java @@ -42,8 +42,8 @@ public final class ProcessorContextAdapter<KForward, VForward> @SuppressWarnings("unchecked") public static <KForward, VForward> InternalApiProcessorContext<KForward, VForward> adapt(final InternalProcessorContext delegate) { - if (delegate instanceof ProcessorContextReverseAdapter) { - return (InternalApiProcessorContext<KForward, VForward>) ((ProcessorContextReverseAdapter) delegate).delegate(); + if (delegate instanceof InternalProcessorContextReverseAdapter) { + return (InternalApiProcessorContext<KForward, VForward>) ((InternalProcessorContextReverseAdapter) delegate).delegate(); } else { return new ProcessorContextAdapter<>(delegate); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java index 6e82a5e..131b776 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java @@ -18,35 +18,52 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.File; import java.time.Duration; import java.util.Map; -public final class ProcessorContextReverseAdapter implements InternalProcessorContext { - private final InternalApiProcessorContext<Object, Object> delegate; +public final class ProcessorContextReverseAdapter implements org.apache.kafka.streams.processor.ProcessorContext { + private final ProcessorContext<Object, Object> delegate; + private final DeprecatedForwarder deprecatedForwarder; - static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) { + public interface DeprecatedForwarder { + <K, V> void forward(final K key, final V value, final int childIndex); + } + + public static final class UnsupportedDeprecatedForwarder implements DeprecatedForwarder { + @Override + public <K, V> void forward(final K key, final V value, final int childIndex) { + throw new UnsupportedOperationException("Forwarding by index was deprecated in 2.0 and is not supported by this ProcessorContext."); + } + } + + @SuppressWarnings("unchecked") + public static org.apache.kafka.streams.processor.ProcessorContext adapt(final ProcessorContext<?, ?> delegate, + final DeprecatedForwarder deprecatedForwarder) { if (delegate instanceof ProcessorContextAdapter) { - return ((ProcessorContextAdapter<Object, Object>) delegate).delegate(); + return ((ProcessorContextAdapter<?, ?>) delegate).delegate(); + } else if (delegate instanceof InternalApiProcessorContext) { + return InternalProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) delegate); } else { - return new ProcessorContextReverseAdapter(delegate); + return new ProcessorContextReverseAdapter(delegate, deprecatedForwarder); } } - private ProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) { - this.delegate = delegate; + @SuppressWarnings("unchecked") + private ProcessorContextReverseAdapter(final ProcessorContext<?, ?> delegate, + final DeprecatedForwarder deprecatedForwarder) { + this.delegate = (ProcessorContext<Object, Object>) delegate; + this.deprecatedForwarder = deprecatedForwarder; } @Override @@ -75,91 +92,11 @@ public final class ProcessorContextReverseAdapter implements InternalProcessorCo } @Override - public StreamsMetricsImpl metrics() { + public StreamsMetrics metrics() { return delegate.metrics(); } @Override - public void setSystemTimeMs(final long timeMs) { - delegate.setSystemTimeMs(timeMs); - } - - @Override - public long currentSystemTimeMs() { - return delegate.currentSystemTimeMs(); - } - - @Override - public ProcessorRecordContext recordContext() { - return delegate.recordContext(); - } - - @Override - public void setRecordContext(final ProcessorRecordContext recordContext) { - delegate.setRecordContext(recordContext); - } - - @Override - public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) { - delegate.setCurrentNode(currentNode); - } - - @Override - public ProcessorNode<?, ?, ?, ?> currentNode() { - return delegate.currentNode(); - } - - @Override - public ThreadCache cache() { - return delegate.cache(); - } - - @Override - public void initialize() { - delegate.initialize(); - } - - @Override - public void uninitialize() { - delegate.uninitialize(); - } - - @Override - public Task.TaskType taskType() { - return delegate.taskType(); - } - - @Override - public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) { - delegate.transitionToActive(streamTask, recordCollector, newCache); - } - - @Override - public void transitionToStandby(final ThreadCache newCache) { - delegate.transitionToStandby(newCache); - } - - @Override - public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) { - delegate.registerCacheFlushListener(namespace, listener); - } - - @Override - public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) { - return delegate.getStateStore(builder); - } - - @Override - public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) { - delegate.logChange(storeName, key, value, timestamp); - } - - @Override - public String changelogFor(final String storeName) { - return delegate.changelogFor(storeName); - } - - @Override public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) { delegate.register(store, stateRestoreCallback); } @@ -176,7 +113,7 @@ public final class ProcessorContextReverseAdapter implements InternalProcessorCo } @Override - public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException { + public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) { return delegate.schedule(interval, type, callback); } @@ -193,7 +130,7 @@ public final class ProcessorContextReverseAdapter implements InternalProcessorCo @Deprecated @Override public <K, V> void forward(final K key, final V value, final int childIndex) { - delegate.forward(key, value, To.child((currentNode().children()).get(childIndex).name())); + deprecatedForwarder.forward(key, value, childIndex); } @Deprecated @@ -241,8 +178,4 @@ public final class ProcessorContextReverseAdapter implements InternalProcessorCo public Map<String, Object> appConfigsWithPrefix(final String prefix) { return delegate.appConfigsWithPrefix(prefix); } - - InternalApiProcessorContext<Object, Object> delegate() { - return delegate; - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java index ad8cd0a..e180010 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.Task.TaskType; @@ -142,7 +143,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForKeyValueStore() { final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME); try { - store.init(null, null); + store.init((ProcessorContext) null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -151,7 +152,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForTimestampedKeyValueStore() { final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME); try { - store.init(null, null); + store.init((ProcessorContext) null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -160,7 +161,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForWindowStore() { final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME); try { - store.init(null, null); + store.init((ProcessorContext) null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -169,7 +170,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForTimestampedWindowStore() { final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME); try { - store.init(null, null); + store.init((ProcessorContext) null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -178,7 +179,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForSessionStore() { final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME); try { - store.init(null, null); + store.init((ProcessorContext) null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index f4b62c3d..ab88efa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -767,7 +767,7 @@ public class ProcessorContextImplTest { assertTrue(store.persistent()); assertTrue(store.isOpen()); - checkThrowsUnsupportedOperation(() -> store.init(null, null), "init()"); + checkThrowsUnsupportedOperation(() -> store.init((ProcessorContext) null, null), "init()"); checkThrowsUnsupportedOperation(store::close, "close()"); } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java new file mode 100644 index 0000000..acad94d --- /dev/null +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java @@ -0,0 +1,546 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.internals.ApiUtils; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.ValueTransformer; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.internals.ClientUtils; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; + +import java.io.File; +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; + +/** + * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor}, + * {@link Transformer}, and {@link ValueTransformer} implementations. + * <p> + * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral + * tests that serve as example usage. + * <p> + * Note that this class does not take any automated actions (such as firing scheduled punctuators). + * It simply captures any data it witnesses. + * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink + * {@link Topology} and using the {@link TopologyTestDriver}. + */ +public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier { + // Immutable fields ================================================ + private final StreamsMetricsImpl metrics; + private final TaskId taskId; + private final StreamsConfig config; + private final File stateDir; + + // settable record metadata ================================================ + private String topic; + private Integer partition; + private Long offset; + private Headers headers; + private Long timestamp; + + // mocks ================================================ + private final Map<String, StateStore> stateStores = new HashMap<>(); + private final List<CapturedPunctuator> punctuators = new LinkedList<>(); + private final List<CapturedForward<KForward, VForward>> capturedForwards = new LinkedList<>(); + private boolean committed = false; + + + /** + * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information. + */ + public static final class CapturedPunctuator { + private final long intervalMs; + private final PunctuationType type; + private final Punctuator punctuator; + private boolean cancelled = false; + + private CapturedPunctuator(final long intervalMs, final PunctuationType type, final Punctuator punctuator) { + this.intervalMs = intervalMs; + this.type = type; + this.punctuator = punctuator; + } + + @SuppressWarnings("unused") + public long getIntervalMs() { + return intervalMs; + } + + @SuppressWarnings("unused") + public PunctuationType getType() { + return type; + } + + @SuppressWarnings("unused") + public Punctuator getPunctuator() { + return punctuator; + } + + @SuppressWarnings({"WeakerAccess", "unused"}) + public void cancel() { + cancelled = true; + } + + @SuppressWarnings("unused") + public boolean cancelled() { + return cancelled; + } + } + + public static final class CapturedForward<KForward, VForward> { + private final String childName; + private final long timestamp; + private final KeyValue<KForward, VForward> keyValue; + + private CapturedForward(final To to, final KeyValue<KForward, VForward> keyValue) { + if (keyValue == null) { + throw new IllegalArgumentException("keyValue can't be null"); + } + + try { + final Field field = To.class.getDeclaredField("childName"); + field.setAccessible(true); + childName = (String) field.get(to); + } catch (final IllegalAccessException | NoSuchFieldException e) { + throw new RuntimeException(e); + } + timestamp = getTimestamp(to); + + this.keyValue = keyValue; + } + + /** + * The child this data was forwarded to. + * + * @return The child name, or {@code null} if it was broadcast. + */ + @SuppressWarnings("unused") + public String childName() { + return childName; + } + + /** + * The timestamp attached to the forwarded record. + * + * @return A timestamp, or {@code -1} if none was forwarded. + */ + @SuppressWarnings("unused") + public long timestamp() { + return timestamp; + } + + /** + * The data forwarded. + * + * @return A key/value pair. Not null. + */ + @SuppressWarnings("unused") + public KeyValue<KForward, VForward> keyValue() { + return keyValue; + } + + @Override + public String toString() { + return "CapturedForward{" + + "childName='" + childName + '\'' + + ", timestamp=" + timestamp + + ", keyValue=" + keyValue + + '}'; + } + } + + // constructors ================================================ + + /** + * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}. + * Most unit tests using this mock won't need to know the taskId, + * and most unit tests should be able to get by with the + * {@link InMemoryKeyValueStore}, so the stateDir won't matter. + */ + @SuppressWarnings("unused") + public MockProcessorContext() { + this( + mkProperties(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "") + )), + new TaskId(0, 0), + null + ); + } + + /** + * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}. + * Most unit tests using this mock won't need to know the taskId, + * and most unit tests should be able to get by with the + * {@link InMemoryKeyValueStore}, so the stateDir won't matter. + * + * @param config a Properties object, used to configure the context and the processor. + */ + @SuppressWarnings("unused") + public MockProcessorContext(final Properties config) { + this(config, new TaskId(0, 0), null); + } + + /** + * Create a {@link MockProcessorContext} with a specified taskId and null stateDir. + * + * @param config a {@link Properties} object, used to configure the context and the processor. + * @param taskId a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}. + * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}. + */ + @SuppressWarnings("unused") + public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) { + final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config); + this.taskId = taskId; + this.config = streamsConfig; + this.stateDir = stateDir; + final MetricConfig metricConfig = new MetricConfig(); + metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG); + final String threadId = Thread.currentThread().getName(); + metrics = new StreamsMetricsImpl( + new Metrics(metricConfig), + threadId, + streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), + Time.SYSTEM + ); + TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics); + } + + @Override + public String applicationId() { + return config.getString(StreamsConfig.APPLICATION_ID_CONFIG); + } + + @Override + public TaskId taskId() { + return taskId; + } + + @Override + public Map<String, Object> appConfigs() { + final Map<String, Object> combined = new HashMap<>(); + combined.putAll(config.originals()); + combined.putAll(config.values()); + return combined; + } + + @Override + public Map<String, Object> appConfigsWithPrefix(final String prefix) { + return config.originalsWithPrefix(prefix); + } + + @Override + public Serde<?> keySerde() { + return config.defaultKeySerde(); + } + + @Override + public Serde<?> valueSerde() { + return config.defaultValueSerde(); + } + + @Override + public File stateDir() { + return stateDir; + } + + @Override + public StreamsMetrics metrics() { + return metrics; + } + + // settable record metadata ================================================ + + /** + * The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework, + * but for the purpose of driving unit tests, you can set them directly. + * + * @param topic A topic name + * @param partition A partition number + * @param offset A record offset + * @param timestamp A record timestamp + */ + @SuppressWarnings("unused") + public void setRecordMetadata(final String topic, + final int partition, + final long offset, + final Headers headers, + final long timestamp) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.headers = headers; + this.timestamp = timestamp; + } + + /** + * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, + * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. + * + * @param topic A topic name + */ + @SuppressWarnings("unused") + public void setTopic(final String topic) { + this.topic = topic; + } + + /** + * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, + * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. + * + * @param partition A partition number + */ + @SuppressWarnings("unused") + public void setPartition(final int partition) { + this.partition = partition; + } + + /** + * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, + * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. + * + * @param offset A record offset + */ + @SuppressWarnings("unused") + public void setOffset(final long offset) { + this.offset = offset; + } + + /** + * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, + * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. + * + * @param headers Record headers + */ + @SuppressWarnings("unused") + public void setHeaders(final Headers headers) { + this.headers = headers; + } + + /** + * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, + * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. + * + * @param timestamp A record timestamp + */ + @SuppressWarnings("unused") + public void setTimestamp(final long timestamp) { + this.timestamp = timestamp; + } + + @Override + public String topic() { + if (topic == null) { + throw new IllegalStateException("Topic must be set before use via setRecordMetadata() or setTopic()."); + } + return topic; + } + + @Override + public int partition() { + if (partition == null) { + throw new IllegalStateException("Partition must be set before use via setRecordMetadata() or setPartition()."); + } + return partition; + } + + @Override + public long offset() { + if (offset == null) { + throw new IllegalStateException("Offset must be set before use via setRecordMetadata() or setOffset()."); + } + return offset; + } + + @Override + public Headers headers() { + return headers; + } + + @Override + public long timestamp() { + if (timestamp == null) { + throw new IllegalStateException("Timestamp must be set before use via setRecordMetadata() or setTimestamp()."); + } + return timestamp; + } + + // mocks ================================================ + + @Override + public void register(final StateStore store, + final StateRestoreCallback stateRestoreCallbackIsIgnoredInMock) { + stateStores.put(store.name(), store); + } + + @SuppressWarnings("unchecked") + @Override + public <S extends StateStore> S getStateStore(final String name) { + return (S) stateStores.get(name); + } + + @Override + public Cancellable schedule(final Duration interval, + final PunctuationType type, + final Punctuator callback) { + final CapturedPunctuator capturedPunctuator = + new CapturedPunctuator(ApiUtils.validateMillisecondDuration(interval, "interval"), type, callback); + + punctuators.add(capturedPunctuator); + + return capturedPunctuator::cancel; + } + + /** + * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}. + * + * @return A list of captured punctuators. + */ + @SuppressWarnings("unused") + public List<CapturedPunctuator> scheduledPunctuators() { + return new LinkedList<>(punctuators); + } + + @Override + public <K extends KForward, V extends VForward> void forward(final K key, final V value) { + forward(key, value, To.all()); + } + + @Override + public <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to) { + capturedForwards.add( + new CapturedForward<>( + (getTimestamp(to)) == -1 ? to.withTimestamp(timestamp == null ? -1 : timestamp) : to, + new KeyValue<>(key, value) + ) + ); + } + + /** + * Get all the forwarded data this context has observed. The returned list will not be + * affected by subsequent interactions with the context. The data in the list is in the same order as the calls to + * {@code forward(...)}. + * + * @return A list of key/value pairs that were previously passed to the context. + */ + public List<CapturedForward<KForward, VForward>> forwarded() { + return new LinkedList<>(capturedForwards); + } + + /** + * Get all the forwarded data this context has observed for a specific child by name. + * The returned list will not be affected by subsequent interactions with the context. + * The data in the list is in the same order as the calls to {@code forward(...)}. + * + * @param childName The child name to retrieve forwards for + * @return A list of key/value pairs that were previously passed to the context. + */ + @SuppressWarnings("unused") + public List<CapturedForward<KForward, VForward>> forwarded(final String childName) { + final LinkedList<CapturedForward<KForward, VForward>> result = new LinkedList<>(); + for (final CapturedForward<KForward, VForward> capture : capturedForwards) { + if (capture.childName() == null || capture.childName().equals(childName)) { + result.add(capture); + } + } + return result; + } + + /** + * Clear the captured forwarded data. + */ + @SuppressWarnings("unused") + public void resetForwards() { + capturedForwards.clear(); + } + + @Override + public void commit() { + committed = true; + } + + /** + * Whether {@link ProcessorContext#commit()} has been called in this context. + * + * @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset. + */ + public boolean committed() { + return committed; + } + + /** + * Reset the commit capture to {@code false} (whether or not it was previously {@code true}). + */ + @SuppressWarnings("unused") + public void resetCommit() { + committed = false; + } + + @Override + public RecordCollector recordCollector() { + // This interface is assumed by state stores that add change-logging. + // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception. + + throw new UnsupportedOperationException( + "MockProcessorContext does not provide record collection. " + + "For processor unit tests, use an in-memory state store with change-logging disabled. " + + "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration." + ); + } + + private static long getTimestamp(final To to) { + final long timestamp; + try { + final Field field = To.class.getDeclaredField("timestamp"); + field.setAccessible(true); + timestamp = (long) field.get(to); + } catch (final IllegalAccessException | NoSuchFieldException e) { + throw new RuntimeException(e); + } + return timestamp; + } +} diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java new file mode 100644 index 0000000..6e9c5a6 --- /dev/null +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.processor.api.MockProcessorContext; +import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextReverseAdapter; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.junit.Test; + +import java.io.File; +import java.time.Duration; +import java.util.Iterator; +import java.util.Properties; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class MockApiProcessorContextTest { + @Test + public void shouldCaptureOutputRecords() { + final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() { + private ProcessorContext<String, Long> context; + + @Override + public void init(final ProcessorContext<String, Long> context) { + this.context = context; + } + + @Override + public void process(final String key, final Long value) { + context.forward(key + value, key.length() + value); + } + }; + + final MockProcessorContext<String, Long> context = new MockProcessorContext<>(); + processor.init(context); + + processor.process("foo", 5L); + processor.process("barbaz", 50L); + + final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded().iterator(); + assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue()); + assertFalse(forwarded.hasNext()); + + context.resetForwards(); + + assertEquals(0, context.forwarded().size()); + } + + @Test + public void shouldCaptureOutputRecordsUsingTo() { + final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() { + private ProcessorContext<String, Long> context; + + @Override + public void init(final ProcessorContext<String, Long> context) { + this.context = context; + } + + @Override + public void process(final String key, final Long value) { + context.forward(key + value, key.length() + value, To.all()); + } + }; + + final MockProcessorContext<String, Long> context = new MockProcessorContext<>(); + + processor.init(context); + + processor.process("foo", 5L); + processor.process("barbaz", 50L); + + final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded().iterator(); + assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue()); + assertFalse(forwarded.hasNext()); + + context.resetForwards(); + + assertEquals(0, context.forwarded().size()); + } + + @Test + public void shouldCaptureRecordsOutputToChildByName() { + final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() { + private int count = 0; + private ProcessorContext<String, Long> context; + + @Override + public void init(final ProcessorContext<String, Long> context) { + this.context = context; + } + + @Override + public void process(final String key, final Long value) { + if (count == 0) { + context.forward("start", -1L, To.all()); // broadcast + } + final To toChild = count % 2 == 0 ? To.child("george") : To.child("pete"); + context.forward(key + value, key.length() + value, toChild); + count++; + } + }; + + final MockProcessorContext<String, Long> context = new MockProcessorContext<>(); + + processor.init(context); + + processor.process("foo", 5L); + processor.process("barbaz", 50L); + + { + final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded().iterator(); + + final CapturedForward forward1 = forwarded.next(); + assertEquals(new KeyValue<>("start", -1L), forward1.keyValue()); + assertNull(forward1.childName()); + + final CapturedForward forward2 = forwarded.next(); + assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue()); + assertEquals("george", forward2.childName()); + + final CapturedForward forward3 = forwarded.next(); + assertEquals(new KeyValue<>("barbaz50", 56L), forward3.keyValue()); + assertEquals("pete", forward3.childName()); + + assertFalse(forwarded.hasNext()); + } + + { + final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded("george").iterator(); + assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue()); + assertFalse(forwarded.hasNext()); + } + + { + final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded("pete").iterator(); + assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue()); + assertFalse(forwarded.hasNext()); + } + + { + final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded("steve").iterator(); + assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue()); + assertFalse(forwarded.hasNext()); + } + } + + @Test + public void shouldCaptureCommitsAndAllowReset() { + final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() { + private int count = 0; + private ProcessorContext<Void, Void> context; + + @Override + public void init(final ProcessorContext<Void, Void> context) { + this.context = context; + } + + @Override + public void process(final String key, final Long value) { + if (++count > 2) { + context.commit(); + } + } + }; + + final MockProcessorContext<Void, Void> context = new MockProcessorContext<>(); + + processor.init(context); + + processor.process("foo", 5L); + processor.process("barbaz", 50L); + + assertFalse(context.committed()); + + processor.process("foobar", 500L); + + assertTrue(context.committed()); + + context.resetCommit(); + + assertFalse(context.committed()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldStoreAndReturnStateStores() { + final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() { + private ProcessorContext<Void, Void> context; + + @Override + public void init(final ProcessorContext<Void, Void> context) { + this.context = context; + } + + @Override + public void process(final String key, final Long value) { + final KeyValueStore<String, Long> stateStore = context.getStateStore("my-state"); + stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value); + stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value); + } + }; + + final MockProcessorContext<Void, Void> context = new MockProcessorContext<>(); + + final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore("my-state"), + Serdes.String(), + Serdes.Long()).withLoggingDisabled(); + + final KeyValueStore<String, Long> store = storeBuilder.build(); + + store.init(ProcessorContextReverseAdapter.adapt(context, new ProcessorContextReverseAdapter.DeprecatedForwarder() { + @Override + public <K, V> void forward(final K key, final V value, final int childIndex) { + throw new UnsupportedOperationException(); + } + }), store); + + processor.init(context); + + processor.process("foo", 5L); + processor.process("bar", 50L); + + assertEquals(5L, (long) store.get("foo")); + assertEquals(50L, (long) store.get("bar")); + assertEquals(55L, (long) store.get("all")); + } + + @Test + public void shouldCaptureApplicationAndRecordMetadata() { + final Properties config = new Properties(); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata"); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ""); + + final Processor<String, Object, String, Object> processor = new Processor<String, Object, String, Object>() { + private ProcessorContext<String, Object> context; + + @Override + public void init(final ProcessorContext<String, Object> context) { + this.context = context; + } + + @Override + public void process(final String key, final Object value) { + context.forward("appId", context.applicationId()); + context.forward("taskId", context.taskId()); + + context.forward("topic", context.topic()); + context.forward("partition", context.partition()); + context.forward("offset", context.offset()); + context.forward("timestamp", context.timestamp()); + + context.forward("key", key); + context.forward("value", value); + } + }; + + final MockProcessorContext<String, Object> context = new MockProcessorContext<>(config); + processor.init(context); + + try { + processor.process("foo", 5L); + fail("Should have thrown an exception."); + } catch (final IllegalStateException expected) { + // expected, since the record metadata isn't initialized + } + + context.resetForwards(); + context.setRecordMetadata("t1", 0, 0L, null, 0L); + + { + processor.process("foo", 5L); + final Iterator<CapturedForward<String, Object>> forwarded = context.forwarded().iterator(); + assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("offset", 0L), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("timestamp", 0L), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("key", "foo"), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("value", 5L), forwarded.next().keyValue()); + } + + context.resetForwards(); + + // record metadata should be "sticky" + context.setOffset(1L); + context.setTimestamp(10L); + + { + processor.process("bar", 50L); + final Iterator<CapturedForward<String, Object>> forwarded = context.forwarded().iterator(); + assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("key", "bar"), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("value", 50L), forwarded.next().keyValue()); + } + + context.resetForwards(); + // record metadata should be "sticky" + context.setTopic("t2"); + context.setPartition(30); + + { + processor.process("baz", 500L); + final Iterator<CapturedForward<String, Object>> forwarded = context.forwarded().iterator(); + assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("topic", "t2"), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("partition", 30), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("key", "baz"), forwarded.next().keyValue()); + assertEquals(new KeyValue<>("value", 500L), forwarded.next().keyValue()); + } + } + + @Test + public void shouldCapturePunctuator() { + final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() { + @Override + public void init(final ProcessorContext<Void, Void> context) { + context.schedule( + Duration.ofSeconds(1L), + PunctuationType.WALL_CLOCK_TIME, + timestamp -> context.commit() + ); + } + + @Override + public void process(final String key, final Long value) { + } + }; + + final MockProcessorContext<Void, Void> context = new MockProcessorContext<>(); + + processor.init(context); + + final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0); + assertEquals(1000L, capturedPunctuator.getIntervalMs()); + assertEquals(PunctuationType.WALL_CLOCK_TIME, capturedPunctuator.getType()); + assertFalse(capturedPunctuator.cancelled()); + + final Punctuator punctuator = capturedPunctuator.getPunctuator(); + assertFalse(context.committed()); + punctuator.punctuate(1234L); + assertTrue(context.committed()); + } + + @Test + public void fullConstructorShouldSetAllExpectedAttributes() { + final Properties config = new Properties(); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor"); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ""); + config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + + final File dummyFile = new File(""); + final MockProcessorContext<Void, Void> context = new MockProcessorContext<>(config, new TaskId(1, 1), dummyFile); + + assertEquals("testFullConstructor", context.applicationId()); + assertEquals(new TaskId(1, 1), context.taskId()); + assertEquals("testFullConstructor", context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG)); + assertEquals("testFullConstructor", context.appConfigsWithPrefix("application.").get("id")); + assertEquals(Serdes.String().getClass(), context.keySerde().getClass()); + assertEquals(Serdes.Long().getClass(), context.valueSerde().getClass()); + assertEquals(dummyFile, context.stateDir()); + } +}
