[ https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16920779#comment-16920779 ]
Reynir Hübner commented on KAFKA-8705: -------------------------------------- I get the same exception on version *2.2.1* when TOPOLOGY_OPTIMIZATION is set to OPTIMIZE. > NullPointerException was thrown by topology optimization when two MergeNodes > have common KeyChaingingNode > --------------------------------------------------------------------------------------------------------- > > Key: KAFKA-8705 > URL: https://issues.apache.org/jira/browse/KAFKA-8705 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.3.0 > Reporter: Hiroshi Nakahara > Assignee: Bill Bejeck > Priority: Major > > NullPointerException was thrown by topology optimization when two MergeNodes > have common KeyChaingingNode. > Kafka Stream version: 2.3.0 > h3. Code > {code:java} > import org.apache.kafka.common.serialization.Serdes; > import org.apache.kafka.streams.StreamsBuilder; > import org.apache.kafka.streams.StreamsConfig; > import org.apache.kafka.streams.kstream.Consumed; > import org.apache.kafka.streams.kstream.KStream; > import java.util.Properties; > public class Main { > public static void main(String[] args) { > final StreamsBuilder streamsBuilder = new StreamsBuilder(); > final KStream<Integer, Integer> parentStream = > streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), > Serdes.Integer())) > .selectKey(Integer::sum); // To make parentStream > KeyChaingingPoint > final KStream<Integer, Integer> childStream1 = > parentStream.mapValues(v -> v + 1); > final KStream<Integer, Integer> childStream2 = > parentStream.mapValues(v -> v + 2); > final KStream<Integer, Integer> childStream3 = > parentStream.mapValues(v -> v + 3); > childStream1 > .merge(childStream2) > .merge(childStream3) > .to("outputTopic"); > final Properties properties = new Properties(); > properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, > StreamsConfig.OPTIMIZE); > streamsBuilder.build(properties); > } > } > {code} > h3. Expected result > streamsBuilder.build should create Topology without throwing Exception. The > expected topology is: > {code:java} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-0000000000 (topics: [parentTopic]) > --> KSTREAM-KEY-SELECT-0000000001 > Processor: KSTREAM-KEY-SELECT-0000000001 (stores: []) > --> KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003, > KSTREAM-MAPVALUES-0000000004 > <-- KSTREAM-SOURCE-0000000000 > Processor: KSTREAM-MAPVALUES-0000000002 (stores: []) > --> KSTREAM-MERGE-0000000005 > <-- KSTREAM-KEY-SELECT-0000000001 > Processor: KSTREAM-MAPVALUES-0000000003 (stores: []) > --> KSTREAM-MERGE-0000000005 > <-- KSTREAM-KEY-SELECT-0000000001 > Processor: KSTREAM-MAPVALUES-0000000004 (stores: []) > --> KSTREAM-MERGE-0000000006 > <-- KSTREAM-KEY-SELECT-0000000001 > Processor: KSTREAM-MERGE-0000000005 (stores: []) > --> KSTREAM-MERGE-0000000006 > <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003 > Processor: KSTREAM-MERGE-0000000006 (stores: []) > --> KSTREAM-SINK-0000000007 > <-- KSTREAM-MERGE-0000000005, KSTREAM-MAPVALUES-0000000004 > Sink: KSTREAM-SINK-0000000007 (topic: outputTopic) > <-- KSTREAM-MERGE-0000000006 > {code} > h3. Actual result > NullPointerException was thrown with the following stacktrace. > {code:java} > Exception in thread "main" java.lang.NullPointerException > at java.util.AbstractCollection.addAll(AbstractCollection.java:343) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275) > at > org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558) > at Main.main(Main.java:24){code} > h3. Cause > This exception occurs in > InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap. > {code:java} > private void maybeUpdateKeyChangingRepartitionNodeMap() { > final Map<StreamsGraphNode, Set<StreamsGraphNode>> > mergeNodesToKeyChangers = new HashMap<>(); > for (final StreamsGraphNode mergeNode : mergeNodes) { > mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>()); > final Collection<StreamsGraphNode> keys = > keyChangingOperationsToOptimizableRepartitionNodes.keySet(); > for (final StreamsGraphNode key : keys) { > final StreamsGraphNode maybeParentKey = > findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key)); > if (maybeParentKey != null) { > mergeNodesToKeyChangers.get(mergeNode).add(key); > } > } > } > for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry : > mergeNodesToKeyChangers.entrySet()) { > final StreamsGraphNode mergeKey = entry.getKey(); > final Collection<StreamsGraphNode> keyChangingParents = > entry.getValue(); > final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes > = new LinkedHashSet<>(); > for (final StreamsGraphNode keyChangingParent : > keyChangingParents) { > > repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent)); > > keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent); > } > keyChangingOperationsToOptimizableRepartitionNodes.put(mergeKey, > repartitionNodes); > } > }{code} > In the example, there will be two elements in mergeNodesToKeyChangers. > (KSTREAM-MERGE-0000000005, KSTREAM-MERGE-0000000006) And each element has > one common keyChagingPrarent. (KSTREAM-KEY-SELECT-0000000001) > Also keyChangingOperationsToOptimizableRepartitionNodes has one element. > (KSTREAM-KEY-SELECT-0000000001) > When the first element is processed in the second for loop, > KSTREAM-MERGE-0000000005 is added to > keyChangingOperationsToOptimizableRepartitionNodes, and > KSTREAM-KEY-SELECT-0000000001 is removed from > keyChangingOperationsToOptimizableRepartitionNodes. > As a result, when the second element is processed in the seconf for loop, > KSTREAM-KEY-SELECT-0000000001 is not found in > keyChangingOperationsToOptimizableRepartitionNodes and it caused > NullPointerException. > > It is the first time for me to report the issue in Kafka. Please let me know > if further information is required. Thank you -- This message was sent by Atlassian Jira (v8.3.2#803003)