This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 87a886b3988 KAFKA-14388 - Fixes the NPE when using the new Processor 
API with the DSL (#12861)
87a886b3988 is described below

commit 87a886b39888b93976c5e54327a97d386d0ed142
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Nov 16 17:06:15 2022 -0500

    KAFKA-14388 - Fixes the NPE when using the new Processor API with the DSL 
(#12861)
    
    With the addition of the new Processor API the newly added 
FixedKeyProcessorNodeFactory extends the ProcessorNodeFactory class. The 
ProcessorNodeFactory had a private field Set<String> stateStoreNames 
initialized to an empty see. The FixedKeyProcessorNodeFactory also had a 
private field Set<String> stateStoreNames.
    
    When executing InternalTopologyBuilder.build executing the 
buildProcessorNode method passed any node factory as ProcessorNodeFactory and 
the method references the stateStoreNames field, it's pointing to the 
superclass field, which is empty so the corresponding StoreBuilder(s) are never 
added - causing NPE in the topology.
    
    This PR makes the field protected on the ProcessorNodeFactory class so 
FixedKeyProcessorNodeFactory inherits it.
    
    The added test fails without this change.
    
    Reviewers: Matthias J. Sax <[email protected]>,  Sophie Blee-Goldman 
<[email protected]>, Jorge Esteban Quilcate Otoya <[email protected]>
---
 .../internals/InternalTopologyBuilder.java         |   7 +-
 .../internals/KStreamNewProcessorApiTest.java      | 138 +++++++++++++++++++++
 2 files changed, 139 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index b4db744389c..bdf521a3b8e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -224,7 +224,7 @@ public class InternalTopologyBuilder {
 
     private static class ProcessorNodeFactory<KIn, VIn, KOut, VOut> extends 
NodeFactory<KIn, VIn, KOut, VOut> {
         private final ProcessorSupplier<KIn, VIn, KOut, VOut> supplier;
-        private final Set<String> stateStoreNames = new HashSet<>();
+        final Set<String> stateStoreNames = new HashSet<>();
 
         ProcessorNodeFactory(final String name,
                              final String[] predecessors,
@@ -250,7 +250,6 @@ public class InternalTopologyBuilder {
 
     private static class FixedKeyProcessorNodeFactory<KIn, VIn, VOut> extends 
ProcessorNodeFactory<KIn, VIn, KIn, VOut> {
         private final FixedKeyProcessorSupplier<KIn, VIn, VOut> supplier;
-        private final Set<String> stateStoreNames = new HashSet<>();
 
         FixedKeyProcessorNodeFactory(final String name,
                              final String[] predecessors,
@@ -259,10 +258,6 @@ public class InternalTopologyBuilder {
             this.supplier = supplier;
         }
 
-        public void addStateStore(final String stateStoreName) {
-            stateStoreNames.add(stateStoreName);
-        }
-
         @Override
         public ProcessorNode<KIn, VIn, KIn, VOut> build() {
             return new ProcessorNode<>(name, supplier.get(), stateStoreNames);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest.java
new file mode 100644
index 00000000000..da5bca43b84
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamNewProcessorApiTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
+import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
+import org.apache.kafka.streams.processor.api.FixedKeyRecord;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+import static java.util.Arrays.asList;
+
+
+public class KStreamNewProcessorApiTest {
+
+    @Test
+    @DisplayName("Should attach the state store using ConnectedStoreProvider")
+    void shouldGetStateStoreWithConnectedStoreProvider() {
+        runTest(false);
+    }
+
+    @Test
+    @DisplayName("Should attach the state store StreamBuilder.addStateStore")
+    void shouldGetStateStoreWithStreamBuilder() {
+        runTest(true);
+    }
+
+    private void runTest(final boolean shouldAddStoreDirectly) {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final StoreBuilder<?> storeBuilder = 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("store"), 
Serdes.String(), Serdes.String());
+
+        if (shouldAddStoreDirectly) {
+            builder.addStateStore(storeBuilder);
+        }
+        builder.stream("input", Consumed.with(Serdes.String(), 
Serdes.String()))
+                .processValues(new TransformerSupplier(shouldAddStoreDirectly 
? null : storeBuilder), "store")
+                .to("output", Produced.with(Serdes.String(), Serdes.String()));
+
+        final List<KeyValue<String, String>> words = 
Arrays.asList(KeyValue.pair("a", "foo"), KeyValue.pair("b", "bar"), 
KeyValue.pair("c", "baz"));
+        try (TopologyTestDriver testDriver = new 
TopologyTestDriver(builder.build())) {
+            final TestInputTopic<String, String>
+                    testDriverInputTopic =
+                    testDriver.createInputTopic("input", 
Serdes.String().serializer(), Serdes.String().serializer());
+
+            words.forEach(clk -> testDriverInputTopic.pipeInput(clk.key, 
clk.value));
+
+            final List<String> expectedOutput = asList("fooUpdated", 
"barUpdated", "bazUpdated");
+
+            final Deserializer<String> keyDeserializer = 
Serdes.String().deserializer();
+            final List<String> actualOutput =
+                    new ArrayList<>(testDriver.createOutputTopic("output", 
keyDeserializer, Serdes.String().deserializer()).readValuesToList());
+
+            final KeyValueStore<String, String> stateStore = 
testDriver.getKeyValueStore("store");
+
+            Assertions.assertEquals(expectedOutput, actualOutput);
+            Assertions.assertEquals(stateStore.get("a"), "fooUpdated");
+            Assertions.assertEquals(stateStore.get("b"), "barUpdated");
+            Assertions.assertEquals(stateStore.get("c"), "bazUpdated");
+        }
+    }
+    
+    private static class TransformerSupplier implements 
FixedKeyProcessorSupplier<String, String, String> {
+        private final StoreBuilder<?> storeBuilder;
+        public TransformerSupplier(final StoreBuilder<?> storeBuilder) {
+            this.storeBuilder = storeBuilder;
+        }
+
+
+        @Override
+        public ContextualFixedKeyProcessor<String, String, String> get() {
+            return new ContextualFixedKeyProcessor<String, String, String>() {
+                KeyValueStore<String, String> store;
+                FixedKeyProcessorContext<String, String> context;
+
+                @Override
+                public void init(final FixedKeyProcessorContext<String, 
String> context) {
+                    super.init(context);
+                    store = context.getStateStore("store");
+                    Objects.requireNonNull(store, "State store can't be null");
+                    this.context = context;
+                }
+
+                @Override
+                public void process(final FixedKeyRecord<String, String> 
record) {
+                    store.putIfAbsent(record.key(), record.value() + 
"Updated");
+                    context().forward(record.withValue(record.value() + 
"Updated"));
+                }
+
+                @Override
+                public void close() {
+
+                }
+            };
+        }
+        @Override
+        public Set<StoreBuilder<?>> stores() {
+            if (storeBuilder != null) {
+                return Collections.singleton(storeBuilder);
+            }
+            return null;
+        }
+    }
+}

Reply via email to