This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 9c91e05  KAFKA-9739:  Fixes null key changing child node (#8416)
9c91e05 is described below

commit 9c91e052f8531fe65df989ad0761ab10623ac116
Author: Bill Bejeck <bbej...@gmail.com>
AuthorDate: Fri Apr 3 15:14:23 2020 -0400

    KAFKA-9739:  Fixes null key changing child node (#8416)
    
    2.4 port of #8400 since cherry-picking not possible
    
    Reviewers: John Roesler <j...@confluent.io>
---
 .../kstream/internals/InternalStreamsBuilder.java  |  17 +-
 .../kstream/internals/graph/StreamsGraphTest.java  | 186 +++++++++++++++++++++
 2 files changed, 198 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 9509431..cbb4a65 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -396,11 +396,13 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
         final Set<StreamsGraphNode> mergeNodeKeyChangingParentsToRemove = new 
HashSet<>();
         for (final StreamsGraphNode mergeNode : mergeNodes) {
             mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
-            final Collection<StreamsGraphNode> keys = 
keyChangingOperationsToOptimizableRepartitionNodes.keySet();
-            for (final StreamsGraphNode key : keys) {
-                final StreamsGraphNode maybeParentKey = 
findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key));
-                if (maybeParentKey != null) {
-                    mergeNodesToKeyChangers.get(mergeNode).add(key);
+            final Set<Map.Entry<StreamsGraphNode, 
LinkedHashSet<OptimizableRepartitionNode>>> entrySet = 
keyChangingOperationsToOptimizableRepartitionNodes.entrySet();
+            for (final Map.Entry<StreamsGraphNode, 
LinkedHashSet<OptimizableRepartitionNode>> entry : entrySet) {
+                if (mergeNodeHasRepartitionChildren(mergeNode, 
entry.getValue())) {
+                    final StreamsGraphNode maybeParentKey = 
findParentNodeMatching(mergeNode, node -> 
node.parentNodes().contains(entry.getKey()));
+                    if (maybeParentKey != null) {
+                        
mergeNodesToKeyChangers.get(mergeNode).add(entry.getKey());
+                    }
                 }
             }
         }
@@ -421,6 +423,11 @@ public class InternalStreamsBuilder implements 
InternalNameProvider {
         }
     }
 
+    private boolean mergeNodeHasRepartitionChildren(final StreamsGraphNode 
mergeNode,
+                                                    final 
LinkedHashSet<OptimizableRepartitionNode> repartitionNodes) {
+        return repartitionNodes.stream().allMatch(n -> 
findParentNodeMatching(n, gn -> gn.parentNodes().contains(mergeNode)) != null);
+    }
+
     @SuppressWarnings("unchecked")
     private OptimizableRepartitionNode createRepartitionNode(final String 
repartitionTopicName,
                                                              final Serde 
keySerde,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index e2006e6..6a0a0b6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -22,14 +22,22 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Transformer;
+import org.apache.kafka.streams.kstream.TransformerSupplier;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.processor.ProcessorContext;
 import org.junit.Test;
 
 import java.time.Duration;
@@ -47,6 +55,8 @@ import static org.junit.Assert.assertEquals;
 public class StreamsGraphTest {
 
     private final Pattern repartitionTopicPattern = Pattern.compile("Sink: 
.*-repartition");
+    private Initializer<String> initializer;
+    private Aggregator<String, String, String> aggregator;
 
     // Test builds topology in succesive manner but only graph node not yet 
processed written to topology
 
@@ -102,6 +112,76 @@ public class StreamsGraphTest {
     }
 
     @Test
+    @SuppressWarnings("unchecked")
+    public void shouldNotThrowNPEWithMergeNodes() {
+        final Properties properties = new Properties();
+        properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, 
"test-application");
+        properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092");
+        properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.OPTIMIZE);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        initializer = () -> "";
+        aggregator = (aggKey, value, aggregate) -> aggregate + value.length();
+        final TransformerSupplier<String, String, KeyValue<String, String>> 
transformSupplier = () -> new Transformer<String, String, KeyValue<String, 
String>>() {
+            @Override
+            public void init(final ProcessorContext context) {
+
+            }
+
+            @Override
+            public KeyValue<String, String> transform(final String key, final 
String value) {
+                return KeyValue.pair(key, value);
+            }
+
+            @Override
+            public void close() {
+
+            }
+        };
+
+        final KStream<String, String> retryStream = 
builder.stream("retryTopic", Consumed.with(Serdes.String(), Serdes.String()))
+                .transform(transformSupplier)
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .aggregate(initializer,
+                        aggregator,
+                        Materialized.with(Serdes.String(), Serdes.String()))
+                .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(500), 
Suppressed.BufferConfig.maxBytes(64_000_000)))
+                .toStream()
+                .flatMap((k, v) -> new ArrayList<>());
+
+        final KTable<String, String> idTable = 
builder.stream("id-table-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
+                .flatMap((k, v) -> new ArrayList<KeyValue<String, String>>())
+                .peek((subscriptionId, recipientId) -> 
System.out.println("data " + subscriptionId + " " + recipientId))
+                .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+                .aggregate(initializer,
+                        aggregator,
+                        Materialized.with(Serdes.String(), Serdes.String()));
+
+        final KStream<String, String> joinStream = 
builder.stream("internal-topic-command", Consumed.with(Serdes.String(), 
Serdes.String()))
+                .peek((subscriptionId, command) -> 
System.out.println("stdoutput"))
+                .mapValues((k, v) -> v)
+                .merge(retryStream)
+                .leftJoin(idTable, (v1, v2) -> v1 + v2,
+                        Joined.with(Serdes.String(), Serdes.String(), 
Serdes.String()));
+
+        final KStream<String, String>[] branches = joinStream.branch((k, v) -> 
v.equals("some-value"), (k, v) -> true);
+
+        branches[0].map(KeyValue::pair)
+                .peek((recipientId, command) -> System.out.println("printing 
out"))
+                .to("external-command", Produced.with(Serdes.String(), 
Serdes.String()));
+
+        branches[1].filter((k, v) -> v != null)
+                .peek((subscriptionId, wrapper) -> 
System.out.println("Printing output"))
+                .mapValues((k, v) -> v)
+                .to("dlq-topic", Produced.with(Serdes.String(), 
Serdes.String()));
+
+        branches[1].map(KeyValue::pair).to("retryTopic", 
Produced.with(Serdes.String(), Serdes.String()));
+
+        final Topology topology = builder.build(properties);
+        assertEquals(expectedComplexMergeOptimizeTopology, 
topology.describe().toString());
+    }
+
+    @Test
     public void 
shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
 
         final Topology attemptedOptimize = 
getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE);
@@ -291,4 +371,110 @@ public class StreamsGraphTest {
         "    Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n" +
         "      <-- KSTREAM-MERGE-0000000006\n\n";
 
+
+    private final String expectedComplexMergeOptimizeTopology = 
"Topologies:\n" +
+            "   Sub-topology: 0\n" +
+            "    Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n" +
+            "      --> KSTREAM-TRANSFORM-0000000001\n" +
+            "    Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000040\n" +
+            "      <-- KSTREAM-SOURCE-0000000000\n" +
+            "    Processor: KSTREAM-FILTER-0000000040 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000039\n" +
+            "      <-- KSTREAM-TRANSFORM-0000000001\n" +
+            "    Sink: KSTREAM-SINK-0000000039 (topic: 
KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000040\n" +
+            "\n" +
+            "  Sub-topology: 1\n" +
+            "    Source: KSTREAM-SOURCE-0000000041 (topics: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" +
+            "      --> KSTREAM-AGGREGATE-0000000003\n" +
+            "    Processor: KSTREAM-AGGREGATE-0000000003 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" +
+            "      --> KTABLE-SUPPRESS-0000000007\n" +
+            "      <-- KSTREAM-SOURCE-0000000041\n" +
+            "    Source: KSTREAM-SOURCE-0000000019 (topics: 
[internal-topic-command])\n" +
+            "      --> KSTREAM-PEEK-0000000020\n" +
+            "    Processor: KTABLE-SUPPRESS-0000000007 (stores: 
[KTABLE-SUPPRESS-STATE-STORE-0000000008])\n" +
+            "      --> KTABLE-TOSTREAM-0000000009\n" +
+            "      <-- KSTREAM-AGGREGATE-0000000003\n" +
+            "    Processor: KSTREAM-PEEK-0000000020 (stores: [])\n" +
+            "      --> KSTREAM-MAPVALUES-0000000021\n" +
+            "      <-- KSTREAM-SOURCE-0000000019\n" +
+            "    Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n" +
+            "      --> KSTREAM-FLATMAP-0000000010\n" +
+            "      <-- KTABLE-SUPPRESS-0000000007\n" +
+            "    Processor: KSTREAM-FLATMAP-0000000010 (stores: [])\n" +
+            "      --> KSTREAM-MERGE-0000000022\n" +
+            "      <-- KTABLE-TOSTREAM-0000000009\n" +
+            "    Processor: KSTREAM-MAPVALUES-0000000021 (stores: [])\n" +
+            "      --> KSTREAM-MERGE-0000000022\n" +
+            "      <-- KSTREAM-PEEK-0000000020\n" +
+            "    Processor: KSTREAM-MERGE-0000000022 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000024\n" +
+            "      <-- KSTREAM-MAPVALUES-0000000021, 
KSTREAM-FLATMAP-0000000010\n" +
+            "    Processor: KSTREAM-FILTER-0000000024 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000023\n" +
+            "      <-- KSTREAM-MERGE-0000000022\n" +
+            "    Sink: KSTREAM-SINK-0000000023 (topic: 
KSTREAM-MERGE-0000000022-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000024\n" +
+            "\n" +
+            "  Sub-topology: 2\n" +
+            "    Source: KSTREAM-SOURCE-0000000011 (topics: 
[id-table-topic])\n" +
+            "      --> KSTREAM-FLATMAP-0000000012\n" +
+            "    Processor: KSTREAM-FLATMAP-0000000012 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000043\n" +
+            "      <-- KSTREAM-SOURCE-0000000011\n" +
+            "    Processor: KSTREAM-FILTER-0000000043 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000042\n" +
+            "      <-- KSTREAM-FLATMAP-0000000012\n" +
+            "    Sink: KSTREAM-SINK-0000000042 (topic: 
KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition)\n" +
+            "      <-- KSTREAM-FILTER-0000000043\n" +
+            "\n" +
+            "  Sub-topology: 3\n" +
+            "    Source: KSTREAM-SOURCE-0000000025 (topics: 
[KSTREAM-MERGE-0000000022-repartition])\n" +
+            "      --> KSTREAM-LEFTJOIN-0000000026\n" +
+            "    Processor: KSTREAM-LEFTJOIN-0000000026 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" +
+            "      --> KSTREAM-BRANCH-0000000027\n" +
+            "      <-- KSTREAM-SOURCE-0000000025\n" +
+            "    Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n" +
+            "      --> KSTREAM-BRANCHCHILD-0000000029, 
KSTREAM-BRANCHCHILD-0000000028\n" +
+            "      <-- KSTREAM-LEFTJOIN-0000000026\n" +
+            "    Processor: KSTREAM-BRANCHCHILD-0000000029 (stores: [])\n" +
+            "      --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n" +
+            "      <-- KSTREAM-BRANCH-0000000027\n" +
+            "    Processor: KSTREAM-BRANCHCHILD-0000000028 (stores: [])\n" +
+            "      --> KSTREAM-MAP-0000000030\n" +
+            "      <-- KSTREAM-BRANCH-0000000027\n" +
+            "    Processor: KSTREAM-FILTER-0000000033 (stores: [])\n" +
+            "      --> KSTREAM-PEEK-0000000034\n" +
+            "      <-- KSTREAM-BRANCHCHILD-0000000029\n" +
+            "    Processor: KSTREAM-MAP-0000000030 (stores: [])\n" +
+            "      --> KSTREAM-PEEK-0000000031\n" +
+            "      <-- KSTREAM-BRANCHCHILD-0000000028\n" +
+            "    Processor: KSTREAM-PEEK-0000000034 (stores: [])\n" +
+            "      --> KSTREAM-MAPVALUES-0000000035\n" +
+            "      <-- KSTREAM-FILTER-0000000033\n" +
+            "    Source: KSTREAM-SOURCE-0000000044 (topics: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n" +
+            "      --> KSTREAM-PEEK-0000000013\n" +
+            "    Processor: KSTREAM-MAP-0000000037 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000038\n" +
+            "      <-- KSTREAM-BRANCHCHILD-0000000029\n" +
+            "    Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000036\n" +
+            "      <-- KSTREAM-PEEK-0000000034\n" +
+            "    Processor: KSTREAM-PEEK-0000000013 (stores: [])\n" +
+            "      --> KSTREAM-AGGREGATE-0000000015\n" +
+            "      <-- KSTREAM-SOURCE-0000000044\n" +
+            "    Processor: KSTREAM-PEEK-0000000031 (stores: [])\n" +
+            "      --> KSTREAM-SINK-0000000032\n" +
+            "      <-- KSTREAM-MAP-0000000030\n" +
+            "    Processor: KSTREAM-AGGREGATE-0000000015 (stores: 
[KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" +
+            "      --> none\n" +
+            "      <-- KSTREAM-PEEK-0000000013\n" +
+            "    Sink: KSTREAM-SINK-0000000032 (topic: external-command)\n" +
+            "      <-- KSTREAM-PEEK-0000000031\n" +
+            "    Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n" +
+            "      <-- KSTREAM-MAPVALUES-0000000035\n" +
+            "    Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n" +
+            "      <-- KSTREAM-MAP-0000000037\n\n";
+
 }

Reply via email to