This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 88feffd32d1 branch-4.0: [enhance](job) refresh routine load lag more
timely (#63654) (#64283)
88feffd32d1 is described below
commit 88feffd32d1bfa2d67e2b25c3dbbaad9c8ed8cd6
Author: hui lai <[email protected]>
AuthorDate: Thu Jun 11 11:17:35 2026 +0800
branch-4.0: [enhance](job) refresh routine load lag more timely (#63654)
(#64283)
pick https://github.com/apache/doris/pull/63654
---
.../load/routineload/KafkaRoutineLoadJob.java | 87 ++++++++++++++++------
.../doris/load/routineload/RoutineLoadJob.java | 3 +
.../doris/load/routineload/RoutineLoadManager.java | 14 ++++
.../load/routineload/RoutineLoadScheduler.java | 2 +
.../load/routineload/KafkaRoutineLoadJobTest.java | 68 +++++++++++++++++
5 files changed, 151 insertions(+), 23 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 36e6faf5582..c338dbfeda4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -181,9 +181,14 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
@Override
public void prepare() throws UserException {
- // should reset converted properties each time the job being prepared.
- // because the file info can be changed anytime.
- convertCustomProperties(true);
+ writeLock();
+ try {
+ // should reset converted properties each time the job being
prepared.
+ // because the file info can be changed anytime.
+ convertCustomProperties(true);
+ } finally {
+ writeUnlock();
+ }
}
private void convertCustomProperties(boolean rebuild) throws DdlException {
@@ -332,15 +337,22 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
private void updateProgressAndOffsetsCache(RLTaskTxnCommitAttachment
attachment) {
((KafkaProgress)
attachment.getProgress()).getOffsetByPartition().entrySet().stream()
- .forEach(entity -> {
- if
(cachedPartitionWithLatestOffsets.containsKey(entity.getKey())
- &&
cachedPartitionWithLatestOffsets.get(entity.getKey()) < entity.getValue() + 1) {
- cachedPartitionWithLatestOffsets.put(entity.getKey(),
entity.getValue() + 1);
- }
- });
+ .forEach(entity ->
cachedPartitionWithLatestOffsets.computeIfPresent(entity.getKey(),
+ (partitionId, cachedOffset) -> Math.max(cachedOffset,
entity.getValue() + 1)));
this.progress.update(attachment);
}
+ private void updateLatestOffsetsCache(List<Pair<Integer, Long>>
latestOffsets, UUID taskId) {
+ for (Pair<Integer, Long> pair : latestOffsets) {
+ Long updatedOffset =
cachedPartitionWithLatestOffsets.merge(pair.first, pair.second, Math::max);
+ if (updatedOffset > pair.second) {
+ LOG.warn("Kafka offset fallback. partition: {}, cache offset:
{}"
+ + " get latest offset: {}, task {}, job {}",
+ pair.first, updatedOffset, pair.second, taskId,
id);
+ }
+ }
+ }
+
@Override
protected void updateProgress(RLTaskTxnCommitAttachment attachment) throws
UserException {
updateProgressAndOffsetsCache(attachment);
@@ -619,14 +631,15 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
super.setOptional(info);
KafkaDataSourceProperties kafkaDataSourceProperties
= (KafkaDataSourceProperties) info.getDataSourceProperties();
- if
(CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets()))
{
- setCustomKafkaPartitions(kafkaDataSourceProperties);
- }
if
(MapUtils.isNotEmpty(kafkaDataSourceProperties.getCustomKafkaProperties())) {
setCustomKafkaProperties(kafkaDataSourceProperties.getCustomKafkaProperties());
}
// set group id if not specified
this.customProperties.putIfAbsent(PROP_GROUP_ID, name + "_" +
UUID.randomUUID());
+ convertCustomProperties(true);
+ if
(CollectionUtils.isNotEmpty(kafkaDataSourceProperties.getKafkaPartitionOffsets()))
{
+ setCustomKafkaPartitions(kafkaDataSourceProperties);
+ }
}
// this is an unprotected method which is called in the initialization
function
@@ -866,18 +879,21 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
try {
// all offsets to be consumed are newer than offsets in
cachedPartitionWithLatestOffsets,
// maybe the cached offset is out-of-date, fetch from kafka server
again
- List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id,
taskId, getBrokerList(),
- getTopic(), getConvertedCustomProperties(),
Lists.newArrayList(partitionIdToOffset.keySet()));
- for (Pair<Integer, Long> pair : tmp) {
- if (pair.second >=
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE)) {
- cachedPartitionWithLatestOffsets.put(pair.first,
pair.second);
- } else {
- LOG.warn("Kafka offset fallback. partition: {}, cache
offset: {}"
- + " get latest offset: {}, task {}, job {}",
- pair.first,
cachedPartitionWithLatestOffsets.getOrDefault(pair.first, Long.MIN_VALUE),
- pair.second, taskId, id);
- }
+ String brokerListSnapshot;
+ String topicSnapshot;
+ Map<String, String> customPropertiesSnapshot;
+ writeLock();
+ try {
+ convertCustomProperties(false);
+ brokerListSnapshot = brokerList;
+ topicSnapshot = topic;
+ customPropertiesSnapshot =
Maps.newHashMap(convertedCustomProperties);
+ } finally {
+ writeUnlock();
}
+ List<Pair<Integer, Long>> tmp = KafkaUtil.getLatestOffsets(id,
taskId, brokerListSnapshot,
+ topicSnapshot, customPropertiesSnapshot,
Lists.newArrayList(partitionIdToOffset.keySet()));
+ updateLatestOffsetsCache(tmp, taskId);
} catch (Exception e) {
// It needs to pause job when can not get partition meta.
// To ensure the stability of the routine load,
@@ -919,6 +935,31 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
return false;
}
+ @Override
+ public void updateLag() throws UserException {
+ List<Integer> partitionIds;
+ String brokerListSnapshot;
+ String topicSnapshot;
+ Map<String, String> customPropertiesSnapshot;
+ writeLock();
+ try {
+ convertCustomProperties(false);
+ partitionIds = Lists.newArrayList(((KafkaProgress)
progress).getOffsetByPartition().keySet());
+ if (partitionIds.isEmpty()) {
+ return;
+ }
+ brokerListSnapshot = brokerList;
+ topicSnapshot = topic;
+ customPropertiesSnapshot =
Maps.newHashMap(convertedCustomProperties);
+ } finally {
+ writeUnlock();
+ }
+ UUID taskId = UUID.randomUUID();
+ List<Pair<Integer, Long>> latestOffsets =
KafkaUtil.getLatestOffsets(id, taskId, brokerListSnapshot,
+ topicSnapshot, customPropertiesSnapshot, partitionIds);
+ updateLatestOffsetsCache(latestOffsets, taskId);
+ }
+
@Override
public String getLag() {
Map<Integer, Long> partitionIdToOffsetLag = ((KafkaProgress)
progress).getLag(cachedPartitionWithLatestOffsets);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 3a5d95c9442..37c9a6a1c52 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1001,6 +1001,9 @@ public abstract class RoutineLoadJob
return 0L;
}
+ public void updateLag() throws UserException {
+ }
+
abstract RoutineLoadTaskInfo unprotectRenewTask(RoutineLoadTaskInfo
routineLoadTaskInfo, boolean delaySchedule);
// call before first scheduling
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
index 3b1f498bcfc..26f806a76ff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadManager.java
@@ -870,6 +870,20 @@ public class RoutineLoadManager implements Writable {
}
}
+ public void updateRoutineLoadJobLag() {
+ for (RoutineLoadJob routineLoadJob : idToRoutineLoadJob.values()) {
+ if (!routineLoadJob.state.isFinalState()) {
+ try {
+ routineLoadJob.updateLag();
+ } catch (UserException e) {
+ LOG.warn(new LogBuilder(LogKey.ROUTINE_LOAD_JOB,
routineLoadJob.getId())
+ .add("msg", "failed to update routine load lag")
+ .build(), e);
+ }
+ }
+ }
+ }
+
public void replayCreateRoutineLoadJob(RoutineLoadJob routineLoadJob) {
unprotectedAddJob(routineLoadJob);
LOG.info(new LogBuilder(LogKey.ROUTINE_LOAD_JOB,
routineLoadJob.getId())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
index 023cd239e09..6fe09e27a0b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadScheduler.java
@@ -62,6 +62,8 @@ public class RoutineLoadScheduler extends MasterDaemon {
private void process() throws UserException {
// update
routineLoadManager.updateRoutineLoadJob();
+ // refresh lag cache after job progress and partition metadata are
updated
+ routineLoadManager.updateRoutineLoadJobLag();
// get need schedule routine jobs
List<RoutineLoadJob> routineLoadJobList = null;
try {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 1d8b58257dc..5113c57ec88 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -203,6 +203,74 @@ public class KafkaRoutineLoadJobTest {
}
}
+ @Test
+ public void testUpdateLagRefreshesLatestOffsetCache() throws UserException
{
+ KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L,
"kafka_routine_load_job", 1L,
+ 1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+ Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
+ partitionIdToOffset.put(1, 10L);
+ partitionIdToOffset.put(2, 20L);
+ Deencapsulation.setField(routineLoadJob, "progress", new
KafkaProgress(partitionIdToOffset));
+
+ new MockUp<KafkaUtil>() {
+ @Mock
+ public List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID
taskId, String brokerList, String topic,
+ Map<String,
String> convertedCustomProperties,
+ List<Integer>
partitionIds) {
+ Assert.assertEquals(1L, jobId);
+ Assert.assertEquals("127.0.0.1:9020", brokerList);
+ Assert.assertEquals("topic1", topic);
+ Assert.assertTrue(partitionIds.contains(1));
+ Assert.assertTrue(partitionIds.contains(2));
+ return Lists.newArrayList(Pair.of(1, 15L), Pair.of(2, 30L));
+ }
+ };
+
+ routineLoadJob.updateLag();
+
+ Assert.assertEquals(15L, routineLoadJob.totalLag().longValue());
+ }
+
+ @Test
+ public void testUpdateLagRebuildsConvertedPropertiesAfterReplay(@Mocked
Env env) throws UserException {
+ KafkaRoutineLoadJob routineLoadJob = new KafkaRoutineLoadJob(1L,
"kafka_routine_load_job", 1L,
+ 1L, "127.0.0.1:9020", "topic1", UserIdentity.ADMIN);
+ Deencapsulation.setField(routineLoadJob, "customKafkaPartitions",
Lists.newArrayList(1));
+
+ Map<String, String> customProperties = Maps.newHashMap();
+ customProperties.put("security.protocol", "SASL_PLAINTEXT");
+ customProperties.put("sasl.mechanism", "PLAIN");
+ Deencapsulation.setField(routineLoadJob, "customProperties",
customProperties);
+ Deencapsulation.setField(routineLoadJob, "convertedCustomProperties",
Maps.newHashMap());
+
+ Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
+ partitionIdToOffset.put(1, 10L);
+ Deencapsulation.setField(routineLoadJob, "progress", new
KafkaProgress(partitionIdToOffset));
+
+ new MockUp<Env>() {
+ @Mock
+ public Env getCurrentEnv() {
+ return env;
+ }
+ };
+ new MockUp<KafkaUtil>() {
+ @Mock
+ public List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID
taskId, String brokerList, String topic,
+ Map<String,
String> convertedCustomProperties,
+ List<Integer>
partitionIds) {
+ Assert.assertEquals("SASL_PLAINTEXT",
convertedCustomProperties.get("security.protocol"));
+ Assert.assertEquals("PLAIN",
convertedCustomProperties.get("sasl.mechanism"));
+ Assert.assertEquals(1, partitionIds.size());
+ Assert.assertTrue(partitionIds.contains(1));
+ return Lists.newArrayList(Pair.of(1, 15L));
+ }
+ };
+
+ routineLoadJob.updateLag();
+
+ Assert.assertEquals(5L, routineLoadJob.totalLag().longValue());
+ }
+
@Test
public void testProcessTimeOutTasks(@Injectable GlobalTransactionMgr
globalTransactionMgr,
@Injectable RoutineLoadManager
routineLoadManager,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]