[ https://issues.apache.org/jira/browse/KAFKA-13681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17497502#comment-17497502 ]
Walker Carlson commented on KAFKA-13681: ---------------------------------------- [~DrozD_0] Right now its only enabled with an experimental feature as we are a bit concerned on the load these commits might place on the brokers. We are also adding a task backoff that should synergies well with [13676|https://issues.apache.org/jira/browse/KAFKA-13676] Once we have a little experience with it we can try enabling for more/all cases. I am not sure this is "fix" that we can back port to older versions though. Maybe we can [~guozhang] or [~mjsax] would probably have a better idea about that > Sink event duplicates for partition-stuck stream application > ------------------------------------------------------------ > > Key: KAFKA-13681 > URL: https://issues.apache.org/jira/browse/KAFKA-13681 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.8.1 > Reporter: Mikhail Dubrovin > Priority: Major > Attachments: fail_topology.txt > > > Hello, > We found the following unpredictable behavior of Kafka streams: > {code:java} > public void buildStreams(final BuilderHelper builder) { > KTable<TableId, TableValue> table = builder.table(); > KTable<TableId, ArrayList<InternalWorkflowDTO>> workflowTable = > workflowTable(builder); > table > .mapValues(value -> mappers.mainDTO(value)) > .leftJoin(workflowTable, mappers::joinWorkflows) > .toStream() > .map((key, value) -> KeyValue.pair( > AggregateId.newBuilder().setId(value.getId()).build(), > mappers.aggregateDTO(value))) > .peek((k, v) -> logSinkRecord(v)) > .filter((id, dto) -> !isReprocessing) > .to(...); > } > private static KTable<TableId, ArrayList<InternalWorkflowDTO>> > workflowTable(BuilderHelper builderHelper) { > return builderHelper.workflowTable() > .groupBy((id, workflow) -> KeyValue.pair( > > TableId.newBuilder().setId(workflow.getTableId()).build(), > mappers.mapWorkflow(workflow)), > Grouped.with(...)) > .aggregate(ArrayList::new, (key, value, agg) -> { > agg.add(value); > return agg; > }, (key, value, agg) -> { > agg.remove(value); > return agg; > }, Materialized.with(...)); > } {code} > it is a small part of our topology but it shows the error flow. > *Data structure:* > We have two many-partition topics: entity and workflow. Every topic is > represented as KTable. > *Data error that causes application shutdown:* > Our final event(join the entity and workflow ktables) expects a not-null > field in the entity but for some reason, it comes for one event. The whole > aggregator fails in _mappers.aggregateDTO(value)_ of the _buildStreams_ > method > We have a health check which restarts the aggregator if it fails. > When incorrect data comes to one partition, the partition processing is stuck > but other partitions are processed. > It causes that at every restart, _workflowTable_ topology repeats > .aggregate() add/remove flows and puts new List into the repartition topic. > But offsets are not moved for processed partitions due to the aggregator's > shutdown. > _This behavior generates/sinks a lot of final entity duplicates at every > restart because the flow is successful for data from a not-corrupted > partition but offsets are not moved for them._ > And it also causes troubles if @EqualsAndHashCode is defined to use all > fields to compare. At every restart, the topology tries to remove the old > value(not existing after the first run) and adds a new value at the end of > the list. The list grows after each restart(contains the same - new value > values). > I also attached the topology description. To visualize: > [https://zz85.github.io/kafka-streams-viz/] > *Current workaround:* > To redefine @EqualsAndHashCode to use entities' ids only. > *Not solved issue:* > Sink events duplication at every restart. > Thank you in advance! -- This message was sent by Atlassian Jira (v8.20.1#820001)