This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4c9650ffcc2 [fix](nereids) fix can not create routine load job in follower node (#51535) 4c9650ffcc2 is described below commit 4c9650ffcc23a1606ce61311a0ddb5020bd5a6e2 Author: hui lai <lai...@selectdb.com> AuthorDate: Tue Jun 10 21:25:11 2025 +0800 [fix](nereids) fix can not create routine load job in follower node (#51535) ### What problem does this PR solve? Can not create routine load job in follower node: ``` CREATE ROUTINE LOAD lineitem ON lineitem FROM KAFKA ( "kafka_broker_list" = "xxx", "kafka_topic" = "test" ); ERROR 1105 (HY000): errCode = 2, detailMessage = java.lang.IllegalStateException ``` --- .../apache/doris/nereids/parser/NereidsParser.java | 19 ++- .../test_routine_load_follower_fe.groovy | 152 +++++++++++++++++++++ 2 files changed, 170 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java index 3e4825cae40..4605ce314da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/NereidsParser.java @@ -36,6 +36,7 @@ import org.apache.doris.plugin.DialectConverterPlugin; import org.apache.doris.plugin.PluginMgr; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.GlobalVariable; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.qe.SessionVariable; import com.google.common.collect.ImmutableMap; @@ -281,9 +282,25 @@ public class NereidsParser { return parseMultiple(sql, null); } + /** + * parse multiple sql statements. + * + * @param sql sql string + * @param logicalPlanBuilder logical plan builder + * @return logical plan + */ public List<Pair<LogicalPlan, StatementContext>> parseMultiple(String sql, @Nullable LogicalPlanBuilder logicalPlanBuilder) { - return parse(sql, logicalPlanBuilder, DorisParser::multiStatements); + List<Pair<LogicalPlan, StatementContext>> result = parse(sql, logicalPlanBuilder, DorisParser::multiStatements); + // ensure each StatementContext has complete OriginStatement information + for (int i = 0; i < result.size(); i++) { + Pair<LogicalPlan, StatementContext> pair = result.get(i); + StatementContext statementContext = pair.second; + if (statementContext.getOriginStatement() == null) { + statementContext.setOriginStatement(new OriginStatement(sql, i)); + } + } + return result; } public Expression parseExpression(String expression) { diff --git a/regression-test/suites/load_p0/routine_load/test_routine_load_follower_fe.groovy b/regression-test/suites/load_p0/routine_load/test_routine_load_follower_fe.groovy new file mode 100644 index 00000000000..06101f41046 --- /dev/null +++ b/regression-test/suites/load_p0/routine_load/test_routine_load_follower_fe.groovy @@ -0,0 +1,152 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.apache.kafka.clients.admin.AdminClient +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.ProducerConfig + +suite("test_routine_load_follower_fe","docker") { + def options = new ClusterOptions() + // Configure 3 FE nodes cluster + options.setFeNum(3) + options.setBeNum(1) + + docker(options) { + def kafkaCsvTpoics = [ + "test_routine_load_follower_fe", + ] + String enabled = context.config.otherConfigs.get("enableKafkaTest") + String kafka_port = context.config.otherConfigs.get("kafka_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + def kafka_broker = "${externalEnvIp}:${kafka_port}" + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + // 1. send data to kafka + def props = new Properties() + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "${kafka_broker}".toString()) + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") + def producer = new KafkaProducer<>(props) + + // Send test data to kafka topic + for (String kafkaCsvTopic in kafkaCsvTpoics) { + // Create simple test data + def testData = [ + "1,test_data_1,2023-01-01,value1,2023-01-01 10:00:00,extra1", + "2,test_data_2,2023-01-02,value2,2023-01-02 11:00:00,extra2", + "3,test_data_3,2023-01-03,value3,2023-01-03 12:00:00,extra3", + "4,test_data_4,2023-01-04,value4,2023-01-04 13:00:00,extra4", + "5,test_data_5,2023-01-05,value5,2023-01-05 14:00:00,extra5" + ] + + testData.each { line -> + logger.info("Sending data to kafka: ${line}") + def record = new ProducerRecord<>(kafkaCsvTopic, null, line) + producer.send(record) + } + } + producer.close() + + // 3. Connect to a follower FE and create table + def masterFe = cluster.getMasterFe() + def allFes = cluster.getAllFrontends() + def followerFes = allFes.findAll { fe -> fe.index != masterFe.index } + def followerFe = followerFes[0] + logger.info("Master FE: ${masterFe.host}") + logger.info("Using follower FE: ${followerFe.host}") + // Connect to follower FE + def url = String.format( + "jdbc:mysql://%s:%s/?useLocalSessionState=true&allowLoadLocalInfile=false", + followerFe.host, followerFe.queryPort) + logger.info("Connecting to follower FE: ${url}") + context.connectTo(url, context.config.jdbcUser, context.config.jdbcPassword) + + sql "drop database if exists test_routine_load_follower_fe" + sql "create database test_routine_load_follower_fe" + sql "use test_routine_load_follower_fe" + def tableName = "test_routine_load_follower_fe" + def job = "test_follower_routine_load" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` string NULL, + `v1` date NULL, + `v2` string NULL, + `v3` datetime NULL, + `v4` string NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + + try { + // 4. Create routine load job on follower FE + sql """ + CREATE ROUTINE LOAD ${job} ON ${tableName} + COLUMNS TERMINATED BY "," + PROPERTIES + ( + "max_batch_interval" = "20", + "max_batch_rows" = "300000", + "max_batch_size" = "209715200" + ) + FROM KAFKA + ( + "kafka_broker_list" = "${kafka_broker}", + "kafka_topic" = "${kafkaCsvTpoics[0]}", + "property.group.id" = "test-follower-consumer-group", + "property.client.id" = "test-follower-client-id", + "property.kafka_default_offsets" = "OFFSET_BEGINNING" + ); + """ + + // 5. Wait for routine load to process data + def count = 0 + def maxWaitCount = 60 // Wait up to 60 seconds + while (count < maxWaitCount) { + def state = sql "show routine load for ${job}" + def routineLoadState = state[0][8].toString() + def statistic = state[0][14].toString() + logger.info("Routine load state: ${routineLoadState}") + logger.info("Routine load statistic: ${statistic}") + + def rowCount = sql "select count(*) from ${tableName}" + // Check if routine load is running and has processed some data + if (routineLoadState == "RUNNING" && rowCount[0][0] > 0) { + break + } + + sleep(1000) + count++ + } + } catch (Exception e) { + logger.error("Test failed with exception: ${e.message}") + } finally { + try { + sql "stop routine load for ${job}" + } catch (Exception e) { + logger.warn("Failed to stop routine load job: ${e.message}") + } + } + } + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org