This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 7a815036e9d branch-3.0: [fix](routine load) make routine load delay
eof schedule work #45528 (#45722)
7a815036e9d is described below
commit 7a815036e9d26a0adf9d7d668bd380c293cb7a1b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Dec 23 09:59:22 2024 +0800
branch-3.0: [fix](routine load) make routine load delay eof schedule work
#45528 (#45722)
Cherry-picked from #45528
Co-authored-by: hui lai <[email protected]>
---
.../load/routineload/KafkaRoutineLoadJob.java | 2 +-
.../doris/load/routineload/KafkaTaskInfo.java | 9 +-
.../load/routineload/RoutineLoadTaskInfo.java | 9 +-
.../load/routineload/KafkaRoutineLoadJobTest.java | 2 +-
.../routineload/RoutineLoadTaskSchedulerTest.java | 2 +-
.../transaction/GlobalTransactionMgrTest.java | 4 +-
.../suites/load_p0/routine_load/data/test_eof.csv | 1 +
.../routine_load/test_routine_load_eof.groovy | 178 +++++++++++++++++++++
8 files changed, 195 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
index 6bdef3301a6..d0843eb9204 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaRoutineLoadJob.java
@@ -235,7 +235,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
KafkaTaskInfo kafkaTaskInfo = new
KafkaTaskInfo(UUID.randomUUID(), id,
maxBatchIntervalS *
Config.routine_load_task_timeout_multiplier * 1000,
- taskKafkaProgress, isMultiTable());
+ taskKafkaProgress, isMultiTable(), -1, false);
routineLoadTaskInfoList.add(kafkaTaskInfo);
result.add(kafkaTaskInfo);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
index f1578269529..e3292dc671f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/KafkaTaskInfo.java
@@ -49,16 +49,17 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
private Map<Integer, Long> partitionIdToOffset;
public KafkaTaskInfo(UUID id, long jobId,
- long timeoutMs, Map<Integer, Long>
partitionIdToOffset, boolean isMultiTable) {
- super(id, jobId, timeoutMs, isMultiTable);
+ long timeoutMs, Map<Integer, Long>
partitionIdToOffset, boolean isMultiTable,
+ long lastScheduledTime, boolean isEof) {
+ super(id, jobId, timeoutMs, isMultiTable, lastScheduledTime, isEof);
this.partitionIdToOffset = partitionIdToOffset;
}
public KafkaTaskInfo(KafkaTaskInfo kafkaTaskInfo, Map<Integer, Long>
partitionIdToOffset, boolean isMultiTable) {
super(UUID.randomUUID(), kafkaTaskInfo.getJobId(),
- kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(),
isMultiTable);
+ kafkaTaskInfo.getTimeoutMs(), kafkaTaskInfo.getBeId(),
isMultiTable,
+ kafkaTaskInfo.getLastScheduledTime(),
kafkaTaskInfo.getIsEof());
this.partitionIdToOffset = partitionIdToOffset;
- this.isEof = kafkaTaskInfo.getIsEof();
}
public List<Integer> getPartitions() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
index 1ff825d97b9..5075311299d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadTaskInfo.java
@@ -79,17 +79,20 @@ public abstract class RoutineLoadTaskInfo {
// so that user or other logic can know the status of the corresponding
txn.
protected TransactionStatus txnStatus = TransactionStatus.UNKNOWN;
- public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean
isMultiTable) {
+ public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, boolean
isMultiTable,
+ long lastScheduledTime, boolean isEof) {
this.id = id;
this.jobId = jobId;
this.createTimeMs = System.currentTimeMillis();
this.timeoutMs = timeoutMs;
this.isMultiTable = isMultiTable;
+ this.lastScheduledTime = lastScheduledTime;
+ this.isEof = isEof;
}
public RoutineLoadTaskInfo(UUID id, long jobId, long timeoutMs, long
previousBeId,
- boolean isMultiTable) {
- this(id, jobId, timeoutMs, isMultiTable);
+ boolean isMultiTable, long lastScheduledTime,
boolean isEof) {
+ this(id, jobId, timeoutMs, isMultiTable, lastScheduledTime, isEof);
this.previousBeId = previousBeId;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
index 20cb626ff37..63452a5d59c 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/KafkaRoutineLoadJobTest.java
@@ -225,7 +225,7 @@ public class KafkaRoutineLoadJobTest {
Map<Integer, Long> partitionIdsToOffset = Maps.newHashMap();
partitionIdsToOffset.put(100, 0L);
KafkaTaskInfo kafkaTaskInfo = new KafkaTaskInfo(new UUID(1, 1), 1L,
- maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false);
+ maxBatchIntervalS * 2 * 1000, partitionIdsToOffset, false, -1,
false);
kafkaTaskInfo.setExecuteStartTimeMs(System.currentTimeMillis() -
maxBatchIntervalS * 2 * 1000 - 1);
routineLoadTaskInfoList.add(kafkaTaskInfo);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
index 95c2423de71..6e11fc5f71a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/routineload/RoutineLoadTaskSchedulerTest.java
@@ -70,7 +70,7 @@ public class RoutineLoadTaskSchedulerTest {
LinkedBlockingDeque<RoutineLoadTaskInfo> routineLoadTaskInfoQueue =
new LinkedBlockingDeque<>();
KafkaTaskInfo routineLoadTaskInfo1 = new KafkaTaskInfo(new UUID(1, 1),
1L, 20000,
- partitionIdToOffset, false);
+ partitionIdToOffset, false, -1, false);
routineLoadTaskInfoQueue.addFirst(routineLoadTaskInfo1);
Map<Long, RoutineLoadTaskInfo> idToRoutineLoadTask = Maps.newHashMap();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
index 420800a4bb3..c4ec468c651 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/transaction/GlobalTransactionMgrTest.java
@@ -302,7 +302,7 @@ public class GlobalTransactionMgrTest {
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
KafkaTaskInfo routineLoadTaskInfo = new
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
- partitionIdToOffset, false);
+ partitionIdToOffset, false, -1, false);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
TransactionState transactionState = new TransactionState(1L,
Lists.newArrayList(1L), 1L, "label", null,
@@ -368,7 +368,7 @@ public class GlobalTransactionMgrTest {
Map<Integer, Long> partitionIdToOffset = Maps.newHashMap();
partitionIdToOffset.put(1, 0L);
KafkaTaskInfo routineLoadTaskInfo = new
KafkaTaskInfo(UUID.randomUUID(), 1L, 20000,
- partitionIdToOffset, false);
+ partitionIdToOffset, false, -1, false);
Deencapsulation.setField(routineLoadTaskInfo, "txnId", 1L);
routineLoadTaskInfoList.add(routineLoadTaskInfo);
TransactionState transactionState = new TransactionState(1L,
Lists.newArrayList(1L), 1L, "label", null,
diff --git a/regression-test/suites/load_p0/routine_load/data/test_eof.csv
b/regression-test/suites/load_p0/routine_load/data/test_eof.csv
new file mode 100644
index 00000000000..bc857cabcfd
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_eof.csv
@@ -0,0 +1 @@
+57|2023-08-19|TRUE|2|-25462|-74112029|6458082754318544493|-7910671781690629051|-15205.859375|-306870797.484914|759730669.0|-628556336.0|2023-07-10
18:39:10|2023-02-12|2023-01-27
07:26:06|y||Xi9nDVrLv8m6AwEpUxmtzFAuK48sQ|{"name": "John", "age": 25, "city":
"New York"}
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
new file mode 100644
index 00000000000..6eeb9a4e51c
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/test_routine_load_eof.groovy
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_eof","p0") {
+ def kafkaCsvTpoics = [
+ "test_eof",
+ ]
+
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def thread = Thread.start {
+ // define kafka
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // Create kafka producer
+ def producer = new KafkaProducer<>(props)
+
+ while(true) {
+ Thread.sleep(1000)
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null,
line)
+ producer.send(record)
+ }
+ }
+ }
+ }
+
+ sleep(2 * 1000)
+
+ def jobName = "testEof"
+ def tableName = "test_routine_load_eof"
+ try {
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName}
+ (
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ k17 STRING NULL,
+ k18 JSON NULL,
+ kd01 BOOLEAN NOT NULL DEFAULT "TRUE",
+ kd02 TINYINT NOT NULL DEFAULT "1",
+ kd03 SMALLINT NOT NULL DEFAULT "2",
+ kd04 INT NOT NULL DEFAULT "3",
+ kd05 BIGINT NOT NULL DEFAULT "4",
+ kd06 LARGEINT NOT NULL DEFAULT "5",
+ kd07 FLOAT NOT NULL DEFAULT "6.0",
+ kd08 DOUBLE NOT NULL DEFAULT "7.0",
+ kd09 DECIMAL NOT NULL DEFAULT "888888888",
+ kd10 DECIMALV3 NOT NULL DEFAULT "999999999",
+ kd11 DATE NOT NULL DEFAULT "2023-08-24",
+ kd12 DATETIME NOT NULL DEFAULT "2023-08-24 12:00:00",
+ kd13 DATEV2 NOT NULL DEFAULT "2023-08-24",
+ kd14 DATETIMEV2 NOT NULL DEFAULT "2023-08-24 12:00:00",
+ kd15 CHAR(255) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd16 VARCHAR(300) NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd17 STRING NOT NULL DEFAULT "我能吞下玻璃而不伤身体",
+ kd18 JSON NULL,
+
+ INDEX idx_inverted_k104 (`k05`) USING INVERTED,
+ INDEX idx_inverted_k110 (`k11`) USING INVERTED,
+ INDEX idx_inverted_k113 (`k13`) USING INVERTED,
+ INDEX idx_inverted_k114 (`k14`) USING INVERTED,
+ INDEX idx_inverted_k117 (`k17`) USING INVERTED
PROPERTIES("parser" = "english"),
+ INDEX idx_ngrambf_k115 (`k15`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k116 (`k16`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+ INDEX idx_ngrambf_k117 (`k17`) USING NGRAM_BF
PROPERTIES("gram_size"="3", "bf_size"="256"),
+
+ INDEX idx_bitmap_k104 (`k02`) USING BITMAP,
+ INDEX idx_bitmap_k110 (`kd01`) USING BITMAP
+
+ )
+ DUPLICATE KEY(k00)
+ PARTITION BY RANGE(k01)
+ (
+ PARTITION p1 VALUES [('2023-08-01'), ('2023-08-11')),
+ PARTITION p2 VALUES [('2023-08-11'), ('2023-08-21')),
+ PARTITION p3 VALUES [('2023-08-21'), ('2023-09-01'))
+ )
+ DISTRIBUTED BY HASH(k00) BUCKETS 32
+ PROPERTIES (
+ "bloom_filter_columns"="k05",
+ "replication_num" = "1"
+ );
+ """
+ sql "sync"
+
+ sql """
+ CREATE ROUTINE LOAD ${jobName} on ${tableName}
+
COLUMNS(k00,k01,k02,k03,k04,k05,k06,k07,k08,k09,k10,k11,k12,k13,k14,k15,k16,k17,k18),
+ COLUMNS TERMINATED BY "|"
+ PROPERTIES
+ (
+ "max_batch_interval" = "5",
+ "max_batch_rows" = "300000",
+ "max_batch_size" = "209715200"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "test_eof",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ def count = 0
+ while (true) {
+ sleep(1000)
+ def res = sql "show routine load for ${jobName}"
+ def state = res[0][8].toString()
+ if (state != "RUNNING") {
+ count++
+ if (count > 60) {
+ assertEquals(1, 2)
+ }
+ continue;
+ }
+ break;
+ }
+ sleep(60 * 1000)
+ def res = sql "show routine load for ${jobName}"
+ def statistic = res[0][14].toString()
+ def json = parseJson(res[0][14])
+ log.info("routine load statistic:
${res[0][14].toString()}".toString())
+ if (json.committedTaskNum > 20) {
+ assertEquals(1, 2)
+ }
+ } finally {
+ sql "stop routine load for ${jobName}"
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ }
+ thread.interrupt()
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]