This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 3125bc3adaa [fix](be) Fix runtime filter crash with shared hash table 
(#63256)
3125bc3adaa is described below

commit 3125bc3adaa14eada808d8c15a682fc3de3f2208
Author: Pxl <[email protected]>
AuthorDate: Fri May 15 16:03:20 2026 +0800

    [fix](be) Fix runtime filter crash with shared hash table (#63256)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Problem Summary: Fix BE SIGSEGV in branch-4.1 when hash join build sink
    publishes runtime filters during close. The crash point is
    `RuntimeFilterWrapper::set_state()`, and the direct cause is that a
    `RuntimeFilterProducer` can hold a null `_wrapper` in the shared hash
    table runtime filter path.
    
    ### Root cause
    
    Bug introduced by: **#49556**
    
    #49556 refactored the broadcast/shared hash table controller and
    introduced the shared runtime filter wrapper handoff:
    
    - The builder task stores runtime filter wrappers in
    `HashJoinBuildSinkOperatorX::_runtime_filters`.
    - Non-builder tasks read wrappers from that same map when
    `use_shared_table=true` and `_should_build_hash_table=false`.
    - `HashJoinBuildSinkLocalState::close()` unconditionally sets `_signaled
    = true` for shared hash table, even if the builder task was terminated
    before it built the hash table and filled `_runtime_filters`.
    - The non-builder runtime filter path uses
    `DCHECK(runtime_filters.contains(...))` and then
    `runtime_filters[filter_id]`. In release builds the DCHECK is disabled,
    so a missing map entry inserts a default null `shared_ptr`.
    - The producer then calls `_wrapper->set_state()`, causing the SIGSEGV
    in `RuntimeFilterWrapper::set_state()`.
    
    I verified the branch-4.1 blame: both the unconditional `_signaled =
    true` and the `runtime_filters[filter_id]` shared-wrapper path come from
    #49556.
    
    ### Fix
    
    Fixed on master by: **#62056**
    
    #62056 fixed this shared hash table race by only setting `_signaled =
    true` when the builder task was not terminated. If the builder is
    terminated early, non-builder tasks return EOF and do not enter the
    shared hash table/runtime filter path with uninitialized shared state.
    
    This PR is a branch-4.1 pick/backport of the necessary #62056 logic.
    
    It also picks the relevant defensive idea from **#60563**: replace the
    runtime filter `DCHECK + operator[]` assumption with explicit
    `Status::InternalError` checks, so a missing/null wrapper returns an
    error instead of inserting a null wrapper and crashing.
    
    ### Related PR
    
    - #49556: introduced the shared hash table runtime filter wrapper
    handoff and the unsafe signal/map assumptions on branch-4.1.
    - #62056: fixed the shared hash table `_signaled` race on master.
    - #60563: changed runtime filter DCHECK/CHECK assumptions to explicit
    error handling on master.
    
    ### Release note
    
    Fixed a BE crash when publishing runtime filters for shared hash table
    hash joins.
    
    ### Check List (For Author)
    
    - Test: Manual test
        - build-support/check-format.sh
        - git diff --check
    - Behavior changed: No
    - Does this need documentation: No
    
    Co-authored-by: Copilot <[email protected]>
---
 be/src/exec/operator/hashjoin_build_sink.cpp       |  7 +++++-
 .../runtime_filter_producer_helper.cpp             | 29 ++++++++++++++++++----
 2 files changed, 30 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/operator/hashjoin_build_sink.cpp 
b/be/src/exec/operator/hashjoin_build_sink.cpp
index 3f1cd4fe61b..fa473a1ea2a 100644
--- a/be/src/exec/operator/hashjoin_build_sink.cpp
+++ b/be/src/exec/operator/hashjoin_build_sink.cpp
@@ -244,7 +244,12 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
 
         if (p._use_shared_hash_table) {
             std::unique_lock lock(p._mutex);
-            p._signaled = true;
+            // Only signal non-builder tasks when the builder actually built 
the hash table.
+            // When the builder is terminated early, process_build_block() has 
not initialized the
+            // shared hash table or runtime filter wrappers, so non-builders 
must return EOF.
+            if (!_terminated) {
+                p._signaled = true;
+            }
             for (auto& dep : _shared_state->sink_deps) {
                 dep->set_ready();
             }
diff --git a/be/src/exec/runtime_filter/runtime_filter_producer_helper.cpp 
b/be/src/exec/runtime_filter/runtime_filter_producer_helper.cpp
index 9940f0d4366..9e774a71002 100644
--- a/be/src/exec/runtime_filter/runtime_filter_producer_helper.cpp
+++ b/be/src/exec/runtime_filter/runtime_filter_producer_helper.cpp
@@ -108,13 +108,32 @@ Status RuntimeFilterProducerHelper::build(
 
     for (const auto& filter : _producers) {
         if (use_shared_table) {
-            DCHECK(_is_broadcast_join);
+            if (!_is_broadcast_join) {
+                return Status::InternalError(
+                        "use_shared_table is true but _is_broadcast_join is 
false");
+            }
+            auto wrapper = filter->wrapper();
+            if (wrapper == nullptr) {
+                return Status::InternalError("runtime filter wrapper is null 
when building filter");
+            }
+            auto filter_id = wrapper->filter_id();
             if (_should_build_hash_table) {
-                
DCHECK(!runtime_filters.contains(filter->wrapper()->filter_id()));
-                runtime_filters[filter->wrapper()->filter_id()] = 
filter->wrapper();
+                if (runtime_filters.contains(filter_id)) {
+                    return Status::InternalError(
+                            "runtime_filters already contains filter_id {} 
when building hash "
+                            "table",
+                            filter_id);
+                }
+                runtime_filters[filter_id] = wrapper;
             } else {
-                
DCHECK(runtime_filters.contains(filter->wrapper()->filter_id()));
-                
filter->set_wrapper(runtime_filters[filter->wrapper()->filter_id()]);
+                auto it = runtime_filters.find(filter_id);
+                if (it == runtime_filters.end() || it->second == nullptr) {
+                    return Status::InternalError(
+                            "runtime_filters does not contain valid filter_id 
{} when not building "
+                            "hash table",
+                            filter_id);
+                }
+                filter->set_wrapper(it->second);
             }
         }
         
filter->set_wrapper_state_and_ready_to_publish(RuntimeFilterWrapper::State::READY);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to