[ https://issues.apache.org/jira/browse/KAFKA-4601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang updated KAFKA-4601: --------------------------------- Issue Type: Sub-task (was: Bug) Parent: KAFKA-6034 > Avoid duplicated repartitioning in KStream DSL > ---------------------------------------------- > > Key: KAFKA-4601 > URL: https://issues.apache.org/jira/browse/KAFKA-4601 > Project: Kafka > Issue Type: Sub-task > Components: streams > Reporter: Guozhang Wang > Labels: performance > > Consider the following DSL: > {code} > Stream<String, String> source = builder.stream(Serdes.String(), > Serdes.String(), "topic1"); > Stream<String, String> mapped = source.map(..); > KTable<String, Long> counts = mapped > .groupByKey() > .count("Counts"); > KStream<String, String> sink = mapped.leftJoin(counts, ..); > {code} > The resulted topology looks like this: > {code} > ProcessorTopology: > KSTREAM-SOURCE-0000000000: > topics: [topic1] > children: [KSTREAM-MAP-0000000001] > KSTREAM-MAP-0000000001: > children: > [KSTREAM-FILTER-0000000004, KSTREAM-FILTER-0000000007] > KSTREAM-FILTER-0000000004: > children: > [KSTREAM-SINK-0000000003] > KSTREAM-SINK-0000000003: > topic: X-Counts-repartition > KSTREAM-FILTER-0000000007: > children: > [KSTREAM-SINK-0000000006] > KSTREAM-SINK-0000000006: > topic: > X-KSTREAM-MAP-0000000001-repartition > ProcessorTopology: > KSTREAM-SOURCE-0000000008: > topics: > [X-KSTREAM-MAP-0000000001-repartition] > children: > [KSTREAM-LEFTJOIN-0000000009] > KSTREAM-LEFTJOIN-0000000009: > states: [Counts] > KSTREAM-SOURCE-0000000005: > topics: [X-Counts-repartition] > children: > [KSTREAM-AGGREGATE-0000000002] > KSTREAM-AGGREGATE-0000000002: > states: [Counts] > {code} > I.e. there are two repartition topics, one for the aggregate and one for the > join, which not only introduce unnecessary overheads but also mess up the > processing ordering (users are expecting each record to go through > aggregation first then the join operator). And in order to get the following > simpler topology users today need to add a {{through}} operator after {{map}} > manually to enforce repartitioning. > {code} > Stream<String, String> source = builder.stream(Serdes.String(), > Serdes.String(), "topic1"); > Stream<String, String> repartitioned = source.map(..).through("topic2"); > KTable<String, Long> counts = repartitioned > .groupByKey() > .count("Counts"); > KStream<String, String> sink = repartitioned.leftJoin(counts, ..); > {code} > The resulted topology then will look like this: > {code} > ProcessorTopology: > KSTREAM-SOURCE-0000000000: > topics: [topic1] > children: [KSTREAM-MAP-0000000001] > KSTREAM-MAP-0000000001: > children: > [KSTREAM-SINK-0000000002] > KSTREAM-SINK-0000000002: > topic: topic 2 > ProcessorTopology: > KSTREAM-SOURCE-0000000003: > topics: [topic 2] > children: > [KSTREAM-AGGREGATE-0000000004, KSTREAM-LEFTJOIN-0000000005] > KSTREAM-AGGREGATE-0000000004: > states: [Counts] > KSTREAM-LEFTJOIN-0000000005: > states: [Counts] > {code} > This kind of optimization should be automatic in Streams, which we can > consider doing when extending from one-operator-at-a-time translation. -- This message was sent by Atlassian JIRA (v6.4.14#64029)