This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fbbff32b21 [fix](pipelinex) coredump caused by 
VRuntimeFilterSlots::_is_global was not set (#29446)
1fbbff32b21 is described below

commit 1fbbff32b216b477867103921c9c611038f108d4
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Jan 3 12:40:41 2024 +0800

    [fix](pipelinex) coredump caused by VRuntimeFilterSlots::_is_global was not 
set (#29446)
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp | 17 ++++++++++++++---
 be/src/vec/exec/join/vhash_join_node.h       |  5 +++--
 2 files changed, 17 insertions(+), 5 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 3d9fd501917..98ecafdc5a8 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -491,6 +491,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                        (*local_state._shared_state->build_block).bytes());
         RETURN_IF_ERROR(
                 local_state.process_build_block(state, 
(*local_state._shared_state->build_block)));
+
+        const bool use_global_rf =
+                
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf;
         auto ret = std::visit(
                 Overload {[&](std::monostate&) -> Status {
                               LOG(FATAL) << "FATAL: uninited hash table";
@@ -498,7 +501,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                           },
                           [&](auto&& arg) -> Status {
                               vectorized::ProcessRuntimeFilterBuild 
runtime_filter_build_process;
-                              return runtime_filter_build_process(state, arg, 
&local_state);
+                              return runtime_filter_build_process(state, arg, 
&local_state,
+                                                                  
use_global_rf);
                           }},
                 *local_state._shared_state->hash_table_variants);
         if (!ret.ok()) {
@@ -528,6 +532,10 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
         DCHECK(_shared_hash_table_context != nullptr);
         CHECK(_shared_hash_table_context->signaled);
 
+        if (!_shared_hash_table_context->status.ok()) {
+            return _shared_hash_table_context->status;
+        }
+
         local_state.profile()->add_info_string(
                 "SharedHashTableFrom",
                 print_id(
@@ -547,6 +555,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                         _shared_hash_table_context->hash_table_variants));
 
         local_state._shared_state->build_block = 
_shared_hash_table_context->block;
+        const bool use_global_rf =
+                
local_state._parent->cast<HashJoinBuildSinkOperatorX>()._use_global_rf;
 
         if (!_shared_hash_table_context->runtime_filters.empty()) {
             auto ret = std::visit(
@@ -560,8 +570,9 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                                     return Status::OK();
                                 }
                                 local_state._runtime_filter_slots =
-                                        std::make_shared<VRuntimeFilterSlots>(
-                                                _build_expr_ctxs, 
_runtime_filter_descs);
+                                        
std::make_shared<VRuntimeFilterSlots>(_build_expr_ctxs,
+                                                                              
_runtime_filter_descs,
+                                                                              
use_global_rf);
 
                                 
RETURN_IF_ERROR(local_state._runtime_filter_slots->init(
                                         state, arg.hash_table->size()));
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 8304eedb290..35d97ccb033 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -76,12 +76,13 @@ class HashJoinNode;
 
 struct ProcessRuntimeFilterBuild {
     template <class HashTableContext, typename Parent>
-    Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx, 
Parent* parent) {
+    Status operator()(RuntimeState* state, HashTableContext& hash_table_ctx, 
Parent* parent,
+                      bool is_global = false) {
         if (parent->runtime_filter_descs().empty()) {
             return Status::OK();
         }
         parent->_runtime_filter_slots = std::make_shared<VRuntimeFilterSlots>(
-                parent->_build_expr_ctxs, parent->runtime_filter_descs());
+                parent->_build_expr_ctxs, parent->runtime_filter_descs(), 
is_global);
 
         RETURN_IF_ERROR(
                 parent->_runtime_filter_slots->init(state, 
hash_table_ctx.hash_table->size()));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to