This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch kip-478-part-3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit f58e275aaabdc6aeee531cb2102d62798746fab8 Author: John Roesler <[email protected]> AuthorDate: Tue Aug 25 17:40:29 2020 -0500 roll back statestore change --- .../examples/wordcount/WordCountProcessorDemo.java | 21 +- .../examples/wordcount/WordCountProcessorTest.java | 10 +- .../apache/kafka/streams/processor/StateStore.java | 32 +- .../InternalProcessorContextReverseAdapter.java | 248 ---------- .../processor/internals/ProcessorAdapter.java | 2 +- .../internals/ProcessorContextAdapter.java | 4 +- .../internals/ProcessorContextReverseAdapter.java | 129 +++-- .../processor/api/MockProcessorContext.java | 546 --------------------- .../kafka/streams/MockApiProcessorContextTest.java | 405 --------------- 9 files changed, 119 insertions(+), 1278 deletions(-) diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java index a6a9b8a..c3f47da 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -22,9 +22,9 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.ProcessorContext; -import org.apache.kafka.streams.processor.api.ProcessorSupplier; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; @@ -54,17 +54,17 @@ import java.util.concurrent.CountDownLatch; */ public final class WordCountProcessorDemo { - static class MyProcessorSupplier implements ProcessorSupplier<String, String, String, String> { + static class MyProcessorSupplier implements ProcessorSupplier<String, String> { @Override - public Processor<String, String, String, String> get() { - return new Processor<String, String, String, String>() { - private ProcessorContext<String, String> context; + public Processor<String, String> get() { + return new Processor<String, String>() { + private ProcessorContext context; private KeyValueStore<String, Integer> kvStore; @Override @SuppressWarnings("unchecked") - public void init(final ProcessorContext<String, String> context) { + public void init(final ProcessorContext context) { this.context = context; this.context.schedule(Duration.ofSeconds(1), PunctuationType.STREAM_TIME, timestamp -> { try (final KeyValueIterator<String, Integer> iter = kvStore.all()) { @@ -79,7 +79,7 @@ public final class WordCountProcessorDemo { } } }); - this.kvStore = context.getStateStore("Counts"); + this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts"); } @Override @@ -96,6 +96,9 @@ public final class WordCountProcessorDemo { } } } + + @Override + public void close() {} }; } } diff --git a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java index 5ddda08..bec77e6 100644 --- a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java +++ b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java @@ -18,8 +18,8 @@ package org.apache.kafka.streams.examples.wordcount; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.processor.api.MockProcessorContext; -import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.MockProcessorContext; +import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.junit.Test; @@ -36,7 +36,7 @@ import static org.junit.Assert.assertTrue; public class WordCountProcessorTest { @Test public void test() { - final MockProcessorContext<String, String> context = new MockProcessorContext<>(); + final MockProcessorContext context = new MockProcessorContext(); // Create, initialize, and register the state store. final KeyValueStore<String, Integer> store = @@ -48,7 +48,7 @@ public class WordCountProcessorTest { context.register(store, null); // Create and initialize the processor under test - final Processor<String, String, String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get(); + final Processor<String, String> processor = new WordCountProcessorDemo.MyProcessorSupplier().get(); processor.init(context); // send a record to the processor @@ -61,7 +61,7 @@ public class WordCountProcessorTest { context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L); // finally, we can verify the output. - final Iterator<MockProcessorContext.CapturedForward<String, String>> capturedForwards = context.forwarded().iterator(); + final Iterator<MockProcessorContext.CapturedForward> capturedForwards = context.forwarded().iterator(); assertEquals(new KeyValue<>("alpha", "2"), capturedForwards.next().keyValue()); assertEquals(new KeyValue<>("beta", "1"), capturedForwards.next().keyValue()); assertEquals(new KeyValue<>("gamma", "1"), capturedForwards.next().keyValue()); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java index d143f69..df53ee2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java @@ -17,8 +17,6 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.internals.ProcessorContextReverseAdapter; -import org.apache.kafka.streams.processor.api.ProcessorContext; /** * A storage engine for managing state maintained by a stream processor. @@ -51,27 +49,6 @@ public interface StateStore { * Initializes this state store. * <p> * The implementation of this function must register the root store in the context via the - * {@link org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, StateRestoreCallback)} function, - * where the first {@link StateStore} parameter should always be the passed-in {@code root} object, and - * the second parameter should be an object of user's implementation - * of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog. - * <p> - * Note that if the state store engine itself supports bulk writes, users can implement another - * interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to - * let users implement bulk-load restoration logic instead of restoring one record at a time. - * <p> - * This method is not called if {@link StateStore#init(ProcessorContext, org.apache.kafka.streams.processor.StateStore)} - * is implemented. - * - * @throws IllegalStateException If store gets registered after initialized is already finished - * @throws StreamsException if the store's change log does not contain the partition - */ - void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root); - - /** - * Initializes this state store. - * <p> - * The implementation of this function must register the root store in the context via the * {@link ProcessorContext#register(StateStore, StateRestoreCallback)} function, where the * first {@link StateStore} parameter should always be the passed-in {@code root} object, and * the second parameter should be an object of user's implementation @@ -84,14 +61,7 @@ public interface StateStore { * @throws IllegalStateException If store gets registered after initialized is already finished * @throws StreamsException if the store's change log does not contain the partition */ - default void init(final ProcessorContext<?, ?> context, final StateStore root) { - final org.apache.kafka.streams.processor.ProcessorContext adapted = - ProcessorContextReverseAdapter.adapt( - context, - new ProcessorContextReverseAdapter.UnsupportedDeprecatedForwarder() - ); - init(adapted, root); - } + void init(ProcessorContext context, StateStore root); /** * Flush any cached data diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java deleted file mode 100644 index 11fc1f9..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContextReverseAdapter.java +++ /dev/null @@ -1,248 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.processor.Cancellable; -import org.apache.kafka.streams.processor.PunctuationType; -import org.apache.kafka.streams.processor.Punctuator; -import org.apache.kafka.streams.processor.StateRestoreCallback; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.To; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.internals.ThreadCache; - -import java.io.File; -import java.time.Duration; -import java.util.Map; - -public final class InternalProcessorContextReverseAdapter implements InternalProcessorContext { - private final InternalApiProcessorContext<Object, Object> delegate; - - static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) { - if (delegate instanceof ProcessorContextAdapter) { - return ((ProcessorContextAdapter<Object, Object>) delegate).delegate(); - } else { - return new InternalProcessorContextReverseAdapter(delegate); - } - } - - private InternalProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) { - this.delegate = delegate; - } - - @Override - public String applicationId() { - return delegate.applicationId(); - } - - @Override - public TaskId taskId() { - return delegate.taskId(); - } - - @Override - public Serde<?> keySerde() { - return delegate.keySerde(); - } - - @Override - public Serde<?> valueSerde() { - return delegate.valueSerde(); - } - - @Override - public File stateDir() { - return delegate.stateDir(); - } - - @Override - public StreamsMetricsImpl metrics() { - return delegate.metrics(); - } - - @Override - public void setSystemTimeMs(final long timeMs) { - delegate.setSystemTimeMs(timeMs); - } - - @Override - public long currentSystemTimeMs() { - return delegate.currentSystemTimeMs(); - } - - @Override - public ProcessorRecordContext recordContext() { - return delegate.recordContext(); - } - - @Override - public void setRecordContext(final ProcessorRecordContext recordContext) { - delegate.setRecordContext(recordContext); - } - - @Override - public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) { - delegate.setCurrentNode(currentNode); - } - - @Override - public ProcessorNode<?, ?, ?, ?> currentNode() { - return delegate.currentNode(); - } - - @Override - public ThreadCache cache() { - return delegate.cache(); - } - - @Override - public void initialize() { - delegate.initialize(); - } - - @Override - public void uninitialize() { - delegate.uninitialize(); - } - - @Override - public Task.TaskType taskType() { - return delegate.taskType(); - } - - @Override - public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) { - delegate.transitionToActive(streamTask, recordCollector, newCache); - } - - @Override - public void transitionToStandby(final ThreadCache newCache) { - delegate.transitionToStandby(newCache); - } - - @Override - public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) { - delegate.registerCacheFlushListener(namespace, listener); - } - - @Override - public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) { - return delegate.getStateStore(builder); - } - - @Override - public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) { - delegate.logChange(storeName, key, value, timestamp); - } - - @Override - public String changelogFor(final String storeName) { - return delegate.changelogFor(storeName); - } - - @Override - public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) { - delegate.register(store, stateRestoreCallback); - } - - @Override - public StateStore getStateStore(final String name) { - return delegate.getStateStore(name); - } - - @Deprecated - @Override - public Cancellable schedule(final long intervalMs, final PunctuationType type, final Punctuator callback) { - return delegate.schedule(Duration.ofMillis(intervalMs), type, callback); - } - - @Override - public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException { - return delegate.schedule(interval, type, callback); - } - - @Override - public <K, V> void forward(final K key, final V value) { - delegate.forward(key, value); - } - - @Override - public <K, V> void forward(final K key, final V value, final To to) { - delegate.forward(key, value, to); - } - - @Deprecated - @Override - public <K, V> void forward(final K key, final V value, final int childIndex) { - delegate.forward(key, value, To.child((currentNode().children()).get(childIndex).name())); - } - - @Deprecated - @Override - public <K, V> void forward(final K key, final V value, final String childName) { - delegate.forward(key, value, To.child(childName)); - } - - @Override - public void commit() { - delegate.commit(); - } - - @Override - public String topic() { - return delegate.topic(); - } - - @Override - public int partition() { - return delegate.partition(); - } - - @Override - public long offset() { - return delegate.offset(); - } - - @Override - public Headers headers() { - return delegate.headers(); - } - - @Override - public long timestamp() { - return delegate.timestamp(); - } - - @Override - public Map<String, Object> appConfigs() { - return delegate.appConfigs(); - } - - @Override - public Map<String, Object> appConfigsWithPrefix(final String prefix) { - return delegate.appConfigsWithPrefix(prefix); - } - - InternalApiProcessorContext<Object, Object> delegate() { - return delegate; - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java index 5a9d881..948109e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java @@ -47,7 +47,7 @@ public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<K @SuppressWarnings("unchecked") @Override public void init(final ProcessorContext<KOut, VOut> context) { - delegate.init(InternalProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context)); + delegate.init(ProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) context)); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java index 1704ac4..410adcf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java @@ -42,8 +42,8 @@ public final class ProcessorContextAdapter<KForward, VForward> @SuppressWarnings("unchecked") public static <KForward, VForward> InternalApiProcessorContext<KForward, VForward> adapt(final InternalProcessorContext delegate) { - if (delegate instanceof InternalProcessorContextReverseAdapter) { - return (InternalApiProcessorContext<KForward, VForward>) ((InternalProcessorContextReverseAdapter) delegate).delegate(); + if (delegate instanceof ProcessorContextReverseAdapter) { + return (InternalApiProcessorContext<KForward, VForward>) ((ProcessorContextReverseAdapter) delegate).delegate(); } else { return new ProcessorContextAdapter<>(delegate); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java index 131b776..6e82a5e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java @@ -18,52 +18,35 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.processor.Cancellable; -import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.File; import java.time.Duration; import java.util.Map; -public final class ProcessorContextReverseAdapter implements org.apache.kafka.streams.processor.ProcessorContext { - private final ProcessorContext<Object, Object> delegate; - private final DeprecatedForwarder deprecatedForwarder; +public final class ProcessorContextReverseAdapter implements InternalProcessorContext { + private final InternalApiProcessorContext<Object, Object> delegate; - public interface DeprecatedForwarder { - <K, V> void forward(final K key, final V value, final int childIndex); - } - - public static final class UnsupportedDeprecatedForwarder implements DeprecatedForwarder { - @Override - public <K, V> void forward(final K key, final V value, final int childIndex) { - throw new UnsupportedOperationException("Forwarding by index was deprecated in 2.0 and is not supported by this ProcessorContext."); - } - } - - @SuppressWarnings("unchecked") - public static org.apache.kafka.streams.processor.ProcessorContext adapt(final ProcessorContext<?, ?> delegate, - final DeprecatedForwarder deprecatedForwarder) { + static InternalProcessorContext adapt(final InternalApiProcessorContext<Object, Object> delegate) { if (delegate instanceof ProcessorContextAdapter) { - return ((ProcessorContextAdapter<?, ?>) delegate).delegate(); - } else if (delegate instanceof InternalApiProcessorContext) { - return InternalProcessorContextReverseAdapter.adapt((InternalApiProcessorContext<Object, Object>) delegate); + return ((ProcessorContextAdapter<Object, Object>) delegate).delegate(); } else { - return new ProcessorContextReverseAdapter(delegate, deprecatedForwarder); + return new ProcessorContextReverseAdapter(delegate); } } - @SuppressWarnings("unchecked") - private ProcessorContextReverseAdapter(final ProcessorContext<?, ?> delegate, - final DeprecatedForwarder deprecatedForwarder) { - this.delegate = (ProcessorContext<Object, Object>) delegate; - this.deprecatedForwarder = deprecatedForwarder; + private ProcessorContextReverseAdapter(final InternalApiProcessorContext<Object, Object> delegate) { + this.delegate = delegate; } @Override @@ -92,11 +75,91 @@ public final class ProcessorContextReverseAdapter implements org.apache.kafka.st } @Override - public StreamsMetrics metrics() { + public StreamsMetricsImpl metrics() { return delegate.metrics(); } @Override + public void setSystemTimeMs(final long timeMs) { + delegate.setSystemTimeMs(timeMs); + } + + @Override + public long currentSystemTimeMs() { + return delegate.currentSystemTimeMs(); + } + + @Override + public ProcessorRecordContext recordContext() { + return delegate.recordContext(); + } + + @Override + public void setRecordContext(final ProcessorRecordContext recordContext) { + delegate.setRecordContext(recordContext); + } + + @Override + public void setCurrentNode(final ProcessorNode<?, ?, ?, ?> currentNode) { + delegate.setCurrentNode(currentNode); + } + + @Override + public ProcessorNode<?, ?, ?, ?> currentNode() { + return delegate.currentNode(); + } + + @Override + public ThreadCache cache() { + return delegate.cache(); + } + + @Override + public void initialize() { + delegate.initialize(); + } + + @Override + public void uninitialize() { + delegate.uninitialize(); + } + + @Override + public Task.TaskType taskType() { + return delegate.taskType(); + } + + @Override + public void transitionToActive(final StreamTask streamTask, final RecordCollector recordCollector, final ThreadCache newCache) { + delegate.transitionToActive(streamTask, recordCollector, newCache); + } + + @Override + public void transitionToStandby(final ThreadCache newCache) { + delegate.transitionToStandby(newCache); + } + + @Override + public void registerCacheFlushListener(final String namespace, final ThreadCache.DirtyEntryFlushListener listener) { + delegate.registerCacheFlushListener(namespace, listener); + } + + @Override + public <T extends StateStore> T getStateStore(final StoreBuilder<T> builder) { + return delegate.getStateStore(builder); + } + + @Override + public void logChange(final String storeName, final Bytes key, final byte[] value, final long timestamp) { + delegate.logChange(storeName, key, value, timestamp); + } + + @Override + public String changelogFor(final String storeName) { + return delegate.changelogFor(storeName); + } + + @Override public void register(final StateStore store, final StateRestoreCallback stateRestoreCallback) { delegate.register(store, stateRestoreCallback); } @@ -113,7 +176,7 @@ public final class ProcessorContextReverseAdapter implements org.apache.kafka.st } @Override - public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) { + public Cancellable schedule(final Duration interval, final PunctuationType type, final Punctuator callback) throws IllegalArgumentException { return delegate.schedule(interval, type, callback); } @@ -130,7 +193,7 @@ public final class ProcessorContextReverseAdapter implements org.apache.kafka.st @Deprecated @Override public <K, V> void forward(final K key, final V value, final int childIndex) { - deprecatedForwarder.forward(key, value, childIndex); + delegate.forward(key, value, To.child((currentNode().children()).get(childIndex).name())); } @Deprecated @@ -178,4 +241,8 @@ public final class ProcessorContextReverseAdapter implements org.apache.kafka.st public Map<String, Object> appConfigsWithPrefix(final String prefix) { return delegate.appConfigsWithPrefix(prefix); } + + InternalApiProcessorContext<Object, Object> delegate() { + return delegate; + } } diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java deleted file mode 100644 index bbb024d..0000000 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/api/MockProcessorContext.java +++ /dev/null @@ -1,546 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.api; - -import org.apache.kafka.common.header.Headers; -import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.StreamsMetrics; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.TopologyTestDriver; -import org.apache.kafka.streams.internals.ApiUtils; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.kstream.ValueTransformer; -import org.apache.kafka.streams.processor.Cancellable; -import org.apache.kafka.streams.processor.PunctuationType; -import org.apache.kafka.streams.processor.Punctuator; -import org.apache.kafka.streams.processor.StateRestoreCallback; -import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.To; -import org.apache.kafka.streams.processor.internals.ClientUtils; -import org.apache.kafka.streams.processor.internals.RecordCollector; -import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; -import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; -import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; - -import java.io.File; -import java.lang.reflect.Field; -import java.time.Duration; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import static org.apache.kafka.common.utils.Utils.mkEntry; -import static org.apache.kafka.common.utils.Utils.mkMap; -import static org.apache.kafka.common.utils.Utils.mkProperties; - -/** - * {@link MockProcessorContext} is a mock of {@link ProcessorContext} for users to test their {@link Processor}, - * {@link Transformer}, and {@link ValueTransformer} implementations. - * <p> - * The tests for this class (org.apache.kafka.streams.MockProcessorContextTest) include several behavioral - * tests that serve as example usage. - * <p> - * Note that this class does not take any automated actions (such as firing scheduled punctuators). - * It simply captures any data it witnesses. - * If you require more automated tests, we recommend wrapping your {@link Processor} in a minimal source-processor-sink - * {@link Topology} and using the {@link TopologyTestDriver}. - */ -public class MockProcessorContext<KForward, VForward> implements ProcessorContext<KForward, VForward>, RecordCollector.Supplier { - // Immutable fields ================================================ - private final StreamsMetricsImpl metrics; - private final TaskId taskId; - private final StreamsConfig config; - private final File stateDir; - - // settable record metadata ================================================ - private String topic; - private Integer partition; - private Long offset; - private Headers headers; - private Long timestamp; - - // mocks ================================================ - private final Map<String, StateStore> stateStores = new HashMap<>(); - private final List<CapturedPunctuator> punctuators = new LinkedList<>(); - private final List<CapturedForward<KForward, VForward>> capturedForwards = new LinkedList<>(); - private boolean committed = false; - - - /** - * {@link CapturedPunctuator} holds captured punctuators, along with their scheduling information. - */ - public static final class CapturedPunctuator { - private final long intervalMs; - private final PunctuationType type; - private final Punctuator punctuator; - private boolean cancelled = false; - - private CapturedPunctuator(final long intervalMs, final PunctuationType type, final Punctuator punctuator) { - this.intervalMs = intervalMs; - this.type = type; - this.punctuator = punctuator; - } - - @SuppressWarnings("unused") - public long getIntervalMs() { - return intervalMs; - } - - @SuppressWarnings("unused") - public PunctuationType getType() { - return type; - } - - @SuppressWarnings("unused") - public Punctuator getPunctuator() { - return punctuator; - } - - @SuppressWarnings({"WeakerAccess", "unused"}) - public void cancel() { - cancelled = true; - } - - @SuppressWarnings("unused") - public boolean cancelled() { - return cancelled; - } - } - - public static final class CapturedForward<KForward, VForward> { - private final String childName; - private final long timestamp; - private final KeyValue<KForward, VForward> keyValue; - - private CapturedForward(final To to, final KeyValue<KForward, VForward> keyValue) { - if (keyValue == null) { - throw new IllegalArgumentException("keyValue can't be null"); - } - - try { - final Field field = To.class.getDeclaredField("childName"); - field.setAccessible(true); - childName = (String) field.get(to); - } catch (final IllegalAccessException | NoSuchFieldException e) { - throw new RuntimeException(e); - } - timestamp = getTimestamp(to); - - this.keyValue = keyValue; - } - - /** - * The child this data was forwarded to. - * - * @return The child name, or {@code null} if it was broadcast. - */ - @SuppressWarnings({"WeakerAccess", "unused"}) - public String childName() { - return childName; - } - - /** - * The timestamp attached to the forwarded record. - * - * @return A timestamp, or {@code -1} if none was forwarded. - */ - @SuppressWarnings("unused") - public long timestamp() { - return timestamp; - } - - /** - * The data forwarded. - * - * @return A key/value pair. Not null. - */ - @SuppressWarnings("unused") - public KeyValue<KForward, VForward> keyValue() { - return keyValue; - } - - @Override - public String toString() { - return "CapturedForward{" + - "childName='" + childName + '\'' + - ", timestamp=" + timestamp + - ", keyValue=" + keyValue + - '}'; - } - } - - // constructors ================================================ - - /** - * Create a {@link MockProcessorContext} with dummy {@code config} and {@code taskId} and {@code null} {@code stateDir}. - * Most unit tests using this mock won't need to know the taskId, - * and most unit tests should be able to get by with the - * {@link InMemoryKeyValueStore}, so the stateDir won't matter. - */ - @SuppressWarnings("unused") - public MockProcessorContext() { - this( - mkProperties(mkMap( - mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, ""), - mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "") - )), - new TaskId(0, 0), - null - ); - } - - /** - * Create a {@link MockProcessorContext} with dummy {@code taskId} and {@code null} {@code stateDir}. - * Most unit tests using this mock won't need to know the taskId, - * and most unit tests should be able to get by with the - * {@link InMemoryKeyValueStore}, so the stateDir won't matter. - * - * @param config a Properties object, used to configure the context and the processor. - */ - @SuppressWarnings("unused") - public MockProcessorContext(final Properties config) { - this(config, new TaskId(0, 0), null); - } - - /** - * Create a {@link MockProcessorContext} with a specified taskId and null stateDir. - * - * @param config a {@link Properties} object, used to configure the context and the processor. - * @param taskId a {@link TaskId}, which the context makes available via {@link MockProcessorContext#taskId()}. - * @param stateDir a {@link File}, which the context makes available viw {@link MockProcessorContext#stateDir()}. - */ - @SuppressWarnings("unused") - public MockProcessorContext(final Properties config, final TaskId taskId, final File stateDir) { - final StreamsConfig streamsConfig = new ClientUtils.QuietStreamsConfig(config); - this.taskId = taskId; - this.config = streamsConfig; - this.stateDir = stateDir; - final MetricConfig metricConfig = new MetricConfig(); - metricConfig.recordLevel(Sensor.RecordingLevel.DEBUG); - final String threadId = Thread.currentThread().getName(); - metrics = new StreamsMetricsImpl( - new Metrics(metricConfig), - threadId, - streamsConfig.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), - Time.SYSTEM - ); - TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor(threadId, taskId.toString(), metrics); - } - - @Override - public String applicationId() { - return config.getString(StreamsConfig.APPLICATION_ID_CONFIG); - } - - @Override - public TaskId taskId() { - return taskId; - } - - @Override - public Map<String, Object> appConfigs() { - final Map<String, Object> combined = new HashMap<>(); - combined.putAll(config.originals()); - combined.putAll(config.values()); - return combined; - } - - @Override - public Map<String, Object> appConfigsWithPrefix(final String prefix) { - return config.originalsWithPrefix(prefix); - } - - @Override - public Serde<?> keySerde() { - return config.defaultKeySerde(); - } - - @Override - public Serde<?> valueSerde() { - return config.defaultValueSerde(); - } - - @Override - public File stateDir() { - return stateDir; - } - - @Override - public StreamsMetrics metrics() { - return metrics; - } - - // settable record metadata ================================================ - - /** - * The context exposes these metadata for use in the processor. Normally, they are set by the Kafka Streams framework, - * but for the purpose of driving unit tests, you can set them directly. - * - * @param topic A topic name - * @param partition A partition number - * @param offset A record offset - * @param timestamp A record timestamp - */ - @SuppressWarnings("unused") - public void setRecordMetadata(final String topic, - final int partition, - final long offset, - final Headers headers, - final long timestamp) { - this.topic = topic; - this.partition = partition; - this.offset = offset; - this.headers = headers; - this.timestamp = timestamp; - } - - /** - * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, - * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. - * - * @param topic A topic name - */ - @SuppressWarnings("unused") - public void setTopic(final String topic) { - this.topic = topic; - } - - /** - * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, - * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. - * - * @param partition A partition number - */ - @SuppressWarnings("unused") - public void setPartition(final int partition) { - this.partition = partition; - } - - /** - * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, - * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. - * - * @param offset A record offset - */ - @SuppressWarnings("unused") - public void setOffset(final long offset) { - this.offset = offset; - } - - /** - * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, - * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. - * - * @param headers Record headers - */ - @SuppressWarnings("unused") - public void setHeaders(final Headers headers) { - this.headers = headers; - } - - /** - * The context exposes this metadata for use in the processor. Normally, they are set by the Kafka Streams framework, - * but for the purpose of driving unit tests, you can set it directly. Setting this attribute doesn't affect the others. - * - * @param timestamp A record timestamp - */ - @SuppressWarnings("unused") - public void setTimestamp(final long timestamp) { - this.timestamp = timestamp; - } - - @Override - public String topic() { - if (topic == null) { - throw new IllegalStateException("Topic must be set before use via setRecordMetadata() or setTopic()."); - } - return topic; - } - - @Override - public int partition() { - if (partition == null) { - throw new IllegalStateException("Partition must be set before use via setRecordMetadata() or setPartition()."); - } - return partition; - } - - @Override - public long offset() { - if (offset == null) { - throw new IllegalStateException("Offset must be set before use via setRecordMetadata() or setOffset()."); - } - return offset; - } - - @Override - public Headers headers() { - return headers; - } - - @Override - public long timestamp() { - if (timestamp == null) { - throw new IllegalStateException("Timestamp must be set before use via setRecordMetadata() or setTimestamp()."); - } - return timestamp; - } - - // mocks ================================================ - - @Override - public void register(final StateStore store, - final StateRestoreCallback stateRestoreCallbackIsIgnoredInMock) { - stateStores.put(store.name(), store); - } - - @SuppressWarnings("unchecked") - @Override - public <S> S getStateStore(final String name) { - return (S) stateStores.get(name); - } - - @Override - public Cancellable schedule(final Duration interval, - final PunctuationType type, - final Punctuator callback) { - final CapturedPunctuator capturedPunctuator = - new CapturedPunctuator(ApiUtils.validateMillisecondDuration(interval, "interval"), type, callback); - - punctuators.add(capturedPunctuator); - - return capturedPunctuator::cancel; - } - - /** - * Get the punctuators scheduled so far. The returned list is not affected by subsequent calls to {@code schedule(...)}. - * - * @return A list of captured punctuators. - */ - @SuppressWarnings("unused") - public List<CapturedPunctuator> scheduledPunctuators() { - return new LinkedList<>(punctuators); - } - - @Override - public <K extends KForward, V extends VForward> void forward(final K key, final V value){ - forward(key, value, To.all()); - } - - @Override - public <K extends KForward, V extends VForward> void forward(final K key, final V value, final To to){ - capturedForwards.add( - new CapturedForward<>( - (getTimestamp(to)) == -1 ? to.withTimestamp(timestamp == null ? -1 : timestamp) : to, - new KeyValue<>(key, value) - ) - ); - } - - /** - * Get all the forwarded data this context has observed. The returned list will not be - * affected by subsequent interactions with the context. The data in the list is in the same order as the calls to - * {@code forward(...)}. - * - * @return A list of key/value pairs that were previously passed to the context. - */ - public List<CapturedForward<KForward, VForward>> forwarded() { - return new LinkedList<>(capturedForwards); - } - - /** - * Get all the forwarded data this context has observed for a specific child by name. - * The returned list will not be affected by subsequent interactions with the context. - * The data in the list is in the same order as the calls to {@code forward(...)}. - * - * @param childName The child name to retrieve forwards for - * @return A list of key/value pairs that were previously passed to the context. - */ - @SuppressWarnings("unused") - public List<CapturedForward<KForward, VForward>> forwarded(final String childName) { - final LinkedList<CapturedForward<KForward, VForward>> result = new LinkedList<>(); - for (final CapturedForward<KForward, VForward> capture : capturedForwards) { - if (capture.childName() == null || capture.childName().equals(childName)) { - result.add(capture); - } - } - return result; - } - - /** - * Clear the captured forwarded data. - */ - @SuppressWarnings("unused") - public void resetForwards() { - capturedForwards.clear(); - } - - @Override - public void commit() { - committed = true; - } - - /** - * Whether {@link ProcessorContext#commit()} has been called in this context. - * - * @return {@code true} iff {@link ProcessorContext#commit()} has been called in this context since construction or reset. - */ - public boolean committed() { - return committed; - } - - /** - * Reset the commit capture to {@code false} (whether or not it was previously {@code true}). - */ - @SuppressWarnings("unused") - public void resetCommit() { - committed = false; - } - - @Override - public RecordCollector recordCollector() { - // This interface is assumed by state stores that add change-logging. - // Rather than risk a mysterious ClassCastException during unit tests, throw an explanatory exception. - - throw new UnsupportedOperationException( - "MockProcessorContext does not provide record collection. " + - "For processor unit tests, use an in-memory state store with change-logging disabled. " + - "Alternatively, use the TopologyTestDriver for testing processor/store/topology integration." - ); - } - - private static long getTimestamp(final To to) { - final long timestamp; - try { - final Field field = To.class.getDeclaredField("timestamp"); - field.setAccessible(true); - timestamp = (long) field.get(to); - } catch (final IllegalAccessException | NoSuchFieldException e) { - throw new RuntimeException(e); - } - return timestamp; - } -} diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java deleted file mode 100644 index 6e9c5a6..0000000 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/MockApiProcessorContextTest.java +++ /dev/null @@ -1,405 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams; - -import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.processor.api.MockProcessorContext; -import org.apache.kafka.streams.processor.api.MockProcessorContext.CapturedForward; -import org.apache.kafka.streams.processor.PunctuationType; -import org.apache.kafka.streams.processor.Punctuator; -import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.To; -import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.ProcessorContext; -import org.apache.kafka.streams.processor.internals.ProcessorContextReverseAdapter; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; -import org.junit.Test; - -import java.io.File; -import java.time.Duration; -import java.util.Iterator; -import java.util.Properties; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class MockApiProcessorContextTest { - @Test - public void shouldCaptureOutputRecords() { - final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() { - private ProcessorContext<String, Long> context; - - @Override - public void init(final ProcessorContext<String, Long> context) { - this.context = context; - } - - @Override - public void process(final String key, final Long value) { - context.forward(key + value, key.length() + value); - } - }; - - final MockProcessorContext<String, Long> context = new MockProcessorContext<>(); - processor.init(context); - - processor.process("foo", 5L); - processor.process("barbaz", 50L); - - final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded().iterator(); - assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue()); - assertFalse(forwarded.hasNext()); - - context.resetForwards(); - - assertEquals(0, context.forwarded().size()); - } - - @Test - public void shouldCaptureOutputRecordsUsingTo() { - final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() { - private ProcessorContext<String, Long> context; - - @Override - public void init(final ProcessorContext<String, Long> context) { - this.context = context; - } - - @Override - public void process(final String key, final Long value) { - context.forward(key + value, key.length() + value, To.all()); - } - }; - - final MockProcessorContext<String, Long> context = new MockProcessorContext<>(); - - processor.init(context); - - processor.process("foo", 5L); - processor.process("barbaz", 50L); - - final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded().iterator(); - assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue()); - assertFalse(forwarded.hasNext()); - - context.resetForwards(); - - assertEquals(0, context.forwarded().size()); - } - - @Test - public void shouldCaptureRecordsOutputToChildByName() { - final Processor<String, Long, String, Long> processor = new Processor<String, Long, String, Long>() { - private int count = 0; - private ProcessorContext<String, Long> context; - - @Override - public void init(final ProcessorContext<String, Long> context) { - this.context = context; - } - - @Override - public void process(final String key, final Long value) { - if (count == 0) { - context.forward("start", -1L, To.all()); // broadcast - } - final To toChild = count % 2 == 0 ? To.child("george") : To.child("pete"); - context.forward(key + value, key.length() + value, toChild); - count++; - } - }; - - final MockProcessorContext<String, Long> context = new MockProcessorContext<>(); - - processor.init(context); - - processor.process("foo", 5L); - processor.process("barbaz", 50L); - - { - final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded().iterator(); - - final CapturedForward forward1 = forwarded.next(); - assertEquals(new KeyValue<>("start", -1L), forward1.keyValue()); - assertNull(forward1.childName()); - - final CapturedForward forward2 = forwarded.next(); - assertEquals(new KeyValue<>("foo5", 8L), forward2.keyValue()); - assertEquals("george", forward2.childName()); - - final CapturedForward forward3 = forwarded.next(); - assertEquals(new KeyValue<>("barbaz50", 56L), forward3.keyValue()); - assertEquals("pete", forward3.childName()); - - assertFalse(forwarded.hasNext()); - } - - { - final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded("george").iterator(); - assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("foo5", 8L), forwarded.next().keyValue()); - assertFalse(forwarded.hasNext()); - } - - { - final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded("pete").iterator(); - assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("barbaz50", 56L), forwarded.next().keyValue()); - assertFalse(forwarded.hasNext()); - } - - { - final Iterator<CapturedForward<String, Long>> forwarded = context.forwarded("steve").iterator(); - assertEquals(new KeyValue<>("start", -1L), forwarded.next().keyValue()); - assertFalse(forwarded.hasNext()); - } - } - - @Test - public void shouldCaptureCommitsAndAllowReset() { - final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() { - private int count = 0; - private ProcessorContext<Void, Void> context; - - @Override - public void init(final ProcessorContext<Void, Void> context) { - this.context = context; - } - - @Override - public void process(final String key, final Long value) { - if (++count > 2) { - context.commit(); - } - } - }; - - final MockProcessorContext<Void, Void> context = new MockProcessorContext<>(); - - processor.init(context); - - processor.process("foo", 5L); - processor.process("barbaz", 50L); - - assertFalse(context.committed()); - - processor.process("foobar", 500L); - - assertTrue(context.committed()); - - context.resetCommit(); - - assertFalse(context.committed()); - } - - @SuppressWarnings("unchecked") - @Test - public void shouldStoreAndReturnStateStores() { - final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() { - private ProcessorContext<Void, Void> context; - - @Override - public void init(final ProcessorContext<Void, Void> context) { - this.context = context; - } - - @Override - public void process(final String key, final Long value) { - final KeyValueStore<String, Long> stateStore = context.getStateStore("my-state"); - stateStore.put(key, (stateStore.get(key) == null ? 0 : stateStore.get(key)) + value); - stateStore.put("all", (stateStore.get("all") == null ? 0 : stateStore.get("all")) + value); - } - }; - - final MockProcessorContext<Void, Void> context = new MockProcessorContext<>(); - - final StoreBuilder<KeyValueStore<String, Long>> storeBuilder = Stores.keyValueStoreBuilder( - Stores.inMemoryKeyValueStore("my-state"), - Serdes.String(), - Serdes.Long()).withLoggingDisabled(); - - final KeyValueStore<String, Long> store = storeBuilder.build(); - - store.init(ProcessorContextReverseAdapter.adapt(context, new ProcessorContextReverseAdapter.DeprecatedForwarder() { - @Override - public <K, V> void forward(final K key, final V value, final int childIndex) { - throw new UnsupportedOperationException(); - } - }), store); - - processor.init(context); - - processor.process("foo", 5L); - processor.process("bar", 50L); - - assertEquals(5L, (long) store.get("foo")); - assertEquals(50L, (long) store.get("bar")); - assertEquals(55L, (long) store.get("all")); - } - - @Test - public void shouldCaptureApplicationAndRecordMetadata() { - final Properties config = new Properties(); - config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testMetadata"); - config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ""); - - final Processor<String, Object, String, Object> processor = new Processor<String, Object, String, Object>() { - private ProcessorContext<String, Object> context; - - @Override - public void init(final ProcessorContext<String, Object> context) { - this.context = context; - } - - @Override - public void process(final String key, final Object value) { - context.forward("appId", context.applicationId()); - context.forward("taskId", context.taskId()); - - context.forward("topic", context.topic()); - context.forward("partition", context.partition()); - context.forward("offset", context.offset()); - context.forward("timestamp", context.timestamp()); - - context.forward("key", key); - context.forward("value", value); - } - }; - - final MockProcessorContext<String, Object> context = new MockProcessorContext<>(config); - processor.init(context); - - try { - processor.process("foo", 5L); - fail("Should have thrown an exception."); - } catch (final IllegalStateException expected) { - // expected, since the record metadata isn't initialized - } - - context.resetForwards(); - context.setRecordMetadata("t1", 0, 0L, null, 0L); - - { - processor.process("foo", 5L); - final Iterator<CapturedForward<String, Object>> forwarded = context.forwarded().iterator(); - assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("offset", 0L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("timestamp", 0L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("key", "foo"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("value", 5L), forwarded.next().keyValue()); - } - - context.resetForwards(); - - // record metadata should be "sticky" - context.setOffset(1L); - context.setTimestamp(10L); - - { - processor.process("bar", 50L); - final Iterator<CapturedForward<String, Object>> forwarded = context.forwarded().iterator(); - assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("topic", "t1"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("partition", 0), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("key", "bar"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("value", 50L), forwarded.next().keyValue()); - } - - context.resetForwards(); - // record metadata should be "sticky" - context.setTopic("t2"); - context.setPartition(30); - - { - processor.process("baz", 500L); - final Iterator<CapturedForward<String, Object>> forwarded = context.forwarded().iterator(); - assertEquals(new KeyValue<>("appId", "testMetadata"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("taskId", new TaskId(0, 0)), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("topic", "t2"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("partition", 30), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("offset", 1L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("timestamp", 10L), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("key", "baz"), forwarded.next().keyValue()); - assertEquals(new KeyValue<>("value", 500L), forwarded.next().keyValue()); - } - } - - @Test - public void shouldCapturePunctuator() { - final Processor<String, Long, Void, Void> processor = new Processor<String, Long, Void, Void>() { - @Override - public void init(final ProcessorContext<Void, Void> context) { - context.schedule( - Duration.ofSeconds(1L), - PunctuationType.WALL_CLOCK_TIME, - timestamp -> context.commit() - ); - } - - @Override - public void process(final String key, final Long value) { - } - }; - - final MockProcessorContext<Void, Void> context = new MockProcessorContext<>(); - - processor.init(context); - - final MockProcessorContext.CapturedPunctuator capturedPunctuator = context.scheduledPunctuators().get(0); - assertEquals(1000L, capturedPunctuator.getIntervalMs()); - assertEquals(PunctuationType.WALL_CLOCK_TIME, capturedPunctuator.getType()); - assertFalse(capturedPunctuator.cancelled()); - - final Punctuator punctuator = capturedPunctuator.getPunctuator(); - assertFalse(context.committed()); - punctuator.punctuate(1234L); - assertTrue(context.committed()); - } - - @Test - public void fullConstructorShouldSetAllExpectedAttributes() { - final Properties config = new Properties(); - config.put(StreamsConfig.APPLICATION_ID_CONFIG, "testFullConstructor"); - config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ""); - config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); - - final File dummyFile = new File(""); - final MockProcessorContext<Void, Void> context = new MockProcessorContext<>(config, new TaskId(1, 1), dummyFile); - - assertEquals("testFullConstructor", context.applicationId()); - assertEquals(new TaskId(1, 1), context.taskId()); - assertEquals("testFullConstructor", context.appConfigs().get(StreamsConfig.APPLICATION_ID_CONFIG)); - assertEquals("testFullConstructor", context.appConfigsWithPrefix("application.").get("id")); - assertEquals(Serdes.String().getClass(), context.keySerde().getClass()); - assertEquals(Serdes.Long().getClass(), context.valueSerde().getClass()); - assertEquals(dummyFile, context.stateDir()); - } -}
