This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cdb1b341c7 [pipelineX](runtime filter) Support runtime filter (#24054)
cdb1b341c7 is described below
commit cdb1b341c796aa8c07df73e0a04c33594d087c6d
Author: Gabriel <[email protected]>
AuthorDate: Fri Sep 8 10:17:22 2023 +0800
[pipelineX](runtime filter) Support runtime filter (#24054)
---
be/src/pipeline/exec/scan_operator.cpp | 37 +++++++++++++++++-----
be/src/pipeline/exec/scan_operator.h | 7 ++--
be/src/pipeline/pipeline_fragment_context.cpp | 2 +-
be/src/pipeline/pipeline_fragment_context.h | 7 +++-
be/src/pipeline/pipeline_x/operator.h | 9 ++++++
.../pipeline_x/pipeline_x_fragment_context.cpp | 9 +++++-
.../pipeline_x/pipeline_x_fragment_context.h | 21 ++++++++++++
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 32 +++++++++++--------
be/src/pipeline/pipeline_x/pipeline_x_task.h | 4 ++-
be/src/runtime/fragment_mgr.cpp | 19 +++++------
be/src/runtime/runtime_filter_mgr.cpp | 29 +++++++++--------
.../suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql | 2 +-
.../suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql | 2 +-
.../tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql | 2 +-
46 files changed, 161 insertions(+), 85 deletions(-)
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 520400e77c..9fc033da19 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -113,6 +113,7 @@ bool ScanLocalState<Derived>::should_run_serial() const {
template <typename Derived>
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo&
info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+ RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
auto& p = _parent->cast<typename Derived::Parent>();
set_scan_ranges(info.scan_ranges);
_common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
@@ -140,7 +141,14 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
_open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
_alloc_resource_timer = ADD_TIMER(_runtime_profile,
"AllocateResourceTime");
+ return Status::OK();
+}
+template <typename Derived>
+Status ScanLocalState<Derived>::open(RuntimeState* state) {
+ if (_opened) {
+ return Status::OK();
+ }
RETURN_IF_ERROR(_acquire_runtime_filter());
RETURN_IF_ERROR(_process_conjuncts());
@@ -150,6 +158,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
RETURN_IF_ERROR(_scanner_ctx->init());
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
}
+ _opened = true;
return status;
}
@@ -1239,17 +1248,21 @@
ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* pool, const TPlanNode&
template <typename LocalStateType>
bool ScanOperatorX<LocalStateType>::can_read(RuntimeState* state) {
auto& local_state = state->get_local_state(id())->template
cast<LocalStateType>();
- if (local_state._eos || local_state._scanner_ctx->done()) {
- // _eos: need eos
- // _scanner_ctx->done(): need finish
- // _scanner_ctx->no_schedule(): should schedule _scanner_ctx
+ if (!local_state._opened) {
return true;
} else {
- if (local_state._scanner_ctx->get_num_running_scanners() == 0 &&
- local_state._scanner_ctx->has_enough_space_in_blocks_queue()) {
- local_state._scanner_ctx->reschedule_scanner_ctx();
+ if (local_state._eos || local_state._scanner_ctx->done()) {
+ // _eos: need eos
+ // _scanner_ctx->done(): need finish
+ // _scanner_ctx->no_schedule(): should schedule _scanner_ctx
+ return true;
+ } else {
+ if (local_state._scanner_ctx->get_num_running_scanners() == 0 &&
+ local_state._scanner_ctx->has_enough_space_in_blocks_queue()) {
+ local_state._scanner_ctx->reschedule_scanner_ctx();
+ }
+ return local_state.ready_to_read(); // there are some blocks to
process
}
- return local_state.ready_to_read(); // there are some blocks to process
}
}
@@ -1321,6 +1334,14 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
return PipelineXLocalState<>::close(state);
}
+template <typename LocalStateType>
+bool ScanOperatorX<LocalStateType>::runtime_filters_are_ready_or_timeout(
+ RuntimeState* state) const {
+ return state->get_local_state(id())
+ ->template cast<LocalStateType>()
+ .runtime_filters_are_ready_or_timeout();
+}
+
template <typename LocalStateType>
Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 00a842c69c..a04a5ca60f 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -60,7 +60,7 @@ public:
ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<>(state, parent),
vectorized::RuntimeFilterConsumer(parent->id(),
parent->runtime_filter_descs(),
- parent->row_descriptor(),
parent->conjuncts()) {}
+ parent->row_descriptor(),
_conjuncts) {}
virtual ~ScanLocalStateBase() = default;
virtual bool ready_to_read() = 0;
@@ -128,6 +128,7 @@ class ScanLocalState : public ScanLocalStateBase {
virtual ~ScanLocalState() = default;
Status init(RuntimeState* state, LocalStateInfo& info) override;
+ Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
bool ready_to_read() override;
@@ -337,12 +338,14 @@ protected:
RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
doris::Mutex _block_lock;
+
+ std::atomic<bool> _opened = false;
};
template <typename LocalStateType>
class ScanOperatorX : public OperatorX<LocalStateType> {
public:
- // bool runtime_filters_are_ready_or_timeout() override;
+ bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const
override;
Status try_close(RuntimeState* state) override;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 97689ed001..9e6479be1b 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -211,7 +211,7 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
_runtime_state->set_tracer(std::move(tracer));
// TODO should be combine with plan_fragment_executor.prepare funciton
- SCOPED_ATTACH_TASK(get_runtime_state());
+ SCOPED_ATTACH_TASK(_runtime_state.get());
_runtime_state->runtime_filter_mgr()->init();
_runtime_state->set_be_number(local_params.backend_num);
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 4b35c206e5..8147eddd1c 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -71,7 +71,9 @@ public:
TUniqueId get_fragment_instance_id() { return _fragment_instance_id; }
- RuntimeState* get_runtime_state() { return _runtime_state.get(); }
+ virtual RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/)
{
+ return _runtime_state.get();
+ }
// should be protected by lock?
[[nodiscard]] bool is_canceled() const { return
_runtime_state->is_cancelled(); }
@@ -112,6 +114,9 @@ public:
_merge_controller_handler = handler;
}
+ virtual void add_merge_controller_handler(
+ std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {}
+
void send_report(bool);
virtual void report_profile();
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 6658df039b..d5247542d4 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -63,6 +63,7 @@ public:
}
virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0;
+ virtual Status open(RuntimeState* state) { return Status::OK(); }
virtual Status close(RuntimeState* state) = 0;
// If use projection, we should clear `_origin_block`.
@@ -175,6 +176,13 @@ public:
return Status::OK();
}
+ bool runtime_filters_are_ready_or_timeout() override {
+ LOG(FATAL) << "should not reach here!";
+ return true;
+ }
+
+ virtual bool runtime_filters_are_ready_or_timeout(RuntimeState* state)
const { return true; }
+
virtual Status close(RuntimeState* state) override;
virtual bool can_read(RuntimeState* state) { return true; }
@@ -302,6 +310,7 @@ public:
"PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
return Status::OK();
}
+
virtual Status close(RuntimeState* state) override {
if (_closed) {
return Status::OK();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 8a487d794f..f240e4731a 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -173,7 +173,7 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
_runtime_state->set_tracer(std::move(tracer));
- SCOPED_ATTACH_TASK(get_runtime_state());
+ SCOPED_ATTACH_TASK(_runtime_state.get());
if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
}
@@ -362,6 +362,13 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
}
}
}
+
+ {
+ std::lock_guard<std::mutex> l(_state_map_lock);
+ _instance_id_to_runtime_state.insert(
+ {UniqueId(_runtime_states[i]->fragment_instance_id()),
+ _runtime_states[i].get()});
+ }
}
_build_side_pipelines.clear();
_dag.clear();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index af32f5e705..3a72adac11 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -75,6 +75,11 @@ public:
}
}
+ void add_merge_controller_handler(
+ std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler)
override {
+ _merge_controller_handlers.emplace_back(handler);
+ }
+
// bool is_canceled() const { return _runtime_state->is_cancelled(); }
// Prepare global information including global states and the unique
operator tree shared by all pipeline tasks.
@@ -92,6 +97,15 @@ public:
void report_profile() override;
+ RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override {
+ std::lock_guard<std::mutex> l(_state_map_lock);
+ if (_instance_id_to_runtime_state.count(fragment_instance_id) > 0) {
+ return _instance_id_to_runtime_state[fragment_instance_id];
+ } else {
+ return _runtime_state.get();
+ }
+ }
+
private:
void _close_action() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams&
request) override;
@@ -121,6 +135,9 @@ private:
// Local runtime states for each pipeline task.
std::vector<std::unique_ptr<RuntimeState>> _runtime_states;
+ // It is used to manage the lifecycle of RuntimeFilterMergeController
+ std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>>
_merge_controller_handlers;
+
// TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
// of it in pipeline task not the fragment_context
DataSinkOperatorXPtr _sink;
@@ -135,6 +152,10 @@ private:
// ProbeSide first, and use `_pipelines_to_build` to store which pipeline
the build operator
// is in, so we can build BuildSide once we complete probe side.
std::map<int, PipelinePtr> _build_side_pipelines;
+
+ std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state;
+ std::mutex _state_map_lock;
};
+
} // namespace pipeline
} // namespace doris
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index e401b99e8c..07b4b85173 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -98,21 +98,27 @@ Status PipelineXTask::_open() {
SCOPED_TIMER(_task_profile->total_time_counter());
SCOPED_CPU_TIMER(_task_cpu_timer);
SCOPED_TIMER(_open_timer);
- Status st = Status::OK();
- for (auto& o : _operators) {
- Dependency* dep = _upstream_dependency.find(o->id()) ==
_upstream_dependency.end()
- ? (Dependency*)nullptr
- :
_upstream_dependency.find(o->id())->second.get();
- LocalStateInfo info {_scan_ranges, dep, _recvr};
- Status cur_st = o->setup_local_state(_state, info);
- if (!cur_st.ok()) {
- st = cur_st;
+ if (!_init_local_state) {
+ Status st = Status::OK();
+ for (auto& o : _operators) {
+ Dependency* dep = _upstream_dependency.find(o->id()) ==
_upstream_dependency.end()
+ ? (Dependency*)nullptr
+ :
_upstream_dependency.find(o->id())->second.get();
+ LocalStateInfo info {_scan_ranges, dep, _recvr};
+ Status cur_st = o->setup_local_state(_state, info);
+ if (!cur_st.ok()) {
+ st = cur_st;
+ }
}
+ LocalSinkStateInfo info {_sender_id, _downstream_dependency.get(),
_sender};
+ RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
+ _dry_run = _sink->should_dry_run(_state);
+ RETURN_IF_ERROR(st);
+ _init_local_state = true;
+ }
+ for (auto& o : _operators) {
+ RETURN_IF_ERROR(_state->get_local_state(o->id())->open(_state));
}
- LocalSinkStateInfo info {_sender_id, _downstream_dependency.get(),
_sender};
- RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
- _dry_run = _sink->should_dry_run(_state);
- RETURN_IF_ERROR(st);
_opened = true;
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index de865876be..3c6ac57118 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -79,7 +79,7 @@ public:
}
bool runtime_filters_are_ready_or_timeout() override {
- return _source->runtime_filters_are_ready_or_timeout();
+ return _source->runtime_filters_are_ready_or_timeout(_state);
}
bool sink_can_write() override { return _sink->can_write(_state); }
@@ -131,5 +131,7 @@ private:
std::shared_ptr<BufferControlBlock> _sender;
std::shared_ptr<vectorized::VDataStreamRecvr> _recvr;
bool _dry_run = false;
+ bool _init_local_state = false;
};
+
} // namespace doris::pipeline
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 8e347be172..96fe688297 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -711,12 +711,11 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
}
g_fragmentmgr_prepare_latency << (duration_ns / 1000);
- // std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
- // _runtimefilter_controller.add_entity(params, local_params,
&handler,
- //
context->get_runtime_state());
- // context->set_merge_controller_handler(handler);
-
for (size_t i = 0; i < params.local_params.size(); i++) {
+ std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
+ _runtimefilter_controller.add_entity(params,
params.local_params[i], &handler,
+
context->get_runtime_state(UniqueId()));
+ context->set_merge_controller_handler(handler);
const TUniqueId& fragment_instance_id =
params.local_params[i].fragment_instance_id;
{
std::lock_guard<std::mutex> lock(_lock);
@@ -792,7 +791,7 @@ Status FragmentMgr::exec_plan_fragment(const
TPipelineFragmentParams& params,
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
_runtimefilter_controller.add_entity(params, local_params,
&handler,
- context->get_runtime_state());
+
context->get_runtime_state(UniqueId()));
context->set_merge_controller_handler(handler);
{
@@ -1195,7 +1194,8 @@ Status FragmentMgr::apply_filter(const
PPublishFilterRequest* request,
pip_context = iter->second;
DCHECK(pip_context != nullptr);
- runtime_filter_mgr =
pip_context->get_runtime_state()->runtime_filter_mgr();
+ runtime_filter_mgr =
+
pip_context->get_runtime_state(fragment_instance_id)->runtime_filter_mgr();
} else {
std::unique_lock<std::mutex> lock(_lock);
auto iter = _fragment_map.find(tfragment_instance_id);
@@ -1237,8 +1237,9 @@ Status FragmentMgr::apply_filterv2(const
PPublishFilterRequestV2* request,
pip_context = iter->second;
DCHECK(pip_context != nullptr);
- runtime_filter_mgr =
-
pip_context->get_runtime_state()->get_query_ctx()->runtime_filter_mgr();
+ runtime_filter_mgr =
pip_context->get_runtime_state(fragment_instance_id)
+ ->get_query_ctx()
+ ->runtime_filter_mgr();
pool = &pip_context->get_query_context()->obj_pool;
} else {
std::unique_lock<std::mutex> lock(_lock);
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 54b83c5330..58cbc96e50 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -65,6 +65,7 @@ Status RuntimeFilterMgr::init() {
Status RuntimeFilterMgr::get_producer_filter(const int filter_id,
IRuntimeFilter** target) {
int32_t key = filter_id;
+ std::lock_guard<std::mutex> l(_lock);
auto iter = _producer_map.find(key);
if (iter == _producer_map.end()) {
LOG(WARNING) << "unknown runtime filter: " << key << ", role:
PRODUCER";
@@ -77,6 +78,7 @@ Status RuntimeFilterMgr::get_producer_filter(const int
filter_id, IRuntimeFilter
Status RuntimeFilterMgr::get_consume_filter(const int filter_id, const int
node_id,
IRuntimeFilter** consumer_filter) {
+ std::lock_guard<std::mutex> l(_lock);
auto iter = _consumer_map.find(filter_id);
if (iter == _consumer_map.cend()) {
LOG(WARNING) << "unknown runtime filter: " << filter_id << ", role:
consumer";
@@ -97,6 +99,7 @@ Status RuntimeFilterMgr::get_consume_filter(const int
filter_id, const int node_
Status RuntimeFilterMgr::get_consume_filters(const int filter_id,
std::vector<IRuntimeFilter*>&
consumer_filters) {
int32_t key = filter_id;
+ std::lock_guard<std::mutex> l(_lock);
auto iter = _consumer_map.find(key);
if (iter == _consumer_map.end()) {
LOG(WARNING) << "unknown runtime filter: " << key << ", role:
consumer";
@@ -114,29 +117,26 @@ Status RuntimeFilterMgr::register_consumer_filter(const
TRuntimeFilterDesc& desc
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
+ std::lock_guard<std::mutex> l(_lock);
auto iter = _consumer_map.find(key);
if (desc.__isset.opt_remote_rf && desc.opt_remote_rf &&
desc.has_remote_targets &&
desc.type == TRuntimeFilterType::BLOOM) {
// if this runtime filter has remote target (e.g. need merge), we
reuse the runtime filter between all instances
DCHECK(_query_ctx != nullptr);
- {
- std::lock_guard<std::mutex> l(_lock);
-
- iter = _consumer_map.find(key);
- if (iter != _consumer_map.end()) {
- for (auto holder : iter->second) {
- if (holder.node_id == node_id) {
- return Status::OK();
- }
+ iter = _consumer_map.find(key);
+ if (iter != _consumer_map.end()) {
+ for (auto holder : iter->second) {
+ if (holder.node_id == node_id) {
+ return Status::OK();
}
}
- IRuntimeFilter* filter;
- RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx,
&_query_ctx->obj_pool, &desc,
- &options,
RuntimeFilterRole::CONSUMER, node_id,
- &filter, build_bf_exactly));
- _consumer_map[key].emplace_back(node_id, filter);
}
+ IRuntimeFilter* filter;
+ RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx,
&_query_ctx->obj_pool, &desc, &options,
+ RuntimeFilterRole::CONSUMER,
node_id, &filter,
+ build_bf_exactly));
+ _consumer_map[key].emplace_back(node_id, filter);
} else {
DCHECK(_state != nullptr);
@@ -162,6 +162,7 @@ Status RuntimeFilterMgr::register_producer_filter(const
TRuntimeFilterDesc& desc
bool build_bf_exactly) {
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
+ std::lock_guard<std::mutex> l(_lock);
auto iter = _producer_map.find(key);
DCHECK(_state != nullptr);
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
index ec27845894..50b50bc368 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ SUM(lo_extendedprice*lo_discount) AS
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(lo_extendedprice*lo_discount) AS
REVENUE
FROM lineorder, date
WHERE lo_orderdate = d_datekey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
index d4ed16a4a3..77c0262016 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ SUM(lo_extendedprice*lo_discount) AS
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(lo_extendedprice*lo_discount) AS
REVENUE
FROM lineorder, date
WHERE lo_orderdate = d_datekey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
index df41035f99..0052db0aac 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ SUM(lo_extendedprice*lo_discount) AS
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(lo_extendedprice*lo_discount) AS
REVENUE
FROM lineorder, date
WHERE lo_orderdate = d_datekey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
index e7a8529ea7..a47ec82b51 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ SUM(lo_revenue), d_year, p_brand
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(lo_revenue), d_year, p_brand
FROM lineorder, date, part, supplier
WHERE lo_orderdate = d_datekey
AND lo_partkey = p_partkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
index 221d0db794..9ab1a95d4d 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ SUM(lo_revenue), d_year, p_brand
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(lo_revenue), d_year, p_brand
FROM lineorder, date, part, supplier
WHERE lo_orderdate = d_datekey
AND lo_partkey = p_partkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
index 3fee8a0ade..b7e6bd7840 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ SUM(lo_revenue), d_year, p_brand
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
SUM(lo_revenue), d_year, p_brand
FROM lineorder, date, part, supplier
WHERE lo_orderdate = d_datekey
AND lo_partkey = p_partkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
index 56b2cd68a4..85c470b708 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ c_nation, s_nation, d_year,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_nation,
s_nation, d_year,
SUM(lo_revenue) AS REVENUE
FROM customer, lineorder, supplier, date
WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
index 0d24c46376..cd0b320f87 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ c_city, s_city, d_year, sum(lo_revenue)
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city,
s_city, d_year, sum(lo_revenue)
AS REVENUE
FROM customer, lineorder, supplier, date
WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
index eb41f98453..89765c02d9 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ c_city, s_city, d_year, SUM(lo_revenue)
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city,
s_city, d_year, SUM(lo_revenue)
AS REVENUE
FROM customer, lineorder, supplier, date
WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
index 43758bee15..5cef87a3fe 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ c_city, s_city, d_year, SUM(lo_revenue)
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city,
s_city, d_year, SUM(lo_revenue)
AS REVENUE
FROM customer, lineorder, supplier, date
WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
index 66d98abfa0..3e0227c2ea 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ d_year, c_nation,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year,
c_nation,
SUM(lo_revenue - lo_supplycost) AS PROFIT
FROM date, customer, supplier, part, lineorder
WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
index 6b9e21d1f2..1338e780ae 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ d_year, s_nation, p_category,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year,
s_nation, p_category,
SUM(lo_revenue - lo_supplycost) AS PROFIT
FROM date, customer, supplier, part, lineorder
WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
index b70db54312..d8e6f7c42d 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
@@ -15,7 +15,7 @@
-- specific language governing permissions and limitations
-- under the License.
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ d_year, s_city, p_brand,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year,
s_city, p_brand,
SUM(lo_revenue - lo_supplycost) AS PROFIT
FROM date, customer, supplier, part, lineorder
WHERE lo_custkey = c_custkey
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
index fbb302e8c0..ded6754a97 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
@@ -1,5 +1,5 @@
-- tables: lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
l_returnflag,
l_linestatus,
sum(l_quantity) AS sum_qty,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
index 46cf6d7e13..f102f7504d 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
@@ -1,5 +1,5 @@
-- tables: part,supplier,partsupp,nation,region
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
s_acctbal,
s_name,
n_name,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
index 770d0cf07e..8bd60f0e07 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
@@ -1,5 +1,5 @@
-- tables: customer,orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
l_orderkey,
sum(l_extendedprice * (1 - l_discount)) AS revenue,
o_orderdate,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
index f0ef6bd650..3f44094729 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
@@ -1,5 +1,5 @@
-- tables: orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
o_orderpriority,
count(*) AS order_count
FROM orders
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
index 4e1c7f66c7..ed179f8b86 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
@@ -1,5 +1,5 @@
-- tables: customer,orders,lineitem,supplier,nation,region
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
n_name,
sum(l_extendedprice * (1 - l_discount)) AS revenue
FROM
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
index 30d6b66b1c..2dd86f8c2c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
@@ -1,6 +1,6 @@
-- tables: lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ sum(l_extendedprice * l_discount) AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
sum(l_extendedprice * l_discount) AS revenue
FROM
lineitem
WHERE
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
index 729c03716b..6453c1094a 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
@@ -1,5 +1,5 @@
-- tables: supplier,lineitem,orders,customer,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
supp_nation,
cust_nation,
l_year,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
index 044d938555..e4c46fb084 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
@@ -1,5 +1,5 @@
-- tables: part,supplier,lineitem,orders,customer,nation,region
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
o_year,
sum(CASE
WHEN nation = 'BRAZIL'
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
index ed99a0375b..cee9925fb5 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
@@ -1,5 +1,5 @@
-- tables: part,supplier,lineitem,partsupp,orders,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
nation,
o_year,
sum(amount) AS sum_profit
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
index 2164023726..c95a80fcee 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
@@ -1,5 +1,5 @@
-- tables: customer,orders,lineitem,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
c_custkey,
c_name,
sum(l_extendedprice * (1 - l_discount)) AS revenue,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
index b33bd481cb..b23701e940 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
@@ -1,5 +1,5 @@
-- tables: partsupp,supplier,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
ps_partkey,
sum(ps_supplycost * ps_availqty) AS value
FROM
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
index 909fecda95..e8893e71e4 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
@@ -1,5 +1,5 @@
-- tables: orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
l_shipmode,
sum(CASE
WHEN o_orderpriority = '1-URGENT'
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
index b43ecb6418..9db2da60ee 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
@@ -1,5 +1,5 @@
-- tables: customer
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
c_count,
count(*) AS custdist
FROM (
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
index 56be5be01b..70d7a57d07 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
@@ -1,5 +1,5 @@
-- tables: lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ 100.00 * sum(CASE
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 100.00 *
sum(CASE
WHEN p_type LIKE 'PROMO%'
THEN l_extendedprice * (1 - l_discount)
ELSE 0
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
index 6cd05dfab7..45f75ff985 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
@@ -1,4 +1,4 @@
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
s_suppkey,
s_name,
s_address,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
index 55da2c2056..37a438c796 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
@@ -1,5 +1,5 @@
-- tables: partsupp,part,supplier
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
p_brand,
p_type,
p_size,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
index dd52bef885..62f39a750c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
@@ -1,5 +1,5 @@
-- tables: lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ sum(l_extendedprice) / 7.0 AS avg_yearly
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
sum(l_extendedprice) / 7.0 AS avg_yearly
FROM
lineitem,
part
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
index 4aa46be458..2eb2505c01 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
@@ -1,5 +1,5 @@
-- tables: customer,orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
c_name,
c_custkey,
o_orderkey,
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
index fe049badfe..16e543f87c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
@@ -1,5 +1,5 @@
-- tables: lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */ sum(l_extendedprice * (1 - l_discount)) AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
sum(l_extendedprice * (1 - l_discount)) AS revenue
FROM
lineitem,
part
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
index 9dc41c145b..a2aca56790 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
@@ -1,5 +1,5 @@
-- tables: supplier,nation,partsupp,lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
s_name,
s_address
FROM
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
index 5ac09109d2..7b4874f96c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
@@ -1,5 +1,5 @@
-- tables: supplier,lineitem,orders,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
s_name,
count(*) AS numwait
FROM
diff --git
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
index c8de2089a7..bf784175e0 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
@@ -1,5 +1,5 @@
-- tables: orders,customer
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true,
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
cntrycode,
count(*) AS numcust,
sum(c_acctbal) AS totacctbal
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]