This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new b5abc5f5e2b branch-4.0: [fix](job) fix routine load job incorrectly 
cancelled on FE restart after swap table #61046 (#61128)
b5abc5f5e2b is described below

commit b5abc5f5e2b4269e432a80b9af9b6aae99c19722
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 9 14:15:32 2026 +0800

    branch-4.0: [fix](job) fix routine load job incorrectly cancelled on FE 
restart after swap table #61046 (#61128)
    
    Cherry-picked from #61046
    
    Co-authored-by: hui lai <[email protected]>
---
 .../doris/load/routineload/RoutineLoadJob.java     |  13 +++
 .../plans/commands/info/CreateRoutineLoadInfo.java |   4 +
 .../test_routine_load_swap_table_restart.groovy    | 104 +++++++++++++++++++++
 3 files changed, 121 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
index 33f38f733f8..3a5d95c9442 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java
@@ -1988,6 +1988,19 @@ public abstract class RoutineLoadJob
                 CreateRoutineLoadCommand command = (CreateRoutineLoadCommand) 
nereidsParser.parseSingle(
                         origStmt.originStmt);
                 CreateRoutineLoadInfo createRoutineLoadInfo = 
command.getCreateRoutineLoadInfo();
+                // If tableId is set, resolve the current table name by ID so 
that
+                // table rename / SWAP TABLE won't cause replay to fail with 
stale name in origStmt.
+                if (!isMultiTable && tableId != 0) {
+                    try {
+                        Database db = 
Env.getCurrentEnv().getInternalCatalog().getDb(dbId).orElse(null);
+                        if (db != null) {
+                            db.getTable(tableId).ifPresent(
+                                    table -> 
createRoutineLoadInfo.setTableName(table.getName()));
+                        }
+                    } catch (Exception ignored) {
+                        // fall through; let validate() surface the real error
+                    }
+                }
                 createRoutineLoadInfo.validate(ctx);
                 setRoutineLoadDesc(createRoutineLoadInfo.getRoutineLoadDesc());
             } finally {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
index 9a54bcaacad..f67ac2fd965 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateRoutineLoadInfo.java
@@ -278,6 +278,10 @@ public class CreateRoutineLoadInfo {
         return tableName;
     }
 
+    public void setTableName(String tableName) {
+        this.tableName = tableName;
+    }
+
     public String getDBName() {
         return dbName;
     }
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_swap_table_restart.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_swap_table_restart.groovy
new file mode 100644
index 00000000000..d51f9cff139
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_swap_table_restart.groovy
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.doris.regression.util.RoutineLoadTestUtils
+
+suite("test_routine_load_swap_table_restart", "docker") {
+    if (!RoutineLoadTestUtils.isKafkaTestEnabled(context)) {
+        return
+    }
+
+    def kafka_broker = RoutineLoadTestUtils.getKafkaBroker(context)
+    def producer = RoutineLoadTestUtils.createKafkaProducer(kafka_broker)
+    def topic = "test_routine_load_swap_table_restart"
+    def tblTest = "rl_swap_tbl_test"
+    def tblMain = "rl_swap_tbl_main"
+    def jobName = "test_rl_swap_table_job"
+
+    def options = new ClusterOptions()
+    options.setFeNum(1)
+    options.setBeNum(1)
+
+    docker(options) {
+        def runSql = { String q -> sql q }
+
+        sql "DROP TABLE IF EXISTS ${tblTest}"
+        sql "DROP TABLE IF EXISTS ${tblMain}"
+        def createTableSql = { name ->
+            """
+            CREATE TABLE IF NOT EXISTS ${name} (
+                `k1` int(20)  NULL,
+                `k2` string   NULL,
+                `v1` date     NULL,
+                `v2` string   NULL,
+                `v3` datetime NULL,
+                `v4` string   NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(`k1`)
+            DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+            PROPERTIES ("replication_num" = "1");
+            """
+        }
+        sql createTableSql(tblTest)
+        sql createTableSql(tblMain)
+
+        try {
+            sql """
+                CREATE ROUTINE LOAD ${jobName} ON ${tblTest}
+                COLUMNS TERMINATED BY ","
+                PROPERTIES ("max_batch_interval" = "5")
+                FROM KAFKA
+                (
+                    "kafka_broker_list"              = "${kafka_broker}",
+                    "kafka_topic"                    = "${topic}",
+                    "property.group.id"              = 
"test-swap-table-consumer-group",
+                    "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                );
+            """
+
+            // Send data and wait for routine load to be RUNNING with rows 
loaded
+            RoutineLoadTestUtils.sendTestDataToKafka(producer, [topic])
+            RoutineLoadTestUtils.waitForTaskFinish(runSql, jobName, tblTest, 0)
+
+            // SWAP tblMain with tblTest:
+            //   after swap, tableId of tblTest (the RL target) is now 
registered under tblMain name
+            sql "ALTER TABLE ${tblMain} REPLACE WITH TABLE ${tblTest} 
PROPERTIES('swap' = 'true')"
+            // DROP tblTest (now holds the original empty tblMain data)
+            sql "DROP TABLE IF EXISTS ${tblTest}"
+            logger.info("Swapped and dropped ${tblTest}")
+        } finally {
+            // Restart FE to trigger gsonPostProcess() replay
+            cluster.restartFrontends()
+            sleep(30000)
+            context.reconnectFe()
+            logger.info("FE restarted and reconnected")
+
+            def res = sql "SHOW ROUTINE LOAD FOR ${jobName}"
+            def stateAfterRestart = res[0][8].toString()
+            logger.info("Routine load state after restart: 
${stateAfterRestart}, reason: ${res[0][17]}")
+
+            assertNotEquals("CANCELLED", stateAfterRestart,
+                "Routine load must NOT be CANCELLED after FE restart following 
SWAP TABLE + DROP TABLE")
+
+            sql "STOP ROUTINE LOAD FOR ${jobName}"
+            sql "DROP TABLE IF EXISTS ${tblMain}"
+        }
+    }
+
+    producer.close()
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to