Hiroshi Nakahara created KAFKA-8705:
---------------------------------------
Summary: NullPointerException was thrown by topology optimization
when two MergeNodes have common KeyChaingingNode
Key: KAFKA-8705
URL: https://issues.apache.org/jira/browse/KAFKA-8705
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.3.0
Reporter: Hiroshi Nakahara
NullPointerException was thrown by topology optimization when two MergeNodes
have common KeyChaingingNode.
Kafka Stream version: 2.3.0
h3. Code
{code:java}
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Properties;
public class Main {
public static void main(String[] args) {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<Integer, Integer> parentStream =
streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(),
Serdes.Integer()))
.selectKey(Integer::sum); // To make parentStream
KeyChaingingPoint
final KStream<Integer, Integer> childStream1 = parentStream.mapValues(v
-> v + 1);
final KStream<Integer, Integer> childStream2 = parentStream.mapValues(v
-> v + 2);
final KStream<Integer, Integer> childStream3 = parentStream.mapValues(v
-> v + 3);
childStream1
.merge(childStream2)
.merge(childStream3)
.to("outputTopic");
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION,
StreamsConfig.OPTIMIZE);
streamsBuilder.build(properties);
}
}
{code}
h3. Expected result
streamsBuilder.build should create Topology without throwing Exception. The
expected topology is:
{code:java}
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [parentTopic])
--> KSTREAM-KEY-SELECT-0000000001
Processor: KSTREAM-KEY-SELECT-0000000001 (stores: [])
--> KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003,
KSTREAM-MAPVALUES-0000000004
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-MAPVALUES-0000000002 (stores: [])
--> KSTREAM-MERGE-0000000005
<-- KSTREAM-KEY-SELECT-0000000001
Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])
--> KSTREAM-MERGE-0000000005
<-- KSTREAM-KEY-SELECT-0000000001
Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])
--> KSTREAM-MERGE-0000000006
<-- KSTREAM-KEY-SELECT-0000000001
Processor: KSTREAM-MERGE-0000000005 (stores: [])
--> KSTREAM-MERGE-0000000006
<-- KSTREAM-MAPVALUES-0000000002, KSTREAM-MAPVALUES-0000000003
Processor: KSTREAM-MERGE-0000000006 (stores: [])
--> KSTREAM-SINK-0000000007
<-- KSTREAM-MERGE-0000000005, KSTREAM-MAPVALUES-0000000004
Sink: KSTREAM-SINK-0000000007 (topic: outputTopic)
<-- KSTREAM-MERGE-0000000006
{code}
h3. Actual result
NullPointerException was thrown with the following stacktrace.
{code:java}
Exception in thread "main" java.lang.NullPointerException
at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
at
org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
at
org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
at
org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
at
org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
at
org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
at Main.main(Main.java:24){code}
h3. Cause
This exception occurs in
InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
{code:java}
private void maybeUpdateKeyChangingRepartitionNodeMap() {
final Map<StreamsGraphNode, Set<StreamsGraphNode>>
mergeNodesToKeyChangers = new HashMap<>();
for (final StreamsGraphNode mergeNode : mergeNodes) {
mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
final Collection<StreamsGraphNode> keys =
keyChangingOperationsToOptimizableRepartitionNodes.keySet();
for (final StreamsGraphNode key : keys) {
final StreamsGraphNode maybeParentKey =
findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key));
if (maybeParentKey != null) {
mergeNodesToKeyChangers.get(mergeNode).add(key);
}
}
}
for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry :
mergeNodesToKeyChangers.entrySet()) {
final StreamsGraphNode mergeKey = entry.getKey();
final Collection<StreamsGraphNode> keyChangingParents =
entry.getValue();
final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes =
new LinkedHashSet<>();
for (final StreamsGraphNode keyChangingParent : keyChangingParents)
{
repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent);
}
keyChangingOperationsToOptimizableRepartitionNodes.put(mergeKey,
repartitionNodes);
}
}{code}
In the example, there will be two elements in mergeNodesToKeyChangers.
(KSTREAM-MERGE-0000000005, KSTREAM-MERGE-0000000006) And each element has one
common keyChagingPrarent. (KSTREAM-KEY-SELECT-0000000001)
Also keyChangingOperationsToOptimizableRepartitionNodes has one element.
(KSTREAM-KEY-SELECT-0000000001)
When the first element is processed in the second for loop,
KSTREAM-MERGE-0000000005 is added to
keyChangingOperationsToOptimizableRepartitionNodes, and
KSTREAM-KEY-SELECT-0000000001 is removed from
keyChangingOperationsToOptimizableRepartitionNodes.
As a result, when the second element is processed in the seconf for loop,
KSTREAM-KEY-SELECT-0000000001 is not found in
keyChangingOperationsToOptimizableRepartitionNodes and it caused
NullPointerException.
It is the first time for me to report the issue in Kafka. Please let me know
if further information is required. Thank you
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)