This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e58fb2e2e4d branch-3.0: [fix](load) fix multi table load repeated
failures and retries when meet data quality error #49938 (#50026)
e58fb2e2e4d is described below
commit e58fb2e2e4d4678a738afc2e054a3f6e56b86921
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Apr 15 17:47:34 2025 +0800
branch-3.0: [fix](load) fix multi table load repeated failures and retries
when meet data quality error #49938 (#50026)
Cherry-picked from #49938
Co-authored-by: hui lai <[email protected]>
---
be/src/io/fs/multi_table_pipe.cpp | 25 +++--
be/src/io/fs/multi_table_pipe.h | 1 +
.../apache/doris/planner/StreamLoadPlanner.java | 7 +-
.../test_multi_table_load_data_quality_error.out | Bin 0 -> 111 bytes
.../data/multi_table_load_data_quality.csv | 2 +
...test_multi_table_load_data_quality_error.groovy | 120 +++++++++++++++++++++
6 files changed, 141 insertions(+), 14 deletions(-)
diff --git a/be/src/io/fs/multi_table_pipe.cpp
b/be/src/io/fs/multi_table_pipe.cpp
index 463f002596a..5e27ef35ab6 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -193,6 +193,7 @@ Status MultiTablePipe::request_and_exec_plans() {
request.__set_memtable_on_sink_node(_ctx->memtable_on_sink_node);
request.__set_user(_ctx->qualified_user);
request.__set_cloud_cluster(_ctx->cloud_cluster);
+ request.__set_max_filter_ratio(1.0);
// no need to register new_load_stream_mgr coz it is already done in
routineload submit task
// plan this load
@@ -271,21 +272,19 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
std::vector<ExecParam> para
_number_loaded_rows += state->num_rows_load_success();
_number_filtered_rows += state->num_rows_load_filtered();
_number_unselected_rows +=
state->num_rows_load_unselected();
- _ctx->error_url =
to_load_error_http_path(state->get_error_log_file_path());
- // check filtered ratio for this plan fragment
- int64_t num_selected_rows =
- state->num_rows_load_total() -
state->num_rows_load_unselected();
- if (num_selected_rows > 0 &&
- (double)state->num_rows_load_filtered() /
num_selected_rows >
- _ctx->max_filter_ratio) {
- *status = Status::DataQualityError("too many filtered
rows");
- }
// if any of the plan fragment exec failed, set the status
to the first failed plan
- if (!status->ok()) {
- LOG(WARNING)
- << "plan fragment exec failed. errmsg=" <<
*status << _ctx->brief();
- _status = *status;
+ {
+ std::lock_guard<std::mutex> l(_callback_lock);
+ if (!state->get_error_log_file_path().empty()) {
+ _ctx->error_url =
+
to_load_error_http_path(state->get_error_log_file_path());
+ }
+ if (!status->ok()) {
+ LOG(WARNING) << "plan fragment exec failed.
errmsg=" << *status
+ << _ctx->brief();
+ _status = *status;
+ }
}
auto inflight_cnt = _inflight_cnt.fetch_sub(1);
diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h
index f1d2e523652..fc1da272f12 100644
--- a/be/src/io/fs/multi_table_pipe.h
+++ b/be/src/io/fs/multi_table_pipe.h
@@ -95,6 +95,7 @@ private:
std::atomic<int64_t> _number_unselected_rows {0};
std::mutex _pipe_map_lock;
+ std::mutex _callback_lock;
std::unordered_map<TUniqueId /*instance id*/,
std::shared_ptr<io::StreamLoadPipe>> _pipe_map;
uint32_t _row_threshold = config::multi_table_batch_plan_threshold;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index 6e830f2fd64..5f6c448b930 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -45,6 +45,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.loadv2.LoadTask;
+import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.LoadTaskInfo;
@@ -365,7 +366,11 @@ public class StreamLoadPlanner {
queryGlobals.setNowString(TimeUtils.getDatetimeFormatWithTimeZone().format(LocalDateTime.now()));
queryGlobals.setTimestampMs(System.currentTimeMillis());
queryGlobals.setTimeZone(taskInfo.getTimezone());
- queryGlobals.setLoadZeroTolerance(taskInfo.getMaxFilterRatio() <= 0.0);
+ if (taskInfo instanceof RoutineLoadJob) {
+ queryGlobals.setLoadZeroTolerance(false);
+ } else {
+ queryGlobals.setLoadZeroTolerance(taskInfo.getMaxFilterRatio() <=
0.0);
+ }
queryGlobals.setNanoSeconds(LocalDateTime.now().getNano());
params.setQueryGlobals(queryGlobals);
diff --git
a/regression-test/data/load_p0/routine_load/test_multi_table_load_data_quality_error.out
b/regression-test/data/load_p0/routine_load/test_multi_table_load_data_quality_error.out
new file mode 100644
index 00000000000..343c45fa998
Binary files /dev/null and
b/regression-test/data/load_p0/routine_load/test_multi_table_load_data_quality_error.out
differ
diff --git
a/regression-test/suites/load_p0/routine_load/data/multi_table_load_data_quality.csv
b/regression-test/suites/load_p0/routine_load/data/multi_table_load_data_quality.csv
new file mode 100644
index 00000000000..b74967971d6
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/data/multi_table_load_data_quality.csv
@@ -0,0 +1,2 @@
+test_multi_table_load_data_quality|1,a
+test_multi_table_load_data_quality_error|a,a
\ No newline at end of file
diff --git
a/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
new file mode 100644
index 00000000000..549dbdf3f59
--- /dev/null
+++
b/regression-test/suites/load_p0/routine_load/test_multi_table_load_data_quality_error.groovy
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.apache.kafka.clients.admin.AdminClient
+import org.apache.kafka.clients.producer.KafkaProducer
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.ProducerConfig
+
+suite("test_multi_table_load_data_quality_error","p0") {
+ def kafkaCsvTpoics = [
+ "multi_table_load_data_quality",
+ ]
+ String enabled = context.config.otherConfigs.get("enableKafkaTest")
+ String kafka_port = context.config.otherConfigs.get("kafka_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ def kafka_broker = "${externalEnvIp}:${kafka_port}"
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ // define kafka
+ def props = new Properties()
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"${kafka_broker}".toString())
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
+ // Create kafka producer
+ def producer = new KafkaProducer<>(props)
+
+ for (String kafkaCsvTopic in kafkaCsvTpoics) {
+ def txt = new
File("""${context.file.parent}/data/${kafkaCsvTopic}.csv""").text
+ def lines = txt.readLines()
+ lines.each { line ->
+ logger.info("=====${line}========")
+ def record = new ProducerRecord<>(kafkaCsvTopic, null, line)
+ producer.send(record)
+ }
+ }
+ }
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ def tableName = "test_multi_table_load_data_quality"
+ def tableName1 = "test_multi_table_load_data_quality_error"
+ def jobName = "test_multi_table_load_data_quality_error"
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """ DROP TABLE IF EXISTS ${tableName1} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName1} (
+ `k1` int(20) NULL,
+ `k2` string NULL,
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`k1`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 3
+ PROPERTIES ("replication_allocation" = "tag.location.default: 1");
+ """
+
+ try {
+ sql """
+ CREATE ROUTINE LOAD ${jobName}
+ COLUMNS TERMINATED BY ","
+ PROPERTIES
+ (
+ "strict_mode" = "true"
+ )
+ FROM KAFKA
+ (
+ "kafka_broker_list" = "${externalEnvIp}:${kafka_port}",
+ "kafka_topic" = "${kafkaCsvTpoics[0]}",
+ "property.kafka_default_offsets" = "OFFSET_BEGINNING"
+ );
+ """
+ sql "sync"
+
+ def count = 0
+ while (true) {
+ def res = sql "select count(*) from ${tableName}"
+ def state = sql "show routine load for ${jobName}"
+ log.info("routine load state:
${state[0][8].toString()}".toString())
+ log.info("routine load statistic:
${state[0][14].toString()}".toString())
+ log.info("reason of state changed:
${state[0][17].toString()}".toString())
+ log.info("error url: ${state[0][18].toString()}".toString())
+ if (res[0][0] > 0 && state[0][18].toString() != "") {
+ break
+ }
+ if (count >= 120) {
+ log.error("routine load can not visible for long time")
+ assertEquals(20, res[0][0])
+ break
+ }
+ sleep(1000)
+ count++
+ }
+ qt_sql "select * from ${tableName} order by k1"
+
+ } finally {
+ sql "stop routine load for ${jobName}"
+ }
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]