This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch kip-478-part-3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit de3fab16b6d91c363718dcfb0ab875853b2db75f
Author: John Roesler <[email protected]>
AuthorDate: Tue Aug 25 19:12:30 2020 -0500

    cleanup
---
 .../streams/kstream/internals/KTableImpl.java      |  2 +-
 .../internals/graph/KTableKTableJoinNode.java      |  6 +-
 .../internals/graph/ProcessorParameters.java       |  6 +-
 .../streams/processor/api/ProcessorContext.java    |  2 +-
 .../processor/internals/ProcessorAdapter.java      |  2 +-
 .../internals/ProcessorContextAdapter.java         |  2 +-
 .../internals/GlobalProcessorContextImplTest.java  | 11 ++-
 .../internals/ProcessorContextImplTest.java        |  2 +-
 .../processor/internals/ProcessorTopologyTest.java | 79 ++++++++++------------
 .../processor/internals/StreamThreadTest.java      | 62 ++++++++---------
 .../kafka/streams/scala/kstream/KStreamTest.scala  |  9 ++-
 11 files changed, 87 insertions(+), 96 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 25accd0..74939b3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -851,7 +851,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, 
V> implements KTable<
     }
 
     /**
-     * We conflate V with Change<V> in many places. It might be nice to fix 
that eventually.
+     * We conflate V with Change<V> in many places. This will get fixed in the 
implementation of KIP-478.
      * For now, I'm just explicitly lying about the parameterized type.
      */
     @SuppressWarnings("unchecked")
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
index 9441a49..e2abfb5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java
@@ -210,7 +210,8 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
 
         @SuppressWarnings("unchecked")
         public KTableKTableJoinNode<K, V1, V2, VR> build() {
-            return new KTableKTableJoinNode<>(nodeName,
+            return new KTableKTableJoinNode<>(
+                nodeName,
                 joinThisProcessorParameters,
                 joinOtherProcessorParameters,
                 new ProcessorParameters<>(
@@ -225,7 +226,8 @@ public class KTableKTableJoinNode<K, V1, V2, VR> extends 
BaseJoinProcessorNode<K
                 valueSerde,
                 joinThisStoreNames,
                 joinOtherStoreNames,
-                storeBuilder);
+                storeBuilder
+            );
         }
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
index bfcc399..018d2b7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
@@ -32,20 +32,22 @@ import 
org.apache.kafka.streams.processor.internals.ProcessorAdapter;
  */
 public class ProcessorParameters<KIn, VIn, KOut, VOut> {
 
+    // During the transition to KIP-478, we capture arguments passed from the 
old API to simplify
+    // the performance of casts that we still need to perform. This will 
eventually be removed.
     private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, 
VIn> oldProcessorSupplier;
     private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
     private final String processorName;
 
     public ProcessorParameters(final 
org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> 
processorSupplier,
                                final String processorName) {
-        this.oldProcessorSupplier = processorSupplier;
+        oldProcessorSupplier = processorSupplier;
         this.processorSupplier = () -> 
ProcessorAdapter.adapt(processorSupplier.get());
         this.processorName = processorName;
     }
 
     public ProcessorParameters(final ProcessorSupplier<KIn, VIn, KOut, VOut> 
processorSupplier,
                                final String processorName) {
-        this.oldProcessorSupplier = null;
+        oldProcessorSupplier = null;
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
index 9e71d5d..0e26ee5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
@@ -102,7 +102,7 @@ public interface ProcessorContext<KForward, VForward> {
      * @param name The store name
      * @return The state store instance
      */
-    <S> S getStateStore(final String name);
+    <S extends StateStore> S getStateStore(final String name);
 
     /**
      * Schedules a periodic operation for processors. A processor may call 
this method during
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
index 948109e..d8e4af4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorAdapter.java
@@ -21,7 +21,7 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 
 public final class ProcessorAdapter<KIn, VIn, KOut, VOut> implements 
Processor<KIn, VIn, KOut, VOut> {
-    public final org.apache.kafka.streams.processor.Processor<KIn, VIn> 
delegate;
+    private final org.apache.kafka.streams.processor.Processor<KIn, VIn> 
delegate;
 
     public static <KIn, VIn, KOut, VOut> Processor<KIn, VIn, KOut, VOut> 
adapt(final org.apache.kafka.streams.processor.Processor<KIn, VIn> delegate) {
         if (delegate == null) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
index 410adcf..85dbd52 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
@@ -170,7 +170,7 @@ public final class ProcessorContextAdapter<KForward, 
VForward>
 
     @SuppressWarnings("unchecked")
     @Override
-    public <S> S getStateStore(final String name) {
+    public <S extends StateStore> S getStateStore(final String name) {
         return (S) delegate.getStateStore(name);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
index e180010..ad8cd0a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImplTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.To;
 import org.apache.kafka.streams.processor.internals.Task.TaskType;
@@ -143,7 +142,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForKeyValueStore() {
         final StateStore store = 
globalContext.getStateStore(GLOBAL_KEY_VALUE_STORE_NAME);
         try {
-            store.init((ProcessorContext) null, null);
+            store.init(null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -152,7 +151,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForTimestampedKeyValueStore() {
         final StateStore store = 
globalContext.getStateStore(GLOBAL_TIMESTAMPED_KEY_VALUE_STORE_NAME);
         try {
-            store.init((ProcessorContext) null, null);
+            store.init(null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -161,7 +160,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForWindowStore() {
         final StateStore store = 
globalContext.getStateStore(GLOBAL_WINDOW_STORE_NAME);
         try {
-            store.init((ProcessorContext) null, null);
+            store.init(null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -170,7 +169,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForTimestampedWindowStore() {
         final StateStore store = 
globalContext.getStateStore(GLOBAL_TIMESTAMPED_WINDOW_STORE_NAME);
         try {
-            store.init((ProcessorContext) null, null);
+            store.init(null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
@@ -179,7 +178,7 @@ public class GlobalProcessorContextImplTest {
     public void shouldNotAllowInitForSessionStore() {
         final StateStore store = 
globalContext.getStateStore(GLOBAL_SESSION_STORE_NAME);
         try {
-            store.init((ProcessorContext) null, null);
+            store.init(null, null);
             fail("Should have thrown UnsupportedOperationException.");
         } catch (final UnsupportedOperationException expected) { }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index ab88efa..f4b62c3d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -767,7 +767,7 @@ public class ProcessorContextImplTest {
         assertTrue(store.persistent());
         assertTrue(store.isOpen());
 
-        checkThrowsUnsupportedOperation(() -> store.init((ProcessorContext) 
null, null), "init()");
+        checkThrowsUnsupportedOperation(() -> store.init(null, null), 
"init()");
         checkThrowsUnsupportedOperation(store::close, "close()");
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index cc637cb..77dc6af 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -81,7 +81,7 @@ public class ProcessorTopologyTest {
     private static final String THROUGH_TOPIC_1 = "through-topic-1";
 
     private static final Header HEADER = new RecordHeader("key", 
"value".getBytes());
-    private static final Headers HEADERS = new RecordHeaders(new Header[] 
{HEADER});
+    private static final Headers HEADERS = new RecordHeaders(new 
Header[]{HEADER});
 
     private final TopologyWrapper topology = new TopologyWrapper();
     private final MockApiProcessorSupplier<?, ?, ?, ?> mockProcessorSupplier = 
new MockApiProcessorSupplier<>();
@@ -178,7 +178,7 @@ public class ProcessorTopologyTest {
         driver = new TopologyTestDriver(createSimpleTopology(partition), 
props);
         final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, 
Instant.ofEpochMilli(0L), Duration.ZERO);
         final TestOutputTopic<String, String> outputTopic1 =
-            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
+                driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
 
         inputTopic.pipeInput("key1", "value1");
         assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1");
@@ -203,9 +203,9 @@ public class ProcessorTopologyTest {
         driver = new TopologyTestDriver(createMultiplexingTopology(), props);
         final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, 
Instant.ofEpochMilli(0L), Duration.ZERO);
         final TestOutputTopic<String, String> outputTopic1 =
-            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
+                driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
         final TestOutputTopic<String, String> outputTopic2 =
-            driver.createOutputTopic(OUTPUT_TOPIC_2, 
Serdes.String().deserializer(), Serdes.String().deserializer());
+                driver.createOutputTopic(OUTPUT_TOPIC_2, 
Serdes.String().deserializer(), Serdes.String().deserializer());
         inputTopic.pipeInput("key1", "value1");
         assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1(1)");
         assertNextOutputRecord(outputTopic2.readRecord(), "key1", "value1(2)");
@@ -231,9 +231,9 @@ public class ProcessorTopologyTest {
         final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, 
Instant.ofEpochMilli(0L), Duration.ZERO);
         inputTopic.pipeInput("key1", "value1");
         final TestOutputTopic<String, String> outputTopic1 =
-            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
+                driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
         final TestOutputTopic<String, String> outputTopic2 =
-            driver.createOutputTopic(OUTPUT_TOPIC_2, 
Serdes.String().deserializer(), Serdes.String().deserializer());
+                driver.createOutputTopic(OUTPUT_TOPIC_2, 
Serdes.String().deserializer(), Serdes.String().deserializer());
         assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1(1)");
         assertNextOutputRecord(outputTopic2.readRecord(), "key1", "value1(2)");
 
@@ -258,7 +258,7 @@ public class ProcessorTopologyTest {
         driver = new TopologyTestDriver(createStatefulTopology(storeName), 
props);
         final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
         final TestOutputTopic<Integer, String> outputTopic1 =
-            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
+                driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.Integer().deserializer(), Serdes.String().deserializer());
 
         inputTopic.pipeInput("key1", "value1");
         inputTopic.pipeInput("key2", "value2");
@@ -389,9 +389,9 @@ public class ProcessorTopologyTest {
         driver = new 
TopologyTestDriver(createSimpleMultiSourceTopology(partition), props);
         final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER, 
Instant.ofEpochMilli(0L), Duration.ZERO);
         final TestOutputTopic<String, String> outputTopic1 =
-            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
+                driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
         final TestOutputTopic<String, String> outputTopic2 =
-            driver.createOutputTopic(OUTPUT_TOPIC_2, 
Serdes.String().deserializer(), Serdes.String().deserializer());
+                driver.createOutputTopic(OUTPUT_TOPIC_2, 
Serdes.String().deserializer(), Serdes.String().deserializer());
 
         inputTopic.pipeInput("key1", "value1");
         assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1");
@@ -411,7 +411,7 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key2", "value2");
         inputTopic.pipeInput("key3", "value3");
         final TestOutputTopic<String, String> outputTopic2 =
-            driver.createOutputTopic(OUTPUT_TOPIC_2, 
Serdes.String().deserializer(), Serdes.String().deserializer());
+                driver.createOutputTopic(OUTPUT_TOPIC_2, 
Serdes.String().deserializer(), Serdes.String().deserializer());
         assertNextOutputRecord(outputTopic2.readRecord(), "key1", "value1");
         assertNextOutputRecord(outputTopic2.readRecord(), "key2", "value2");
         assertNextOutputRecord(outputTopic2.readRecord(), "key3", "value3");
@@ -439,11 +439,11 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key3", "value3@3000");
         final TestOutputTopic<String, String> outputTopic = 
driver.createOutputTopic(OUTPUT_TOPIC_1, STRING_DESERIALIZER, 
STRING_DESERIALIZER);
         assertThat(outputTopic.readRecord(),
-                   equalTo(new TestRecord<>("key1", "value1", null, 1000L)));
+                equalTo(new TestRecord<>("key1", "value1", null, 1000L)));
         assertThat(outputTopic.readRecord(),
-                   equalTo(new TestRecord<>("key2", "value2", null, 2000L)));
+                equalTo(new TestRecord<>("key2", "value2", null, 2000L)));
         assertThat(outputTopic.readRecord(),
-                   equalTo(new TestRecord<>("key3", "value3", null, 3000L)));
+                equalTo(new TestRecord<>("key3", "value3", null, 3000L)));
     }
 
     @Test
@@ -499,7 +499,7 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key2", "value2", 20L);
         inputTopic.pipeInput("key3", "value3", 30L);
         final TestOutputTopic<String, String> outputTopic1 =
-            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
+                driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
         assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1", 
10L);
         assertNextOutputRecord(outputTopic1.readRecord(), "key2", "value2", 
20L);
         assertNextOutputRecord(outputTopic1.readRecord(), "key3", "value3", 
30L);
@@ -514,7 +514,7 @@ public class ProcessorTopologyTest {
         inputTopic.pipeInput("key2", "value2", 20L);
         inputTopic.pipeInput("key3", "value3", 30L);
         final TestOutputTopic<String, String> outputTopic1 =
-            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
+                driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
         assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1", 
20L);
         assertNextOutputRecord(outputTopic1.readRecord(), "key2", "value2", 
30L);
         assertNextOutputRecord(outputTopic1.readRecord(), "key3", "value3", 
40L);
@@ -526,9 +526,9 @@ public class ProcessorTopologyTest {
         driver = new 
TopologyTestDriver(createMultiProcessorTimestampTopology(partition), props);
         final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic(INPUT_TOPIC_1, STRING_SERIALIZER, STRING_SERIALIZER);
         final TestOutputTopic<String, String> outputTopic1 =
-            driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
+                driver.createOutputTopic(OUTPUT_TOPIC_1, 
Serdes.String().deserializer(), Serdes.String().deserializer());
         final TestOutputTopic<String, String> outputTopic2 =
-            driver.createOutputTopic(OUTPUT_TOPIC_2, 
Serdes.String().deserializer(), Serdes.String().deserializer());
+                driver.createOutputTopic(OUTPUT_TOPIC_2, 
Serdes.String().deserializer(), Serdes.String().deserializer());
 
         inputTopic.pipeInput("key1", "value1", 10L);
         assertNextOutputRecord(outputTopic1.readRecord(), "key1", "value1", 
10L);
@@ -610,22 +610,11 @@ public class ProcessorTopologyTest {
         assertTrue(processorTopology.hasPersistentGlobalStore());
     }
 
-    private ProcessorTopology createLocalStoreTopologyWithOldAPI(final 
KeyValueBytesStoreSupplier storeSupplier) {
-        final TopologyWrapper topology = new TopologyWrapper();
-        final String processor = "processor";
-        final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), 
Serdes.String());
-        topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
"topic")
-                .addProcessor(processor, () -> new 
OldAPIStatefulProcessor(storeSupplier.name()), "source")
-                .addStateStore(storeBuilder, processor);
-        return topology.getInternalBuilder("anyAppId").buildTopology();
-    }
-
     private ProcessorTopology createLocalStoreTopology(final 
KeyValueBytesStoreSupplier storeSupplier) {
         final TopologyWrapper topology = new TopologyWrapper();
         final String processor = "processor";
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), 
Serdes.String());
+                Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), 
Serdes.String());
         topology.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
"topic")
                 .addProcessor(processor, () -> new 
StatefulProcessor(storeSupplier.name()), "source")
                 .addStateStore(storeBuilder, processor);
@@ -635,9 +624,9 @@ public class ProcessorTopologyTest {
     private ProcessorTopology createGlobalStoreTopology(final 
KeyValueBytesStoreSupplier storeSupplier) {
         final TopologyWrapper topology = new TopologyWrapper();
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
-            Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), 
Serdes.String()).withLoggingDisabled();
+                Stores.keyValueStoreBuilder(storeSupplier, Serdes.String(), 
Serdes.String()).withLoggingDisabled();
         topology.addGlobalStore(storeBuilder, "global", STRING_DESERIALIZER, 
STRING_DESERIALIZER, "topic", "processor",
-                                define(new 
OldAPIStatefulProcessor(storeSupplier.name())));
+                define(new OldAPIStatefulProcessor(storeSupplier.name())));
         return topology.getInternalBuilder("anyAppId").buildTopology();
     }
 
@@ -727,9 +716,9 @@ public class ProcessorTopologyTest {
 
     private Topology createInternalRepartitioningTopology() {
         topology.addSource("source", INPUT_TOPIC_1)
-                .addSink("sink0", THROUGH_TOPIC_1, "source")
-                .addSource("source1", THROUGH_TOPIC_1)
-                .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+            .addSink("sink0", THROUGH_TOPIC_1, "source")
+            .addSource("source1", THROUGH_TOPIC_1)
+            .addSink("sink1", OUTPUT_TOPIC_1, "source1");
 
         // use wrapper to get the internal topology builder to add internal 
topic
         final InternalTopologyBuilder internalTopologyBuilder = 
TopologyWrapper.getInternalTopologyBuilder(topology);
@@ -754,24 +743,24 @@ public class ProcessorTopologyTest {
 
     private Topology createForwardToSourceTopology() {
         return topology.addSource("source-1", INPUT_TOPIC_1)
-                       .addSink("sink-1", OUTPUT_TOPIC_1, "source-1")
-                       .addSource("source-2", OUTPUT_TOPIC_1)
-                       .addSink("sink-2", OUTPUT_TOPIC_2, "source-2");
+                .addSink("sink-1", OUTPUT_TOPIC_1, "source-1")
+                .addSource("source-2", OUTPUT_TOPIC_1)
+                .addSink("sink-2", OUTPUT_TOPIC_2, "source-2");
     }
 
     private Topology createSimpleMultiSourceTopology(final int partition) {
         return topology.addSource("source-1", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
-                       .addProcessor("processor-1", define(new 
ForwardingProcessor()), "source-1")
-                       .addSink("sink-1", OUTPUT_TOPIC_1, 
constantPartitioner(partition), "processor-1")
-                       .addSource("source-2", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_2)
-                       .addProcessor("processor-2", define(new 
ForwardingProcessor()), "source-2")
-                       .addSink("sink-2", OUTPUT_TOPIC_2, 
constantPartitioner(partition), "processor-2");
+                .addProcessor("processor-1", define(new 
ForwardingProcessor()), "source-1")
+                .addSink("sink-1", OUTPUT_TOPIC_1, 
constantPartitioner(partition), "processor-1")
+                .addSource("source-2", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_2)
+                .addProcessor("processor-2", define(new 
ForwardingProcessor()), "source-2")
+                .addSink("sink-2", OUTPUT_TOPIC_2, 
constantPartitioner(partition), "processor-2");
     }
 
     private Topology createAddHeaderTopology() {
         return topology.addSource("source-1", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
-                       .addProcessor("processor-1", define(new 
AddHeaderProcessor()), "source-1")
-                       .addSink("sink-1", OUTPUT_TOPIC_1, "processor-1");
+                .addProcessor("processor-1", define(new AddHeaderProcessor()), 
"source-1")
+                .addSink("sink-1", OUTPUT_TOPIC_1, "processor-1");
     }
 
     /**
@@ -992,7 +981,7 @@ public class ProcessorTopologyTest {
     /**
      * A custom timestamp extractor that extracts the timestamp from the 
record's value if the value is in ".*@[0-9]+"
      * format. Otherwise, it returns the record's timestamp or the default 
timestamp if the record's timestamp is negative.
-     */
+    */
     public static class CustomTimestampExtractor implements TimestampExtractor 
{
         private static final long DEFAULT_TIMESTAMP = 1000L;
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5b871f5..a0c09e5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -61,7 +61,6 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
 import org.apache.kafka.streams.processor.api.Processor;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -1196,33 +1195,25 @@ public class StreamThreadTest {
         internalTopologyBuilder.addSource(null, "name", null, null, null, 
topic1);
         final AtomicBoolean shouldThrow = new AtomicBoolean(false);
         final AtomicBoolean processed = new AtomicBoolean(false);
-        internalTopologyBuilder.addProcessor("proc", new 
ProcessorSupplier<Object, Object, Object, Object>() {
-            @Override
-            public Processor<Object, Object, Object, Object> get() {
-                return new Processor<Object, Object, Object, Object>() {
-                    private ProcessorContext<Object, Object> context;
-
-                    @Override
-                    public void init(final ProcessorContext<Object, Object> 
context) {
-                        this.context = context;
-                    }
+        internalTopologyBuilder.addProcessor(
+            "proc",
+            () -> new Processor<Object, Object, Object, Object>() {
 
-                    @Override
-                    public void process(final Object key, final Object value) {
-                        if (shouldThrow.get()) {
-                            throw new 
TaskCorruptedException(singletonMap(task1, new 
HashSet<TopicPartition>(singleton(storeChangelogTopicPartition))));
-                        } else {
-                            processed.set(true);
-                        }
+                @Override
+                public void process(final Object key, final Object value) {
+                    if (shouldThrow.get()) {
+                        throw new TaskCorruptedException(singletonMap(task1, 
new HashSet<>(singleton(storeChangelogTopicPartition))));
+                    } else {
+                        processed.set(true);
                     }
+                }
 
-                    @Override
-                    public void close() {
+                @Override
+                public void close() {
 
-                    }
-                };
-            }
-        }, "name");
+                }
+            },
+            "name");
         internalTopologyBuilder.addStateStore(
             Stores.keyValueStoreBuilder(
                 Stores.persistentKeyValueStore(storeName),
@@ -1596,19 +1587,20 @@ public class StreamThreadTest {
     public void shouldPunctuateActiveTask() {
         final List<Long> punctuatedStreamTime = new ArrayList<>();
         final List<Long> punctuatedWallClockTime = new ArrayList<>();
-        final org.apache.kafka.streams.processor.ProcessorSupplier<Object, 
Object> punctuateProcessor = () -> new 
org.apache.kafka.streams.processor.Processor<Object, Object>() {
-            @Override
-            public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
-                context.schedule(Duration.ofMillis(100L), 
PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
-                context.schedule(Duration.ofMillis(100L), 
PunctuationType.WALL_CLOCK_TIME, punctuatedWallClockTime::add);
-            }
+        final org.apache.kafka.streams.processor.ProcessorSupplier<Object, 
Object> punctuateProcessor =
+            () -> new org.apache.kafka.streams.processor.Processor<Object, 
Object>() {
+                @Override
+                public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+                    context.schedule(Duration.ofMillis(100L), 
PunctuationType.STREAM_TIME, punctuatedStreamTime::add);
+                    context.schedule(Duration.ofMillis(100L), 
PunctuationType.WALL_CLOCK_TIME, punctuatedWallClockTime::add);
+                }
 
-            @Override
-            public void process(final Object key, final Object value) {}
+                @Override
+                public void process(final Object key, final Object value) {}
 
-            @Override
-            public void close() {}
-        };
+                @Override
+                public void close() {}
+            };
 
         internalStreamsBuilder.stream(Collections.singleton(topic1), 
consumed).process(punctuateProcessor);
         internalStreamsBuilder.buildAndOptimizeTopology();
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
index 87c915f..9c3bc3d 100644
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
+++ 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala
@@ -22,7 +22,14 @@ import java.time.Duration.ofSeconds
 import java.time.Instant
 
 import org.apache.kafka.streams.KeyValue
-import org.apache.kafka.streams.kstream._
+import org.apache.kafka.streams.kstream.{
+  JoinWindows,
+  Transformer,
+  ValueTransformer,
+  ValueTransformerSupplier,
+  ValueTransformerWithKey,
+  ValueTransformerWithKeySupplier
+}
 import org.apache.kafka.streams.processor.ProcessorContext
 import org.apache.kafka.streams.scala.ImplicitConversions._
 import org.apache.kafka.streams.scala.Serdes._

Reply via email to