[ 
https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13008:
------------------------------
    Description: 
In KIP-695, we improved the task idling mechanism by checking partition lag. 
It's a good improvement for timestamp sync. But I found it will cause the 
stream stop processing the data for a long time while waiting for the partition 
metadata.

 

I've been investigating this case for a while, and figuring out the issue will 
happen in below situation (or similar situation):
 # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
partitions: A-0, A-1, A-2)
 # After 2 streams started, the partitions assignment are: (I skipped some 
other processing related partitions for simplicity)
 stream1-thread1: A-0, A-1 
 stream2-thread1: A-2
 # start processing some data, assume now, the position and high watermark is:
 A-0: offset: 2, highWM: 2
 A-1: offset: 2, highWM: 2
 A-2: offset: 2, highWM: 2
 # Now, stream3 joined, so trigger rebalance with this assignment:
 stream1-thread1: A-0 
 stream2-thread1: A-2
 stream3-thread1: A-1
 # Suddenly, stream3 left, so now, rebalance again, with the step 2 assignment:
 stream1-thread1: A-0, *A-1* 
 stream2-thread1: A-2
 # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
record per 30 mins), and partition A-0 has fast input (ex: 10K records / sec). 
So, now, the stream1-thread1 won't process any data until we got input from 
partition A-1 (even if partition A-0 is buffered a lot, and we have 
`{{max.task.idle.ms}}` set to 0).

 

The reason why the stream1-thread1 won't process any data is because we can't 
get the metadata of partition A-1. And why we can't get the metadata? It's 
because
 # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
remote call
 # The lag for a partition will be cleared if the assignment in this round 
doesn't have this partition. check here. So, in the above example, the metadata 
cache for partition A-1 will be cleared in step 4, and re-initialized (to null) 
in step 5
 # In KIP-227, we introduced a fetch session to have incremental fetch 
request/response. That is, if the session existed, the client(consumer) will 
get the update only when the fetched partition have update (ex: new data). So, 
in the above case, the partition A-1 has slow input (ex: 1 record per 30 mins), 
it won't have update until next 30 mins, or wait for the fetch session become 
inactive for (default) 2 mins to be evicted. Either case, the metadata won't be 
updated for a while.

 

In KIP-695, if we don't get the partition lag, we can't determine the partition 
data status to do timestamp sync, so we'll keep waiting and not processing any 
data. That's why this issue will happen.

 

*Proposed solution:*
 # If we don't get the current lag for a partition, or the current lag > 0, we 
start to wait for max.task.idle.ms, and reset the deadline when we get the 
partition lag, like what we did in previous KIP-353
 # Introduce a waiting time config when no partition lag, or partition lag 
keeps > 0 (need KIP)

[~vvcephei] [~guozhang] , any suggestions?

 

cc [~ableegoldman]  [~mjsax] , this is the root cause that in 
[https://github.com/apache/kafka/pull/10736,] we discussed and thought there's 
a data lose situation. FYI.

  was:
In KIP-695, we improved the task idling mechanism by checking partition lag. 
It's a good improvement for timestamp sync. But I found it will cause the 
stream stop processing the data for a long time while waiting for the partition 
metadata.

 

I've been investigating this case for a while, and figuring out the issue will 
happen in below situation (or similar situation):
 # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
partitions: A-0, A-1, A-2)
 # After 2 streams started, the partitions assignment are: (I skipped some 
other processing related partitions for simplicity)
stream1-thread1: A-0, A-1 
stream2-thread1: A-2
 # start processing some data, assume now, the position and high watermark is:
A-0: offset: 2, highWM: 2
A-1: offset: 2, highWM: 2
A-2: offset: 2, highWM: 2
 # Now, stream3 joined, so trigger rebalance with this assignment:
stream1-thread1: A-0 
stream2-thread1: A-2
stream3-thread1: A-1
 # Suddenly, stream3 left, so now, rebalance again, with the step 2 assignment:
stream1-thread1: A-0, *A-1* 
stream2-thread1: A-2
 # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
record per 30 mins), and partition A-0 has fast input (ex: 10K records / sec). 
So, now, the stream1-thread1 won't process any data until we got input from 
partition A-1 (even if partition A-0 is buffered a lot, and we have 
`{{max.task.idle.ms}}` set to 0).

 

The reason why the stream1-thread1 won't process any data is because we can't 
get the metadata of partition A-1. And why we can't get the metadata? It's 
because
 # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
remote call
 # The lag for a partition will be cleared if the assignment in this round 
doesn't have this partition. check here. So, in the above example, the metadata 
cache for partition A-1 will be cleared in step 4, and re-initialized (to null) 
in step 5
 # In KIP-227, we introduced a fetch session to have incremental fetch 
request/response. That is, if the session existed, the client(consumer) will 
get the update only when the fetched partition have update (ex: new data). So, 
in the above case, the partition A-1 has slow input (ex: 1 record per 30 mins), 
it won't have update until next 30 mins, or wait for the fetch session become 
inactive for (default) 2 mins to be evicted. Either case, the metadata won't be 
updated for a while.

 

In KIP-695, if we don't get the partition lag, we can't determine the partition 
data status to do timestamp sync, so we'll keep waiting and not processing any 
data. That's why this issue will happen.

 

*Proposed solution:*
 # If we don't get the current lag for a partition, or the current lag > 0, we 
start to wait for max.task.idle.ms, and reset the deadline when we get the 
partition lag, like what we did in previous KIP-353
 # Introduce a waiting time config when no partition lag, or partition lag 
keeps > 0 (need KIP)

[~vvcephei] [~guozhang] , any suggestions?

 

cc [~ableegoldman]  [~mjsax] , this is the root cause that in 
[https://github.com/apache/kafka/pull/10736,] we discussed and thought there's 
a data lose situation. FYI.


> Stream will stop processing data for a long time while waiting for the 
> partition lag
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13008
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13008
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 3.0.0
>            Reporter: Luke Chen
>            Priority: Major
>
> In KIP-695, we improved the task idling mechanism by checking partition lag. 
> It's a good improvement for timestamp sync. But I found it will cause the 
> stream stop processing the data for a long time while waiting for the 
> partition metadata.
>  
> I've been investigating this case for a while, and figuring out the issue 
> will happen in below situation (or similar situation):
>  # start 2 streams (each with 1 thread) to consume from a topicA (with 3 
> partitions: A-0, A-1, A-2)
>  # After 2 streams started, the partitions assignment are: (I skipped some 
> other processing related partitions for simplicity)
>  stream1-thread1: A-0, A-1 
>  stream2-thread1: A-2
>  # start processing some data, assume now, the position and high watermark is:
>  A-0: offset: 2, highWM: 2
>  A-1: offset: 2, highWM: 2
>  A-2: offset: 2, highWM: 2
>  # Now, stream3 joined, so trigger rebalance with this assignment:
>  stream1-thread1: A-0 
>  stream2-thread1: A-2
>  stream3-thread1: A-1
>  # Suddenly, stream3 left, so now, rebalance again, with the step 2 
> assignment:
>  stream1-thread1: A-0, *A-1* 
>  stream2-thread1: A-2
>  # Now, note that, the partition A-1 used to get assigned to stream1-thread1, 
> and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 
> record per 30 mins), and partition A-0 has fast input (ex: 10K records / 
> sec). So, now, the stream1-thread1 won't process any data until we got input 
> from partition A-1 (even if partition A-0 is buffered a lot, and we have 
> `{{max.task.idle.ms}}` set to 0).
>  
> The reason why the stream1-thread1 won't process any data is because we can't 
> get the metadata of partition A-1. And why we can't get the metadata? It's 
> because
>  # In KIP-695, we use consumer's cache to get the partition lag, to avoid 
> remote call
>  # The lag for a partition will be cleared if the assignment in this round 
> doesn't have this partition. check here. So, in the above example, the 
> metadata cache for partition A-1 will be cleared in step 4, and 
> re-initialized (to null) in step 5
>  # In KIP-227, we introduced a fetch session to have incremental fetch 
> request/response. That is, if the session existed, the client(consumer) will 
> get the update only when the fetched partition have update (ex: new data). 
> So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 
> mins), it won't have update until next 30 mins, or wait for the fetch session 
> become inactive for (default) 2 mins to be evicted. Either case, the metadata 
> won't be updated for a while.
>  
> In KIP-695, if we don't get the partition lag, we can't determine the 
> partition data status to do timestamp sync, so we'll keep waiting and not 
> processing any data. That's why this issue will happen.
>  
> *Proposed solution:*
>  # If we don't get the current lag for a partition, or the current lag > 0, 
> we start to wait for max.task.idle.ms, and reset the deadline when we get the 
> partition lag, like what we did in previous KIP-353
>  # Introduce a waiting time config when no partition lag, or partition lag 
> keeps > 0 (need KIP)
> [~vvcephei] [~guozhang] , any suggestions?
>  
> cc [~ableegoldman]  [~mjsax] , this is the root cause that in 
> [https://github.com/apache/kafka/pull/10736,] we discussed and thought 
> there's a data lose situation. FYI.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to