This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push: new 9c91e05 KAFKA-9739: Fixes null key changing child node (#8416) 9c91e05 is described below commit 9c91e052f8531fe65df989ad0761ab10623ac116 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Fri Apr 3 15:14:23 2020 -0400 KAFKA-9739: Fixes null key changing child node (#8416) 2.4 port of #8400 since cherry-picking not possible Reviewers: John Roesler <j...@confluent.io> --- .../kstream/internals/InternalStreamsBuilder.java | 17 +- .../kstream/internals/graph/StreamsGraphTest.java | 186 +++++++++++++++++++++ 2 files changed, 198 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 9509431..cbb4a65 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -396,11 +396,13 @@ public class InternalStreamsBuilder implements InternalNameProvider { final Set<StreamsGraphNode> mergeNodeKeyChangingParentsToRemove = new HashSet<>(); for (final StreamsGraphNode mergeNode : mergeNodes) { mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>()); - final Collection<StreamsGraphNode> keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet(); - for (final StreamsGraphNode key : keys) { - final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key)); - if (maybeParentKey != null) { - mergeNodesToKeyChangers.get(mergeNode).add(key); + final Set<Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>>> entrySet = keyChangingOperationsToOptimizableRepartitionNodes.entrySet(); + for (final Map.Entry<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> entry : entrySet) { + if (mergeNodeHasRepartitionChildren(mergeNode, entry.getValue())) { + final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(entry.getKey())); + if (maybeParentKey != null) { + mergeNodesToKeyChangers.get(mergeNode).add(entry.getKey()); + } } } } @@ -421,6 +423,11 @@ public class InternalStreamsBuilder implements InternalNameProvider { } } + private boolean mergeNodeHasRepartitionChildren(final StreamsGraphNode mergeNode, + final LinkedHashSet<OptimizableRepartitionNode> repartitionNodes) { + return repartitionNodes.stream().allMatch(n -> findParentNodeMatching(n, gn -> gn.parentNodes().contains(mergeNode)) != null); + } + @SuppressWarnings("unchecked") private OptimizableRepartitionNode createRepartitionNode(final String repartitionTopicName, final Serde keySerde, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java index e2006e6..6a0a0b6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java @@ -22,14 +22,22 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Transformer; +import org.apache.kafka.streams.kstream.TransformerSupplier; import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.processor.ProcessorContext; import org.junit.Test; import java.time.Duration; @@ -47,6 +55,8 @@ import static org.junit.Assert.assertEquals; public class StreamsGraphTest { private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition"); + private Initializer<String> initializer; + private Aggregator<String, String, String> aggregator; // Test builds topology in succesive manner but only graph node not yet processed written to topology @@ -102,6 +112,76 @@ public class StreamsGraphTest { } @Test + @SuppressWarnings("unchecked") + public void shouldNotThrowNPEWithMergeNodes() { + final Properties properties = new Properties(); + properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application"); + properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); + + final StreamsBuilder builder = new StreamsBuilder(); + initializer = () -> ""; + aggregator = (aggKey, value, aggregate) -> aggregate + value.length(); + final TransformerSupplier<String, String, KeyValue<String, String>> transformSupplier = () -> new Transformer<String, String, KeyValue<String, String>>() { + @Override + public void init(final ProcessorContext context) { + + } + + @Override + public KeyValue<String, String> transform(final String key, final String value) { + return KeyValue.pair(key, value); + } + + @Override + public void close() { + + } + }; + + final KStream<String, String> retryStream = builder.stream("retryTopic", Consumed.with(Serdes.String(), Serdes.String())) + .transform(transformSupplier) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .aggregate(initializer, + aggregator, + Materialized.with(Serdes.String(), Serdes.String())) + .suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(500), Suppressed.BufferConfig.maxBytes(64_000_000))) + .toStream() + .flatMap((k, v) -> new ArrayList<>()); + + final KTable<String, String> idTable = builder.stream("id-table-topic", Consumed.with(Serdes.String(), Serdes.String())) + .flatMap((k, v) -> new ArrayList<KeyValue<String, String>>()) + .peek((subscriptionId, recipientId) -> System.out.println("data " + subscriptionId + " " + recipientId)) + .groupByKey(Grouped.with(Serdes.String(), Serdes.String())) + .aggregate(initializer, + aggregator, + Materialized.with(Serdes.String(), Serdes.String())); + + final KStream<String, String> joinStream = builder.stream("internal-topic-command", Consumed.with(Serdes.String(), Serdes.String())) + .peek((subscriptionId, command) -> System.out.println("stdoutput")) + .mapValues((k, v) -> v) + .merge(retryStream) + .leftJoin(idTable, (v1, v2) -> v1 + v2, + Joined.with(Serdes.String(), Serdes.String(), Serdes.String())); + + final KStream<String, String>[] branches = joinStream.branch((k, v) -> v.equals("some-value"), (k, v) -> true); + + branches[0].map(KeyValue::pair) + .peek((recipientId, command) -> System.out.println("printing out")) + .to("external-command", Produced.with(Serdes.String(), Serdes.String())); + + branches[1].filter((k, v) -> v != null) + .peek((subscriptionId, wrapper) -> System.out.println("Printing output")) + .mapValues((k, v) -> v) + .to("dlq-topic", Produced.with(Serdes.String(), Serdes.String())); + + branches[1].map(KeyValue::pair).to("retryTopic", Produced.with(Serdes.String(), Serdes.String())); + + final Topology topology = builder.build(properties); + assertEquals(expectedComplexMergeOptimizeTopology, topology.describe().toString()); + } + + @Test public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() { final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE); @@ -291,4 +371,110 @@ public class StreamsGraphTest { " Sink: KSTREAM-SINK-0000000007 (topic: output_topic)\n" + " <-- KSTREAM-MERGE-0000000006\n\n"; + + private final String expectedComplexMergeOptimizeTopology = "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [retryTopic])\n" + + " --> KSTREAM-TRANSFORM-0000000001\n" + + " Processor: KSTREAM-TRANSFORM-0000000001 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000040\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-FILTER-0000000040 (stores: [])\n" + + " --> KSTREAM-SINK-0000000039\n" + + " <-- KSTREAM-TRANSFORM-0000000001\n" + + " Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition)\n" + + " <-- KSTREAM-FILTER-0000000040\n" + + "\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000002-repartition])\n" + + " --> KSTREAM-AGGREGATE-0000000003\n" + + " Processor: KSTREAM-AGGREGATE-0000000003 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000002])\n" + + " --> KTABLE-SUPPRESS-0000000007\n" + + " <-- KSTREAM-SOURCE-0000000041\n" + + " Source: KSTREAM-SOURCE-0000000019 (topics: [internal-topic-command])\n" + + " --> KSTREAM-PEEK-0000000020\n" + + " Processor: KTABLE-SUPPRESS-0000000007 (stores: [KTABLE-SUPPRESS-STATE-STORE-0000000008])\n" + + " --> KTABLE-TOSTREAM-0000000009\n" + + " <-- KSTREAM-AGGREGATE-0000000003\n" + + " Processor: KSTREAM-PEEK-0000000020 (stores: [])\n" + + " --> KSTREAM-MAPVALUES-0000000021\n" + + " <-- KSTREAM-SOURCE-0000000019\n" + + " Processor: KTABLE-TOSTREAM-0000000009 (stores: [])\n" + + " --> KSTREAM-FLATMAP-0000000010\n" + + " <-- KTABLE-SUPPRESS-0000000007\n" + + " Processor: KSTREAM-FLATMAP-0000000010 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000022\n" + + " <-- KTABLE-TOSTREAM-0000000009\n" + + " Processor: KSTREAM-MAPVALUES-0000000021 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000022\n" + + " <-- KSTREAM-PEEK-0000000020\n" + + " Processor: KSTREAM-MERGE-0000000022 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000024\n" + + " <-- KSTREAM-MAPVALUES-0000000021, KSTREAM-FLATMAP-0000000010\n" + + " Processor: KSTREAM-FILTER-0000000024 (stores: [])\n" + + " --> KSTREAM-SINK-0000000023\n" + + " <-- KSTREAM-MERGE-0000000022\n" + + " Sink: KSTREAM-SINK-0000000023 (topic: KSTREAM-MERGE-0000000022-repartition)\n" + + " <-- KSTREAM-FILTER-0000000024\n" + + "\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000011 (topics: [id-table-topic])\n" + + " --> KSTREAM-FLATMAP-0000000012\n" + + " Processor: KSTREAM-FLATMAP-0000000012 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000043\n" + + " <-- KSTREAM-SOURCE-0000000011\n" + + " Processor: KSTREAM-FILTER-0000000043 (stores: [])\n" + + " --> KSTREAM-SINK-0000000042\n" + + " <-- KSTREAM-FLATMAP-0000000012\n" + + " Sink: KSTREAM-SINK-0000000042 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition)\n" + + " <-- KSTREAM-FILTER-0000000043\n" + + "\n" + + " Sub-topology: 3\n" + + " Source: KSTREAM-SOURCE-0000000025 (topics: [KSTREAM-MERGE-0000000022-repartition])\n" + + " --> KSTREAM-LEFTJOIN-0000000026\n" + + " Processor: KSTREAM-LEFTJOIN-0000000026 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" + + " --> KSTREAM-BRANCH-0000000027\n" + + " <-- KSTREAM-SOURCE-0000000025\n" + + " Processor: KSTREAM-BRANCH-0000000027 (stores: [])\n" + + " --> KSTREAM-BRANCHCHILD-0000000029, KSTREAM-BRANCHCHILD-0000000028\n" + + " <-- KSTREAM-LEFTJOIN-0000000026\n" + + " Processor: KSTREAM-BRANCHCHILD-0000000029 (stores: [])\n" + + " --> KSTREAM-FILTER-0000000033, KSTREAM-MAP-0000000037\n" + + " <-- KSTREAM-BRANCH-0000000027\n" + + " Processor: KSTREAM-BRANCHCHILD-0000000028 (stores: [])\n" + + " --> KSTREAM-MAP-0000000030\n" + + " <-- KSTREAM-BRANCH-0000000027\n" + + " Processor: KSTREAM-FILTER-0000000033 (stores: [])\n" + + " --> KSTREAM-PEEK-0000000034\n" + + " <-- KSTREAM-BRANCHCHILD-0000000029\n" + + " Processor: KSTREAM-MAP-0000000030 (stores: [])\n" + + " --> KSTREAM-PEEK-0000000031\n" + + " <-- KSTREAM-BRANCHCHILD-0000000028\n" + + " Processor: KSTREAM-PEEK-0000000034 (stores: [])\n" + + " --> KSTREAM-MAPVALUES-0000000035\n" + + " <-- KSTREAM-FILTER-0000000033\n" + + " Source: KSTREAM-SOURCE-0000000044 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000014-repartition])\n" + + " --> KSTREAM-PEEK-0000000013\n" + + " Processor: KSTREAM-MAP-0000000037 (stores: [])\n" + + " --> KSTREAM-SINK-0000000038\n" + + " <-- KSTREAM-BRANCHCHILD-0000000029\n" + + " Processor: KSTREAM-MAPVALUES-0000000035 (stores: [])\n" + + " --> KSTREAM-SINK-0000000036\n" + + " <-- KSTREAM-PEEK-0000000034\n" + + " Processor: KSTREAM-PEEK-0000000013 (stores: [])\n" + + " --> KSTREAM-AGGREGATE-0000000015\n" + + " <-- KSTREAM-SOURCE-0000000044\n" + + " Processor: KSTREAM-PEEK-0000000031 (stores: [])\n" + + " --> KSTREAM-SINK-0000000032\n" + + " <-- KSTREAM-MAP-0000000030\n" + + " Processor: KSTREAM-AGGREGATE-0000000015 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014])\n" + + " --> none\n" + + " <-- KSTREAM-PEEK-0000000013\n" + + " Sink: KSTREAM-SINK-0000000032 (topic: external-command)\n" + + " <-- KSTREAM-PEEK-0000000031\n" + + " Sink: KSTREAM-SINK-0000000036 (topic: dlq-topic)\n" + + " <-- KSTREAM-MAPVALUES-0000000035\n" + + " Sink: KSTREAM-SINK-0000000038 (topic: retryTopic)\n" + + " <-- KSTREAM-MAP-0000000037\n\n"; + }