[ 
https://issues.apache.org/jira/browse/KAFKA-13681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17497004#comment-17497004
 ] 

Guozhang Wang commented on KAFKA-13681:
---------------------------------------

Thanks for reporting [~DrozD_0], I think this is indeed a place for 
optimization and is related to 
https://issues.apache.org/jira/browse/KAFKA-13676. 

> Sink event duplicates for partition-stuck stream application
> ------------------------------------------------------------
>
>                 Key: KAFKA-13681
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13681
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.1
>            Reporter: Mikhail Dubrovin
>            Priority: Major
>         Attachments: fail_topology.txt
>
>
> Hello,
> We found the following unpredictable behavior of Kafka streams:
> {code:java}
> public void buildStreams(final BuilderHelper builder) {        
> KTable<TableId, TableValue> table = builder.table();        
> KTable<TableId, ArrayList<InternalWorkflowDTO>> workflowTable = 
> workflowTable(builder);        
> table
>                 .mapValues(value -> mappers.mainDTO(value))
>                 .leftJoin(workflowTable, mappers::joinWorkflows)
>                 .toStream()
>                 .map((key, value) -> KeyValue.pair(
>                         AggregateId.newBuilder().setId(value.getId()).build(),
>                         mappers.aggregateDTO(value)))
>                 .peek((k, v) -> logSinkRecord(v))
>                 .filter((id, dto) -> !isReprocessing)
>                 .to(...);
>     }    
> private static KTable<TableId, ArrayList<InternalWorkflowDTO>> 
> workflowTable(BuilderHelper builderHelper) {
>             return builderHelper.workflowTable()
>                     .groupBy((id, workflow) -> KeyValue.pair(
>                             
> TableId.newBuilder().setId(workflow.getTableId()).build(),
>                             mappers.mapWorkflow(workflow)),
>                             Grouped.with(...))
>                     .aggregate(ArrayList::new, (key, value, agg) -> {
>                         agg.add(value);
>                         return agg;
>                     }, (key, value, agg) -> {
>                         agg.remove(value);
>                         return agg;
>                     }, Materialized.with(...));
>         } {code}
> it is a small part of our topology but it shows the error flow.
> *Data structure:*
> We have two many-partition topics: entity and workflow. Every topic is 
> represented as KTable.
> *Data error that causes application shutdown:*
> Our final event(join the entity and workflow ktables) expects a not-null 
> field in the entity but for some reason, it comes for one event. The whole 
> aggregator fails in _mappers.aggregateDTO(value)_ of the _buildStreams_ 
> method 
> We have a health check which restarts the aggregator if it fails.
> When incorrect data comes to one partition, the partition processing is stuck 
> but other partitions are processed.
> It causes that at every restart, _workflowTable_ topology repeats 
> .aggregate() add/remove flows and puts new List into the repartition topic. 
> But offsets are not moved for processed partitions due to the aggregator's 
> shutdown.
> _This behavior generates/sinks a lot of final entity duplicates at every 
> restart because the flow is successful for data from a not-corrupted 
> partition but offsets are not moved for them._ 
> And it also causes troubles if @EqualsAndHashCode is defined to use all 
> fields to compare. At every restart, the topology tries to remove the old 
> value(not existing after the first run) and adds a new value at the end of 
> the list. The list grows after each restart(contains the same - new value 
> values).
> I also attached the topology description. To visualize: 
> [https://zz85.github.io/kafka-streams-viz/]
> *Current workaround:*
> To redefine @EqualsAndHashCode to use entities' ids only.
> *Not solved issue:*
> Sink events duplication at every restart.
> Thank you in advance!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to