[jira] [Updated] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13008: Component/s: streams > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Blocker > Fix For: 3.0.0 > > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config when no partition lag, or partition lag > keeps > 0 (need KIP) > [~vvcephei] [~guozhang] , any suggestions? > > cc [~ableegoldman] [~mjsax] , this is the root cause that in > [https://github.com/apache/kafka/pull/10736,] we discussed and thought > there's a data lose situation. FYI. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-13008: --- Fix Version/s: 3.0.0 > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Major > Fix For: 3.0.0 > > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config when no partition lag, or partition lag > keeps > 0 (need KIP) > [~vvcephei] [~guozhang] , any suggestions? > > cc [~ableegoldman] [~mjsax] , this is the root cause that in > [https://github.com/apache/kafka/pull/10736,] we discussed and thought > there's a data lose situation. FYI. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-13008: --- Priority: Blocker (was: Major) > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Blocker > Fix For: 3.0.0 > > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config when no partition lag, or partition lag > keeps > 0 (need KIP) > [~vvcephei] [~guozhang] , any suggestions? > > cc [~ableegoldman] [~mjsax] , this is the root cause that in > [https://github.com/apache/kafka/pull/10736,] we discussed and thought > there's a data lose situation. FYI. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13008: -- Attachment: image-2021-07-07-11-19-55-630.png > Stream will stop processing data for a long time while waiting for the > partition lag > > > Key: KAFKA-13008 > URL: https://issues.apache.org/jira/browse/KAFKA-13008 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Luke Chen >Priority: Major > Attachments: image-2021-07-07-11-19-55-630.png > > > In KIP-695, we improved the task idling mechanism by checking partition lag. > It's a good improvement for timestamp sync. But I found it will cause the > stream stop processing the data for a long time while waiting for the > partition metadata. > > I've been investigating this case for a while, and figuring out the issue > will happen in below situation (or similar situation): > # start 2 streams (each with 1 thread) to consume from a topicA (with 3 > partitions: A-0, A-1, A-2) > # After 2 streams started, the partitions assignment are: (I skipped some > other processing related partitions for simplicity) > stream1-thread1: A-0, A-1 > stream2-thread1: A-2 > # start processing some data, assume now, the position and high watermark is: > A-0: offset: 2, highWM: 2 > A-1: offset: 2, highWM: 2 > A-2: offset: 2, highWM: 2 > # Now, stream3 joined, so trigger rebalance with this assignment: > stream1-thread1: A-0 > stream2-thread1: A-2 > stream3-thread1: A-1 > # Suddenly, stream3 left, so now, rebalance again, with the step 2 > assignment: > stream1-thread1: A-0, *A-1* > stream2-thread1: A-2 > (note: after initialization, the position of A-1 will be: position: null, > highWM: null) > # Now, note that, the partition A-1 used to get assigned to stream1-thread1, > and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 > record per 30 mins), and partition A-0 has fast input (ex: 10K records / > sec). So, now, the stream1-thread1 won't process any data until we got input > from partition A-1 (even if partition A-0 is buffered a lot, and we have > `{{max.task.idle.ms}}` set to 0). > > The reason why the stream1-thread1 won't process any data is because we can't > get the lag of partition A-1. And why we can't get the lag? It's because > # In KIP-695, we use consumer's cache to get the partition lag, to avoid > remote call > # The lag for a partition will be cleared if the assignment in this round > doesn't have this partition. check > [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. > So, in the above example, the metadata cache for partition A-1 will be > cleared in step 4, and re-initialized (to null) in step 5 > # In KIP-227, we introduced a fetch session to have incremental fetch > request/response. That is, if the session existed, the client(consumer) will > get the update only when the fetched partition have update (ex: new data). > So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 > mins), it won't have update until next 30 mins, or wait for the fetch session > become inactive for (default) 2 mins to be evicted. Either case, the metadata > won't be updated for a while. > > In KIP-695, if we don't get the partition lag, we can't determine the > partition data status to do timestamp sync, so we'll keep waiting and not > processing any data. That's why this issue will happen. > > *Proposed solution:* > # If we don't get the current lag for a partition, or the current lag > 0, > we start to wait for max.task.idle.ms, and reset the deadline when we get the > partition lag, like what we did in previous KIP-353 > # Introduce a waiting time config when no partition lag, or partition lag > keeps > 0 (need KIP) > [~vvcephei] [~guozhang] , any suggestions? > > cc [~ableegoldman] [~mjsax] , this is the root cause that in > [https://github.com/apache/kafka/pull/10736,] we discussed and thought > there's a data lose situation. FYI. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13008: -- Description: In KIP-695, we improved the task idling mechanism by checking partition lag. It's a good improvement for timestamp sync. But I found it will cause the stream stop processing the data for a long time while waiting for the partition metadata. I've been investigating this case for a while, and figuring out the issue will happen in below situation (or similar situation): # start 2 streams (each with 1 thread) to consume from a topicA (with 3 partitions: A-0, A-1, A-2) # After 2 streams started, the partitions assignment are: (I skipped some other processing related partitions for simplicity) stream1-thread1: A-0, A-1 stream2-thread1: A-2 # start processing some data, assume now, the position and high watermark is: A-0: offset: 2, highWM: 2 A-1: offset: 2, highWM: 2 A-2: offset: 2, highWM: 2 # Now, stream3 joined, so trigger rebalance with this assignment: stream1-thread1: A-0 stream2-thread1: A-2 stream3-thread1: A-1 # Suddenly, stream3 left, so now, rebalance again, with the step 2 assignment: stream1-thread1: A-0, *A-1* stream2-thread1: A-2 (note: after initialization, the position of A-1 will be: position: null, highWM: null) # Now, note that, the partition A-1 used to get assigned to stream1-thread1, and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 record per 30 mins), and partition A-0 has fast input (ex: 10K records / sec). So, now, the stream1-thread1 won't process any data until we got input from partition A-1 (even if partition A-0 is buffered a lot, and we have `{{max.task.idle.ms}}` set to 0). The reason why the stream1-thread1 won't process any data is because we can't get the lag of partition A-1. And why we can't get the lag? It's because # In KIP-695, we use consumer's cache to get the partition lag, to avoid remote call # The lag for a partition will be cleared if the assignment in this round doesn't have this partition. check [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. So, in the above example, the metadata cache for partition A-1 will be cleared in step 4, and re-initialized (to null) in step 5 # In KIP-227, we introduced a fetch session to have incremental fetch request/response. That is, if the session existed, the client(consumer) will get the update only when the fetched partition have update (ex: new data). So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 mins), it won't have update until next 30 mins, or wait for the fetch session become inactive for (default) 2 mins to be evicted. Either case, the metadata won't be updated for a while. In KIP-695, if we don't get the partition lag, we can't determine the partition data status to do timestamp sync, so we'll keep waiting and not processing any data. That's why this issue will happen. *Proposed solution:* # If we don't get the current lag for a partition, or the current lag > 0, we start to wait for max.task.idle.ms, and reset the deadline when we get the partition lag, like what we did in previous KIP-353 # Introduce a waiting time config when no partition lag, or partition lag keeps > 0 (need KIP) [~vvcephei] [~guozhang] , any suggestions? cc [~ableegoldman] [~mjsax] , this is the root cause that in [https://github.com/apache/kafka/pull/10736,] we discussed and thought there's a data lose situation. FYI. was: In KIP-695, we improved the task idling mechanism by checking partition lag. It's a good improvement for timestamp sync. But I found it will cause the stream stop processing the data for a long time while waiting for the partition metadata. I've been investigating this case for a while, and figuring out the issue will happen in below situation (or similar situation): # start 2 streams (each with 1 thread) to consume from a topicA (with 3 partitions: A-0, A-1, A-2) # After 2 streams started, the partitions assignment are: (I skipped some other processing related partitions for simplicity) stream1-thread1: A-0, A-1 stream2-thread1: A-2 # start processing some data, assume now, the position and high watermark is: A-0: offset: 2, highWM: 2 A-1: offset: 2, highWM: 2 A-2: offset: 2, highWM: 2 # Now, stream3 joined, so trigger rebalance with this assignment: stream1-thread1: A-0 stream2-thread1: A-2 stream3-thread1: A-1 # Suddenly, stream3 left, so now, rebalance again, with the step 2 assignment: stream1-thread1: A-0, *A-1* stream2-thread1: A-2 (note: after initialization, the position of A-1 will be: position: null, highWM: null) # Now, note that, the partition A-1 used to get assigned to stream1-thread1, and now, it's back. And also, assume the partition A-1 has
[jira] [Updated] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13008: -- Description: In KIP-695, we improved the task idling mechanism by checking partition lag. It's a good improvement for timestamp sync. But I found it will cause the stream stop processing the data for a long time while waiting for the partition metadata. I've been investigating this case for a while, and figuring out the issue will happen in below situation (or similar situation): # start 2 streams (each with 1 thread) to consume from a topicA (with 3 partitions: A-0, A-1, A-2) # After 2 streams started, the partitions assignment are: (I skipped some other processing related partitions for simplicity) stream1-thread1: A-0, A-1 stream2-thread1: A-2 # start processing some data, assume now, the position and high watermark is: A-0: offset: 2, highWM: 2 A-1: offset: 2, highWM: 2 A-2: offset: 2, highWM: 2 # Now, stream3 joined, so trigger rebalance with this assignment: stream1-thread1: A-0 stream2-thread1: A-2 stream3-thread1: A-1 # Suddenly, stream3 left, so now, rebalance again, with the step 2 assignment: stream1-thread1: A-0, *A-1* stream2-thread1: A-2 (note: after initialization, the position of A-1 will be: position: null, highWM: null) # Now, note that, the partition A-1 used to get assigned to stream1-thread1, and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 record per 30 mins), and partition A-0 has fast input (ex: 10K records / sec). So, now, the stream1-thread1 won't process any data until we got input from partition A-1 (even if partition A-0 is buffered a lot, and we have `{{max.task.idle.ms}}` set to 0). The reason why the stream1-thread1 won't process any data is because we can't get the metadata of partition A-1. And why we can't get the metadata? It's because # In KIP-695, we use consumer's cache to get the partition lag, to avoid remote call # The lag for a partition will be cleared if the assignment in this round doesn't have this partition. check [here|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java#L272]. So, in the above example, the metadata cache for partition A-1 will be cleared in step 4, and re-initialized (to null) in step 5 # In KIP-227, we introduced a fetch session to have incremental fetch request/response. That is, if the session existed, the client(consumer) will get the update only when the fetched partition have update (ex: new data). So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 mins), it won't have update until next 30 mins, or wait for the fetch session become inactive for (default) 2 mins to be evicted. Either case, the metadata won't be updated for a while. In KIP-695, if we don't get the partition lag, we can't determine the partition data status to do timestamp sync, so we'll keep waiting and not processing any data. That's why this issue will happen. *Proposed solution:* # If we don't get the current lag for a partition, or the current lag > 0, we start to wait for max.task.idle.ms, and reset the deadline when we get the partition lag, like what we did in previous KIP-353 # Introduce a waiting time config when no partition lag, or partition lag keeps > 0 (need KIP) [~vvcephei] [~guozhang] , any suggestions? cc [~ableegoldman] [~mjsax] , this is the root cause that in [https://github.com/apache/kafka/pull/10736,] we discussed and thought there's a data lose situation. FYI. was: In KIP-695, we improved the task idling mechanism by checking partition lag. It's a good improvement for timestamp sync. But I found it will cause the stream stop processing the data for a long time while waiting for the partition metadata. I've been investigating this case for a while, and figuring out the issue will happen in below situation (or similar situation): # start 2 streams (each with 1 thread) to consume from a topicA (with 3 partitions: A-0, A-1, A-2) # After 2 streams started, the partitions assignment are: (I skipped some other processing related partitions for simplicity) stream1-thread1: A-0, A-1 stream2-thread1: A-2 # start processing some data, assume now, the position and high watermark is: A-0: offset: 2, highWM: 2 A-1: offset: 2, highWM: 2 A-2: offset: 2, highWM: 2 # Now, stream3 joined, so trigger rebalance with this assignment: stream1-thread1: A-0 stream2-thread1: A-2 stream3-thread1: A-1 # Suddenly, stream3 left, so now, rebalance again, with the step 2 assignment: stream1-thread1: A-0, *A-1* stream2-thread1: A-2 # Now, note that, the partition A-1 used to get assigned to stream1-thread1, and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 record per 30 mins), and partition A-0 has fast input (ex:
[jira] [Updated] (KAFKA-13008) Stream will stop processing data for a long time while waiting for the partition lag
[ https://issues.apache.org/jira/browse/KAFKA-13008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-13008: -- Description: In KIP-695, we improved the task idling mechanism by checking partition lag. It's a good improvement for timestamp sync. But I found it will cause the stream stop processing the data for a long time while waiting for the partition metadata. I've been investigating this case for a while, and figuring out the issue will happen in below situation (or similar situation): # start 2 streams (each with 1 thread) to consume from a topicA (with 3 partitions: A-0, A-1, A-2) # After 2 streams started, the partitions assignment are: (I skipped some other processing related partitions for simplicity) stream1-thread1: A-0, A-1 stream2-thread1: A-2 # start processing some data, assume now, the position and high watermark is: A-0: offset: 2, highWM: 2 A-1: offset: 2, highWM: 2 A-2: offset: 2, highWM: 2 # Now, stream3 joined, so trigger rebalance with this assignment: stream1-thread1: A-0 stream2-thread1: A-2 stream3-thread1: A-1 # Suddenly, stream3 left, so now, rebalance again, with the step 2 assignment: stream1-thread1: A-0, *A-1* stream2-thread1: A-2 # Now, note that, the partition A-1 used to get assigned to stream1-thread1, and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 record per 30 mins), and partition A-0 has fast input (ex: 10K records / sec). So, now, the stream1-thread1 won't process any data until we got input from partition A-1 (even if partition A-0 is buffered a lot, and we have `{{max.task.idle.ms}}` set to 0). The reason why the stream1-thread1 won't process any data is because we can't get the metadata of partition A-1. And why we can't get the metadata? It's because # In KIP-695, we use consumer's cache to get the partition lag, to avoid remote call # The lag for a partition will be cleared if the assignment in this round doesn't have this partition. check here. So, in the above example, the metadata cache for partition A-1 will be cleared in step 4, and re-initialized (to null) in step 5 # In KIP-227, we introduced a fetch session to have incremental fetch request/response. That is, if the session existed, the client(consumer) will get the update only when the fetched partition have update (ex: new data). So, in the above case, the partition A-1 has slow input (ex: 1 record per 30 mins), it won't have update until next 30 mins, or wait for the fetch session become inactive for (default) 2 mins to be evicted. Either case, the metadata won't be updated for a while. In KIP-695, if we don't get the partition lag, we can't determine the partition data status to do timestamp sync, so we'll keep waiting and not processing any data. That's why this issue will happen. *Proposed solution:* # If we don't get the current lag for a partition, or the current lag > 0, we start to wait for max.task.idle.ms, and reset the deadline when we get the partition lag, like what we did in previous KIP-353 # Introduce a waiting time config when no partition lag, or partition lag keeps > 0 (need KIP) [~vvcephei] [~guozhang] , any suggestions? cc [~ableegoldman] [~mjsax] , this is the root cause that in [https://github.com/apache/kafka/pull/10736,] we discussed and thought there's a data lose situation. FYI. was: In KIP-695, we improved the task idling mechanism by checking partition lag. It's a good improvement for timestamp sync. But I found it will cause the stream stop processing the data for a long time while waiting for the partition metadata. I've been investigating this case for a while, and figuring out the issue will happen in below situation (or similar situation): # start 2 streams (each with 1 thread) to consume from a topicA (with 3 partitions: A-0, A-1, A-2) # After 2 streams started, the partitions assignment are: (I skipped some other processing related partitions for simplicity) stream1-thread1: A-0, A-1 stream2-thread1: A-2 # start processing some data, assume now, the position and high watermark is: A-0: offset: 2, highWM: 2 A-1: offset: 2, highWM: 2 A-2: offset: 2, highWM: 2 # Now, stream3 joined, so trigger rebalance with this assignment: stream1-thread1: A-0 stream2-thread1: A-2 stream3-thread1: A-1 # Suddenly, stream3 left, so now, rebalance again, with the step 2 assignment: stream1-thread1: A-0, *A-1* stream2-thread1: A-2 # Now, note that, the partition A-1 used to get assigned to stream1-thread1, and now, it's back. And also, assume the partition A-1 has slow input (ex: 1 record per 30 mins), and partition A-0 has fast input (ex: 10K records / sec). So, now, the stream1-thread1 won't process any data until we got input from partition A-1 (even if partition A-0 is buffered a lot, and we have `{{max.task.idle.ms}}` set to 0). The reason why the stream1-thread1