This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c9650ffcc2 [fix](nereids) fix can not create routine load job in 
follower node (#51535)
4c9650ffcc2 is described below

commit 4c9650ffcc23a1606ce61311a0ddb5020bd5a6e2
Author: hui lai <lai...@selectdb.com>
AuthorDate: Tue Jun 10 21:25:11 2025 +0800

    [fix](nereids) fix can not create routine load job in follower node (#51535)
    
    ### What problem does this PR solve?
    
    Can not create routine load job in follower node:
    ```
    CREATE ROUTINE LOAD lineitem ON lineitem
    FROM KAFKA
    (
           "kafka_broker_list" = "xxx",
           "kafka_topic" = "test"
    );
    ERROR 1105 (HY000): errCode = 2, detailMessage = 
java.lang.IllegalStateException
    ```
---
 .../apache/doris/nereids/parser/NereidsParser.java |  19 ++-
 .../test_routine_load_follower_fe.groovy           | 152 +++++++++++++++++++++
 2 files changed, 170 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java
index 3e4825cae40..4605ce314da 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java
@@ -36,6 +36,7 @@ import org.apache.doris.plugin.DialectConverterPlugin;
 import org.apache.doris.plugin.PluginMgr;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.GlobalVariable;
+import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.qe.SessionVariable;
 
 import com.google.common.collect.ImmutableMap;
@@ -281,9 +282,25 @@ public class NereidsParser {
         return parseMultiple(sql, null);
     }
 
+    /**
+     * parse multiple sql statements.
+     *
+     * @param sql sql string
+     * @param logicalPlanBuilder logical plan builder
+     * @return logical plan
+     */
     public List<Pair<LogicalPlan, StatementContext>> parseMultiple(String sql,
                                                                    @Nullable 
LogicalPlanBuilder logicalPlanBuilder) {
-        return parse(sql, logicalPlanBuilder, DorisParser::multiStatements);
+        List<Pair<LogicalPlan, StatementContext>> result = parse(sql, 
logicalPlanBuilder, DorisParser::multiStatements);
+        // ensure each StatementContext has complete OriginStatement 
information
+        for (int i = 0; i < result.size(); i++) {
+            Pair<LogicalPlan, StatementContext> pair = result.get(i);
+            StatementContext statementContext = pair.second;
+            if (statementContext.getOriginStatement() == null) {
+                statementContext.setOriginStatement(new OriginStatement(sql, 
i));
+            }
+        }
+        return result;
     }
 
     public Expression parseExpression(String expression) {
diff --git 
a/regression-test/suites/load_p0/routine_load/test_routine_load_follower_fe.groovy
 
b/regression-test/suites/load_p0/routine_load/test_routine_load_follower_fe.groovy
new file mode 100644
index 00000000000..06101f41046
--- /dev/null
+++ 
b/regression-test/suites/load_p0/routine_load/test_routine_load_follower_fe.groovy
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.doris.regression.suite.ClusterOptions
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_routine_load_follower_fe","docker") {
+    def options = new ClusterOptions()
+    // Configure 3 FE nodes cluster
+    options.setFeNum(3)
+    options.setBeNum(1)
+
+    docker(options) {
+        def kafkaCsvTpoics = [
+                  "test_routine_load_follower_fe",
+                ]
+        String enabled = context.config.otherConfigs.get("enableKafkaTest")
+        String kafka_port = context.config.otherConfigs.get("kafka_port")
+        String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+        def kafka_broker = "${externalEnvIp}:${kafka_port}"
+        
+        if (enabled != null && enabled.equalsIgnoreCase("true")) {
+            // 1. send data to kafka
+            def props = new Properties()
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"${kafka_broker}".toString())
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.StringSerializer")
+            def producer = new KafkaProducer<>(props)
+            
+            // Send test data to kafka topic
+            for (String kafkaCsvTopic in kafkaCsvTpoics) {
+                // Create simple test data
+                def testData = [
+                    "1,test_data_1,2023-01-01,value1,2023-01-01 
10:00:00,extra1",
+                    "2,test_data_2,2023-01-02,value2,2023-01-02 
11:00:00,extra2",
+                    "3,test_data_3,2023-01-03,value3,2023-01-03 
12:00:00,extra3",
+                    "4,test_data_4,2023-01-04,value4,2023-01-04 
13:00:00,extra4",
+                    "5,test_data_5,2023-01-05,value5,2023-01-05 
14:00:00,extra5"
+                ]
+                
+                testData.each { line ->
+                    logger.info("Sending data to kafka: ${line}")
+                    def record = new ProducerRecord<>(kafkaCsvTopic, null, 
line)
+                    producer.send(record)
+                }
+            }
+            producer.close()
+            
+            // 3. Connect to a follower FE and create table
+            def masterFe = cluster.getMasterFe()
+            def allFes = cluster.getAllFrontends()
+            def followerFes = allFes.findAll { fe -> fe.index != 
masterFe.index }
+            def followerFe = followerFes[0]
+            logger.info("Master FE: ${masterFe.host}")
+            logger.info("Using follower FE: ${followerFe.host}")
+            // Connect to follower FE
+            def url = String.format(
+                    
"jdbc:mysql://%s:%s/?useLocalSessionState=true&allowLoadLocalInfile=false",
+                    followerFe.host, followerFe.queryPort)
+            logger.info("Connecting to follower FE: ${url}")
+            context.connectTo(url, context.config.jdbcUser, 
context.config.jdbcPassword)
+
+            sql "drop database if exists test_routine_load_follower_fe"
+            sql "create database test_routine_load_follower_fe"
+            sql "use test_routine_load_follower_fe"
+            def tableName = "test_routine_load_follower_fe"
+            def job = "test_follower_routine_load"
+            sql """ DROP TABLE IF EXISTS ${tableName} """
+            sql """
+                CREATE TABLE IF NOT EXISTS ${tableName} (
+                    `k1` int(20) NULL,
+                    `k2` string NULL,
+                    `v1` date  NULL,
+                    `v2` string  NULL,
+                    `v3` datetime  NULL,
+                    `v4` string  NULL
+                ) ENGINE=OLAP
+                DUPLICATE KEY(`k1`)
+                COMMENT 'OLAP'
+                DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+                PROPERTIES ("replication_allocation" = "tag.location.default: 
1");
+            """
+
+            try {
+                // 4. Create routine load job on follower FE
+                sql """
+                    CREATE ROUTINE LOAD ${job} ON ${tableName}
+                    COLUMNS TERMINATED BY ","
+                    PROPERTIES
+                    (
+                        "max_batch_interval" = "20",
+                        "max_batch_rows" = "300000",
+                        "max_batch_size" = "209715200"
+                    )
+                    FROM KAFKA
+                    (
+                        "kafka_broker_list" = "${kafka_broker}",
+                        "kafka_topic" = "${kafkaCsvTpoics[0]}",
+                        "property.group.id" = "test-follower-consumer-group",
+                        "property.client.id" = "test-follower-client-id",
+                        "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+                    );
+                """
+
+                // 5. Wait for routine load to process data
+                def count = 0
+                def maxWaitCount = 60 // Wait up to 60 seconds
+                while (count < maxWaitCount) {
+                    def state = sql "show routine load for ${job}"
+                    def routineLoadState = state[0][8].toString()
+                    def statistic = state[0][14].toString()
+                    logger.info("Routine load state: ${routineLoadState}")
+                    logger.info("Routine load statistic: ${statistic}")
+                    
+                    def rowCount = sql "select count(*) from ${tableName}"
+                    // Check if routine load is running and has processed some 
data
+                    if (routineLoadState == "RUNNING" && rowCount[0][0] > 0) {
+                        break
+                    }
+                    
+                    sleep(1000)
+                    count++
+                }
+            } catch (Exception e) {
+                logger.error("Test failed with exception: ${e.message}")
+            } finally {
+                try {
+                    sql "stop routine load for ${job}"
+                } catch (Exception e) {
+                    logger.warn("Failed to stop routine load job: 
${e.message}")
+                }
+            }
+        }
+    }
+} 
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to