This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 08ec72cab7a [Feature](join) support LEFT_SEMI_DIRECT_RETURN_OPT#59832 
(#60577)
08ec72cab7a is described below

commit 08ec72cab7a82f12b5eb086c59c6e51fb75ada6f
Author: Pxl <[email protected]>
AuthorDate: Sat Feb 7 18:39:24 2026 +0800

    [Feature](join) support LEFT_SEMI_DIRECT_RETURN_OPT#59832 (#60577)
    
    #59832
---
 be/src/olap/column_predicate.h                     | 30 ++++------
 be/src/pipeline/dependency.h                       |  6 ++
 be/src/pipeline/exec/hashjoin_build_sink.cpp       | 11 ++++
 be/src/pipeline/exec/hashjoin_build_sink.h         |  8 +++
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   | 13 +++-
 .../pipeline/exec/join/process_hash_table_probe.h  |  5 ++
 .../exec/join/process_hash_table_probe_impl.h      | 20 +++++++
 be/src/pipeline/exec/scan_operator.cpp             |  3 +-
 be/src/runtime_filter/runtime_filter_consumer.cpp  |  7 +++
 be/src/runtime_filter/runtime_filter_consumer.h    |  4 +-
 be/src/runtime_filter/runtime_filter_producer.h    | 11 ++++
 .../runtime_filter_producer_helper.cpp             | 12 ++++
 .../runtime_filter_producer_helper.h               |  2 +
 be/src/runtime_filter/runtime_filter_selectivity.h | 36 ++++++-----
 be/src/runtime_filter/runtime_filter_wrapper.h     | 18 ++++++
 be/src/vec/exprs/vexpr_context.h                   |  7 ++-
 .../runtime_filter_selectivity_test.cpp            | 69 ++++++++++++----------
 .../java/org/apache/doris/qe/SessionVariable.java  |  6 ++
 gensrc/thrift/PaloInternalService.thrift           |  1 +
 .../data/query_p0/join/test_left_join1.out         |  6 ++
 .../suites/query_p0/join/test_left_join1.groovy    | 19 ++++++
 21 files changed, 223 insertions(+), 71 deletions(-)

diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index c4a5736beb0..f1ffb18bf95 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -329,8 +329,10 @@ public:
     void attach_profile_counter(
             int filter_id, std::shared_ptr<RuntimeProfile::Counter> 
predicate_filtered_rows_counter,
             std::shared_ptr<RuntimeProfile::Counter> 
predicate_input_rows_counter,
-            std::shared_ptr<RuntimeProfile::Counter> 
predicate_always_true_rows_counter) {
+            std::shared_ptr<RuntimeProfile::Counter> 
predicate_always_true_rows_counter,
+            const RuntimeFilterSelectivity& rf_selectivity) {
         _runtime_filter_id = filter_id;
+        _rf_selectivity = rf_selectivity;
         DCHECK(predicate_filtered_rows_counter != nullptr);
         DCHECK(predicate_input_rows_counter != nullptr);
 
@@ -389,7 +391,7 @@ public:
         }
     }
 
-    bool always_true() const { return _always_true; }
+    bool always_true() const { return 
_rf_selectivity.maybe_always_true_can_ignore(); }
     // Return whether the ColumnPredicate was created by a runtime filter.
     // If true, it was definitely created by a runtime filter.
     // If false, it may still have been created by a runtime filter,
@@ -405,26 +407,17 @@ protected:
         throw Exception(INTERNAL_ERROR, "Not Implemented _evaluate_inner");
     }
 
-    void reset_judge_selectivity() const {
-        _always_true = false;
-        _judge_counter = config::runtime_filter_sampling_frequency;
-        _judge_input_rows = 0;
-        _judge_filter_rows = 0;
-    }
+    void reset_judge_selectivity() const { 
_rf_selectivity.reset_judge_selectivity(); }
 
     void try_reset_judge_selectivity() const {
-        if (_can_ignore() && ((_judge_counter--) == 0)) {
-            reset_judge_selectivity();
+        if (_can_ignore()) {
+            _rf_selectivity.update_judge_counter();
         }
     }
 
     void do_judge_selectivity(uint64_t filter_rows, uint64_t input_rows) const 
{
-        if (!_always_true) {
-            _judge_filter_rows += filter_rows;
-            _judge_input_rows += input_rows;
-            
RuntimeFilterSelectivity::judge_selectivity(get_ignore_threshold(), 
_judge_filter_rows,
-                                                        _judge_input_rows, 
_always_true);
-        }
+        _rf_selectivity.update_judge_selectivity(_runtime_filter_id, 
filter_rows, input_rows,
+                                                 get_ignore_threshold());
     }
 
     uint32_t _column_id;
@@ -441,10 +434,7 @@ protected:
     // is evaluated as true, the logic for always_true is applied for the rest 
of that period
     // without recalculating. At the beginning of the next period,
     // reset_judge_selectivity is used to reset these variables.
-    mutable int _judge_counter = 0;
-    mutable uint64_t _judge_input_rows = 0;
-    mutable uint64_t _judge_filter_rows = 0;
-    mutable bool _always_true = false;
+    mutable RuntimeFilterSelectivity _rf_selectivity;
 
     std::shared_ptr<RuntimeProfile::Counter> _predicate_filtered_rows_counter =
             std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 036ca3a90de..993149f0d36 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -625,6 +625,12 @@ struct HashJoinSharedState : public JoinSharedState {
     // memory in `_hash_table_variants`. So before execution, we should use a 
local _hash_table_variants
     // which has a shared hash table in it.
     std::vector<std::shared_ptr<JoinDataVariants>> hash_table_variant_vector;
+
+    // whether left semi join could directly return
+    // if runtime filters contains local in filter, we can make sure all input 
rows are matched
+    // local filter will always be applied, and in filter could guarantee 
precise filtering
+    // ATTN: we should disable always_true logic for in filter when we set 
this flag
+    bool left_semi_direct_return = false;
 };
 
 struct PartitionedHashJoinSharedState
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 778409087ee..91ba6f6781a 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -239,6 +239,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
             RETURN_IF_ERROR(_runtime_filter_producer_helper->build(
                     state, _shared_state->build_block.get(), 
p._use_shared_hash_table,
                     p._runtime_filters));
+            // only single join conjunct and left semi join can direct return
+            if (p.allow_left_semi_direct_return(state)) {
+                auto wrapper = 
_runtime_filter_producer_helper->detect_local_in_filter(state);
+                if (wrapper) {
+                    _shared_state->left_semi_direct_return = true;
+                    wrapper->set_disable_always_true_logic();
+                    custom_profile()->add_info_string(
+                            "LeftSemiDirectReturn",
+                            
std::to_string(_shared_state->left_semi_direct_return));
+                }
+            }
             RETURN_IF_ERROR(_runtime_filter_producer_helper->publish(state));
         }
     } catch (Exception& e) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index f1c94f24c38..235333fd505 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -148,6 +148,14 @@ public:
     }
     std::vector<bool>& is_null_safe_eq_join() { return _is_null_safe_eq_join; }
 
+    bool allow_left_semi_direct_return(RuntimeState* state) const {
+        // only single join conjunct and left semi join can direct return
+        return _join_op == TJoinOp::LEFT_SEMI_JOIN && _build_expr_ctxs.size() 
== 1 &&
+               !_have_other_join_conjunct && !_is_mark_join &&
+               
state->query_options().__isset.enable_left_semi_direct_return_opt &&
+               state->query_options().enable_left_semi_direct_return_opt;
+    }
+
 private:
     friend class HashJoinBuildSinkLocalState;
 
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 3bde47cf143..cb2c0df540c 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -232,6 +232,12 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
                     if constexpr (!std::is_same_v<HashTableProbeType, 
std::monostate>) {
                         using HashTableCtxType = std::decay_t<decltype(arg)>;
                         if constexpr (!std::is_same_v<HashTableCtxType, 
std::monostate>) {
+                            if 
(local_state._shared_state->left_semi_direct_return) {
+                                process_hashtable_ctx.process_direct_return(
+                                        arg, mutable_join_block, &temp_block,
+                                        
cast_set<uint32_t>(local_state._probe_block.rows()));
+                                return;
+                            }
                             st = process_hashtable_ctx.process(
                                     arg,
                                     local_state._null_map_column
@@ -289,8 +295,11 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
     }
 
     local_state._estimate_memory_usage += temp_block.allocated_bytes();
-    RETURN_IF_ERROR(
-            local_state.filter_data_and_build_output(state, output_block, eos, 
&temp_block));
+    // if left_semi_direct_return is true,
+    // here does not increase the output rows count(just same as 
`_probe_block`'s rows count).
+    RETURN_IF_ERROR(local_state.filter_data_and_build_output(
+            state, output_block, eos, &temp_block,
+            !local_state._shared_state->left_semi_direct_return));
     // Here make _join_block release the columns' ptr
     
local_state._join_block.set_columns(local_state._join_block.clone_empty_columns());
     mutable_join_block.clear();
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h 
b/be/src/pipeline/exec/join/process_hash_table_probe.h
index 100a11fd2c7..52dcc6415fa 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -59,6 +59,11 @@ struct ProcessHashTableProbe {
                    vectorized::MutableBlock& mutable_block, vectorized::Block* 
output_block,
                    uint32_t probe_rows, bool is_mark_join);
 
+    template <typename HashTableType>
+    void process_direct_return(HashTableType& hash_table_ctx,
+                               vectorized::MutableBlock& mutable_block,
+                               vectorized::Block* output_block, uint32_t 
probe_rows);
+
     // In the presence of other join conjunct, the process of join become more 
complicated.
     // each matching join column need to be processed by other join conjunct. 
so the struct of mutable block
     // and output block may be different
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h 
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 60261223cdf..85969391725 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -206,6 +206,22 @@ typename HashTableType::State 
ProcessHashTableProbe<JoinOpType>::_init_probe_sid
     return typename HashTableType::State(_parent->_probe_columns);
 }
 
+template <int JoinOpType>
+template <typename HashTableType>
+void ProcessHashTableProbe<JoinOpType>::process_direct_return(
+        HashTableType& hash_table_ctx, vectorized::MutableBlock& mutable_block,
+        vectorized::Block* output_block, uint32_t probe_rows) {
+    _probe_indexs.resize(probe_rows);
+    auto* probe_indexs_data = _probe_indexs.get_data().data();
+    for (uint32_t i = 0; i < probe_rows; i++) {
+        probe_indexs_data[i] = i;
+    }
+    auto& mcol = mutable_block.mutable_columns();
+    probe_side_output_column(mcol);
+    output_block->swap(mutable_block.to_block());
+    _parent->_probe_index = probe_rows;
+}
+
 template <int JoinOpType>
 template <typename HashTableType>
 Status ProcessHashTableProbe<JoinOpType>::process(HashTableType& 
hash_table_ctx,
@@ -792,6 +808,10 @@ struct ExtractType<T(U)> {
             ExtractType<void(T)>::Type & hash_table_ctx, const uint8_t* 
null_map,                  \
             vectorized::MutableBlock& mutable_block, vectorized::Block* 
output_block,              \
             uint32_t probe_rows, bool is_mark_join);                           
                    \
+    template void                                                              
                    \
+    
ProcessHashTableProbe<JoinOpType>::process_direct_return<ExtractType<void(T)>::Type>(
          \
+            ExtractType<void(T)>::Type & hash_table_ctx, 
vectorized::MutableBlock & mutable_block, \
+            vectorized::Block * output_block, uint32_t probe_rows);            
                    \
     template Status 
ProcessHashTableProbe<JoinOpType>::finish_probing<ExtractType<void(T)>::Type>( \
             ExtractType<void(T)>::Type & hash_table_ctx, 
vectorized::MutableBlock & mutable_block, \
             vectorized::Block * output_block, bool* eos, bool is_mark_join);
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 6a2d36819d7..1c8f2327814 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -368,7 +368,8 @@ Status 
ScanLocalState<Derived>::_normalize_predicate(vectorized::VExprContext* c
                                         rf_expr->filter_id(),
                                         
rf_expr->predicate_filtered_rows_counter(),
                                         
rf_expr->predicate_input_rows_counter(),
-                                        
rf_expr->predicate_always_true_rows_counter());
+                                        
rf_expr->predicate_always_true_rows_counter(),
+                                        
context->get_runtime_filter_selectivity());
                             }
                         };
                         switch (expr->node_type()) {
diff --git a/be/src/runtime_filter/runtime_filter_consumer.cpp 
b/be/src/runtime_filter/runtime_filter_consumer.cpp
index a99f7bfd874..2bf9998c536 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.cpp
+++ b/be/src/runtime_filter/runtime_filter_consumer.cpp
@@ -83,6 +83,13 @@ Status 
RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
 
     auto real_filter_type = _wrapper->get_real_type();
     bool null_aware = _wrapper->contain_null();
+
+    // Set sampling frequency based on disable_always_true_logic status
+    int sampling_frequency = _wrapper->disable_always_true_logic()
+                                     ? 
RuntimeFilterSelectivity::DISABLE_SAMPLING
+                                     : 
config::runtime_filter_sampling_frequency;
+    
probe_ctx->get_runtime_filter_selectivity().set_sampling_frequency(sampling_frequency);
+
     switch (real_filter_type) {
     case RuntimeFilterType::IN_FILTER: {
         TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
diff --git a/be/src/runtime_filter/runtime_filter_consumer.h 
b/be/src/runtime_filter/runtime_filter_consumer.h
index e0e42e509d4..0e832b412ee 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.h
+++ b/be/src/runtime_filter/runtime_filter_consumer.h
@@ -93,8 +93,10 @@ private:
               _registration_time(MonotonicMillis()),
               _rf_state(State::NOT_READY) {
         // If bitmap filter is not applied, it will cause the query result to 
be incorrect
+        // local rf must wait until timeout, otherwise it may lead results 
incorrectness, because LEFT_SEMI_DIRECT_RETURN_OPT
         bool wait_infinitely = query_ctx->runtime_filter_wait_infinitely() ||
-                               _runtime_filter_type == 
RuntimeFilterType::BITMAP_FILTER;
+                               _runtime_filter_type == 
RuntimeFilterType::BITMAP_FILTER ||
+                               !has_remote_target();
         _rf_wait_time_ms = wait_infinitely ? query_ctx->execution_timeout() * 
1000
                                            : 
query_ctx->runtime_filter_wait_time_ms();
         DorisMetrics::instance()->runtime_filter_consumer_num->increment(1);
diff --git a/be/src/runtime_filter/runtime_filter_producer.h 
b/be/src/runtime_filter/runtime_filter_producer.h
index 0edf85cd1d9..e686a97ee3a 100644
--- a/be/src/runtime_filter/runtime_filter_producer.h
+++ b/be/src/runtime_filter/runtime_filter_producer.h
@@ -131,6 +131,17 @@ public:
         _wrapper = wrapper;
     }
 
+    std::shared_ptr<RuntimeFilterWrapper> detect_in_filter() {
+        if (_has_remote_target) {
+            return nullptr;
+        }
+        std::unique_lock<std::recursive_mutex> l(_rmtx);
+        if (_wrapper->is_ready_in_filter()) {
+            return _wrapper;
+        }
+        return nullptr;
+    }
+
 private:
     RuntimeFilterProducer(const QueryContext* query_ctx, const 
TRuntimeFilterDesc* desc)
             : RuntimeFilter(desc), _is_broadcast_join(desc->is_broadcast_join) 
{}
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp 
b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
index 55e300cdd87..9611d38f7ac 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
@@ -164,4 +164,16 @@ void RuntimeFilterProducerHelper::collect_realtime_profile(
     build_timer->set(_runtime_filter_compute_timer->value());
 }
 
+std::shared_ptr<RuntimeFilterWrapper> 
RuntimeFilterProducerHelper::detect_local_in_filter(
+        RuntimeState* state) {
+    // If any runtime filter is local in filter, return true.
+    // Local in filter is used to LEFT_SEMI_DIRECT_RETURN_OPT
+    for (const auto& filter : _producers) {
+        if (auto wrapper = filter->detect_in_filter(); wrapper != nullptr) {
+            return wrapper;
+        }
+    }
+    return nullptr;
+}
+
 } // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.h 
b/be/src/runtime_filter/runtime_filter_producer_helper.h
index 4e6d14089ee..bc0431ebe59 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.h
@@ -67,6 +67,8 @@ public:
 
     void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
 
+    std::shared_ptr<RuntimeFilterWrapper> detect_local_in_filter(RuntimeState* 
state);
+
 protected:
     virtual void _init_expr(const vectorized::VExprContextSPtrs& 
build_expr_ctxs,
                             const std::vector<TRuntimeFilterDesc>& 
runtime_filter_descs);
diff --git a/be/src/runtime_filter/runtime_filter_selectivity.h 
b/be/src/runtime_filter/runtime_filter_selectivity.h
index 1b0a82143de..694895edad9 100644
--- a/be/src/runtime_filter/runtime_filter_selectivity.h
+++ b/be/src/runtime_filter/runtime_filter_selectivity.h
@@ -19,7 +19,6 @@
 
 #include <cstdint>
 
-#include "common/config.h"
 #include "common/logging.h"
 
 namespace doris {
@@ -34,9 +33,13 @@ class RuntimeFilterSelectivity {
 public:
     RuntimeFilterSelectivity() = default;
 
-    RuntimeFilterSelectivity(const RuntimeFilterSelectivity&) = delete;
+    // If sampling_frequency is less than or equal to 0, the selectivity 
tracking will be disabled
+    static constexpr int DISABLE_SAMPLING = -1;
+
+    void set_sampling_frequency(int frequency) { _sampling_frequency = 
frequency; }
+
     void update_judge_counter() {
-        if ((_judge_counter++) >= config::runtime_filter_sampling_frequency) {
+        if ((_judge_counter++) >= _sampling_frequency) {
             reset_judge_selectivity();
         }
     }
@@ -46,8 +49,8 @@ public:
         if (!_always_true) {
             _judge_filter_rows += filter_rows;
             _judge_input_rows += input_rows;
-            judge_selectivity(ignore_thredhold, _judge_filter_rows, 
_judge_input_rows,
-                              _always_true);
+            _judge_selectivity(ignore_thredhold, _judge_filter_rows, 
_judge_input_rows,
+                               _always_true);
         }
 
         VLOG_ROW << fmt::format(
@@ -61,23 +64,13 @@ public:
 
     bool maybe_always_true_can_ignore() const {
         /// TODO: maybe we can use session variable to control this behavior ?
-        if (config::runtime_filter_sampling_frequency <= 0) {
+        if (_sampling_frequency <= 0) {
             return false;
         } else {
             return _always_true;
         }
     }
 
-    static void judge_selectivity(double ignore_threshold, int64_t 
filter_rows, int64_t input_rows,
-                                  bool& always_true) {
-        // if the judged input rows is too small, we think the selectivity is 
not reliable
-        if (input_rows > min_judge_input_rows) {
-            always_true = (static_cast<double>(filter_rows) / 
static_cast<double>(input_rows)) <
-                          ignore_threshold;
-        }
-    }
-
-private:
     void reset_judge_selectivity() {
         _always_true = false;
         _judge_counter = 0;
@@ -85,10 +78,21 @@ private:
         _judge_filter_rows = 0;
     }
 
+private:
+    void _judge_selectivity(double ignore_threshold, int64_t filter_rows, 
int64_t input_rows,
+                            bool& always_true) {
+        // if the judged input rows is too small, we think the selectivity is 
not reliable
+        if (input_rows > min_judge_input_rows) {
+            always_true = (static_cast<double>(filter_rows) / 
static_cast<double>(input_rows)) <
+                          ignore_threshold;
+        }
+    }
+
     int64_t _judge_input_rows = 0;
     int64_t _judge_filter_rows = 0;
     int _judge_counter = 0;
     bool _always_true = false;
+    int _sampling_frequency = -1;
 
     constexpr static int64_t min_judge_input_rows = 4096 * 10;
 };
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h 
b/be/src/runtime_filter/runtime_filter_wrapper.h
index 1a7456a7190..9338c7b349d 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.h
+++ b/be/src/runtime_filter/runtime_filter_wrapper.h
@@ -86,6 +86,8 @@ public:
 
     bool contain_null() const;
 
+    bool disable_always_true_logic() const { return 
_disable_always_true_logic; }
+
     std::string debug_string() const;
 
     // set_state may called in SyncSizeClosure's rpc thread
@@ -120,6 +122,18 @@ public:
         }
     }
 
+    bool is_ready_in_filter() {
+        if (get_real_type() != RuntimeFilterType::IN_FILTER) {
+            return false;
+        }
+        if (_state != State::READY) {
+            return false;
+        }
+        return true;
+    }
+
+    void set_disable_always_true_logic() { _disable_always_true_logic = true; }
+
 private:
     // used by shuffle runtime filter
     // assign this filter by protobuf
@@ -139,6 +153,10 @@ private:
     std::shared_ptr<BloomFilterFuncBase> _bloom_filter_func;
     std::shared_ptr<BitmapFilterFuncBase> _bitmap_filter_func;
 
+    // disable always_true logic if detected in filter
+    // to make left_semi_direct_return_opt work correctly
+    bool _disable_always_true_logic = false;
+
     // Wrapper is the core structure of runtime filter. If filter is local, 
wrapper may be shared
     // by producer and consumer. To avoid read-write conflict, we need a 
rwlock to ensure operations
     // on state is thread-safe.
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index de18da09a4d..a7824d80b43 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -274,7 +274,12 @@ public:
         return _last_result_column_id;
     }
 
-    RuntimeFilterSelectivity& get_runtime_filter_selectivity() { return 
*_rf_selectivity; }
+    RuntimeFilterSelectivity& get_runtime_filter_selectivity() {
+        if (!_rf_selectivity) {
+            throw Exception(ErrorCode::INTERNAL_ERROR, 
"RuntimeFilterSelectivity is null");
+        }
+        return *_rf_selectivity;
+    }
 
     FunctionContext::FunctionStateScope get_function_state_scope() const {
         return _is_clone ? FunctionContext::THREAD_LOCAL : 
FunctionContext::FRAGMENT_LOCAL;
diff --git a/be/test/runtime_filter/runtime_filter_selectivity_test.cpp 
b/be/test/runtime_filter/runtime_filter_selectivity_test.cpp
index b8504f950c2..bba98f3ecf2 100644
--- a/be/test/runtime_filter/runtime_filter_selectivity_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_selectivity_test.cpp
@@ -20,6 +20,8 @@
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
+#include "common/config.h"
+
 namespace doris {
 
 class RuntimeFilterSelectivityTest : public testing::Test {
@@ -39,13 +41,14 @@ protected:
 
 TEST_F(RuntimeFilterSelectivityTest, basic_initialization) {
     RuntimeFilterSelectivity selectivity;
+    
selectivity.set_sampling_frequency(config::runtime_filter_sampling_frequency);
     // Initially should be false (not always_true)
     EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
 }
 
 TEST_F(RuntimeFilterSelectivityTest, disabled_sampling_frequency) {
     RuntimeFilterSelectivity selectivity;
-    config::runtime_filter_sampling_frequency = 0;
+    selectivity.set_sampling_frequency(0);
 
     // Even if conditions are met, should return false when sampling is 
disabled
     selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
@@ -54,37 +57,40 @@ TEST_F(RuntimeFilterSelectivityTest, 
disabled_sampling_frequency) {
 
 TEST_F(RuntimeFilterSelectivityTest, negative_sampling_frequency) {
     RuntimeFilterSelectivity selectivity;
-    config::runtime_filter_sampling_frequency = -1;
+    selectivity.set_sampling_frequency(-1);
 
     selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
     EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
 }
 
 TEST_F(RuntimeFilterSelectivityTest, judge_selectivity_below_threshold) {
-    bool always_true = false;
+    RuntimeFilterSelectivity selectivity;
+    selectivity.set_sampling_frequency(100);
     // filter_rows/input_rows = 5/50000 = 0.0001 < 0.1
     // input_rows (50000) > min_judge_input_rows (40960)
-    RuntimeFilterSelectivity::judge_selectivity(0.1, 5, 50000, always_true);
-    EXPECT_TRUE(always_true);
+    selectivity.update_judge_selectivity(-1, 5, 50000, 0.1);
+    EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
 }
 
 TEST_F(RuntimeFilterSelectivityTest, judge_selectivity_above_threshold) {
-    bool always_true = false;
+    RuntimeFilterSelectivity selectivity;
+    selectivity.set_sampling_frequency(100);
     // filter_rows/input_rows = 25000/50000 = 0.5 >= 0.1
-    RuntimeFilterSelectivity::judge_selectivity(0.1, 25000, 50000, 
always_true);
-    EXPECT_FALSE(always_true);
+    selectivity.update_judge_selectivity(-1, 25000, 50000, 0.1);
+    EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
 }
 
 TEST_F(RuntimeFilterSelectivityTest, 
judge_selectivity_insufficient_input_rows) {
-    bool always_true = false;
+    RuntimeFilterSelectivity selectivity;
+    selectivity.set_sampling_frequency(100);
     // Even though 5/100 = 0.05 < 0.1, input_rows (100) < min_judge_input_rows 
(40960)
-    RuntimeFilterSelectivity::judge_selectivity(0.1, 5, 100, always_true);
-    EXPECT_FALSE(always_true);
+    selectivity.update_judge_selectivity(-1, 5, 100, 0.1);
+    EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
 }
 
 TEST_F(RuntimeFilterSelectivityTest, update_with_low_selectivity) {
-    config::runtime_filter_sampling_frequency = 100;
     RuntimeFilterSelectivity selectivity;
+    selectivity.set_sampling_frequency(100);
 
     // filter_rows/input_rows = 2000/50000 = 0.04 < 0.1
     selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
@@ -92,8 +98,8 @@ TEST_F(RuntimeFilterSelectivityTest, 
update_with_low_selectivity) {
 }
 
 TEST_F(RuntimeFilterSelectivityTest, update_with_high_selectivity) {
-    config::runtime_filter_sampling_frequency = 100;
     RuntimeFilterSelectivity selectivity;
+    selectivity.set_sampling_frequency(100);
 
     // filter_rows/input_rows = 45000/50000 = 0.9 >= 0.1
     selectivity.update_judge_selectivity(-1, 45000, 50000, 0.1);
@@ -101,9 +107,8 @@ TEST_F(RuntimeFilterSelectivityTest, 
update_with_high_selectivity) {
 }
 
 TEST_F(RuntimeFilterSelectivityTest, once_always_true_stays_true) {
-    config::runtime_filter_sampling_frequency = 100;
     RuntimeFilterSelectivity selectivity;
-
+    selectivity.set_sampling_frequency(100);
     // First update: low selectivity
     selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
     EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
@@ -114,8 +119,8 @@ TEST_F(RuntimeFilterSelectivityTest, 
once_always_true_stays_true) {
 }
 
 TEST_F(RuntimeFilterSelectivityTest, accumulated_selectivity_low) {
-    config::runtime_filter_sampling_frequency = 100;
     RuntimeFilterSelectivity selectivity;
+    selectivity.set_sampling_frequency(100);
 
     // First update: 1000/50000 = 0.02
     selectivity.update_judge_selectivity(-1, 1000, 50000, 0.1);
@@ -123,8 +128,8 @@ TEST_F(RuntimeFilterSelectivityTest, 
accumulated_selectivity_low) {
 }
 
 TEST_F(RuntimeFilterSelectivityTest, accumulated_selectivity_high) {
-    config::runtime_filter_sampling_frequency = 100;
     RuntimeFilterSelectivity selectivity;
+    selectivity.set_sampling_frequency(100);
 
     // First update: 20000/50000 = 0.4
     selectivity.update_judge_selectivity(-1, 20000, 50000, 0.1);
@@ -136,8 +141,8 @@ TEST_F(RuntimeFilterSelectivityTest, 
accumulated_selectivity_high) {
 }
 
 TEST_F(RuntimeFilterSelectivityTest, counter_triggers_reset) {
-    config::runtime_filter_sampling_frequency = 3;
     RuntimeFilterSelectivity selectivity;
+    selectivity.set_sampling_frequency(3);
 
     // Mark as always_true
     selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
@@ -152,8 +157,8 @@ TEST_F(RuntimeFilterSelectivityTest, 
counter_triggers_reset) {
 }
 
 TEST_F(RuntimeFilterSelectivityTest, reset_allows_reevaluation) {
-    config::runtime_filter_sampling_frequency = 2;
     RuntimeFilterSelectivity selectivity;
+    selectivity.set_sampling_frequency(2);
 
     // First cycle: mark as always_true
     selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
@@ -169,25 +174,29 @@ TEST_F(RuntimeFilterSelectivityTest, 
reset_allows_reevaluation) {
 }
 
 TEST_F(RuntimeFilterSelectivityTest, edge_case_zero_rows) {
-    bool always_true = false;
-    RuntimeFilterSelectivity::judge_selectivity(0.1, 0, 0, always_true);
-    EXPECT_FALSE(always_true);
+    RuntimeFilterSelectivity selectivity;
+    selectivity.set_sampling_frequency(100);
+    selectivity.update_judge_selectivity(-1, 0, 0, 0.1);
+    EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
 }
 
 TEST_F(RuntimeFilterSelectivityTest, edge_case_exact_threshold) {
-    bool always_true = false;
+    RuntimeFilterSelectivity selectivity;
+    selectivity.set_sampling_frequency(100);
     // Exactly at threshold: 5000/50000 = 0.1, NOT less than 0.1
-    RuntimeFilterSelectivity::judge_selectivity(0.1, 5000, 50000, always_true);
-    EXPECT_FALSE(always_true);
+    selectivity.update_judge_selectivity(-1, 5000, 50000, 0.1);
+    EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
 
     // Just below threshold: 4999/50000 = 0.09998 < 0.1
-    RuntimeFilterSelectivity::judge_selectivity(0.1, 4999, 50000, always_true);
-    EXPECT_TRUE(always_true);
+    RuntimeFilterSelectivity selectivity2;
+    selectivity2.set_sampling_frequency(100);
+    selectivity2.update_judge_selectivity(-1, 4999, 50000, 0.1);
+    EXPECT_TRUE(selectivity2.maybe_always_true_can_ignore());
 }
 
 TEST_F(RuntimeFilterSelectivityTest, multiple_updates_before_threshold) {
-    config::runtime_filter_sampling_frequency = 100;
     RuntimeFilterSelectivity selectivity;
+    selectivity.set_sampling_frequency(100);
 
     // Multiple updates with insufficient rows each time
     selectivity.update_judge_selectivity(-1, 100, 1000, 0.1); // 100/1000, 
insufficient
@@ -202,11 +211,10 @@ TEST_F(RuntimeFilterSelectivityTest, 
multiple_updates_before_threshold) {
 }
 
 TEST_F(RuntimeFilterSelectivityTest, different_thresholds) {
-    config::runtime_filter_sampling_frequency = 100;
-
     // Test with threshold 0.05
     {
         RuntimeFilterSelectivity selectivity;
+        selectivity.set_sampling_frequency(100);
         selectivity.update_judge_selectivity(-1, 2000, 50000, 0.05); // 0.04 < 
0.05
         EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
     }
@@ -214,6 +222,7 @@ TEST_F(RuntimeFilterSelectivityTest, different_thresholds) {
     // Test with threshold 0.03
     {
         RuntimeFilterSelectivity selectivity;
+        selectivity.set_sampling_frequency(100);
         selectivity.update_judge_selectivity(-1, 2000, 50000, 0.03); // 0.04 
>= 0.03
         EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 46d67bed8fb..abbde33d46e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -301,6 +301,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_SHARED_SCAN = "enable_shared_scan";
 
+    public static final String ENABLE_LEFT_SEMI_DIRECT_RETURN_OPT = 
"enable_left_semi_direct_return_opt";
+
     public static final String IGNORE_STORAGE_DATA_DISTRIBUTION = 
"ignore_storage_data_distribution";
 
     public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange";
@@ -1356,6 +1358,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VarAttr(name = QUERY_CACHE_ENTRY_MAX_ROWS)
     private long queryCacheEntryMaxRows = 500000;
 
+    @VariableMgr.VarAttr(name = ENABLE_LEFT_SEMI_DIRECT_RETURN_OPT)
+    public boolean enableLeftSemiDirectReturnOpt = true;
+
     @VariableMgr.VarAttr(name = FORWARD_TO_MASTER)
     public boolean forwardToMaster = true;
 
@@ -4876,6 +4881,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setInvertedIndexCompatibleRead(invertedIndexCompatibleRead);
 
         tResult.setEnableParallelScan(enableParallelScan);
+        
tResult.setEnableLeftSemiDirectReturnOpt(enableLeftSemiDirectReturnOpt);
         tResult.setParallelScanMaxScannersCount(parallelScanMaxScannersCount);
         
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
         tResult.setOptimizeIndexScanParallelism(optimizeIndexScanParallelism);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index b15bfd8bd89..8c31acf6557 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -422,6 +422,7 @@ struct TQueryOptions {
 
   179: optional bool enable_parquet_filter_by_bloom_filter = true;
 
+  195: optional bool enable_left_semi_direct_return_opt;
 
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
diff --git a/regression-test/data/query_p0/join/test_left_join1.out 
b/regression-test/data/query_p0/join/test_left_join1.out
index 7f19a1333eb..60ff1e296b5 100644
--- a/regression-test/data/query_p0/join/test_left_join1.out
+++ b/regression-test/data/query_p0/join/test_left_join1.out
@@ -5,3 +5,9 @@
 3      125     3       125     3       125
 4      126     4       126     4       126
 
+-- !select --
+1      123     1       123     \N      \N
+2      124     2       124     2       124
+3      125     3       125     3       125
+4      126     4       126     4       126
+
diff --git a/regression-test/suites/query_p0/join/test_left_join1.groovy 
b/regression-test/suites/query_p0/join/test_left_join1.groovy
index 104adab4a85..70180f621f8 100644
--- a/regression-test/suites/query_p0/join/test_left_join1.groovy
+++ b/regression-test/suites/query_p0/join/test_left_join1.groovy
@@ -23,6 +23,25 @@ suite("test_left_join1", "query,p0,arrow_flight_sql") {
 
     sql """insert into ${tableName} values (1, 123),(2, 124),(3, 125),(4, 
126);"""
 
+    qt_select """ SELECT
+                          *
+                          FROM
+                  ( SELECT f_key, f_value FROM ${tableName} ) a
+                  LEFT JOIN ( SELECT f_key, f_value FROM ${tableName} ) b ON 
a.f_key = b.f_key
+                  LEFT JOIN (
+                          SELECT
+                  *
+                  FROM
+                  ${tableName}
+                  WHERE
+                  f_key IN ( SELECT f_key FROM ${tableName} WHERE f_key IN ( 
SELECT f_key FROM ${tableName} WHERE f_value > 123 ) )
+                  ) c ON a.f_key = c.f_key
+                  ORDER BY
+                  a.f_key; 
+             """
+
+    sql "set runtime_filter_type=1,runtime_filter_max_in_num=0;"
+
     qt_select """ SELECT
                           *
                           FROM


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to