This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b93dd1d5f74 [enhancement](load) improve error msg for load when
cancelled by mem gc (#26809)
b93dd1d5f74 is described below
commit b93dd1d5f7469fa5775909a98ca0c1d273ae7c01
Author: zhengyu <[email protected]>
AuthorDate: Tue Nov 28 17:36:11 2023 +0800
[enhancement](load) improve error msg for load when cancelled by mem gc
(#26809)
Signed-off-by: freemandealer <[email protected]>
---
be/src/runtime/plan_fragment_executor.cpp | 2 +-
be/src/runtime/runtime_state.cpp | 4 +
be/src/runtime/runtime_state.h | 23 ++++--
be/src/vec/sink/writer/vtablet_writer.cpp | 10 +++
.../data/fault_injection_p0/baseall.txt | 16 ++++
.../test_mem_gc_when_load_fault_injection.groovy | 90 ++++++++++++++++++++++
6 files changed, 137 insertions(+), 8 deletions(-)
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 2fbd7d53c48..671c38b15c6 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -596,7 +596,7 @@ void PlanFragmentExecutor::cancel(const
PPlanFragmentCancelReason& reason, const
_is_report_on_cancel = false;
}
_cancel_msg = msg;
- _runtime_state->set_is_cancelled(true, msg);
+ _runtime_state->set_is_cancelled(msg);
// To notify wait_for_start()
_query_ctx->set_ready_to_execute(true);
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index a8027d0d61b..c6df2daff0d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -318,6 +318,10 @@ bool RuntimeState::is_cancelled() const {
return _is_cancelled.load() || (_query_ctx && _query_ctx->is_cancelled());
}
+std::string RuntimeState::cancel_reason() const {
+ return _cancel_reason;
+}
+
Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) {
{
std::lock_guard<std::mutex> l(_process_status_lock);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 51778a54bc9..b8ab49ff276 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -175,14 +175,22 @@ public:
void get_unreported_errors(std::vector<std::string>* new_errors);
[[nodiscard]] bool is_cancelled() const;
+ std::string cancel_reason() const;
int codegen_level() const { return _query_options.codegen_level; }
- void set_is_cancelled(bool v, std::string msg) {
- _is_cancelled.store(v);
- // Create a error status, so that we could print error stack, and
- // we could know which path call cancel.
- LOG(WARNING) << "Task is cancelled, instance: "
- << PrintInstanceStandardInfo(_query_id,
_fragment_instance_id)
- << " st = " << Status::Error<ErrorCode::CANCELLED>(msg);
+ void set_is_cancelled(std::string msg) {
+ if (!_is_cancelled.exchange(true)) {
+ _cancel_reason = msg;
+ // Create a error status, so that we could print error stack, and
+ // we could know which path call cancel.
+ LOG(WARNING) << "Task is cancelled, instance: "
+ << PrintInstanceStandardInfo(_query_id,
_fragment_instance_id)
+ << ", st = " <<
Status::Error<ErrorCode::CANCELLED>(msg);
+ } else {
+ LOG(WARNING) << "Task is already cancelled, instance: "
+ << PrintInstanceStandardInfo(_query_id,
_fragment_instance_id)
+ << ", original cancel msg: " << _cancel_reason
+ << ", new cancel msg: " <<
Status::Error<ErrorCode::CANCELLED>(msg);
+ }
}
void set_backend_id(int64_t backend_id) { _backend_id = backend_id; }
@@ -557,6 +565,7 @@ private:
// if true, execution should stop with a CANCELLED status
std::atomic<bool> _is_cancelled;
+ std::string _cancel_reason;
int _per_fragment_instance_idx;
int _num_per_fragment_instances = 0;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index c85e5520287..c61248b1b27 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -73,8 +73,10 @@
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
+#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
+#include "util/mem_info.h"
#include "util/network_util.h"
#include "util/proto_util.h"
#include "util/ref_count_closure.h"
@@ -537,6 +539,9 @@ Status VNodeChannel::add_block(vectorized::Block* block,
const Payload* payload,
int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
std::unique_ptr<ThreadPoolToken>&
thread_pool_token) {
+ DBUG_EXECUTE_IF("VNodeChannel.try_send_and_fetch_status_full_gc",
+ { MemInfo::process_full_gc(); });
+
if (_cancelled || _send_finished) { // not run
return 0;
}
@@ -856,6 +861,7 @@ bool VNodeChannel::is_send_data_rpc_done() const {
}
Status VNodeChannel::close_wait(RuntimeState* state) {
+ DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", {
MemInfo::process_full_gc(); });
SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
// set _is_closed to true finally
Defer set_closed {[&]() {
@@ -881,6 +887,10 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
}
_close_time_ms = UnixMillis() - _close_time_ms;
+ if (_cancelled || state->is_cancelled()) {
+ _cancel_with_msg(state->cancel_reason());
+ }
+
if (_add_batches_finished) {
_close_check();
state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
diff --git a/regression-test/data/fault_injection_p0/baseall.txt
b/regression-test/data/fault_injection_p0/baseall.txt
new file mode 100644
index 00000000000..72bbfe98aaf
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/baseall.txt
@@ -0,0 +1,16 @@
+0,1,1989,1001,11011902,123.123,true,1989-03-21,1989-03-21
13:00:00,wangjuoo4,0.1,6.333,string12345,170141183460469231731687303715884105727
+0,2,1986,1001,11011903,1243.5,false,1901-12-31,1989-03-21
13:00:00,wangynnsf,20.268,789.25,string12345,-170141183460469231731687303715884105727
+0,3,1989,1002,11011905,24453.325,false,2012-03-14,2000-01-01
00:00:00,yunlj8@nk,78945,3654.0,string12345,0
+0,4,1991,3021,-11011907,243243.325,false,3124-10-10,2015-03-13
10:30:00,yanvjldjlll,2.06,-0.001,string12345,20220101
+0,5,1985,5014,-11011903,243.325,true,2015-01-01,2015-03-13
12:36:38,du3lnvl,-0.000,-365,string12345,20220102
+0,6,32767,3021,123456,604587.000,true,2014-11-11,2015-03-13
12:36:38,yanavnd,0.1,80699,string12345,20220104
+0,7,-32767,1002,7210457,3.141,false,1988-03-21,1901-01-01
00:00:00,jiw3n4,0.0,6058,string12345,-20220101
+1,8,255,2147483647,11011920,-0.123,true,1989-03-21,9999-11-11
12:12:00,wangjuoo5,987456.123,12.14,string12345,-2022
+1,9,1991,-2147483647,11011902,-654.654,true,1991-08-11,1989-03-21
13:11:00,wangjuoo4,0.000,69.123,string12345,11011903
+1,10,1991,5014,9223372036854775807,-258.369,false,2015-04-02,2013-04-02
15:16:52,wangynnsf,-123456.54,0.235,string12345,-11011903
+1,11,1989,25699,-9223372036854775807,0.666,true,2015-04-02,1989-03-21
13:11:00,yunlj8@nk,-987.001,4.336,string12345,1701411834604692317316873037158
+1,12,32767,-2147483647,9223372036854775807,243.325,false,1991-08-11,2013-04-02
15:16:52,lifsno,-564.898,3.141592654,string12345,1701604692317316873037158
+1,13,-32767,2147483647,-9223372036854775807,100.001,false,2015-04-02,2015-04-02
00:00:00,wenlsfnl,123.456,3.141592653,string12345,701411834604692317316873037158
+1,14,255,103,11011902,-0.000,false,2015-04-02,2015-04-02 00:00:00,
,3.141592654,2.036,string12345,701411834604692317316873
+1,15,1992,3021,11011920,0.00,true,9999-12-12,2015-04-02
00:00:00,,3.141592653,20.456,string12345,701411834604692317
+\N,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
diff --git
a/regression-test/suites/fault_injection_p0/test_mem_gc_when_load_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_mem_gc_when_load_fault_injection.groovy
new file mode 100644
index 00000000000..99ae467ff4d
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_mem_gc_when_load_fault_injection.groovy
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("mem_gc_when_load") {
+ // init query case data
+ sql """
+ CREATE TABLE IF NOT EXISTS `baseall` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ """
+ sql """
+ CREATE TABLE IF NOT EXISTS `test` (
+ `k0` boolean null comment "",
+ `k1` tinyint(4) null comment "",
+ `k2` smallint(6) null comment "",
+ `k3` int(11) null comment "",
+ `k4` bigint(20) null comment "",
+ `k5` decimal(9, 3) null comment "",
+ `k6` char(5) null comment "",
+ `k10` date null comment "",
+ `k11` datetime null comment "",
+ `k7` varchar(20) null comment "",
+ `k8` double max null comment "",
+ `k9` float sum null comment "",
+ `k12` string replace_if_not_null null comment "",
+ `k13` largeint(40) replace null comment ""
+ ) engine=olap
+ DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+ """
+
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ streamLoad {
+ table "baseall"
+ db "regression_test_fault_injection_p0"
+ set 'column_separator', ','
+ file "baseall.txt"
+ }
+
+ try {
+ // let the gc cancel the load
+
GetDebugPoint().enableDebugPointForAllBEs("VNodeChannel.try_send_and_fetch_status_full_gc")
+ sql "insert into test select * from baseall where k1 <= 3"
+ } catch(Exception e) {
+ assertTrue(e.getMessage().contains("Process has no memory available"))
// the msg should contain the root cause
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("VNodeChannel.try_send_and_fetch_status_full_gc")
+ }
+
+ try {
+ // let the gc cancel the load when load is close_wait
+
GetDebugPoint().enableDebugPointForAllBEs("VNodeChannel.close_wait_full_gc")
+ sql "insert into test select * from baseall where k1 <= 3"
+ } catch(Exception e) {
+ assertTrue(e.getMessage().contains("Process has no memory available"))
// the msg should contain the root cause
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("VNodeChannel.close_wait_full_gc")
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]