[ https://issues.apache.org/jira/browse/KAFKA-10659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17809859#comment-17809859 ]
Stanislav Spiridonov commented on KAFKA-10659: ---------------------------------------------- The same issue here. The workaround with groupByKey -> with works but I need to create these topics manually > Cogroup topology generation fails if input streams are repartitioned > -------------------------------------------------------------------- > > Key: KAFKA-10659 > URL: https://issues.apache.org/jira/browse/KAFKA-10659 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.6.0, 2.5.1 > Reporter: blueedgenick > Priority: Major > > Example to reproduce: > > {code:java} > KGroupedStream<String, A> groupedA = builder > .stream(topicA, Consumed.with(Serdes.String(), serdeA)) > .selectKey((aKey, aVal) -> aVal.someId) > .groupByKey(); > KGroupedStream<String, B> groupedB = builder > .stream(topicB, Consumed.with(Serdes.String(), serdeB)) > .selectKey((bKey, bVal) -> bVal.someId) > .groupByKey(); > KGroupedStream<String, C> groupedC = builder > .stream(topicC, Consumed.with(Serdes.String(), serdeC)) > .selectKey((cKey, cVal) -> cVal.someId) > .groupByKey(); > CogroupedKStream<String, ABC> cogroup = groupedA.cogroup(AggregatorA) > .cogroup(groupedB, AggregatorB) > . cogroup(groupedC, AggregatorC); > // Aggregate all streams of the cogroup > KTable<String, ABC> agg = cogroup.aggregate( > () -> new ABC(), > Named.as("my-agg-proc-name"), > Materialized.<String, ABC, KeyValueStore<Bytes, byte[]>>as( > "abc-agg-store") > .withKeySerde(Serdes.String()) > .withValueSerde(serdeABC) > ); > {code} > > > This throws an exception during topology generation: > > {code:java} > org.apache.kafka.streams.errors.TopologyException: Invalid topology: > Processor abc-agg-store-repartition-filter is already added. at > org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addProcessor(Inter > nalTopologyBuilder.java:485)` > at > org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:70) > at > org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:307) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:564) > at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:553) > at ... > {code} > > The same exception is observed if the `selectKey(...).groupByKey()` pattern > is replaced with `groupBy(...)`. > This behavior is observed with topology optimization at default state, > explicitly set off, or explicitly set on. > Interestingly the problem is avoided, and a workable topology produced,, if > the grouping step is named by passing a `Grouped.with(...)` expression to > either `groupByKey`` or `groupBy`. > -- This message was sent by Atlassian Jira (v8.20.10#820010)