This is an automated email from the ASF dual-hosted git repository. vvcephei pushed a commit to branch kip-478-part-3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit de3fab16b6d91c363718dcfb0ab875853b2db75f Author: John Roesler <[email protected]> AuthorDate: Tue Aug 25 19:12:30 2020 -0500 cleanup --- .../streams/kstream/internals/KTableImpl.java | 2 +- .../internals/graph/KTableKTableJoinNode.java | 6 +- .../internals/graph/ProcessorParameters.java | 6 +- .../streams/processor/api/ProcessorContext.java | 2 +- .../processor/internals/ProcessorAdapter.java | 2 +- .../internals/ProcessorContextAdapter.java | 2 +- .../internals/GlobalProcessorContextImplTest.java | 11 ++- .../internals/ProcessorContextImplTest.java | 2 +- .../processor/internals/ProcessorTopologyTest.java | 79 ++++++++++------------ .../processor/internals/StreamThreadTest.java | 62 ++++++++--------- .../kafka/streams/scala/kstream/KStreamTest.scala | 9 ++- 11 files changed, 87 insertions(+), 96 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 25accd0..74939b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -851,7 +851,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< } /** - * We conflate V with Change<V> in many places. It might be nice to fix that eventually. + * We conflate V with Change<V> in many places. This will get fixed in the implementation of KIP-478. * For now, I'm just explicitly lying about the parameterized type. */ @SuppressWarnings("unchecked") diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java index 9441a49..e2abfb5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java @@ -210,7 +210,8 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K @SuppressWarnings("unchecked") public KTableKTableJoinNode<K, V1, V2, VR> build() { - return new KTableKTableJoinNode<>(nodeName, + return new KTableKTableJoinNode<>( + nodeName, joinThisProcessorParameters, joinOtherProcessorParameters, new ProcessorParameters<>( @@ -225,7 +226,8 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends BaseJoinProcessorNode<K valueSerde, joinThisStoreNames, joinOtherStoreNames, - storeBuilder); + storeBuilder + ); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java index bfcc399..018d2b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java @@ -32,20 +32,22 @@ import org.apache.kafka.streams.processor.internals.ProcessorAdapter; */ public class ProcessorParameters<KIn, VIn, KOut, VOut> { + // During the transition to KIP-478, we capture arguments passed from the old API to simplify + // the performance of casts that we still need to perform. This will eventually be removed. private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier; private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier; private final String processorName; public ProcessorParameters(final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> processorSupplier, final String processorName) { - this.oldProcessorSupplier = processorSupplier; + oldProcessorSupplier = processorSupplier; this.processorSupplier = () -> ProcessorAdapter.adapt(processorSupplier.get()); this.processorName = processorName; } public ProcessorParameters(final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier, final String processorName) { - this.oldProcessorSupplier = null; + oldProcessorSupplier = null; this.processorSupplier = processorSupplier; this.processorName = processorName; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java index 9e71d5d..0e26ee5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java @@ -102,7 +102,7 @@ public interface ProcessorContext<KForward, VForward> { * @param name The store name * @return The state store instance */ - <S> S getStateStore(final String name); + <S extends StateStore> S getStateStore(final String name); /** * Schedules a periodic operation for processors. A processor may call this method during diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java index 948109e..d8e4af4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java @@ -21,7 +21,7 @@ import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> { - public final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate; + private final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate; public static <KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut> adapt(final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate) { if (delegate == null) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java index 410adcf..85dbd52 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java @@ -170,7 +170,7 @@ public final class ProcessorContextAdapter<KForward, VForward> @SuppressWarnings("unchecked") @Override - public <S> S getStateStore(final String name) { + public <S extends StateStore> S getStateStore(final String name) { return (S) delegate.getStateStore(name); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java index e180010..ad8cd0a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java @@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.To; import org.apache.kafka.streams.processor.internals.Task.TaskType; @@ -143,7 +142,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForKeyValueStore() { final StateStore store = globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME); try { - store.init((ProcessorContext) null, null); + store.init(null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -152,7 +151,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForTimestampedKeyValueStore() { final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME); try { - store.init((ProcessorContext) null, null); + store.init(null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -161,7 +160,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForWindowStore() { final StateStore store = globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME); try { - store.init((ProcessorContext) null, null); + store.init(null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -170,7 +169,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForTimestampedWindowStore() { final StateStore store = globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME); try { - store.init((ProcessorContext) null, null); + store.init(null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } @@ -179,7 +178,7 @@ public class GlobalProcessorContextImplTest { public void shouldNotAllowInitForSessionStore() { final StateStore store = globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME); try { - store.init((ProcessorContext) null, null); + store.init(null, null); fail("Should have thrown UnsupportedOperationException."); } catch (final UnsupportedOperationException expected) { } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java index ab88efa..f4b62c3d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java @@ -767,7 +767,7 @@ public class ProcessorContextImplTest { assertTrue(store.persistent()); assertTrue(store.isOpen()); - checkThrowsUnsupportedOperation(() -> store.init((ProcessorContext) null, null), "init()"); + checkThrowsUnsupportedOperation(() -> store.init(null, null), "init()"); checkThrowsUnsupportedOperation(store::close, "close()"); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index cc637cb..77dc6af 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -81,7 +81,7 @@ public class ProcessorTopologyTest { private static final String THROUGH_TOPIC_1 = "through-topic-1"; private static final Header HEADER = new RecordHeader("key", "value".getBytes()); - private static final Headers HEADERS = new RecordHeaders(new Header[] {HEADER}); + private static final Headers HEADERS = new RecordHeaders(new Header[]{HEADER}); private final TopologyWrapper topology = new TopologyWrapper(); private final MockApiProcessorSupplier<?, ?, ?, ?> mockProcessorSupplier = new MockApiProcessorSupplier<>(); @@ -178,7 +178,7 @@ public class ProcessorTopologyTest { driver = new TopologyTestDriver(createSimpleTopology(partition), props); final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO); final TestOutputTopic<String, String> outputTopic1 = - driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); inputTopic.pipeInput("key1", "value1"); assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1"); @@ -203,9 +203,9 @@ public class ProcessorTopologyTest { driver = new TopologyTestDriver(createMultiplexingTopology(), props); final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO); final TestOutputTopic<String, String> outputTopic1 = - driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); final TestOutputTopic<String, String> outputTopic2 = - driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer()); + driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer()); inputTopic.pipeInput("key1", "value1"); assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1(1)"); assertNextOutputRecord(outputTopic2.readRecord(), "key1", "value1(2)"); @@ -231,9 +231,9 @@ public class ProcessorTopologyTest { final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO); inputTopic.pipeInput("key1", "value1"); final TestOutputTopic<String, String> outputTopic1 = - driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); final TestOutputTopic<String, String> outputTopic2 = - driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer()); + driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer()); assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1(1)"); assertNextOutputRecord(outputTopic2.readRecord(), "key1", "value1(2)"); @@ -258,7 +258,7 @@ public class ProcessorTopologyTest { driver = new TopologyTestDriver(createStatefulTopology(storeName), props); final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER); final TestOutputTopic<Integer, String> outputTopic1 = - driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer()); + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.Integer().deserializer(), Serdes.String().deserializer()); inputTopic.pipeInput("key1", "value1"); inputTopic.pipeInput("key2", "value2"); @@ -389,9 +389,9 @@ public class ProcessorTopologyTest { driver = new TopologyTestDriver(createSimpleMultiSourceTopology(partition), props); final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, Instant.ofEpochMilli(0L), Duration.ZERO); final TestOutputTopic<String, String> outputTopic1 = - driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); final TestOutputTopic<String, String> outputTopic2 = - driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer()); + driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer()); inputTopic.pipeInput("key1", "value1"); assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1"); @@ -411,7 +411,7 @@ public class ProcessorTopologyTest { inputTopic.pipeInput("key2", "value2"); inputTopic.pipeInput("key3", "value3"); final TestOutputTopic<String, String> outputTopic2 = - driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer()); + driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer()); assertNextOutputRecord(outputTopic2.readRecord(), "key1", "value1"); assertNextOutputRecord(outputTopic2.readRecord(), "key2", "value2"); assertNextOutputRecord(outputTopic2.readRecord(), "key3", "value3"); @@ -439,11 +439,11 @@ public class ProcessorTopologyTest { inputTopic.pipeInput("key3", "value3@3000"); final TestOutputTopic<String, String> outputTopic = driver.createOutputTopic(OUTPUT_TOPIC_1, STRING_DESERIALIZER, STRING_DESERIALIZER); assertThat(outputTopic.readRecord(), - equalTo(new TestRecord<>("key1", "value1", null, 1000L))); + equalTo(new TestRecord<>("key1", "value1", null, 1000L))); assertThat(outputTopic.readRecord(), - equalTo(new TestRecord<>("key2", "value2", null, 2000L))); + equalTo(new TestRecord<>("key2", "value2", null, 2000L))); assertThat(outputTopic.readRecord(), - equalTo(new TestRecord<>("key3", "value3", null, 3000L))); + equalTo(new TestRecord<>("key3", "value3", null, 3000L))); } @Test @@ -499,7 +499,7 @@ public class ProcessorTopologyTest { inputTopic.pipeInput("key2", "value2", 20L); inputTopic.pipeInput("key3", "value3", 30L); final TestOutputTopic<String, String> outputTopic1 = - driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1", 10L); assertNextOutputRecord(outputTopic1.readRecord(), "key2", "value2", 20L); assertNextOutputRecord(outputTopic1.readRecord(), "key3", "value3", 30L); @@ -514,7 +514,7 @@ public class ProcessorTopologyTest { inputTopic.pipeInput("key2", "value2", 20L); inputTopic.pipeInput("key3", "value3", 30L); final TestOutputTopic<String, String> outputTopic1 = - driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1", 20L); assertNextOutputRecord(outputTopic1.readRecord(), "key2", "value2", 30L); assertNextOutputRecord(outputTopic1.readRecord(), "key3", "value3", 40L); @@ -526,9 +526,9 @@ public class ProcessorTopologyTest { driver = new TopologyTestDriver(createMultiProcessorTimestampTopology(partition), props); final TestInputTopic<String, String> inputTopic = driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER); final TestOutputTopic<String, String> outputTopic1 = - driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); + driver.createOutputTopic(OUTPUT_TOPIC_1, Serdes.String().deserializer(), Serdes.String().deserializer()); final TestOutputTopic<String, String> outputTopic2 = - driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer()); + driver.createOutputTopic(OUTPUT_TOPIC_2, Serdes.String().deserializer(), Serdes.String().deserializer()); inputTopic.pipeInput("key1", "value1", 10L); assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1", 10L); @@ -610,22 +610,11 @@ public class ProcessorTopologyTest { assertTrue(processorTopology.hasPersistentGlobalStore()); } - private ProcessorTopology createLocalStoreTopologyWithOldAPI(final KeyValueBytesStoreSupplier storeSupplier) { - final TopologyWrapper topology = new TopologyWrapper(); - final String processor = "processor"; - final StoreBuilder<KeyValueStore<String, String>> storeBuilder = - Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String()); - topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic") - .addProcessor(processor, () -> new OldAPIStatefulProcessor(storeSupplier.name()), "source") - .addStateStore(storeBuilder, processor); - return topology.getInternalBuilder("anyAppId").buildTopology(); - } - private ProcessorTopology createLocalStoreTopology(final KeyValueBytesStoreSupplier storeSupplier) { final TopologyWrapper topology = new TopologyWrapper(); final String processor = "processor"; final StoreBuilder<KeyValueStore<String, String>> storeBuilder = - Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String()); + Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String()); topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic") .addProcessor(processor, () -> new StatefulProcessor(storeSupplier.name()), "source") .addStateStore(storeBuilder, processor); @@ -635,9 +624,9 @@ public class ProcessorTopologyTest { private ProcessorTopology createGlobalStoreTopology(final KeyValueBytesStoreSupplier storeSupplier) { final TopologyWrapper topology = new TopologyWrapper(); final StoreBuilder<KeyValueStore<String, String>> storeBuilder = - Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String()).withLoggingDisabled(); + Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), Serdes.String()).withLoggingDisabled(); topology.addGlobalStore(storeBuilder, "global", STRING_DESERIALIZER, STRING_DESERIALIZER, "topic", "processor", - define(new OldAPIStatefulProcessor(storeSupplier.name()))); + define(new OldAPIStatefulProcessor(storeSupplier.name()))); return topology.getInternalBuilder("anyAppId").buildTopology(); } @@ -727,9 +716,9 @@ public class ProcessorTopologyTest { private Topology createInternalRepartitioningTopology() { topology.addSource("source", INPUT_TOPIC_1) - .addSink("sink0", THROUGH_TOPIC_1, "source") - .addSource("source1", THROUGH_TOPIC_1) - .addSink("sink1", OUTPUT_TOPIC_1, "source1"); + .addSink("sink0", THROUGH_TOPIC_1, "source") + .addSource("source1", THROUGH_TOPIC_1) + .addSink("sink1", OUTPUT_TOPIC_1, "source1"); // use wrapper to get the internal topology builder to add internal topic final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology); @@ -754,24 +743,24 @@ public class ProcessorTopologyTest { private Topology createForwardToSourceTopology() { return topology.addSource("source-1", INPUT_TOPIC_1) - .addSink("sink-1", OUTPUT_TOPIC_1, "source-1") - .addSource("source-2", OUTPUT_TOPIC_1) - .addSink("sink-2", OUTPUT_TOPIC_2, "source-2"); + .addSink("sink-1", OUTPUT_TOPIC_1, "source-1") + .addSource("source-2", OUTPUT_TOPIC_1) + .addSink("sink-2", OUTPUT_TOPIC_2, "source-2"); } private Topology createSimpleMultiSourceTopology(final int partition) { return topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) - .addProcessor("processor-1", define(new ForwardingProcessor()), "source-1") - .addSink("sink-1", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor-1") - .addSource("source-2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2) - .addProcessor("processor-2", define(new ForwardingProcessor()), "source-2") - .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2"); + .addProcessor("processor-1", define(new ForwardingProcessor()), "source-1") + .addSink("sink-1", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor-1") + .addSource("source-2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2) + .addProcessor("processor-2", define(new ForwardingProcessor()), "source-2") + .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2"); } private Topology createAddHeaderTopology() { return topology.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) - .addProcessor("processor-1", define(new AddHeaderProcessor()), "source-1") - .addSink("sink-1", OUTPUT_TOPIC_1, "processor-1"); + .addProcessor("processor-1", define(new AddHeaderProcessor()), "source-1") + .addSink("sink-1", OUTPUT_TOPIC_1, "processor-1"); } /** @@ -992,7 +981,7 @@ public class ProcessorTopologyTest { /** * A custom timestamp extractor that extracts the timestamp from the record's value if the value is in ".*@[0-9]+" * format. Otherwise, it returns the record's timestamp or the default timestamp if the record's timestamp is negative. - */ + */ public static class CustomTimestampExtractor implements TimestampExtractor { private static final long DEFAULT_TIMESTAMP = 1000L; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 5b871f5..a0c09e5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -61,7 +61,6 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; import org.apache.kafka.streams.processor.api.Processor; -import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; @@ -1196,33 +1195,25 @@ public class StreamThreadTest { internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); final AtomicBoolean shouldThrow = new AtomicBoolean(false); final AtomicBoolean processed = new AtomicBoolean(false); - internalTopologyBuilder.addProcessor("proc", new ProcessorSupplier<Object, Object, Object, Object>() { - @Override - public Processor<Object, Object, Object, Object> get() { - return new Processor<Object, Object, Object, Object>() { - private ProcessorContext<Object, Object> context; - - @Override - public void init(final ProcessorContext<Object, Object> context) { - this.context = context; - } + internalTopologyBuilder.addProcessor( + "proc", + () -> new Processor<Object, Object, Object, Object>() { - @Override - public void process(final Object key, final Object value) { - if (shouldThrow.get()) { - throw new TaskCorruptedException(singletonMap(task1, new HashSet<TopicPartition>(singleton(storeChangelogTopicPartition)))); - } else { - processed.set(true); - } + @Override + public void process(final Object key, final Object value) { + if (shouldThrow.get()) { + throw new TaskCorruptedException(singletonMap(task1, new HashSet<>(singleton(storeChangelogTopicPartition)))); + } else { + processed.set(true); } + } - @Override - public void close() { + @Override + public void close() { - } - }; - } - }, "name"); + } + }, + "name"); internalTopologyBuilder.addStateStore( Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore(storeName), @@ -1596,19 +1587,20 @@ public class StreamThreadTest { public void shouldPunctuateActiveTask() { final List<Long> punctuatedStreamTime = new ArrayList<>(); final List<Long> punctuatedWallClockTime = new ArrayList<>(); - final org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> punctuateProcessor = () -> new org.apache.kafka.streams.processor.Processor<Object, Object>() { - @Override - public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { - context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add); - context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, punctuatedWallClockTime::add); - } + final org.apache.kafka.streams.processor.ProcessorSupplier<Object, Object> punctuateProcessor = + () -> new org.apache.kafka.streams.processor.Processor<Object, Object>() { + @Override + public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { + context.schedule(Duration.ofMillis(100L), PunctuationType.STREAM_TIME, punctuatedStreamTime::add); + context.schedule(Duration.ofMillis(100L), PunctuationType.WALL_CLOCK_TIME, punctuatedWallClockTime::add); + } - @Override - public void process(final Object key, final Object value) {} + @Override + public void process(final Object key, final Object value) {} - @Override - public void close() {} - }; + @Override + public void close() {} + }; internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).process(punctuateProcessor); internalStreamsBuilder.buildAndOptimizeTopology(); diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala index 87c915f..9c3bc3d 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala @@ -22,7 +22,14 @@ import java.time.Duration.ofSeconds import java.time.Instant import org.apache.kafka.streams.KeyValue -import org.apache.kafka.streams.kstream._ +import org.apache.kafka.streams.kstream.{ + JoinWindows, + Transformer, + ValueTransformer, + ValueTransformerSupplier, + ValueTransformerWithKey, + ValueTransformerWithKeySupplier +} import org.apache.kafka.streams.processor.ProcessorContext import org.apache.kafka.streams.scala.ImplicitConversions._ import org.apache.kafka.streams.scala.Serdes._
