[
https://issues.apache.org/jira/browse/KAFKA-10659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17809859#comment-17809859
]
Stanislav Spiridonov commented on KAFKA-10659:
----------------------------------------------
The same issue here. The workaround with groupByKey -> with works but I need to
create these topics manually
> Cogroup topology generation fails if input streams are repartitioned
> --------------------------------------------------------------------
>
> Key: KAFKA-10659
> URL: https://issues.apache.org/jira/browse/KAFKA-10659
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.6.0, 2.5.1
> Reporter: blueedgenick
> Priority: Major
>
> Example to reproduce:
>
> {code:java}
> KGroupedStream<String, A> groupedA = builder
> .stream(topicA, Consumed.with(Serdes.String(), serdeA))
> .selectKey((aKey, aVal) -> aVal.someId)
> .groupByKey();
> KGroupedStream<String, B> groupedB = builder
> .stream(topicB, Consumed.with(Serdes.String(), serdeB))
> .selectKey((bKey, bVal) -> bVal.someId)
> .groupByKey();
> KGroupedStream<String, C> groupedC = builder
> .stream(topicC, Consumed.with(Serdes.String(), serdeC))
> .selectKey((cKey, cVal) -> cVal.someId)
> .groupByKey();
> CogroupedKStream<String, ABC> cogroup = groupedA.cogroup(AggregatorA)
> .cogroup(groupedB, AggregatorB)
> . cogroup(groupedC, AggregatorC);
> // Aggregate all streams of the cogroup
> KTable<String, ABC> agg = cogroup.aggregate(
> () -> new ABC(),
> Named.as("my-agg-proc-name"),
> Materialized.<String, ABC, KeyValueStore<Bytes, byte[]>>as(
> "abc-agg-store")
> .withKeySerde(Serdes.String())
> .withValueSerde(serdeABC)
> );
> {code}
>
>
> This throws an exception during topology generation:
>
> {code:java}
> org.apache.kafka.streams.errors.TopologyException: Invalid topology:
> Processor abc-agg-store-repartition-filter is already added. at
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addProcessor(Inter
> nalTopologyBuilder.java:485)`
> at
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:70)
> at
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:307)
> at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:564)
> at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:553)
> at ...
> {code}
>
> The same exception is observed if the `selectKey(...).groupByKey()` pattern
> is replaced with `groupBy(...)`.
> This behavior is observed with topology optimization at default state,
> explicitly set off, or explicitly set on.
> Interestingly the problem is avoided, and a workable topology produced,, if
> the grouping step is named by passing a `Grouped.with(...)` expression to
> either `groupByKey`` or `groupBy`.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)