This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cdb1b341c7 [pipelineX](runtime filter) Support runtime filter (#24054)
cdb1b341c7 is described below

commit cdb1b341c796aa8c07df73e0a04c33594d087c6d
Author: Gabriel <[email protected]>
AuthorDate: Fri Sep 8 10:17:22 2023 +0800

    [pipelineX](runtime filter) Support runtime filter (#24054)
---
 be/src/pipeline/exec/scan_operator.cpp             | 37 +++++++++++++++++-----
 be/src/pipeline/exec/scan_operator.h               |  7 ++--
 be/src/pipeline/pipeline_fragment_context.cpp      |  2 +-
 be/src/pipeline/pipeline_fragment_context.h        |  7 +++-
 be/src/pipeline/pipeline_x/operator.h              |  9 ++++++
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  9 +++++-
 .../pipeline_x/pipeline_x_fragment_context.h       | 21 ++++++++++++
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     | 32 +++++++++++--------
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |  4 ++-
 be/src/runtime/fragment_mgr.cpp                    | 19 +++++------
 be/src/runtime/runtime_filter_mgr.cpp              | 29 +++++++++--------
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql     |  2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql     |  2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql     |  2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql     |  2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql     |  2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql     |  2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql     |  2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql     |  2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql     |  2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql     |  2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql     |  2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql     |  2 +-
 .../suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql     |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql  |  2 +-
 .../tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql  |  2 +-
 46 files changed, 161 insertions(+), 85 deletions(-)

diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 520400e77c..9fc033da19 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -113,6 +113,7 @@ bool ScanLocalState<Derived>::should_run_serial() const {
 template <typename Derived>
 Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& 
info) {
     RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
+    RETURN_IF_ERROR(RuntimeFilterConsumer::init(state));
     auto& p = _parent->cast<typename Derived::Parent>();
     set_scan_ranges(info.scan_ranges);
     _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
@@ -140,7 +141,14 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
 
     _open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
     _alloc_resource_timer = ADD_TIMER(_runtime_profile, 
"AllocateResourceTime");
+    return Status::OK();
+}
 
+template <typename Derived>
+Status ScanLocalState<Derived>::open(RuntimeState* state) {
+    if (_opened) {
+        return Status::OK();
+    }
     RETURN_IF_ERROR(_acquire_runtime_filter());
     RETURN_IF_ERROR(_process_conjuncts());
 
@@ -150,6 +158,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
         RETURN_IF_ERROR(_scanner_ctx->init());
         
RETURN_IF_ERROR(state->exec_env()->scanner_scheduler()->submit(_scanner_ctx.get()));
     }
+    _opened = true;
     return status;
 }
 
@@ -1239,17 +1248,21 @@ 
ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* pool, const TPlanNode&
 template <typename LocalStateType>
 bool ScanOperatorX<LocalStateType>::can_read(RuntimeState* state) {
     auto& local_state = state->get_local_state(id())->template 
cast<LocalStateType>();
-    if (local_state._eos || local_state._scanner_ctx->done()) {
-        // _eos: need eos
-        // _scanner_ctx->done(): need finish
-        // _scanner_ctx->no_schedule(): should schedule _scanner_ctx
+    if (!local_state._opened) {
         return true;
     } else {
-        if (local_state._scanner_ctx->get_num_running_scanners() == 0 &&
-            local_state._scanner_ctx->has_enough_space_in_blocks_queue()) {
-            local_state._scanner_ctx->reschedule_scanner_ctx();
+        if (local_state._eos || local_state._scanner_ctx->done()) {
+            // _eos: need eos
+            // _scanner_ctx->done(): need finish
+            // _scanner_ctx->no_schedule(): should schedule _scanner_ctx
+            return true;
+        } else {
+            if (local_state._scanner_ctx->get_num_running_scanners() == 0 &&
+                local_state._scanner_ctx->has_enough_space_in_blocks_queue()) {
+                local_state._scanner_ctx->reschedule_scanner_ctx();
+            }
+            return local_state.ready_to_read(); // there are some blocks to 
process
         }
-        return local_state.ready_to_read(); // there are some blocks to process
     }
 }
 
@@ -1321,6 +1334,14 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
     return PipelineXLocalState<>::close(state);
 }
 
+template <typename LocalStateType>
+bool ScanOperatorX<LocalStateType>::runtime_filters_are_ready_or_timeout(
+        RuntimeState* state) const {
+    return state->get_local_state(id())
+            ->template cast<LocalStateType>()
+            .runtime_filters_are_ready_or_timeout();
+}
+
 template <typename LocalStateType>
 Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                 SourceState& source_state) {
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 00a842c69c..a04a5ca60f 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -60,7 +60,7 @@ public:
     ScanLocalStateBase(RuntimeState* state, OperatorXBase* parent)
             : PipelineXLocalState<>(state, parent),
               vectorized::RuntimeFilterConsumer(parent->id(), 
parent->runtime_filter_descs(),
-                                                parent->row_descriptor(), 
parent->conjuncts()) {}
+                                                parent->row_descriptor(), 
_conjuncts) {}
     virtual ~ScanLocalStateBase() = default;
 
     virtual bool ready_to_read() = 0;
@@ -128,6 +128,7 @@ class ScanLocalState : public ScanLocalStateBase {
     virtual ~ScanLocalState() = default;
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
+    Status open(RuntimeState* state) override;
     Status close(RuntimeState* state) override;
 
     bool ready_to_read() override;
@@ -337,12 +338,14 @@ protected:
     RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
 
     doris::Mutex _block_lock;
+
+    std::atomic<bool> _opened = false;
 };
 
 template <typename LocalStateType>
 class ScanOperatorX : public OperatorX<LocalStateType> {
 public:
-    //    bool runtime_filters_are_ready_or_timeout() override;
+    bool runtime_filters_are_ready_or_timeout(RuntimeState* state) const 
override;
 
     Status try_close(RuntimeState* state) override;
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 97689ed001..9e6479be1b 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -211,7 +211,7 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     _runtime_state->set_tracer(std::move(tracer));
 
     // TODO should be combine with plan_fragment_executor.prepare funciton
-    SCOPED_ATTACH_TASK(get_runtime_state());
+    SCOPED_ATTACH_TASK(_runtime_state.get());
     _runtime_state->runtime_filter_mgr()->init();
     _runtime_state->set_be_number(local_params.backend_num);
 
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 4b35c206e5..8147eddd1c 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -71,7 +71,9 @@ public:
 
     TUniqueId get_fragment_instance_id() { return _fragment_instance_id; }
 
-    RuntimeState* get_runtime_state() { return _runtime_state.get(); }
+    virtual RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) 
{
+        return _runtime_state.get();
+    }
 
     // should be protected by lock?
     [[nodiscard]] bool is_canceled() const { return 
_runtime_state->is_cancelled(); }
@@ -112,6 +114,9 @@ public:
         _merge_controller_handler = handler;
     }
 
+    virtual void add_merge_controller_handler(
+            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {}
+
     void send_report(bool);
 
     virtual void report_profile();
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 6658df039b..d5247542d4 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -63,6 +63,7 @@ public:
     }
 
     virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0;
+    virtual Status open(RuntimeState* state) { return Status::OK(); }
     virtual Status close(RuntimeState* state) = 0;
 
     // If use projection, we should clear `_origin_block`.
@@ -175,6 +176,13 @@ public:
         return Status::OK();
     }
 
+    bool runtime_filters_are_ready_or_timeout() override {
+        LOG(FATAL) << "should not reach here!";
+        return true;
+    }
+
+    virtual bool runtime_filters_are_ready_or_timeout(RuntimeState* state) 
const { return true; }
+
     virtual Status close(RuntimeState* state) override;
 
     virtual bool can_read(RuntimeState* state) { return true; }
@@ -302,6 +310,7 @@ public:
                 "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
         return Status::OK();
     }
+
     virtual Status close(RuntimeState* state) override {
         if (_closed) {
             return Status::OK();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 8a487d794f..f240e4731a 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -173,7 +173,7 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     _runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
     _runtime_state->set_tracer(std::move(tracer));
 
-    SCOPED_ATTACH_TASK(get_runtime_state());
+    SCOPED_ATTACH_TASK(_runtime_state.get());
     if (request.__isset.backend_id) {
         _runtime_state->set_backend_id(request.backend_id);
     }
@@ -362,6 +362,13 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
                 }
             }
         }
+
+        {
+            std::lock_guard<std::mutex> l(_state_map_lock);
+            _instance_id_to_runtime_state.insert(
+                    {UniqueId(_runtime_states[i]->fragment_instance_id()),
+                     _runtime_states[i].get()});
+        }
     }
     _build_side_pipelines.clear();
     _dag.clear();
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index af32f5e705..3a72adac11 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -75,6 +75,11 @@ public:
         }
     }
 
+    void add_merge_controller_handler(
+            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) 
override {
+        _merge_controller_handlers.emplace_back(handler);
+    }
+
     //    bool is_canceled() const { return _runtime_state->is_cancelled(); }
 
     // Prepare global information including global states and the unique 
operator tree shared by all pipeline tasks.
@@ -92,6 +97,15 @@ public:
 
     void report_profile() override;
 
+    RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override {
+        std::lock_guard<std::mutex> l(_state_map_lock);
+        if (_instance_id_to_runtime_state.count(fragment_instance_id) > 0) {
+            return _instance_id_to_runtime_state[fragment_instance_id];
+        } else {
+            return _runtime_state.get();
+        }
+    }
+
 private:
     void _close_action() override;
     Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& 
request) override;
@@ -121,6 +135,9 @@ private:
     // Local runtime states for each pipeline task.
     std::vector<std::unique_ptr<RuntimeState>> _runtime_states;
 
+    // It is used to manage the lifecycle of RuntimeFilterMergeController
+    std::vector<std::shared_ptr<RuntimeFilterMergeControllerEntity>> 
_merge_controller_handlers;
+
     // TODO: remove the _sink and _multi_cast_stream_sink_senders to set both
     // of it in pipeline task not the fragment_context
     DataSinkOperatorXPtr _sink;
@@ -135,6 +152,10 @@ private:
     // ProbeSide first, and use `_pipelines_to_build` to store which pipeline 
the build operator
     // is in, so we can build BuildSide once we complete probe side.
     std::map<int, PipelinePtr> _build_side_pipelines;
+
+    std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state;
+    std::mutex _state_map_lock;
 };
+
 } // namespace pipeline
 } // namespace doris
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index e401b99e8c..07b4b85173 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -98,21 +98,27 @@ Status PipelineXTask::_open() {
     SCOPED_TIMER(_task_profile->total_time_counter());
     SCOPED_CPU_TIMER(_task_cpu_timer);
     SCOPED_TIMER(_open_timer);
-    Status st = Status::OK();
-    for (auto& o : _operators) {
-        Dependency* dep = _upstream_dependency.find(o->id()) == 
_upstream_dependency.end()
-                                  ? (Dependency*)nullptr
-                                  : 
_upstream_dependency.find(o->id())->second.get();
-        LocalStateInfo info {_scan_ranges, dep, _recvr};
-        Status cur_st = o->setup_local_state(_state, info);
-        if (!cur_st.ok()) {
-            st = cur_st;
+    if (!_init_local_state) {
+        Status st = Status::OK();
+        for (auto& o : _operators) {
+            Dependency* dep = _upstream_dependency.find(o->id()) == 
_upstream_dependency.end()
+                                      ? (Dependency*)nullptr
+                                      : 
_upstream_dependency.find(o->id())->second.get();
+            LocalStateInfo info {_scan_ranges, dep, _recvr};
+            Status cur_st = o->setup_local_state(_state, info);
+            if (!cur_st.ok()) {
+                st = cur_st;
+            }
         }
+        LocalSinkStateInfo info {_sender_id, _downstream_dependency.get(), 
_sender};
+        RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
+        _dry_run = _sink->should_dry_run(_state);
+        RETURN_IF_ERROR(st);
+        _init_local_state = true;
+    }
+    for (auto& o : _operators) {
+        RETURN_IF_ERROR(_state->get_local_state(o->id())->open(_state));
     }
-    LocalSinkStateInfo info {_sender_id, _downstream_dependency.get(), 
_sender};
-    RETURN_IF_ERROR(_sink->setup_local_state(_state, info));
-    _dry_run = _sink->should_dry_run(_state);
-    RETURN_IF_ERROR(st);
     _opened = true;
     return Status::OK();
 }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index de865876be..3c6ac57118 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -79,7 +79,7 @@ public:
     }
 
     bool runtime_filters_are_ready_or_timeout() override {
-        return _source->runtime_filters_are_ready_or_timeout();
+        return _source->runtime_filters_are_ready_or_timeout(_state);
     }
 
     bool sink_can_write() override { return _sink->can_write(_state); }
@@ -131,5 +131,7 @@ private:
     std::shared_ptr<BufferControlBlock> _sender;
     std::shared_ptr<vectorized::VDataStreamRecvr> _recvr;
     bool _dry_run = false;
+    bool _init_local_state = false;
 };
+
 } // namespace doris::pipeline
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 8e347be172..96fe688297 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -711,12 +711,11 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
         }
         g_fragmentmgr_prepare_latency << (duration_ns / 1000);
 
-        //        std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
-        //        _runtimefilter_controller.add_entity(params, local_params, 
&handler,
-        //                                             
context->get_runtime_state());
-        //        context->set_merge_controller_handler(handler);
-
         for (size_t i = 0; i < params.local_params.size(); i++) {
+            std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
+            _runtimefilter_controller.add_entity(params, 
params.local_params[i], &handler,
+                                                 
context->get_runtime_state(UniqueId()));
+            context->set_merge_controller_handler(handler);
             const TUniqueId& fragment_instance_id = 
params.local_params[i].fragment_instance_id;
             {
                 std::lock_guard<std::mutex> lock(_lock);
@@ -792,7 +791,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
 
             std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
             _runtimefilter_controller.add_entity(params, local_params, 
&handler,
-                                                 context->get_runtime_state());
+                                                 
context->get_runtime_state(UniqueId()));
             context->set_merge_controller_handler(handler);
 
             {
@@ -1195,7 +1194,8 @@ Status FragmentMgr::apply_filter(const 
PPublishFilterRequest* request,
         pip_context = iter->second;
 
         DCHECK(pip_context != nullptr);
-        runtime_filter_mgr = 
pip_context->get_runtime_state()->runtime_filter_mgr();
+        runtime_filter_mgr =
+                
pip_context->get_runtime_state(fragment_instance_id)->runtime_filter_mgr();
     } else {
         std::unique_lock<std::mutex> lock(_lock);
         auto iter = _fragment_map.find(tfragment_instance_id);
@@ -1237,8 +1237,9 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
             pip_context = iter->second;
 
             DCHECK(pip_context != nullptr);
-            runtime_filter_mgr =
-                    
pip_context->get_runtime_state()->get_query_ctx()->runtime_filter_mgr();
+            runtime_filter_mgr = 
pip_context->get_runtime_state(fragment_instance_id)
+                                         ->get_query_ctx()
+                                         ->runtime_filter_mgr();
             pool = &pip_context->get_query_context()->obj_pool;
         } else {
             std::unique_lock<std::mutex> lock(_lock);
diff --git a/be/src/runtime/runtime_filter_mgr.cpp 
b/be/src/runtime/runtime_filter_mgr.cpp
index 54b83c5330..58cbc96e50 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -65,6 +65,7 @@ Status RuntimeFilterMgr::init() {
 Status RuntimeFilterMgr::get_producer_filter(const int filter_id, 
IRuntimeFilter** target) {
     int32_t key = filter_id;
 
+    std::lock_guard<std::mutex> l(_lock);
     auto iter = _producer_map.find(key);
     if (iter == _producer_map.end()) {
         LOG(WARNING) << "unknown runtime filter: " << key << ", role: 
PRODUCER";
@@ -77,6 +78,7 @@ Status RuntimeFilterMgr::get_producer_filter(const int 
filter_id, IRuntimeFilter
 
 Status RuntimeFilterMgr::get_consume_filter(const int filter_id, const int 
node_id,
                                             IRuntimeFilter** consumer_filter) {
+    std::lock_guard<std::mutex> l(_lock);
     auto iter = _consumer_map.find(filter_id);
     if (iter == _consumer_map.cend()) {
         LOG(WARNING) << "unknown runtime filter: " << filter_id << ", role: 
consumer";
@@ -97,6 +99,7 @@ Status RuntimeFilterMgr::get_consume_filter(const int 
filter_id, const int node_
 Status RuntimeFilterMgr::get_consume_filters(const int filter_id,
                                              std::vector<IRuntimeFilter*>& 
consumer_filters) {
     int32_t key = filter_id;
+    std::lock_guard<std::mutex> l(_lock);
     auto iter = _consumer_map.find(key);
     if (iter == _consumer_map.end()) {
         LOG(WARNING) << "unknown runtime filter: " << key << ", role: 
consumer";
@@ -114,29 +117,26 @@ Status RuntimeFilterMgr::register_consumer_filter(const 
TRuntimeFilterDesc& desc
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
 
+    std::lock_guard<std::mutex> l(_lock);
     auto iter = _consumer_map.find(key);
     if (desc.__isset.opt_remote_rf && desc.opt_remote_rf && 
desc.has_remote_targets &&
         desc.type == TRuntimeFilterType::BLOOM) {
         // if this runtime filter has remote target (e.g. need merge), we 
reuse the runtime filter between all instances
         DCHECK(_query_ctx != nullptr);
 
-        {
-            std::lock_guard<std::mutex> l(_lock);
-
-            iter = _consumer_map.find(key);
-            if (iter != _consumer_map.end()) {
-                for (auto holder : iter->second) {
-                    if (holder.node_id == node_id) {
-                        return Status::OK();
-                    }
+        iter = _consumer_map.find(key);
+        if (iter != _consumer_map.end()) {
+            for (auto holder : iter->second) {
+                if (holder.node_id == node_id) {
+                    return Status::OK();
                 }
             }
-            IRuntimeFilter* filter;
-            RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, 
&_query_ctx->obj_pool, &desc,
-                                                   &options, 
RuntimeFilterRole::CONSUMER, node_id,
-                                                   &filter, build_bf_exactly));
-            _consumer_map[key].emplace_back(node_id, filter);
         }
+        IRuntimeFilter* filter;
+        RETURN_IF_ERROR(IRuntimeFilter::create(_query_ctx, 
&_query_ctx->obj_pool, &desc, &options,
+                                               RuntimeFilterRole::CONSUMER, 
node_id, &filter,
+                                               build_bf_exactly));
+        _consumer_map[key].emplace_back(node_id, filter);
     } else {
         DCHECK(_state != nullptr);
 
@@ -162,6 +162,7 @@ Status RuntimeFilterMgr::register_producer_filter(const 
TRuntimeFilterDesc& desc
                                                   bool build_bf_exactly) {
     SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
     int32_t key = desc.filter_id;
+    std::lock_guard<std::mutex> l(_lock);
     auto iter = _producer_map.find(key);
 
     DCHECK(_state != nullptr);
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
index ec27845894..50b50bc368 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ SUM(lo_extendedprice*lo_discount) AS
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(lo_extendedprice*lo_discount) AS
 REVENUE
 FROM  lineorder, date
 WHERE  lo_orderdate = d_datekey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
index d4ed16a4a3..77c0262016 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ SUM(lo_extendedprice*lo_discount) AS
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(lo_extendedprice*lo_discount) AS
 REVENUE
 FROM  lineorder, date
 WHERE  lo_orderdate = d_datekey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
index df41035f99..0052db0aac 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q1.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ SUM(lo_extendedprice*lo_discount) AS
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(lo_extendedprice*lo_discount) AS
 REVENUE
 FROM  lineorder, date
 WHERE  lo_orderdate = d_datekey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
index e7a8529ea7..a47ec82b51 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ SUM(lo_revenue), d_year, p_brand
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(lo_revenue), d_year, p_brand
 FROM lineorder, date, part, supplier
 WHERE lo_orderdate = d_datekey
 AND lo_partkey = p_partkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
index 221d0db794..9ab1a95d4d 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ SUM(lo_revenue), d_year, p_brand
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(lo_revenue), d_year, p_brand
 FROM lineorder, date, part, supplier
 WHERE lo_orderdate = d_datekey
 AND lo_partkey = p_partkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
index 3fee8a0ade..b7e6bd7840 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q2.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ SUM(lo_revenue), d_year, p_brand
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
SUM(lo_revenue), d_year, p_brand
 FROM lineorder, date, part, supplier
 WHERE  lo_orderdate = d_datekey
 AND lo_partkey = p_partkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
index 56b2cd68a4..85c470b708 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ c_nation, s_nation, d_year,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_nation, 
s_nation, d_year,
 SUM(lo_revenue)  AS  REVENUE
 FROM customer, lineorder, supplier, date
 WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
index 0d24c46376..cd0b320f87 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ c_city, s_city, d_year, sum(lo_revenue)
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, 
s_city, d_year, sum(lo_revenue)
 AS  REVENUE
 FROM customer, lineorder, supplier, date
 WHERE  lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
index eb41f98453..89765c02d9 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ c_city, s_city, d_year, SUM(lo_revenue)
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, 
s_city, d_year, SUM(lo_revenue)
 AS  REVENUE
 FROM customer, lineorder, supplier, date
 WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
index 43758bee15..5cef87a3fe 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q3.4.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ c_city, s_city, d_year, SUM(lo_revenue)
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ c_city, 
s_city, d_year, SUM(lo_revenue)
 AS  REVENUE
 FROM customer, lineorder, supplier, date
 WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
index 66d98abfa0..3e0227c2ea 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.1.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ d_year, c_nation,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, 
c_nation,
 SUM(lo_revenue - lo_supplycost) AS PROFIT
 FROM date, customer, supplier, part, lineorder
 WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
index 6b9e21d1f2..1338e780ae 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.2.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ d_year, s_nation, p_category,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, 
s_nation, p_category,
 SUM(lo_revenue - lo_supplycost) AS PROFIT
 FROM date, customer, supplier, part, lineorder
 WHERE lo_custkey = c_custkey
diff --git a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql 
b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
index b70db54312..d8e6f7c42d 100644
--- a/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
+++ b/regression-test/suites/ssb_sf0.1_p1/sql/pipelinex_q4.3.sql
@@ -15,7 +15,7 @@
 -- specific language governing permissions and limitations
 -- under the License.
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ d_year, s_city, p_brand,
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ d_year, 
s_city, p_brand,
 SUM(lo_revenue - lo_supplycost) AS PROFIT
 FROM date, customer, supplier, part, lineorder
 WHERE lo_custkey = c_custkey
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
index fbb302e8c0..ded6754a97 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q01.sql
@@ -1,5 +1,5 @@
 -- tables: lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   l_returnflag,
   l_linestatus,
   sum(l_quantity)                                       AS sum_qty,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
index 46cf6d7e13..f102f7504d 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q02.sql
@@ -1,5 +1,5 @@
 -- tables: part,supplier,partsupp,nation,region
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   s_acctbal,
   s_name,
   n_name,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
index 770d0cf07e..8bd60f0e07 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q03.sql
@@ -1,5 +1,5 @@
 -- tables: customer,orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   l_orderkey,
   sum(l_extendedprice * (1 - l_discount)) AS revenue,
   o_orderdate,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
index f0ef6bd650..3f44094729 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q04.sql
@@ -1,5 +1,5 @@
 -- tables: orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   o_orderpriority,
   count(*) AS order_count
 FROM orders
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
index 4e1c7f66c7..ed179f8b86 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q05.sql
@@ -1,5 +1,5 @@
 -- tables: customer,orders,lineitem,supplier,nation,region
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   n_name,
   sum(l_extendedprice * (1 - l_discount)) AS revenue
 FROM
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
index 30d6b66b1c..2dd86f8c2c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q06.sql
@@ -1,6 +1,6 @@
 -- tables: lineitem
 
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ sum(l_extendedprice * l_discount) AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
sum(l_extendedprice * l_discount) AS revenue
 FROM
   lineitem
 WHERE
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
index 729c03716b..6453c1094a 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q07.sql
@@ -1,5 +1,5 @@
 -- tables: supplier,lineitem,orders,customer,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   supp_nation,
   cust_nation,
   l_year,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
index 044d938555..e4c46fb084 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q08.sql
@@ -1,5 +1,5 @@
 -- tables: part,supplier,lineitem,orders,customer,nation,region
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   o_year,
   sum(CASE
       WHEN nation = 'BRAZIL'
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
index ed99a0375b..cee9925fb5 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q09.sql
@@ -1,5 +1,5 @@
 -- tables: part,supplier,lineitem,partsupp,orders,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   nation,
   o_year,
   sum(amount) AS sum_profit
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
index 2164023726..c95a80fcee 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q10.sql
@@ -1,5 +1,5 @@
 -- tables: customer,orders,lineitem,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   c_custkey,
   c_name,
   sum(l_extendedprice * (1 - l_discount)) AS revenue,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
index b33bd481cb..b23701e940 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q11.sql
@@ -1,5 +1,5 @@
 -- tables: partsupp,supplier,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   ps_partkey,
   sum(ps_supplycost * ps_availqty) AS value
 FROM
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
index 909fecda95..e8893e71e4 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q12.sql
@@ -1,5 +1,5 @@
 -- tables: orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   l_shipmode,
   sum(CASE
       WHEN o_orderpriority = '1-URGENT'
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
index b43ecb6418..9db2da60ee 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q13.sql
@@ -1,5 +1,5 @@
 -- tables: customer
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   c_count,
   count(*) AS custdist
 FROM (
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
index 56be5be01b..70d7a57d07 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q14.sql
@@ -1,5 +1,5 @@
 -- tables: lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ 100.00 * sum(CASE
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 100.00 * 
sum(CASE
                     WHEN p_type LIKE 'PROMO%'
                       THEN l_extendedprice * (1 - l_discount)
                     ELSE 0
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
index 6cd05dfab7..45f75ff985 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q15.sql
@@ -1,4 +1,4 @@
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   s_suppkey,
   s_name,
   s_address,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
index 55da2c2056..37a438c796 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q16.sql
@@ -1,5 +1,5 @@
 -- tables: partsupp,part,supplier
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   p_brand,
   p_type,
   p_size,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
index dd52bef885..62f39a750c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q17.sql
@@ -1,5 +1,5 @@
 -- tables: lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ sum(l_extendedprice) / 7.0 AS avg_yearly
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
sum(l_extendedprice) / 7.0 AS avg_yearly
 FROM
   lineitem,
   part
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
index 4aa46be458..2eb2505c01 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q18.sql
@@ -1,5 +1,5 @@
 -- tables: customer,orders,lineitem
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   c_name,
   c_custkey,
   o_orderkey,
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
index fe049badfe..16e543f87c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q19.sql
@@ -1,5 +1,5 @@
 -- tables: lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */ sum(l_extendedprice * (1 - l_discount)) AS revenue
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */ 
sum(l_extendedprice * (1 - l_discount)) AS revenue
 FROM
   lineitem,
   part
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
index 9dc41c145b..a2aca56790 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q20.sql
@@ -1,5 +1,5 @@
 -- tables: supplier,nation,partsupp,lineitem,part
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   s_name,
   s_address
 FROM
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
index 5ac09109d2..7b4874f96c 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q21.sql
@@ -1,5 +1,5 @@
 -- tables: supplier,lineitem,orders,nation
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   s_name,
   count(*) AS numwait
 FROM
diff --git 
a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql 
b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
index c8de2089a7..bf784175e0 100644
--- a/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
+++ b/regression-test/suites/tpch_unique_sql_zstd_p0/sql/pipelinex_q22.sql
@@ -1,5 +1,5 @@
 -- tables: orders,customer
-SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true, 
runtime_filter_mode=OFF) */
+SELECT /*+SET_VAR(experimental_enable_pipeline_x_engine=true) */
   cntrycode,
   count(*)       AS numcust,
   sum(c_acctbal) AS totacctbal


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to