This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7935dc9fae4bca7852c9b3ca7f3670f2459a657c
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu Feb 1 18:35:11 2024 +0800

    [fix](routine-load) update partition offset cache timely to avoid negative 
lag #30455
---
 .../org/apache/doris/load/routineload/KafkaRoutineLoadJob.java   | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 77171b6a4cb..2f4d2509304 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -707,6 +707,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
     // check if given partitions has more data to consume.
     // 'partitionIdToOffset' to the offset to be consumed.
     public boolean hasMoreDataToConsume(UUID taskId, Map<Integer, Long> 
partitionIdToOffset) throws UserException {
+        boolean needUpdateCache = false;
+        // it is need check all partitions, for some partitions offset may be 
out of time
         for (Map.Entry<Integer, Long> entry : partitionIdToOffset.entrySet()) {
             if (cachedPartitionWithLatestOffsets.containsKey(entry.getKey())
                     && entry.getValue() < 
cachedPartitionWithLatestOffsets.get(entry.getKey())) {
@@ -717,9 +719,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
                 //  query_watermark_offsets() will return 4.)
                 LOG.debug("has more data to consume. offsets to be consumed: 
{}, latest offsets: {}, task {}, job {}",
                         partitionIdToOffset, cachedPartitionWithLatestOffsets, 
taskId, id);
-                return true;
+            } else {
+                needUpdateCache = true;
+                break;
             }
         }
+        if (needUpdateCache == false) {
+            return true;
+        }
 
         try {
             // all offsets to be consumed are newer than offsets in 
cachedPartitionWithLatestOffsets,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to