[ 
https://issues.apache.org/jira/browse/KAFKA-10659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17809859#comment-17809859
 ] 

Stanislav Spiridonov commented on KAFKA-10659:
----------------------------------------------

The same issue here. The workaround with groupByKey -> with works but I need to 
create these topics manually

> Cogroup topology generation fails if input streams are repartitioned
> --------------------------------------------------------------------
>
>                 Key: KAFKA-10659
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10659
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.6.0, 2.5.1
>            Reporter: blueedgenick
>            Priority: Major
>
> Example to reproduce:
>  
> {code:java}
> KGroupedStream<String, A> groupedA = builder
>   .stream(topicA, Consumed.with(Serdes.String(), serdeA))
>   .selectKey((aKey, aVal) -> aVal.someId)
>   .groupByKey();
> KGroupedStream<String, B> groupedB = builder
>   .stream(topicB, Consumed.with(Serdes.String(), serdeB))
>   .selectKey((bKey, bVal) -> bVal.someId)
>   .groupByKey();
> KGroupedStream<String, C> groupedC = builder
>   .stream(topicC, Consumed.with(Serdes.String(), serdeC))
>   .selectKey((cKey, cVal) -> cVal.someId)
>   .groupByKey();
> CogroupedKStream<String, ABC> cogroup = groupedA.cogroup(AggregatorA)
>   .cogroup(groupedB, AggregatorB)
>  .  cogroup(groupedC, AggregatorC);
> // Aggregate all streams of the cogroup
>  KTable<String, ABC> agg = cogroup.aggregate(
>   () -> new ABC(),
>   Named.as("my-agg-proc-name"),
>   Materialized.<String, ABC, KeyValueStore<Bytes, byte[]>>as(
>  "abc-agg-store") 
>  .withKeySerde(Serdes.String())
>  .withValueSerde(serdeABC)
>  );
> {code}
>  
>  
> This throws an exception during topology generation: 
>  
> {code:java}
> org.apache.kafka.streams.errors.TopologyException: Invalid topology: 
> Processor abc-agg-store-repartition-filter is already added. at 
> org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.addProcessor(Inter
> nalTopologyBuilder.java:485)`
>  at 
> org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.writeToTopology(OptimizableRepartitionNode.java:70)
>  at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:307)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:564)
>  at org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:553)
>  at ...
> {code}
>  
> The same exception is observed if the `selectKey(...).groupByKey()`  pattern 
> is replaced with `groupBy(...)`.
> This behavior is observed with topology optimization at default state, 
> explicitly set off, or explicitly set on.
> Interestingly the problem is avoided, and a workable topology produced,, if 
> the grouping step is named by passing a `Grouped.with(...)` expression to 
> either `groupByKey`` or `groupBy`.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to