[ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-8705:
----------------------------------

    Assignee: Bill Bejeck

> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8705
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8705
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Hiroshi Nakahara
>            Assignee: Bill Bejeck
>            Priority: Major
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
>     public static void main(String[] args) {
>         final StreamsBuilder streamsBuilder = new StreamsBuilder();
>         final KStream<Integer, Integer> parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
>                 .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
>         final KStream<Integer, Integer> childStream1 = 
> parentStream.mapValues(v -> v + 1);
>         final KStream<Integer, Integer> childStream2 = 
> parentStream.mapValues(v -> v + 2);
>         final KStream<Integer, Integer> childStream3 = 
> parentStream.mapValues(v -> v + 3);
>         childStream1
>                 .merge(childStream2)
>                 .merge(childStream3)
>                 .to("outputTopic");
>         final Properties properties = new Properties();
>         properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
>         streamsBuilder.build(properties);
>     }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-0000000000 (topics: [parentTopic])
>       --> KSTREAM-KEY-SELECT-0000000001
>     Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
>       --> KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003, 
> KSTREAM-MAPVALUES-0000000004
>       <-- KSTREAM-SOURCE-0000000000
>     Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
>       --> KSTREAM-MERGE-0000000005
>       <-- KSTREAM-KEY-SELECT-0000000001
>     Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
>       --> KSTREAM-MERGE-0000000005
>       <-- KSTREAM-KEY-SELECT-0000000001
>     Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])
>       --> KSTREAM-MERGE-0000000006
>       <-- KSTREAM-KEY-SELECT-0000000001
>     Processor: KSTREAM-MERGE-0000000005 (stores: [])
>       --> KSTREAM-MERGE-0000000006
>       <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003
>     Processor: KSTREAM-MERGE-0000000006 (stores: [])
>       --> KSTREAM-SINK-0000000007
>       <-- KSTREAM-MERGE-0000000005, KSTREAM-MAPVALUES-0000000004
>     Sink: KSTREAM-SINK-0000000007 (topic: outputTopic)
>       <-- KSTREAM-MERGE-0000000006
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>       at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>       at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>       at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>       at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>       at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>       at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>       at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
> {code:java}
>     private void maybeUpdateKeyChangingRepartitionNodeMap() {
>         final Map<StreamsGraphNode, Set<StreamsGraphNode>> 
> mergeNodesToKeyChangers = new HashMap<>();
>         for (final StreamsGraphNode mergeNode : mergeNodes) {
>             mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
>             final Collection<StreamsGraphNode> keys = 
> keyChangingOperationsToOptimizableRepartitionNodes.keySet();
>             for (final StreamsGraphNode key : keys) {
>                 final StreamsGraphNode maybeParentKey = 
> findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key));
>                 if (maybeParentKey != null) {
>                     mergeNodesToKeyChangers.get(mergeNode).add(key);
>                 }
>             }
>         }
>         for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry : 
> mergeNodesToKeyChangers.entrySet()) {
>             final StreamsGraphNode mergeKey = entry.getKey();
>             final Collection<StreamsGraphNode> keyChangingParents = 
> entry.getValue();
>             final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes 
> = new LinkedHashSet<>();
>             for (final StreamsGraphNode keyChangingParent : 
> keyChangingParents) {
>                 
> repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
>                 
> keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent);
>             }
>             keyChangingOperationsToOptimizableRepartitionNodes.put(mergeKey, 
> repartitionNodes);
>         }
>     }{code}
> In the example, there will be two elements in mergeNodesToKeyChangers.  
> (KSTREAM-MERGE-0000000005, KSTREAM-MERGE-0000000006)  And each element has 
> one common keyChagingPrarent.  (KSTREAM-KEY-SELECT-0000000001)
> Also keyChangingOperationsToOptimizableRepartitionNodes has one element.  
> (KSTREAM-KEY-SELECT-0000000001)
> When the first element is processed in the second for loop, 
> KSTREAM-MERGE-0000000005 is added to 
> keyChangingOperationsToOptimizableRepartitionNodes, and 
> KSTREAM-KEY-SELECT-0000000001 is removed from 
> keyChangingOperationsToOptimizableRepartitionNodes.
> As a result, when the second element is processed in the seconf for loop, 
> KSTREAM-KEY-SELECT-0000000001 is not found in 
> keyChangingOperationsToOptimizableRepartitionNodes and it caused 
> NullPointerException.
>  
> It is the first time for me to report the issue in Kafka.  Please let me know 
> if further information is required.  Thank you



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to