[jira] [Commented] (KAFKA-13681) Sink event duplicates for partition-stuck stream application

2022-02-25 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17498236#comment-17498236
 ] 

Guozhang Wang commented on KAFKA-13681:
---

[~DrozD_0] We have not decided when it would be applied as a general 
optimization yet, but since we usually only do backports for any non-critical 
changes for the past year's releases, it's less likely we are going to have new 
2.x bug-fix releases.

> Sink event duplicates for partition-stuck stream application
> 
>
> Key: KAFKA-13681
> URL: https://issues.apache.org/jira/browse/KAFKA-13681
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1
>Reporter: Mikhail Dubrovin
>Priority: Major
> Attachments: fail_topology.txt
>
>
> Hello,
> We found the following unpredictable behavior of Kafka streams:
> {code:java}
> public void buildStreams(final BuilderHelper builder) {        
> KTable table = builder.table();        
> KTable> workflowTable = 
> workflowTable(builder);        
> table
>                 .mapValues(value -> mappers.mainDTO(value))
>                 .leftJoin(workflowTable, mappers::joinWorkflows)
>                 .toStream()
>                 .map((key, value) -> KeyValue.pair(
>                         AggregateId.newBuilder().setId(value.getId()).build(),
>                         mappers.aggregateDTO(value)))
>                 .peek((k, v) -> logSinkRecord(v))
>                 .filter((id, dto) -> !isReprocessing)
>                 .to(...);
>     }    
> private static KTable> 
> workflowTable(BuilderHelper builderHelper) {
>             return builderHelper.workflowTable()
>                     .groupBy((id, workflow) -> KeyValue.pair(
>                             
> TableId.newBuilder().setId(workflow.getTableId()).build(),
>                             mappers.mapWorkflow(workflow)),
>                             Grouped.with(...))
>                     .aggregate(ArrayList::new, (key, value, agg) -> {
>                         agg.add(value);
>                         return agg;
>                     }, (key, value, agg) -> {
>                         agg.remove(value);
>                         return agg;
>                     }, Materialized.with(...));
>         } {code}
> it is a small part of our topology but it shows the error flow.
> *Data structure:*
> We have two many-partition topics: entity and workflow. Every topic is 
> represented as KTable.
> *Data error that causes application shutdown:*
> Our final event(join the entity and workflow ktables) expects a not-null 
> field in the entity but for some reason, it comes for one event. The whole 
> aggregator fails in _mappers.aggregateDTO(value)_ of the _buildStreams_ 
> method 
> We have a health check which restarts the aggregator if it fails.
> When incorrect data comes to one partition, the partition processing is stuck 
> but other partitions are processed.
> It causes that at every restart, _workflowTable_ topology repeats 
> .aggregate() add/remove flows and puts new List into the repartition topic. 
> But offsets are not moved for processed partitions due to the aggregator's 
> shutdown.
> _This behavior generates/sinks a lot of final entity duplicates at every 
> restart because the flow is successful for data from a not-corrupted 
> partition but offsets are not moved for them._ 
> And it also causes troubles if @EqualsAndHashCode is defined to use all 
> fields to compare. At every restart, the topology tries to remove the old 
> value(not existing after the first run) and adds a new value at the end of 
> the list. The list grows after each restart(contains the same - new value 
> values).
> I also attached the topology description. To visualize: 
> [https://zz85.github.io/kafka-streams-viz/]
> *Current workaround:*
> To redefine @EqualsAndHashCode to use entities' ids only.
> *Not solved issue:*
> Sink events duplication at every restart.
> Thank you in advance!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13681) Sink event duplicates for partition-stuck stream application

2022-02-24 Thread Walker Carlson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497502#comment-17497502
 ] 

Walker Carlson commented on KAFKA-13681:


[~DrozD_0] Right now its only enabled with an experimental feature as we are a 
bit concerned on the load these commits might place on the brokers. We are also 
adding a task backoff that should synergies well with 
[13676|https://issues.apache.org/jira/browse/KAFKA-13676] Once we have a little 
experience with it we can try enabling for more/all cases.

 

I am not sure this is "fix" that we can back port to older versions though. 
Maybe we can [~guozhang] or [~mjsax] would probably have a better idea about 
that

> Sink event duplicates for partition-stuck stream application
> 
>
> Key: KAFKA-13681
> URL: https://issues.apache.org/jira/browse/KAFKA-13681
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1
>Reporter: Mikhail Dubrovin
>Priority: Major
> Attachments: fail_topology.txt
>
>
> Hello,
> We found the following unpredictable behavior of Kafka streams:
> {code:java}
> public void buildStreams(final BuilderHelper builder) {        
> KTable table = builder.table();        
> KTable> workflowTable = 
> workflowTable(builder);        
> table
>                 .mapValues(value -> mappers.mainDTO(value))
>                 .leftJoin(workflowTable, mappers::joinWorkflows)
>                 .toStream()
>                 .map((key, value) -> KeyValue.pair(
>                         AggregateId.newBuilder().setId(value.getId()).build(),
>                         mappers.aggregateDTO(value)))
>                 .peek((k, v) -> logSinkRecord(v))
>                 .filter((id, dto) -> !isReprocessing)
>                 .to(...);
>     }    
> private static KTable> 
> workflowTable(BuilderHelper builderHelper) {
>             return builderHelper.workflowTable()
>                     .groupBy((id, workflow) -> KeyValue.pair(
>                             
> TableId.newBuilder().setId(workflow.getTableId()).build(),
>                             mappers.mapWorkflow(workflow)),
>                             Grouped.with(...))
>                     .aggregate(ArrayList::new, (key, value, agg) -> {
>                         agg.add(value);
>                         return agg;
>                     }, (key, value, agg) -> {
>                         agg.remove(value);
>                         return agg;
>                     }, Materialized.with(...));
>         } {code}
> it is a small part of our topology but it shows the error flow.
> *Data structure:*
> We have two many-partition topics: entity and workflow. Every topic is 
> represented as KTable.
> *Data error that causes application shutdown:*
> Our final event(join the entity and workflow ktables) expects a not-null 
> field in the entity but for some reason, it comes for one event. The whole 
> aggregator fails in _mappers.aggregateDTO(value)_ of the _buildStreams_ 
> method 
> We have a health check which restarts the aggregator if it fails.
> When incorrect data comes to one partition, the partition processing is stuck 
> but other partitions are processed.
> It causes that at every restart, _workflowTable_ topology repeats 
> .aggregate() add/remove flows and puts new List into the repartition topic. 
> But offsets are not moved for processed partitions due to the aggregator's 
> shutdown.
> _This behavior generates/sinks a lot of final entity duplicates at every 
> restart because the flow is successful for data from a not-corrupted 
> partition but offsets are not moved for them._ 
> And it also causes troubles if @EqualsAndHashCode is defined to use all 
> fields to compare. At every restart, the topology tries to remove the old 
> value(not existing after the first run) and adds a new value at the end of 
> the list. The list grows after each restart(contains the same - new value 
> values).
> I also attached the topology description. To visualize: 
> [https://zz85.github.io/kafka-streams-viz/]
> *Current workaround:*
> To redefine @EqualsAndHashCode to use entities' ids only.
> *Not solved issue:*
> Sink events duplication at every restart.
> Thank you in advance!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13681) Sink event duplicates for partition-stuck stream application

2022-02-23 Thread Mikhail Dubrovin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497211#comment-17497211
 ] 

Mikhail Dubrovin commented on KAFKA-13681:
--

[~guozhang] thank you for the reply. What's the plan? Do you want to propagate 
changes from https://issues.apache.org/jira/browse/KAFKA-13676 to Kafka 2.* 
version? 

cc: [~wcarlson5] [~mjsax] 

> Sink event duplicates for partition-stuck stream application
> 
>
> Key: KAFKA-13681
> URL: https://issues.apache.org/jira/browse/KAFKA-13681
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1
>Reporter: Mikhail Dubrovin
>Priority: Major
> Attachments: fail_topology.txt
>
>
> Hello,
> We found the following unpredictable behavior of Kafka streams:
> {code:java}
> public void buildStreams(final BuilderHelper builder) {        
> KTable table = builder.table();        
> KTable> workflowTable = 
> workflowTable(builder);        
> table
>                 .mapValues(value -> mappers.mainDTO(value))
>                 .leftJoin(workflowTable, mappers::joinWorkflows)
>                 .toStream()
>                 .map((key, value) -> KeyValue.pair(
>                         AggregateId.newBuilder().setId(value.getId()).build(),
>                         mappers.aggregateDTO(value)))
>                 .peek((k, v) -> logSinkRecord(v))
>                 .filter((id, dto) -> !isReprocessing)
>                 .to(...);
>     }    
> private static KTable> 
> workflowTable(BuilderHelper builderHelper) {
>             return builderHelper.workflowTable()
>                     .groupBy((id, workflow) -> KeyValue.pair(
>                             
> TableId.newBuilder().setId(workflow.getTableId()).build(),
>                             mappers.mapWorkflow(workflow)),
>                             Grouped.with(...))
>                     .aggregate(ArrayList::new, (key, value, agg) -> {
>                         agg.add(value);
>                         return agg;
>                     }, (key, value, agg) -> {
>                         agg.remove(value);
>                         return agg;
>                     }, Materialized.with(...));
>         } {code}
> it is a small part of our topology but it shows the error flow.
> *Data structure:*
> We have two many-partition topics: entity and workflow. Every topic is 
> represented as KTable.
> *Data error that causes application shutdown:*
> Our final event(join the entity and workflow ktables) expects a not-null 
> field in the entity but for some reason, it comes for one event. The whole 
> aggregator fails in _mappers.aggregateDTO(value)_ of the _buildStreams_ 
> method 
> We have a health check which restarts the aggregator if it fails.
> When incorrect data comes to one partition, the partition processing is stuck 
> but other partitions are processed.
> It causes that at every restart, _workflowTable_ topology repeats 
> .aggregate() add/remove flows and puts new List into the repartition topic. 
> But offsets are not moved for processed partitions due to the aggregator's 
> shutdown.
> _This behavior generates/sinks a lot of final entity duplicates at every 
> restart because the flow is successful for data from a not-corrupted 
> partition but offsets are not moved for them._ 
> And it also causes troubles if @EqualsAndHashCode is defined to use all 
> fields to compare. At every restart, the topology tries to remove the old 
> value(not existing after the first run) and adds a new value at the end of 
> the list. The list grows after each restart(contains the same - new value 
> values).
> I also attached the topology description. To visualize: 
> [https://zz85.github.io/kafka-streams-viz/]
> *Current workaround:*
> To redefine @EqualsAndHashCode to use entities' ids only.
> *Not solved issue:*
> Sink events duplication at every restart.
> Thank you in advance!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13681) Sink event duplicates for partition-stuck stream application

2022-02-23 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17497004#comment-17497004
 ] 

Guozhang Wang commented on KAFKA-13681:
---

Thanks for reporting [~DrozD_0], I think this is indeed a place for 
optimization and is related to 
https://issues.apache.org/jira/browse/KAFKA-13676. 

> Sink event duplicates for partition-stuck stream application
> 
>
> Key: KAFKA-13681
> URL: https://issues.apache.org/jira/browse/KAFKA-13681
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1
>Reporter: Mikhail Dubrovin
>Priority: Major
> Attachments: fail_topology.txt
>
>
> Hello,
> We found the following unpredictable behavior of Kafka streams:
> {code:java}
> public void buildStreams(final BuilderHelper builder) {        
> KTable table = builder.table();        
> KTable> workflowTable = 
> workflowTable(builder);        
> table
>                 .mapValues(value -> mappers.mainDTO(value))
>                 .leftJoin(workflowTable, mappers::joinWorkflows)
>                 .toStream()
>                 .map((key, value) -> KeyValue.pair(
>                         AggregateId.newBuilder().setId(value.getId()).build(),
>                         mappers.aggregateDTO(value)))
>                 .peek((k, v) -> logSinkRecord(v))
>                 .filter((id, dto) -> !isReprocessing)
>                 .to(...);
>     }    
> private static KTable> 
> workflowTable(BuilderHelper builderHelper) {
>             return builderHelper.workflowTable()
>                     .groupBy((id, workflow) -> KeyValue.pair(
>                             
> TableId.newBuilder().setId(workflow.getTableId()).build(),
>                             mappers.mapWorkflow(workflow)),
>                             Grouped.with(...))
>                     .aggregate(ArrayList::new, (key, value, agg) -> {
>                         agg.add(value);
>                         return agg;
>                     }, (key, value, agg) -> {
>                         agg.remove(value);
>                         return agg;
>                     }, Materialized.with(...));
>         } {code}
> it is a small part of our topology but it shows the error flow.
> *Data structure:*
> We have two many-partition topics: entity and workflow. Every topic is 
> represented as KTable.
> *Data error that causes application shutdown:*
> Our final event(join the entity and workflow ktables) expects a not-null 
> field in the entity but for some reason, it comes for one event. The whole 
> aggregator fails in _mappers.aggregateDTO(value)_ of the _buildStreams_ 
> method 
> We have a health check which restarts the aggregator if it fails.
> When incorrect data comes to one partition, the partition processing is stuck 
> but other partitions are processed.
> It causes that at every restart, _workflowTable_ topology repeats 
> .aggregate() add/remove flows and puts new List into the repartition topic. 
> But offsets are not moved for processed partitions due to the aggregator's 
> shutdown.
> _This behavior generates/sinks a lot of final entity duplicates at every 
> restart because the flow is successful for data from a not-corrupted 
> partition but offsets are not moved for them._ 
> And it also causes troubles if @EqualsAndHashCode is defined to use all 
> fields to compare. At every restart, the topology tries to remove the old 
> value(not existing after the first run) and adds a new value at the end of 
> the list. The list grows after each restart(contains the same - new value 
> values).
> I also attached the topology description. To visualize: 
> [https://zz85.github.io/kafka-streams-viz/]
> *Current workaround:*
> To redefine @EqualsAndHashCode to use entities' ids only.
> *Not solved issue:*
> Sink events duplication at every restart.
> Thank you in advance!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)