This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 186ed0ed80d [fix](routine-load) dealing with the high watermark of
Kafka may fallback (#35901) (#37454)
186ed0ed80d is described below
commit 186ed0ed80da2b5b05ad17589635b1a1a3aa2502
Author: hui lai <[email protected]>
AuthorDate: Mon Jul 8 17:33:58 2024 +0800
[fix](routine-load) dealing with the high watermark of Kafka may fallback
(#35901) (#37454)
pick (#35901)
---
.../org/apache/doris/load/routineload/KafkaRoutineLoadJob.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 5df7714ffd3..692d79d445e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -743,7 +743,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id,
taskId, getBrokerList(),
getTopic(), getConvertedCustomProperties(),
Lists.newArrayList(partitionIdToOffset.keySet()));
for (Pair<Integer, Long> pair : tmp) {
- cachedPartitionWithLatestOffsets.put(pair.first, pair.second);
+ if (pair.second >=
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE)) {
+ cachedPartitionWithLatestOffsets.put(pair.first,
pair.second);
+ } else {
+ LOG.warn("Kafka offset fallback. partition: {}, cache
offset: {}"
+ + " get latest offset: {}, task {}, job {}",
+ pair.first,
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE),
+ pair.second, taskId, id);
+ }
}
} catch (Exception e) {
// It needs to pause job when can not get partition meta.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]