This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new b5abc5f5e2b branch-4.0: [fix](job) fix routine load job incorrectly
cancelled on FE restart after swap table #61046 (#61128)
b5abc5f5e2b is described below
commit b5abc5f5e2b4269e432a80b9af9b6aae99c19722
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 9 14:15:32 2026 +0800
branch-4.0: [fix](job) fix routine load job incorrectly cancelled on FE
restart after swap table #61046 (#61128)
Cherry-picked from #61046
Co-authored-by: hui lai <[email protected]>
---
.../doris/load/routineload/RoutineLoadJob.java | 13 +++
.../plans/commands/info/CreateRoutineLoadInfo.java | 4 +
.../test_routine_load_swap_table_restart.groovy | 104 +++++++++++++++++++++
3 files changed, 121 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 33f38f733f8..3a5d95c9442 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1988,6 +1988,19 @@ public abstract class RoutineLoadJob
CreateRoutineLoadCommand command = (CreateRoutineLoadCommand)
nereidsParser.parseSingle(
origStmt.originStmt);
CreateRoutineLoadInfo createRoutineLoadInfo =
command.getCreateRoutineLoadInfo();
+ // If tableId is set, resolve the current table name by ID so
that
+ // table rename / SWAP TABLE won't cause replay to fail with
stale name in origStmt.
+ if (!isMultiTable && tableId != 0) {
+ try {
+ Database db =
Env.getCurrentEnv().getInternalCatalog().getDb(dbId).orElse(null);
+ if (db != null) {
+ db.getTable(tableId).ifPresent(
+ table ->
createRoutineLoadInfo.setTableName(table.getName()));
+ }
+ } catch (Exception ignored) {
+ // fall through; let validate() surface the real error
+ }
+ }
createRoutineLoadInfo.validate(ctx);
setRoutineLoadDesc(createRoutineLoadInfo.getRoutineLoadDesc());
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index 9a54bcaacad..f67ac2fd965 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -278,6 +278,10 @@ public class CreateRoutineLoadInfo {
return tableName;
}
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
public String getDBName() {
return dbName;
}
diff --git
a/regression-test/suites/load_p0/routine_load/test_routine_load_swap_table_restart.groovy
b/regression-test/suites/load_p0/routine_load/test_routine_load_swap_table_restart.groovy
new file mode 100644
index 00000000000..d51f9cff139
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_routine_load_swap_table_restart.groovy
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+
+suite("test_routine_load_swap_table_restart", "docker") {
+ if (!RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+ return
+ }
+
+ def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+ def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+ def topic = "test_routine_load_swap_table_restart"
+ def tblTest = "rl_swap_tbl_test"
+ def tblMain = "rl_swap_tbl_main"
+ def jobName = "test_rl_swap_table_job"
+
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+ options.setBeNum(1)
+
+ docker(options) {
+ def runSql = { String q -> sql q }
+
+ sql "DROP TABLE IF EXISTS ${tblTest}"
+ sql "DROP TABLE IF EXISTS ${tblMain}"
+ def createTableSql = { name ->
+ """
+ CREATE TABLE IF NOT EXISTS ${name} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_num" = "1");
+ """
+ }
+ sql createTableSql(tblTest)
+ sql createTableSql(tblMain)
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName} ON ${tblTest}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES ("max_batch_interval" = "5")
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${kafka_broker}",
+ "kafka_topic" = "${topic}",
+ "property.group.id" =
"test-swap-table-consumer-group",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+
+ // Send data and wait for routine load to be RUNNING with rows
loaded
+ RoutineLoadTestUtils.sendTestDataToKafka(producer, [topic])
+ RoutineLoadTestUtils.waitForTaskFinish(runSql, jobName, tblTest, 0)
+
+ // SWAP tblMain with tblTest:
+ // after swap, tableId of tblTest (the RL target) is now
registered under tblMain name
+ sql "ALTER TABLE ${tblMain} REPLACE WITH TABLE ${tblTest}
PROPERTIES('swap' = 'true')"
+ // DROP tblTest (now holds the original empty tblMain data)
+ sql "DROP TABLE IF EXISTS ${tblTest}"
+ logger.info("Swapped and dropped ${tblTest}")
+ } finally {
+ // Restart FE to trigger gsonPostProcess() replay
+ cluster.restartFrontends()
+ sleep(30000)
+ context.reconnectFe()
+ logger.info("FE restarted and reconnected")
+
+ def res = sql "SHOW ROUTINE LOAD FOR ${jobName}"
+ def stateAfterRestart = res[0][8].toString()
+ logger.info("Routine load state after restart:
${stateAfterRestart}, reason: ${res[0][17]}")
+
+ assertNotEquals("CANCELLED", stateAfterRestart,
+ "Routine load must NOT be CANCELLED after FE restart following
SWAP TABLE + DROP TABLE")
+
+ sql "STOP ROUTINE LOAD FOR ${jobName}"
+ sql "DROP TABLE IF EXISTS ${tblMain}"
+ }
+ }
+
+ producer.close()
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]