[ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16892227#comment-16892227
 ] 

ASF GitHub Bot commented on KAFKA-8705:
---------------------------------------

bbejeck commented on pull request #7109: KAFKA-8705: Remove parent node after 
leaving loop to prevent NPE
URL: https://github.com/apache/kafka/pull/7109
 
 
   Fixes case where multiple children merged from a key-changing node causes an 
NPE.
   
   I've updated the tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8705
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8705
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.3.0
>            Reporter: Hiroshi Nakahara
>            Assignee: Bill Bejeck
>            Priority: Major
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
>     public static void main(String[] args) {
>         final StreamsBuilder streamsBuilder = new StreamsBuilder();
>         final KStream<Integer, Integer> parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
>                 .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
>         final KStream<Integer, Integer> childStream1 = 
> parentStream.mapValues(v -> v + 1);
>         final KStream<Integer, Integer> childStream2 = 
> parentStream.mapValues(v -> v + 2);
>         final KStream<Integer, Integer> childStream3 = 
> parentStream.mapValues(v -> v + 3);
>         childStream1
>                 .merge(childStream2)
>                 .merge(childStream3)
>                 .to("outputTopic");
>         final Properties properties = new Properties();
>         properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
>         streamsBuilder.build(properties);
>     }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>    Sub-topology: 0
>     Source: KSTREAM-SOURCE-0000000000 (topics: [parentTopic])
>       --> KSTREAM-KEY-SELECT-0000000001
>     Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
>       --> KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003, 
> KSTREAM-MAPVALUES-0000000004
>       <-- KSTREAM-SOURCE-0000000000
>     Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
>       --> KSTREAM-MERGE-0000000005
>       <-- KSTREAM-KEY-SELECT-0000000001
>     Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
>       --> KSTREAM-MERGE-0000000005
>       <-- KSTREAM-KEY-SELECT-0000000001
>     Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])
>       --> KSTREAM-MERGE-0000000006
>       <-- KSTREAM-KEY-SELECT-0000000001
>     Processor: KSTREAM-MERGE-0000000005 (stores: [])
>       --> KSTREAM-MERGE-0000000006
>       <-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003
>     Processor: KSTREAM-MERGE-0000000006 (stores: [])
>       --> KSTREAM-SINK-0000000007
>       <-- KSTREAM-MERGE-0000000005, KSTREAM-MAPVALUES-0000000004
>     Sink: KSTREAM-SINK-0000000007 (topic: outputTopic)
>       <-- KSTREAM-MERGE-0000000006
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>       at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>       at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>       at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>       at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>       at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>       at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>       at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
> {code:java}
>     private void maybeUpdateKeyChangingRepartitionNodeMap() {
>         final Map<StreamsGraphNode, Set<StreamsGraphNode>> 
> mergeNodesToKeyChangers = new HashMap<>();
>         for (final StreamsGraphNode mergeNode : mergeNodes) {
>             mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
>             final Collection<StreamsGraphNode> keys = 
> keyChangingOperationsToOptimizableRepartitionNodes.keySet();
>             for (final StreamsGraphNode key : keys) {
>                 final StreamsGraphNode maybeParentKey = 
> findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key));
>                 if (maybeParentKey != null) {
>                     mergeNodesToKeyChangers.get(mergeNode).add(key);
>                 }
>             }
>         }
>         for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry : 
> mergeNodesToKeyChangers.entrySet()) {
>             final StreamsGraphNode mergeKey = entry.getKey();
>             final Collection<StreamsGraphNode> keyChangingParents = 
> entry.getValue();
>             final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes 
> = new LinkedHashSet<>();
>             for (final StreamsGraphNode keyChangingParent : 
> keyChangingParents) {
>                 
> repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
>                 
> keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent);
>             }
>             keyChangingOperationsToOptimizableRepartitionNodes.put(mergeKey, 
> repartitionNodes);
>         }
>     }{code}
> In the example, there will be two elements in mergeNodesToKeyChangers.  
> (KSTREAM-MERGE-0000000005, KSTREAM-MERGE-0000000006)  And each element has 
> one common keyChagingPrarent.  (KSTREAM-KEY-SELECT-0000000001)
> Also keyChangingOperationsToOptimizableRepartitionNodes has one element.  
> (KSTREAM-KEY-SELECT-0000000001)
> When the first element is processed in the second for loop, 
> KSTREAM-MERGE-0000000005 is added to 
> keyChangingOperationsToOptimizableRepartitionNodes, and 
> KSTREAM-KEY-SELECT-0000000001 is removed from 
> keyChangingOperationsToOptimizableRepartitionNodes.
> As a result, when the second element is processed in the seconf for loop, 
> KSTREAM-KEY-SELECT-0000000001 is not found in 
> keyChangingOperationsToOptimizableRepartitionNodes and it caused 
> NullPointerException.
>  
> It is the first time for me to report the issue in Kafka.  Please let me know 
> if further information is required.  Thank you



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to