[ 
https://issues.apache.org/jira/browse/KAFKA-14302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damien Gasparina updated KAFKA-14302:
-------------------------------------
    Description: 
If a store, with a changelog topic, has been fully emptied, it could generate 
infinite probing rebalance.

 

The scenario is the following:
 * A Kafka Streams application have a store with a changelog
 * Many entries are pushed into the changelog, thus the Log end Offset is high, 
let's say 20,000
 * Then, the store got emptied, either due to data retention (windowing) or 
tombstone
 * Then an instance of the application is restarted, and its local disk is 
deleted (e.g. Kubernetes without Persistent Volume)
 * After restart, the application restores the store from the changelog, but 
does not write a checkpoint file as there are no data
 * As there are no checkpoint entries, this instance specify a taskOffsetSums 
with offset set to 0 in the subscriptionUserData
 * The group leader, during the assignment, then compute a lag of 20,000 (end 
offsets - task offset), which is greater than the default acceptable lag, thus 
decide to schedule a probing rebalance
 * In ther next probing rebalance, nothing changed, so... new probing rebalance

 

I was able to reproduce locally with a simple topology:

 
{code:java}
var table = streamsBuilder.stream("table");
streamsBuilder
.stream("stream")
.join(table, (eSt, eTb) -> eSt.toString() + eTb.toString(), 
JoinWindows.of(Duration.ofSeconds(5)))
.to("output");{code}
 

 

 

Due to this issue, application having an empty changelog are experiencing 
frequent rebalance:

!image-2022-10-14-12-04-01-190.png!

 

With assignments similar to:
{code:java}
[hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3] INFO 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - 
stream-thread 
[hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3-consumer] 
Assigned tasks [0_5, 0_4, 0_3, 0_2, 0_1, 0_0] including stateful [0_5, 0_4, 
0_3, 0_2, 0_1, 0_0] to clients as: 
d0e2d556-2587-48e8-b9ab-43a4e8207be6=[activeTasks: ([]) standbyTasks: ([0_0, 
0_1, 0_2, 0_3, 0_4, 0_5])]
8323d214-4c56-470f-bace-e4291cdf10eb=[activeTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 
0_5]) standbyTasks: ([])].{code}

  was:
If a store, with a changelog topic, has been fully emptied, it could generate 
infinite probing rebalance.

 

The scenario is the following:
 * A Kafka Streams application have a store with a changelog
 * Many entries are pushed into the changelog, thus the Log end Offset is high, 
let's say 20,000
 * Then, the store got emptied, either due to data retention (windowing) or 
tombstone
 * Then an instance of the application is restarted
 * It restores the store from the changelog, but does not write a checkpoint 
file as there are no data pushed at all
 * As there are no checkpoint entries, this instance specify a taskOffsetSums 
with offset set to 0 in the subscriptionUserData
 * The group leader, during the assignment, then compute a lag of 20,000 (end 
offsets - task offset), which is greater than the default acceptable lag, thus 
decide to schedule a probing rebalance
 * In ther next probing rebalance, nothing changed, so... new probing rebalance

 

I was able to reproduce locally with a simple topology:

 
{code:java}
var table = streamsBuilder.stream("table");
streamsBuilder
.stream("stream")
.join(table, (eSt, eTb) -> eSt.toString() + eTb.toString(), 
JoinWindows.of(Duration.ofSeconds(5)))
.to("output");{code}
 

 

 

Due to this issue, application having an empty changelog are experiencing 
frequent rebalance:

!image-2022-10-14-12-04-01-190.png!

 

With assignments similar to:
{code:java}
[hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3] INFO 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - 
stream-thread 
[hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3-consumer] 
Assigned tasks [0_5, 0_4, 0_3, 0_2, 0_1, 0_0] including stateful [0_5, 0_4, 
0_3, 0_2, 0_1, 0_0] to clients as: 
d0e2d556-2587-48e8-b9ab-43a4e8207be6=[activeTasks: ([]) standbyTasks: ([0_0, 
0_1, 0_2, 0_3, 0_4, 0_5])]
8323d214-4c56-470f-bace-e4291cdf10eb=[activeTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 
0_5]) standbyTasks: ([])].{code}


> Infinite probing rebalance if a changelog topic got emptied
> -----------------------------------------------------------
>
>                 Key: KAFKA-14302
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14302
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.3.1
>            Reporter: Damien Gasparina
>            Priority: Major
>         Attachments: image-2022-10-14-12-04-01-190.png
>
>
> If a store, with a changelog topic, has been fully emptied, it could generate 
> infinite probing rebalance.
>  
> The scenario is the following:
>  * A Kafka Streams application have a store with a changelog
>  * Many entries are pushed into the changelog, thus the Log end Offset is 
> high, let's say 20,000
>  * Then, the store got emptied, either due to data retention (windowing) or 
> tombstone
>  * Then an instance of the application is restarted, and its local disk is 
> deleted (e.g. Kubernetes without Persistent Volume)
>  * After restart, the application restores the store from the changelog, but 
> does not write a checkpoint file as there are no data
>  * As there are no checkpoint entries, this instance specify a taskOffsetSums 
> with offset set to 0 in the subscriptionUserData
>  * The group leader, during the assignment, then compute a lag of 20,000 (end 
> offsets - task offset), which is greater than the default acceptable lag, 
> thus decide to schedule a probing rebalance
>  * In ther next probing rebalance, nothing changed, so... new probing 
> rebalance
>  
> I was able to reproduce locally with a simple topology:
>  
> {code:java}
> var table = streamsBuilder.stream("table");
> streamsBuilder
> .stream("stream")
> .join(table, (eSt, eTb) -> eSt.toString() + eTb.toString(), 
> JoinWindows.of(Duration.ofSeconds(5)))
> .to("output");{code}
>  
>  
>  
> Due to this issue, application having an empty changelog are experiencing 
> frequent rebalance:
> !image-2022-10-14-12-04-01-190.png!
>  
> With assignments similar to:
> {code:java}
> [hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3] INFO 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - 
> stream-thread 
> [hello-world-8323d214-4c56-470f-bace-e4291cdf10eb-StreamThread-3-consumer] 
> Assigned tasks [0_5, 0_4, 0_3, 0_2, 0_1, 0_0] including stateful [0_5, 0_4, 
> 0_3, 0_2, 0_1, 0_0] to clients as: 
> d0e2d556-2587-48e8-b9ab-43a4e8207be6=[activeTasks: ([]) standbyTasks: ([0_0, 
> 0_1, 0_2, 0_3, 0_4, 0_5])]
> 8323d214-4c56-470f-bace-e4291cdf10eb=[activeTasks: ([0_0, 0_1, 0_2, 0_3, 0_4, 
> 0_5]) standbyTasks: ([])].{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to