This is an automated email from the ASF dual-hosted git repository.

mrhhsg pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new eeafe45c0c4 [fix](brpc) coredump caused by brpc checking (#44047) 
(#44188)
eeafe45c0c4 is described below

commit eeafe45c0c4981291ba503fee3b0003e13ba0bb7
Author: Jerry Hu <[email protected]>
AuthorDate: Tue Nov 19 14:56:46 2024 +0800

    [fix](brpc) coredump caused by brpc checking (#44047) (#44188)
    
    pick #44047
    ```
    /root/doris/be/src/runtime/fragment_mgr.cpp:1064:20: runtime error: member 
call on null pointer of type 'doris::PBackendService_Stub'
    
    SUMMARY: UndefinedBehaviorSanitizer: undefined-behavior 
/root/doris/be/src/runtime/fragment_mgr.cpp:1064:20 in
    *** Query id: 0-0 ***
    *** is nereids: 0 ***
    *** tablet id: 0 ***
    *** Aborted at 1731663847 (unix time) try "date -d @1731663847" if you are 
using GNU date ***
    *** Current BE git commitID: b663df0e50 ***
    *** SIGSEGV address not mapped to object (@0x0) received by PID 17169 (TID 
17463 OR 0x7f746d21a700) from PID 0; stack trace: ***
    0# doris::signal::(anonymous namespace)::FailureSignalHandler(int, 
siginfo_t*, void*) at /root/doris/be/src/common/signal_handler.h:421
    1# PosixSignals::chained_handler(int, siginfo_t*, void*) [clone .part.0] in 
/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
    2# JVM_handle_linux_signal in 
/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
    3# 0x00007F7601263090 in /lib/x86_64-linux-gnu/libc.so.6
    4# 
doris::FragmentMgr::_check_brpc_available(std::shared_ptr<doris::PBackendService_Stub>
 const&, doris::FragmentMgr::BrpcItem const&) in 
/mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0/Cluster0/be/lib/doris_be
    5# doris::FragmentMgr::cancel_worker() at 
/root/doris/be/src/runtime/fragment_mgr.cpp:1022
    6# doris::Thread::supervise_thread(void*) at 
/root/doris/be/src/util/thread.cpp:499
    7# start_thread at /build/glibc-SzIz7B/glibc-2.31/nptl/pthread_create.c:478
    8# __clone at ../sysdeps/unix/sysv/linux/x86_64/clone.S:97
    ```
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/runtime/fragment_mgr.cpp         |  9 +++++++--
 be/src/vec/sink/vdata_stream_sender.cpp | 19 ++++++++++++++-----
 2 files changed, 21 insertions(+), 7 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index bcb48559178..53c9c01a470 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -1447,8 +1447,13 @@ void FragmentMgr::cancel_worker() {
                          std::string("Coordinator dead."));
         }
 
-        for (auto it : brpc_stub_with_queries) {
-            _check_brpc_available(it.first, it.second);
+        if (config::enable_brpc_connection_check) {
+            for (auto it : brpc_stub_with_queries) {
+                if (!it.first) {
+                    continue;
+                }
+                _check_brpc_available(it.first, it.second);
+            }
         }
     } while (!_stop_background_threads_latch.wait_for(
             
std::chrono::seconds(config::fragment_mgr_cancel_worker_interval_seconds)));
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 0733c39621e..f4efa3aec02 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -90,9 +90,13 @@ Status Channel<Parent>::init(RuntimeState* state) {
                               _fragment_instance_id, _dest_node_id, 
&_local_recvr),
                       "");
     } else {
+        auto network_address = _brpc_dest_addr;
         if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
             _brpc_stub = 
state->exec_env()->brpc_internal_client_cache()->get_client(
                     "127.0.0.1", _brpc_dest_addr.port);
+            if (config::enable_brpc_connection_check) {
+                network_address.hostname = "127.0.0.1";
+            }
         } else {
             _brpc_stub =
                     
state->exec_env()->brpc_internal_client_cache()->get_client(_brpc_dest_addr);
@@ -104,6 +108,10 @@ Status Channel<Parent>::init(RuntimeState* state) {
             LOG(WARNING) << msg;
             return Status::InternalError(msg);
         }
+
+        if (config::enable_brpc_connection_check) {
+            state->get_query_ctx()->add_using_brpc_stub(network_address, 
_brpc_stub);
+        }
     }
 
     _serializer.set_is_local(_is_local);
@@ -129,19 +137,16 @@ Status Channel<Parent>::init_stub(RuntimeState* state) {
     if (_is_local) {
         return Status::OK();
     }
+
+    auto network_address = _brpc_dest_addr;
     if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
         _brpc_stub = 
state->exec_env()->brpc_internal_client_cache()->get_client(
                 "127.0.0.1", _brpc_dest_addr.port);
         if (config::enable_brpc_connection_check) {
-            auto network_address = _brpc_dest_addr;
             network_address.hostname = "127.0.0.1";
-            state->get_query_ctx()->add_using_brpc_stub(network_address, 
_brpc_stub);
         }
     } else {
         _brpc_stub = 
state->exec_env()->brpc_internal_client_cache()->get_client(_brpc_dest_addr);
-        if (config::enable_brpc_connection_check) {
-            state->get_query_ctx()->add_using_brpc_stub(_brpc_dest_addr, 
_brpc_stub);
-        }
     }
 
     if (!_brpc_stub) {
@@ -150,6 +155,10 @@ Status Channel<Parent>::init_stub(RuntimeState* state) {
         LOG(WARNING) << msg;
         return Status::InternalError(msg);
     }
+
+    if (config::enable_brpc_connection_check) {
+        state->get_query_ctx()->add_using_brpc_stub(network_address, 
_brpc_stub);
+    }
     return Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to