This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new df81ddbf011 branch-3.0: [fix](load) disable fetching meta request to
disable_load or decommissioned BE node #50421 (#50516)
df81ddbf011 is described below
commit df81ddbf011ea68a3a5ed67e6f35aa43e56b9e70
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 29 15:09:31 2025 +0800
branch-3.0: [fix](load) disable fetching meta request to disable_load or
decommissioned BE node #50421 (#50516)
Cherry-picked from #50421
Co-authored-by: hui lai <[email protected]>
---
.../apache/doris/datasource/kafka/KafkaUtil.java | 8 +-
.../routine_load/data/test_disable_load.csv | 1 +
.../load_p0/routine_load/test_disable_load.groovy | 106 +++++++++++++++++++++
3 files changed, 114 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
index 293f0875ac0..da683cc2b37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/kafka/KafkaUtil.java
@@ -229,7 +229,13 @@ public class KafkaUtil {
InternalService.PProxyResult result = null;
try {
while (retryTimes < 3) {
- List<Long> backendIds =
Env.getCurrentSystemInfo().getAllBackendIds(true);
+ List<Long> backendIds = new ArrayList<>();
+ for (Long beId :
Env.getCurrentSystemInfo().getAllBackendIds(true)) {
+ Backend backend =
Env.getCurrentSystemInfo().getBackend(beId);
+ if (backend != null && backend.isLoadAvailable() &&
!backend.isDecommissioned()) {
+ backendIds.add(beId);
+ }
+ }
if (backendIds.isEmpty()) {
MetricRepo.COUNTER_ROUTINE_LOAD_GET_META_FAIL_COUNT.increase(1L);
throw new LoadException("Failed to get info. No alive
backends");
diff --git
a/regression-test/suites/load_p0/routine_load/data/test_disable_load.csv
b/regression-test/suites/load_p0/routine_load/data/test_disable_load.csv
new file mode 100644
index 00000000000..b226b99ee4e
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/data/test_disable_load.csv
@@ -0,0 +1 @@
+1,eab,2023-07-15,def,2023-07-20:05:48:31,"ghi"
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_disable_load.groovy
b/regression-test/suites/load_p0/routine_load/test_disable_load.groovy
new file mode 100644
index 00000000000..fbac3393849
--- /dev/null
+++ b/regression-test/suites/load_p0/routine_load/test_disable_load.groovy
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_disable_load","nonConcurrent,p0") {
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // 1. send data
+ def kafkaCsvTpoics = [
+ "test_disable_load",
+ ]
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ def producer = new KafkaProducer<>(props)
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+
+ // 2. create table and routine load job
+ def tableName = "test_disable_load"
+ def job = "test_disable_load"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ `v1` date NULL,
+ `v2` string NULL,
+ `v3` datetime NULL,
+ `v4` string NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ def backends = sql_return_maparray('show backends')
+ def beId = backends[0].BackendId
+ try {
+ sql """ALTER SYSTEM MODIFY BACKEND "${beId}" SET ("disable_load" =
"true"); """
+ sql """
+ CREATE ROUTINE LOAD ${job} ON ${tableName}
+ COLUMNS TERMINATED BY ","
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}"
+ );
+ """
+
+ // 3. check error info
+ def count = 0
+ def res = 0
+ while (true) {
+ res = sql "select count(*) from ${tableName}"
+ log.info("res: ${res}")
+ def state = sql "show routine load for ${job}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ if (state[0][17].toString().contains("Failed to get info. No
alive backends")) {
+ break
+ }
+ if (count >= 60) {
+ log.error("routine load test fail")
+ assertEquals(1, 2)
+ break
+ }
+ sleep(1000)
+ count++
+ }
+ } finally {
+ sql """ALTER SYSTEM MODIFY BACKEND "${beId}" SET ("disable_load" =
"fasle"); """
+ sql "stop routine load for ${job}"
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]