This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b93dd1d5f74 [enhancement](load) improve error msg for load when 
cancelled by mem gc (#26809)
b93dd1d5f74 is described below

commit b93dd1d5f7469fa5775909a98ca0c1d273ae7c01
Author: zhengyu <[email protected]>
AuthorDate: Tue Nov 28 17:36:11 2023 +0800

    [enhancement](load) improve error msg for load when cancelled by mem gc 
(#26809)
    
    
    Signed-off-by: freemandealer <[email protected]>
---
 be/src/runtime/plan_fragment_executor.cpp          |  2 +-
 be/src/runtime/runtime_state.cpp                   |  4 +
 be/src/runtime/runtime_state.h                     | 23 ++++--
 be/src/vec/sink/writer/vtablet_writer.cpp          | 10 +++
 .../data/fault_injection_p0/baseall.txt            | 16 ++++
 .../test_mem_gc_when_load_fault_injection.groovy   | 90 ++++++++++++++++++++++
 6 files changed, 137 insertions(+), 8 deletions(-)

diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 2fbd7d53c48..671c38b15c6 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -596,7 +596,7 @@ void PlanFragmentExecutor::cancel(const 
PPlanFragmentCancelReason& reason, const
         _is_report_on_cancel = false;
     }
     _cancel_msg = msg;
-    _runtime_state->set_is_cancelled(true, msg);
+    _runtime_state->set_is_cancelled(msg);
     // To notify wait_for_start()
     _query_ctx->set_ready_to_execute(true);
 
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index a8027d0d61b..c6df2daff0d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -318,6 +318,10 @@ bool RuntimeState::is_cancelled() const {
     return _is_cancelled.load() || (_query_ctx && _query_ctx->is_cancelled());
 }
 
+std::string RuntimeState::cancel_reason() const {
+    return _cancel_reason;
+}
+
 Status RuntimeState::set_mem_limit_exceeded(const std::string& msg) {
     {
         std::lock_guard<std::mutex> l(_process_status_lock);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 51778a54bc9..b8ab49ff276 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -175,14 +175,22 @@ public:
     void get_unreported_errors(std::vector<std::string>* new_errors);
 
     [[nodiscard]] bool is_cancelled() const;
+    std::string cancel_reason() const;
     int codegen_level() const { return _query_options.codegen_level; }
-    void set_is_cancelled(bool v, std::string msg) {
-        _is_cancelled.store(v);
-        // Create a error status, so that we could print error stack, and
-        // we could know which path call cancel.
-        LOG(WARNING) << "Task is cancelled, instance: "
-                     << PrintInstanceStandardInfo(_query_id, 
_fragment_instance_id)
-                     << " st = " << Status::Error<ErrorCode::CANCELLED>(msg);
+    void set_is_cancelled(std::string msg) {
+        if (!_is_cancelled.exchange(true)) {
+            _cancel_reason = msg;
+            // Create a error status, so that we could print error stack, and
+            // we could know which path call cancel.
+            LOG(WARNING) << "Task is cancelled, instance: "
+                         << PrintInstanceStandardInfo(_query_id, 
_fragment_instance_id)
+                         << ", st = " << 
Status::Error<ErrorCode::CANCELLED>(msg);
+        } else {
+            LOG(WARNING) << "Task is already cancelled, instance: "
+                         << PrintInstanceStandardInfo(_query_id, 
_fragment_instance_id)
+                         << ", original cancel msg: " << _cancel_reason
+                         << ", new cancel msg: " << 
Status::Error<ErrorCode::CANCELLED>(msg);
+        }
     }
 
     void set_backend_id(int64_t backend_id) { _backend_id = backend_id; }
@@ -557,6 +565,7 @@ private:
 
     // if true, execution should stop with a CANCELLED status
     std::atomic<bool> _is_cancelled;
+    std::string _cancel_reason;
 
     int _per_fragment_instance_idx;
     int _num_per_fragment_instances = 0;
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index c85e5520287..c61248b1b27 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -73,8 +73,10 @@
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "service/backend_options.h"
+#include "util/debug_points.h"
 #include "util/defer_op.h"
 #include "util/doris_metrics.h"
+#include "util/mem_info.h"
 #include "util/network_util.h"
 #include "util/proto_util.h"
 #include "util/ref_count_closure.h"
@@ -537,6 +539,9 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload,
 
 int VNodeChannel::try_send_and_fetch_status(RuntimeState* state,
                                             std::unique_ptr<ThreadPoolToken>& 
thread_pool_token) {
+    DBUG_EXECUTE_IF("VNodeChannel.try_send_and_fetch_status_full_gc",
+                    { MemInfo::process_full_gc(); });
+
     if (_cancelled || _send_finished) { // not run
         return 0;
     }
@@ -856,6 +861,7 @@ bool VNodeChannel::is_send_data_rpc_done() const {
 }
 
 Status VNodeChannel::close_wait(RuntimeState* state) {
+    DBUG_EXECUTE_IF("VNodeChannel.close_wait_full_gc", { 
MemInfo::process_full_gc(); });
     SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     // set _is_closed to true finally
     Defer set_closed {[&]() {
@@ -881,6 +887,10 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
     }
     _close_time_ms = UnixMillis() - _close_time_ms;
 
+    if (_cancelled || state->is_cancelled()) {
+        _cancel_with_msg(state->cancel_reason());
+    }
+
     if (_add_batches_finished) {
         _close_check();
         state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
diff --git a/regression-test/data/fault_injection_p0/baseall.txt 
b/regression-test/data/fault_injection_p0/baseall.txt
new file mode 100644
index 00000000000..72bbfe98aaf
--- /dev/null
+++ b/regression-test/data/fault_injection_p0/baseall.txt
@@ -0,0 +1,16 @@
+0,1,1989,1001,11011902,123.123,true,1989-03-21,1989-03-21 
13:00:00,wangjuoo4,0.1,6.333,string12345,170141183460469231731687303715884105727
+0,2,1986,1001,11011903,1243.5,false,1901-12-31,1989-03-21 
13:00:00,wangynnsf,20.268,789.25,string12345,-170141183460469231731687303715884105727
+0,3,1989,1002,11011905,24453.325,false,2012-03-14,2000-01-01 
00:00:00,yunlj8@nk,78945,3654.0,string12345,0
+0,4,1991,3021,-11011907,243243.325,false,3124-10-10,2015-03-13 
10:30:00,yanvjldjlll,2.06,-0.001,string12345,20220101
+0,5,1985,5014,-11011903,243.325,true,2015-01-01,2015-03-13 
12:36:38,du3lnvl,-0.000,-365,string12345,20220102
+0,6,32767,3021,123456,604587.000,true,2014-11-11,2015-03-13 
12:36:38,yanavnd,0.1,80699,string12345,20220104
+0,7,-32767,1002,7210457,3.141,false,1988-03-21,1901-01-01 
00:00:00,jiw3n4,0.0,6058,string12345,-20220101
+1,8,255,2147483647,11011920,-0.123,true,1989-03-21,9999-11-11 
12:12:00,wangjuoo5,987456.123,12.14,string12345,-2022
+1,9,1991,-2147483647,11011902,-654.654,true,1991-08-11,1989-03-21 
13:11:00,wangjuoo4,0.000,69.123,string12345,11011903
+1,10,1991,5014,9223372036854775807,-258.369,false,2015-04-02,2013-04-02 
15:16:52,wangynnsf,-123456.54,0.235,string12345,-11011903
+1,11,1989,25699,-9223372036854775807,0.666,true,2015-04-02,1989-03-21 
13:11:00,yunlj8@nk,-987.001,4.336,string12345,1701411834604692317316873037158
+1,12,32767,-2147483647,9223372036854775807,243.325,false,1991-08-11,2013-04-02 
15:16:52,lifsno,-564.898,3.141592654,string12345,1701604692317316873037158
+1,13,-32767,2147483647,-9223372036854775807,100.001,false,2015-04-02,2015-04-02
 
00:00:00,wenlsfnl,123.456,3.141592653,string12345,701411834604692317316873037158
+1,14,255,103,11011902,-0.000,false,2015-04-02,2015-04-02 00:00:00, 
,3.141592654,2.036,string12345,701411834604692317316873
+1,15,1992,3021,11011920,0.00,true,9999-12-12,2015-04-02 
00:00:00,,3.141592653,20.456,string12345,701411834604692317
+\N,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N,\N
diff --git 
a/regression-test/suites/fault_injection_p0/test_mem_gc_when_load_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p0/test_mem_gc_when_load_fault_injection.groovy
new file mode 100644
index 00000000000..99ae467ff4d
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p0/test_mem_gc_when_load_fault_injection.groovy
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+import org.apache.doris.regression.util.Http
+
+suite("mem_gc_when_load") {
+    // init query case data
+    sql """
+        CREATE TABLE IF NOT EXISTS `baseall` (
+            `k0` boolean null comment "",
+            `k1` tinyint(4) null comment "",
+            `k2` smallint(6) null comment "",
+            `k3` int(11) null comment "",
+            `k4` bigint(20) null comment "",
+            `k5` decimal(9, 3) null comment "",
+            `k6` char(5) null comment "",
+            `k10` date null comment "",
+            `k11` datetime null comment "",
+            `k7` varchar(20) null comment "",
+            `k8` double max null comment "",
+            `k9` float sum null comment "",
+            `k12` string replace null comment "",
+            `k13` largeint(40) replace null comment ""
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+        """
+    sql """
+        CREATE TABLE IF NOT EXISTS `test` (
+            `k0` boolean null comment "",
+            `k1` tinyint(4) null comment "",
+            `k2` smallint(6) null comment "",
+            `k3` int(11) null comment "",
+            `k4` bigint(20) null comment "",
+            `k5` decimal(9, 3) null comment "",
+            `k6` char(5) null comment "",
+            `k10` date null comment "",
+            `k11` datetime null comment "",
+            `k7` varchar(20) null comment "",
+            `k8` double max null comment "",
+            `k9` float sum null comment "",
+            `k12` string replace_if_not_null null comment "",
+            `k13` largeint(40) replace null comment ""
+        ) engine=olap
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 5 properties("replication_num" = "1")
+        """
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+    streamLoad {
+        table "baseall"
+        db "regression_test_fault_injection_p0"
+        set 'column_separator', ','
+        file "baseall.txt"
+    }
+
+    try {
+        // let the gc cancel the load
+        
GetDebugPoint().enableDebugPointForAllBEs("VNodeChannel.try_send_and_fetch_status_full_gc")
+        sql "insert into test select * from baseall where k1 <= 3"
+    } catch(Exception e) {
+        assertTrue(e.getMessage().contains("Process has no memory available")) 
 // the msg should contain the root cause
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("VNodeChannel.try_send_and_fetch_status_full_gc")
+    }
+
+    try {
+        // let the gc cancel the load when load is close_wait
+        
GetDebugPoint().enableDebugPointForAllBEs("VNodeChannel.close_wait_full_gc")
+        sql "insert into test select * from baseall where k1 <= 3"
+    } catch(Exception e) {
+        assertTrue(e.getMessage().contains("Process has no memory available")) 
 // the msg should contain the root cause
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("VNodeChannel.close_wait_full_gc")
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to