This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 08ec72cab7a [Feature](join) support LEFT_SEMI_DIRECT_RETURN_OPT#59832
(#60577)
08ec72cab7a is described below
commit 08ec72cab7a82f12b5eb086c59c6e51fb75ada6f
Author: Pxl <[email protected]>
AuthorDate: Sat Feb 7 18:39:24 2026 +0800
[Feature](join) support LEFT_SEMI_DIRECT_RETURN_OPT#59832 (#60577)
#59832
---
be/src/olap/column_predicate.h | 30 ++++------
be/src/pipeline/dependency.h | 6 ++
be/src/pipeline/exec/hashjoin_build_sink.cpp | 11 ++++
be/src/pipeline/exec/hashjoin_build_sink.h | 8 +++
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 13 +++-
.../pipeline/exec/join/process_hash_table_probe.h | 5 ++
.../exec/join/process_hash_table_probe_impl.h | 20 +++++++
be/src/pipeline/exec/scan_operator.cpp | 3 +-
be/src/runtime_filter/runtime_filter_consumer.cpp | 7 +++
be/src/runtime_filter/runtime_filter_consumer.h | 4 +-
be/src/runtime_filter/runtime_filter_producer.h | 11 ++++
.../runtime_filter_producer_helper.cpp | 12 ++++
.../runtime_filter_producer_helper.h | 2 +
be/src/runtime_filter/runtime_filter_selectivity.h | 36 ++++++-----
be/src/runtime_filter/runtime_filter_wrapper.h | 18 ++++++
be/src/vec/exprs/vexpr_context.h | 7 ++-
.../runtime_filter_selectivity_test.cpp | 69 ++++++++++++----------
.../java/org/apache/doris/qe/SessionVariable.java | 6 ++
gensrc/thrift/PaloInternalService.thrift | 1 +
.../data/query_p0/join/test_left_join1.out | 6 ++
.../suites/query_p0/join/test_left_join1.groovy | 19 ++++++
21 files changed, 223 insertions(+), 71 deletions(-)
diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index c4a5736beb0..f1ffb18bf95 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -329,8 +329,10 @@ public:
void attach_profile_counter(
int filter_id, std::shared_ptr<RuntimeProfile::Counter>
predicate_filtered_rows_counter,
std::shared_ptr<RuntimeProfile::Counter>
predicate_input_rows_counter,
- std::shared_ptr<RuntimeProfile::Counter>
predicate_always_true_rows_counter) {
+ std::shared_ptr<RuntimeProfile::Counter>
predicate_always_true_rows_counter,
+ const RuntimeFilterSelectivity& rf_selectivity) {
_runtime_filter_id = filter_id;
+ _rf_selectivity = rf_selectivity;
DCHECK(predicate_filtered_rows_counter != nullptr);
DCHECK(predicate_input_rows_counter != nullptr);
@@ -389,7 +391,7 @@ public:
}
}
- bool always_true() const { return _always_true; }
+ bool always_true() const { return
_rf_selectivity.maybe_always_true_can_ignore(); }
// Return whether the ColumnPredicate was created by a runtime filter.
// If true, it was definitely created by a runtime filter.
// If false, it may still have been created by a runtime filter,
@@ -405,26 +407,17 @@ protected:
throw Exception(INTERNAL_ERROR, "Not Implemented _evaluate_inner");
}
- void reset_judge_selectivity() const {
- _always_true = false;
- _judge_counter = config::runtime_filter_sampling_frequency;
- _judge_input_rows = 0;
- _judge_filter_rows = 0;
- }
+ void reset_judge_selectivity() const {
_rf_selectivity.reset_judge_selectivity(); }
void try_reset_judge_selectivity() const {
- if (_can_ignore() && ((_judge_counter--) == 0)) {
- reset_judge_selectivity();
+ if (_can_ignore()) {
+ _rf_selectivity.update_judge_counter();
}
}
void do_judge_selectivity(uint64_t filter_rows, uint64_t input_rows) const
{
- if (!_always_true) {
- _judge_filter_rows += filter_rows;
- _judge_input_rows += input_rows;
-
RuntimeFilterSelectivity::judge_selectivity(get_ignore_threshold(),
_judge_filter_rows,
- _judge_input_rows,
_always_true);
- }
+ _rf_selectivity.update_judge_selectivity(_runtime_filter_id,
filter_rows, input_rows,
+ get_ignore_threshold());
}
uint32_t _column_id;
@@ -441,10 +434,7 @@ protected:
// is evaluated as true, the logic for always_true is applied for the rest
of that period
// without recalculating. At the beginning of the next period,
// reset_judge_selectivity is used to reset these variables.
- mutable int _judge_counter = 0;
- mutable uint64_t _judge_input_rows = 0;
- mutable uint64_t _judge_filter_rows = 0;
- mutable bool _always_true = false;
+ mutable RuntimeFilterSelectivity _rf_selectivity;
std::shared_ptr<RuntimeProfile::Counter> _predicate_filtered_rows_counter =
std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 036ca3a90de..993149f0d36 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -625,6 +625,12 @@ struct HashJoinSharedState : public JoinSharedState {
// memory in `_hash_table_variants`. So before execution, we should use a
local _hash_table_variants
// which has a shared hash table in it.
std::vector<std::shared_ptr<JoinDataVariants>> hash_table_variant_vector;
+
+ // whether left semi join could directly return
+ // if runtime filters contains local in filter, we can make sure all input
rows are matched
+ // local filter will always be applied, and in filter could guarantee
precise filtering
+ // ATTN: we should disable always_true logic for in filter when we set
this flag
+ bool left_semi_direct_return = false;
};
struct PartitionedHashJoinSharedState
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 778409087ee..91ba6f6781a 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -239,6 +239,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
RETURN_IF_ERROR(_runtime_filter_producer_helper->build(
state, _shared_state->build_block.get(),
p._use_shared_hash_table,
p._runtime_filters));
+ // only single join conjunct and left semi join can direct return
+ if (p.allow_left_semi_direct_return(state)) {
+ auto wrapper =
_runtime_filter_producer_helper->detect_local_in_filter(state);
+ if (wrapper) {
+ _shared_state->left_semi_direct_return = true;
+ wrapper->set_disable_always_true_logic();
+ custom_profile()->add_info_string(
+ "LeftSemiDirectReturn",
+
std::to_string(_shared_state->left_semi_direct_return));
+ }
+ }
RETURN_IF_ERROR(_runtime_filter_producer_helper->publish(state));
}
} catch (Exception& e) {
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index f1c94f24c38..235333fd505 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -148,6 +148,14 @@ public:
}
std::vector<bool>& is_null_safe_eq_join() { return _is_null_safe_eq_join; }
+ bool allow_left_semi_direct_return(RuntimeState* state) const {
+ // only single join conjunct and left semi join can direct return
+ return _join_op == TJoinOp::LEFT_SEMI_JOIN && _build_expr_ctxs.size()
== 1 &&
+ !_have_other_join_conjunct && !_is_mark_join &&
+
state->query_options().__isset.enable_left_semi_direct_return_opt &&
+ state->query_options().enable_left_semi_direct_return_opt;
+ }
+
private:
friend class HashJoinBuildSinkLocalState;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 3bde47cf143..cb2c0df540c 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -232,6 +232,12 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
if constexpr (!std::is_same_v<HashTableProbeType,
std::monostate>) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
+ if
(local_state._shared_state->left_semi_direct_return) {
+ process_hashtable_ctx.process_direct_return(
+ arg, mutable_join_block, &temp_block,
+
cast_set<uint32_t>(local_state._probe_block.rows()));
+ return;
+ }
st = process_hashtable_ctx.process(
arg,
local_state._null_map_column
@@ -289,8 +295,11 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
}
local_state._estimate_memory_usage += temp_block.allocated_bytes();
- RETURN_IF_ERROR(
- local_state.filter_data_and_build_output(state, output_block, eos,
&temp_block));
+ // if left_semi_direct_return is true,
+ // here does not increase the output rows count(just same as
`_probe_block`'s rows count).
+ RETURN_IF_ERROR(local_state.filter_data_and_build_output(
+ state, output_block, eos, &temp_block,
+ !local_state._shared_state->left_semi_direct_return));
// Here make _join_block release the columns' ptr
local_state._join_block.set_columns(local_state._join_block.clone_empty_columns());
mutable_join_block.clear();
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe.h
b/be/src/pipeline/exec/join/process_hash_table_probe.h
index 100a11fd2c7..52dcc6415fa 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe.h
@@ -59,6 +59,11 @@ struct ProcessHashTableProbe {
vectorized::MutableBlock& mutable_block, vectorized::Block*
output_block,
uint32_t probe_rows, bool is_mark_join);
+ template <typename HashTableType>
+ void process_direct_return(HashTableType& hash_table_ctx,
+ vectorized::MutableBlock& mutable_block,
+ vectorized::Block* output_block, uint32_t
probe_rows);
+
// In the presence of other join conjunct, the process of join become more
complicated.
// each matching join column need to be processed by other join conjunct.
so the struct of mutable block
// and output block may be different
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 60261223cdf..85969391725 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -206,6 +206,22 @@ typename HashTableType::State
ProcessHashTableProbe<JoinOpType>::_init_probe_sid
return typename HashTableType::State(_parent->_probe_columns);
}
+template <int JoinOpType>
+template <typename HashTableType>
+void ProcessHashTableProbe<JoinOpType>::process_direct_return(
+ HashTableType& hash_table_ctx, vectorized::MutableBlock& mutable_block,
+ vectorized::Block* output_block, uint32_t probe_rows) {
+ _probe_indexs.resize(probe_rows);
+ auto* probe_indexs_data = _probe_indexs.get_data().data();
+ for (uint32_t i = 0; i < probe_rows; i++) {
+ probe_indexs_data[i] = i;
+ }
+ auto& mcol = mutable_block.mutable_columns();
+ probe_side_output_column(mcol);
+ output_block->swap(mutable_block.to_block());
+ _parent->_probe_index = probe_rows;
+}
+
template <int JoinOpType>
template <typename HashTableType>
Status ProcessHashTableProbe<JoinOpType>::process(HashTableType&
hash_table_ctx,
@@ -792,6 +808,10 @@ struct ExtractType<T(U)> {
ExtractType<void(T)>::Type & hash_table_ctx, const uint8_t*
null_map, \
vectorized::MutableBlock& mutable_block, vectorized::Block*
output_block, \
uint32_t probe_rows, bool is_mark_join);
\
+ template void
\
+
ProcessHashTableProbe<JoinOpType>::process_direct_return<ExtractType<void(T)>::Type>(
\
+ ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::MutableBlock & mutable_block, \
+ vectorized::Block * output_block, uint32_t probe_rows);
\
template Status
ProcessHashTableProbe<JoinOpType>::finish_probing<ExtractType<void(T)>::Type>( \
ExtractType<void(T)>::Type & hash_table_ctx,
vectorized::MutableBlock & mutable_block, \
vectorized::Block * output_block, bool* eos, bool is_mark_join);
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 6a2d36819d7..1c8f2327814 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -368,7 +368,8 @@ Status
ScanLocalState<Derived>::_normalize_predicate(vectorized::VExprContext* c
rf_expr->filter_id(),
rf_expr->predicate_filtered_rows_counter(),
rf_expr->predicate_input_rows_counter(),
-
rf_expr->predicate_always_true_rows_counter());
+
rf_expr->predicate_always_true_rows_counter(),
+
context->get_runtime_filter_selectivity());
}
};
switch (expr->node_type()) {
diff --git a/be/src/runtime_filter/runtime_filter_consumer.cpp
b/be/src/runtime_filter/runtime_filter_consumer.cpp
index a99f7bfd874..2bf9998c536 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.cpp
+++ b/be/src/runtime_filter/runtime_filter_consumer.cpp
@@ -83,6 +83,13 @@ Status
RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
auto real_filter_type = _wrapper->get_real_type();
bool null_aware = _wrapper->contain_null();
+
+ // Set sampling frequency based on disable_always_true_logic status
+ int sampling_frequency = _wrapper->disable_always_true_logic()
+ ?
RuntimeFilterSelectivity::DISABLE_SAMPLING
+ :
config::runtime_filter_sampling_frequency;
+
probe_ctx->get_runtime_filter_selectivity().set_sampling_frequency(sampling_frequency);
+
switch (real_filter_type) {
case RuntimeFilterType::IN_FILTER: {
TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
diff --git a/be/src/runtime_filter/runtime_filter_consumer.h
b/be/src/runtime_filter/runtime_filter_consumer.h
index e0e42e509d4..0e832b412ee 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.h
+++ b/be/src/runtime_filter/runtime_filter_consumer.h
@@ -93,8 +93,10 @@ private:
_registration_time(MonotonicMillis()),
_rf_state(State::NOT_READY) {
// If bitmap filter is not applied, it will cause the query result to
be incorrect
+ // local rf must wait until timeout, otherwise it may lead results
incorrectness, because LEFT_SEMI_DIRECT_RETURN_OPT
bool wait_infinitely = query_ctx->runtime_filter_wait_infinitely() ||
- _runtime_filter_type ==
RuntimeFilterType::BITMAP_FILTER;
+ _runtime_filter_type ==
RuntimeFilterType::BITMAP_FILTER ||
+ !has_remote_target();
_rf_wait_time_ms = wait_infinitely ? query_ctx->execution_timeout() *
1000
:
query_ctx->runtime_filter_wait_time_ms();
DorisMetrics::instance()->runtime_filter_consumer_num->increment(1);
diff --git a/be/src/runtime_filter/runtime_filter_producer.h
b/be/src/runtime_filter/runtime_filter_producer.h
index 0edf85cd1d9..e686a97ee3a 100644
--- a/be/src/runtime_filter/runtime_filter_producer.h
+++ b/be/src/runtime_filter/runtime_filter_producer.h
@@ -131,6 +131,17 @@ public:
_wrapper = wrapper;
}
+ std::shared_ptr<RuntimeFilterWrapper> detect_in_filter() {
+ if (_has_remote_target) {
+ return nullptr;
+ }
+ std::unique_lock<std::recursive_mutex> l(_rmtx);
+ if (_wrapper->is_ready_in_filter()) {
+ return _wrapper;
+ }
+ return nullptr;
+ }
+
private:
RuntimeFilterProducer(const QueryContext* query_ctx, const
TRuntimeFilterDesc* desc)
: RuntimeFilter(desc), _is_broadcast_join(desc->is_broadcast_join)
{}
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.cpp
b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
index 55e300cdd87..9611d38f7ac 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.cpp
@@ -164,4 +164,16 @@ void RuntimeFilterProducerHelper::collect_realtime_profile(
build_timer->set(_runtime_filter_compute_timer->value());
}
+std::shared_ptr<RuntimeFilterWrapper>
RuntimeFilterProducerHelper::detect_local_in_filter(
+ RuntimeState* state) {
+ // If any runtime filter is local in filter, return true.
+ // Local in filter is used to LEFT_SEMI_DIRECT_RETURN_OPT
+ for (const auto& filter : _producers) {
+ if (auto wrapper = filter->detect_in_filter(); wrapper != nullptr) {
+ return wrapper;
+ }
+ }
+ return nullptr;
+}
+
} // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_producer_helper.h
b/be/src/runtime_filter/runtime_filter_producer_helper.h
index 4e6d14089ee..bc0431ebe59 100644
--- a/be/src/runtime_filter/runtime_filter_producer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_producer_helper.h
@@ -67,6 +67,8 @@ public:
void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
+ std::shared_ptr<RuntimeFilterWrapper> detect_local_in_filter(RuntimeState*
state);
+
protected:
virtual void _init_expr(const vectorized::VExprContextSPtrs&
build_expr_ctxs,
const std::vector<TRuntimeFilterDesc>&
runtime_filter_descs);
diff --git a/be/src/runtime_filter/runtime_filter_selectivity.h
b/be/src/runtime_filter/runtime_filter_selectivity.h
index 1b0a82143de..694895edad9 100644
--- a/be/src/runtime_filter/runtime_filter_selectivity.h
+++ b/be/src/runtime_filter/runtime_filter_selectivity.h
@@ -19,7 +19,6 @@
#include <cstdint>
-#include "common/config.h"
#include "common/logging.h"
namespace doris {
@@ -34,9 +33,13 @@ class RuntimeFilterSelectivity {
public:
RuntimeFilterSelectivity() = default;
- RuntimeFilterSelectivity(const RuntimeFilterSelectivity&) = delete;
+ // If sampling_frequency is less than or equal to 0, the selectivity
tracking will be disabled
+ static constexpr int DISABLE_SAMPLING = -1;
+
+ void set_sampling_frequency(int frequency) { _sampling_frequency =
frequency; }
+
void update_judge_counter() {
- if ((_judge_counter++) >= config::runtime_filter_sampling_frequency) {
+ if ((_judge_counter++) >= _sampling_frequency) {
reset_judge_selectivity();
}
}
@@ -46,8 +49,8 @@ public:
if (!_always_true) {
_judge_filter_rows += filter_rows;
_judge_input_rows += input_rows;
- judge_selectivity(ignore_thredhold, _judge_filter_rows,
_judge_input_rows,
- _always_true);
+ _judge_selectivity(ignore_thredhold, _judge_filter_rows,
_judge_input_rows,
+ _always_true);
}
VLOG_ROW << fmt::format(
@@ -61,23 +64,13 @@ public:
bool maybe_always_true_can_ignore() const {
/// TODO: maybe we can use session variable to control this behavior ?
- if (config::runtime_filter_sampling_frequency <= 0) {
+ if (_sampling_frequency <= 0) {
return false;
} else {
return _always_true;
}
}
- static void judge_selectivity(double ignore_threshold, int64_t
filter_rows, int64_t input_rows,
- bool& always_true) {
- // if the judged input rows is too small, we think the selectivity is
not reliable
- if (input_rows > min_judge_input_rows) {
- always_true = (static_cast<double>(filter_rows) /
static_cast<double>(input_rows)) <
- ignore_threshold;
- }
- }
-
-private:
void reset_judge_selectivity() {
_always_true = false;
_judge_counter = 0;
@@ -85,10 +78,21 @@ private:
_judge_filter_rows = 0;
}
+private:
+ void _judge_selectivity(double ignore_threshold, int64_t filter_rows,
int64_t input_rows,
+ bool& always_true) {
+ // if the judged input rows is too small, we think the selectivity is
not reliable
+ if (input_rows > min_judge_input_rows) {
+ always_true = (static_cast<double>(filter_rows) /
static_cast<double>(input_rows)) <
+ ignore_threshold;
+ }
+ }
+
int64_t _judge_input_rows = 0;
int64_t _judge_filter_rows = 0;
int _judge_counter = 0;
bool _always_true = false;
+ int _sampling_frequency = -1;
constexpr static int64_t min_judge_input_rows = 4096 * 10;
};
diff --git a/be/src/runtime_filter/runtime_filter_wrapper.h
b/be/src/runtime_filter/runtime_filter_wrapper.h
index 1a7456a7190..9338c7b349d 100644
--- a/be/src/runtime_filter/runtime_filter_wrapper.h
+++ b/be/src/runtime_filter/runtime_filter_wrapper.h
@@ -86,6 +86,8 @@ public:
bool contain_null() const;
+ bool disable_always_true_logic() const { return
_disable_always_true_logic; }
+
std::string debug_string() const;
// set_state may called in SyncSizeClosure's rpc thread
@@ -120,6 +122,18 @@ public:
}
}
+ bool is_ready_in_filter() {
+ if (get_real_type() != RuntimeFilterType::IN_FILTER) {
+ return false;
+ }
+ if (_state != State::READY) {
+ return false;
+ }
+ return true;
+ }
+
+ void set_disable_always_true_logic() { _disable_always_true_logic = true; }
+
private:
// used by shuffle runtime filter
// assign this filter by protobuf
@@ -139,6 +153,10 @@ private:
std::shared_ptr<BloomFilterFuncBase> _bloom_filter_func;
std::shared_ptr<BitmapFilterFuncBase> _bitmap_filter_func;
+ // disable always_true logic if detected in filter
+ // to make left_semi_direct_return_opt work correctly
+ bool _disable_always_true_logic = false;
+
// Wrapper is the core structure of runtime filter. If filter is local,
wrapper may be shared
// by producer and consumer. To avoid read-write conflict, we need a
rwlock to ensure operations
// on state is thread-safe.
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index de18da09a4d..a7824d80b43 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -274,7 +274,12 @@ public:
return _last_result_column_id;
}
- RuntimeFilterSelectivity& get_runtime_filter_selectivity() { return
*_rf_selectivity; }
+ RuntimeFilterSelectivity& get_runtime_filter_selectivity() {
+ if (!_rf_selectivity) {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
"RuntimeFilterSelectivity is null");
+ }
+ return *_rf_selectivity;
+ }
FunctionContext::FunctionStateScope get_function_state_scope() const {
return _is_clone ? FunctionContext::THREAD_LOCAL :
FunctionContext::FRAGMENT_LOCAL;
diff --git a/be/test/runtime_filter/runtime_filter_selectivity_test.cpp
b/be/test/runtime_filter/runtime_filter_selectivity_test.cpp
index b8504f950c2..bba98f3ecf2 100644
--- a/be/test/runtime_filter/runtime_filter_selectivity_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_selectivity_test.cpp
@@ -20,6 +20,8 @@
#include <glog/logging.h>
#include <gtest/gtest.h>
+#include "common/config.h"
+
namespace doris {
class RuntimeFilterSelectivityTest : public testing::Test {
@@ -39,13 +41,14 @@ protected:
TEST_F(RuntimeFilterSelectivityTest, basic_initialization) {
RuntimeFilterSelectivity selectivity;
+
selectivity.set_sampling_frequency(config::runtime_filter_sampling_frequency);
// Initially should be false (not always_true)
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
}
TEST_F(RuntimeFilterSelectivityTest, disabled_sampling_frequency) {
RuntimeFilterSelectivity selectivity;
- config::runtime_filter_sampling_frequency = 0;
+ selectivity.set_sampling_frequency(0);
// Even if conditions are met, should return false when sampling is
disabled
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
@@ -54,37 +57,40 @@ TEST_F(RuntimeFilterSelectivityTest,
disabled_sampling_frequency) {
TEST_F(RuntimeFilterSelectivityTest, negative_sampling_frequency) {
RuntimeFilterSelectivity selectivity;
- config::runtime_filter_sampling_frequency = -1;
+ selectivity.set_sampling_frequency(-1);
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
}
TEST_F(RuntimeFilterSelectivityTest, judge_selectivity_below_threshold) {
- bool always_true = false;
+ RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(100);
// filter_rows/input_rows = 5/50000 = 0.0001 < 0.1
// input_rows (50000) > min_judge_input_rows (40960)
- RuntimeFilterSelectivity::judge_selectivity(0.1, 5, 50000, always_true);
- EXPECT_TRUE(always_true);
+ selectivity.update_judge_selectivity(-1, 5, 50000, 0.1);
+ EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
}
TEST_F(RuntimeFilterSelectivityTest, judge_selectivity_above_threshold) {
- bool always_true = false;
+ RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(100);
// filter_rows/input_rows = 25000/50000 = 0.5 >= 0.1
- RuntimeFilterSelectivity::judge_selectivity(0.1, 25000, 50000,
always_true);
- EXPECT_FALSE(always_true);
+ selectivity.update_judge_selectivity(-1, 25000, 50000, 0.1);
+ EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
}
TEST_F(RuntimeFilterSelectivityTest,
judge_selectivity_insufficient_input_rows) {
- bool always_true = false;
+ RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(100);
// Even though 5/100 = 0.05 < 0.1, input_rows (100) < min_judge_input_rows
(40960)
- RuntimeFilterSelectivity::judge_selectivity(0.1, 5, 100, always_true);
- EXPECT_FALSE(always_true);
+ selectivity.update_judge_selectivity(-1, 5, 100, 0.1);
+ EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
}
TEST_F(RuntimeFilterSelectivityTest, update_with_low_selectivity) {
- config::runtime_filter_sampling_frequency = 100;
RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(100);
// filter_rows/input_rows = 2000/50000 = 0.04 < 0.1
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
@@ -92,8 +98,8 @@ TEST_F(RuntimeFilterSelectivityTest,
update_with_low_selectivity) {
}
TEST_F(RuntimeFilterSelectivityTest, update_with_high_selectivity) {
- config::runtime_filter_sampling_frequency = 100;
RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(100);
// filter_rows/input_rows = 45000/50000 = 0.9 >= 0.1
selectivity.update_judge_selectivity(-1, 45000, 50000, 0.1);
@@ -101,9 +107,8 @@ TEST_F(RuntimeFilterSelectivityTest,
update_with_high_selectivity) {
}
TEST_F(RuntimeFilterSelectivityTest, once_always_true_stays_true) {
- config::runtime_filter_sampling_frequency = 100;
RuntimeFilterSelectivity selectivity;
-
+ selectivity.set_sampling_frequency(100);
// First update: low selectivity
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
@@ -114,8 +119,8 @@ TEST_F(RuntimeFilterSelectivityTest,
once_always_true_stays_true) {
}
TEST_F(RuntimeFilterSelectivityTest, accumulated_selectivity_low) {
- config::runtime_filter_sampling_frequency = 100;
RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(100);
// First update: 1000/50000 = 0.02
selectivity.update_judge_selectivity(-1, 1000, 50000, 0.1);
@@ -123,8 +128,8 @@ TEST_F(RuntimeFilterSelectivityTest,
accumulated_selectivity_low) {
}
TEST_F(RuntimeFilterSelectivityTest, accumulated_selectivity_high) {
- config::runtime_filter_sampling_frequency = 100;
RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(100);
// First update: 20000/50000 = 0.4
selectivity.update_judge_selectivity(-1, 20000, 50000, 0.1);
@@ -136,8 +141,8 @@ TEST_F(RuntimeFilterSelectivityTest,
accumulated_selectivity_high) {
}
TEST_F(RuntimeFilterSelectivityTest, counter_triggers_reset) {
- config::runtime_filter_sampling_frequency = 3;
RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(3);
// Mark as always_true
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
@@ -152,8 +157,8 @@ TEST_F(RuntimeFilterSelectivityTest,
counter_triggers_reset) {
}
TEST_F(RuntimeFilterSelectivityTest, reset_allows_reevaluation) {
- config::runtime_filter_sampling_frequency = 2;
RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(2);
// First cycle: mark as always_true
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.1);
@@ -169,25 +174,29 @@ TEST_F(RuntimeFilterSelectivityTest,
reset_allows_reevaluation) {
}
TEST_F(RuntimeFilterSelectivityTest, edge_case_zero_rows) {
- bool always_true = false;
- RuntimeFilterSelectivity::judge_selectivity(0.1, 0, 0, always_true);
- EXPECT_FALSE(always_true);
+ RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(100);
+ selectivity.update_judge_selectivity(-1, 0, 0, 0.1);
+ EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
}
TEST_F(RuntimeFilterSelectivityTest, edge_case_exact_threshold) {
- bool always_true = false;
+ RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(100);
// Exactly at threshold: 5000/50000 = 0.1, NOT less than 0.1
- RuntimeFilterSelectivity::judge_selectivity(0.1, 5000, 50000, always_true);
- EXPECT_FALSE(always_true);
+ selectivity.update_judge_selectivity(-1, 5000, 50000, 0.1);
+ EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
// Just below threshold: 4999/50000 = 0.09998 < 0.1
- RuntimeFilterSelectivity::judge_selectivity(0.1, 4999, 50000, always_true);
- EXPECT_TRUE(always_true);
+ RuntimeFilterSelectivity selectivity2;
+ selectivity2.set_sampling_frequency(100);
+ selectivity2.update_judge_selectivity(-1, 4999, 50000, 0.1);
+ EXPECT_TRUE(selectivity2.maybe_always_true_can_ignore());
}
TEST_F(RuntimeFilterSelectivityTest, multiple_updates_before_threshold) {
- config::runtime_filter_sampling_frequency = 100;
RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(100);
// Multiple updates with insufficient rows each time
selectivity.update_judge_selectivity(-1, 100, 1000, 0.1); // 100/1000,
insufficient
@@ -202,11 +211,10 @@ TEST_F(RuntimeFilterSelectivityTest,
multiple_updates_before_threshold) {
}
TEST_F(RuntimeFilterSelectivityTest, different_thresholds) {
- config::runtime_filter_sampling_frequency = 100;
-
// Test with threshold 0.05
{
RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(100);
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.05); // 0.04 <
0.05
EXPECT_TRUE(selectivity.maybe_always_true_can_ignore());
}
@@ -214,6 +222,7 @@ TEST_F(RuntimeFilterSelectivityTest, different_thresholds) {
// Test with threshold 0.03
{
RuntimeFilterSelectivity selectivity;
+ selectivity.set_sampling_frequency(100);
selectivity.update_judge_selectivity(-1, 2000, 50000, 0.03); // 0.04
>= 0.03
EXPECT_FALSE(selectivity.maybe_always_true_can_ignore());
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 46d67bed8fb..abbde33d46e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -301,6 +301,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_SHARED_SCAN = "enable_shared_scan";
+ public static final String ENABLE_LEFT_SEMI_DIRECT_RETURN_OPT =
"enable_left_semi_direct_return_opt";
+
public static final String IGNORE_STORAGE_DATA_DISTRIBUTION =
"ignore_storage_data_distribution";
public static final String USE_SERIAL_EXCHANGE = "use_serial_exchange";
@@ -1356,6 +1358,9 @@ public class SessionVariable implements Serializable,
Writable {
@VarAttr(name = QUERY_CACHE_ENTRY_MAX_ROWS)
private long queryCacheEntryMaxRows = 500000;
+ @VariableMgr.VarAttr(name = ENABLE_LEFT_SEMI_DIRECT_RETURN_OPT)
+ public boolean enableLeftSemiDirectReturnOpt = true;
+
@VariableMgr.VarAttr(name = FORWARD_TO_MASTER)
public boolean forwardToMaster = true;
@@ -4876,6 +4881,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setInvertedIndexCompatibleRead(invertedIndexCompatibleRead);
tResult.setEnableParallelScan(enableParallelScan);
+
tResult.setEnableLeftSemiDirectReturnOpt(enableLeftSemiDirectReturnOpt);
tResult.setParallelScanMaxScannersCount(parallelScanMaxScannersCount);
tResult.setParallelScanMinRowsPerScanner(parallelScanMinRowsPerScanner);
tResult.setOptimizeIndexScanParallelism(optimizeIndexScanParallelism);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index b15bfd8bd89..8c31acf6557 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -422,6 +422,7 @@ struct TQueryOptions {
179: optional bool enable_parquet_filter_by_bloom_filter = true;
+ 195: optional bool enable_left_semi_direct_return_opt;
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
diff --git a/regression-test/data/query_p0/join/test_left_join1.out
b/regression-test/data/query_p0/join/test_left_join1.out
index 7f19a1333eb..60ff1e296b5 100644
--- a/regression-test/data/query_p0/join/test_left_join1.out
+++ b/regression-test/data/query_p0/join/test_left_join1.out
@@ -5,3 +5,9 @@
3 125 3 125 3 125
4 126 4 126 4 126
+-- !select --
+1 123 1 123 \N \N
+2 124 2 124 2 124
+3 125 3 125 3 125
+4 126 4 126 4 126
+
diff --git a/regression-test/suites/query_p0/join/test_left_join1.groovy
b/regression-test/suites/query_p0/join/test_left_join1.groovy
index 104adab4a85..70180f621f8 100644
--- a/regression-test/suites/query_p0/join/test_left_join1.groovy
+++ b/regression-test/suites/query_p0/join/test_left_join1.groovy
@@ -23,6 +23,25 @@ suite("test_left_join1", "query,p0,arrow_flight_sql") {
sql """insert into ${tableName} values (1, 123),(2, 124),(3, 125),(4,
126);"""
+ qt_select """ SELECT
+ *
+ FROM
+ ( SELECT f_key, f_value FROM ${tableName} ) a
+ LEFT JOIN ( SELECT f_key, f_value FROM ${tableName} ) b ON
a.f_key = b.f_key
+ LEFT JOIN (
+ SELECT
+ *
+ FROM
+ ${tableName}
+ WHERE
+ f_key IN ( SELECT f_key FROM ${tableName} WHERE f_key IN (
SELECT f_key FROM ${tableName} WHERE f_value > 123 ) )
+ ) c ON a.f_key = c.f_key
+ ORDER BY
+ a.f_key;
+ """
+
+ sql "set runtime_filter_type=1,runtime_filter_max_in_num=0;"
+
qt_select """ SELECT
*
FROM
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]