Walker Carlson created KAFKA-9299: ------------------------------------- Summary: Over eager optimization Key: KAFKA-9299 URL: https://issues.apache.org/jira/browse/KAFKA-9299 Project: Kafka Issue Type: Task Components: streams Reporter: Walker Carlson
There are a few cases where the optimizer will attempt an optimization that can cause a copartitioning failure. Known case of this are related to join and cogroup, however could also effect merge or others. Take for example three input topics A, B and C with 2, 3 and 4 partitions respectively. B' = B.map(); B'.join(A) B'.join(C) the optimizer will push up the repartition upstream and with will cause the copartitioning to fail. Can be seen with the following test: @Test public void shouldInsertRepartitionsTopicForCogroupsUsedTwice() { final StreamsBuilder builder = new StreamsBuilder(); final Properties properties = new Properties(); properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); final KStream<String, String> stream1 = builder.stream("one", stringConsumed); final KGroupedStream<String, String> groupedOne = stream1.map((k, v) -> new KeyValue<>(v, k)).groupByKey(Grouped.as("foo")); final CogroupedKStream<String, String> one = groupedOne.cogroup(STRING_AGGREGATOR); one.aggregate(STRING_INITIALIZER); one.aggregate(STRING_INITIALIZER); final String topologyDescription = builder.build(properties).describe().toString(); System.err.println(topologyDescription); } Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [one]) --> KSTREAM-MAP-0000000001 Processor: KSTREAM-MAP-0000000001 (stores: []) --> foo-repartition-filter <-- KSTREAM-SOURCE-0000000000 Processor: foo-repartition-filter (stores: []) --> foo-repartition-sink <-- KSTREAM-MAP-0000000001 Sink: foo-repartition-sink (topic: foo-repartition) <-- foo-repartition-filter Sub-topology: 1 Source: foo-repartition-source (topics: [foo-repartition]) --> COGROUPKSTREAM-AGGREGATE-0000000006, COGROUPKSTREAM-AGGREGATE-0000000012 Processor: COGROUPKSTREAM-AGGREGATE-0000000006 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000002]) --> COGROUPKSTREAM-MERGE-0000000007 <-- foo-repartition-source Processor: COGROUPKSTREAM-AGGREGATE-0000000012 (stores: [COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000008]) --> COGROUPKSTREAM-MERGE-0000000013 <-- foo-repartition-source Processor: COGROUPKSTREAM-MERGE-0000000007 (stores: []) --> none <-- COGROUPKSTREAM-AGGREGATE-0000000006 Processor: COGROUPKSTREAM-MERGE-0000000013 (stores: []) --> none <-- COGROUPKSTREAM-AGGREGATE-0000000012 -- This message was sent by Atlassian Jira (v8.3.4#803005)