This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fb9022d073 [pipelineX](bug) Fix meta scan operator (#24963)
1fb9022d073 is described below

commit 1fb9022d07396d79bcc9028afb2b7225d8f5a545
Author: Gabriel <[email protected]>
AuthorDate: Wed Sep 27 20:34:47 2023 +0800

    [pipelineX](bug) Fix meta scan operator (#24963)
---
 be/src/pipeline/exec/meta_scan_operator.cpp        |   4 +
 be/src/pipeline/exec/meta_scan_operator.h          |   1 +
 be/src/pipeline/exec/scan_operator.cpp             |  13 +-
 be/src/pipeline/exec/scan_operator.h               |   7 +-
 be/src/pipeline/pipeline_x/operator.cpp            | 156 +++++++++++++++++++++
 be/src/pipeline/pipeline_x/operator.h              | 139 ++----------------
 .../java/org/apache/doris/qe/SessionVariable.java  |   2 +-
 7 files changed, 182 insertions(+), 140 deletions(-)

diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp 
b/be/src/pipeline/exec/meta_scan_operator.cpp
index 4f639eb9f21..87f4e2187ad 100644
--- a/be/src/pipeline/exec/meta_scan_operator.cpp
+++ b/be/src/pipeline/exec/meta_scan_operator.cpp
@@ -43,6 +43,10 @@ void MetaScanLocalState::set_scan_ranges(const 
std::vector<TScanRangeParams>& sc
     _scan_ranges = scan_ranges;
 }
 
+Status MetaScanLocalState::_process_conjuncts() {
+    return Status::OK();
+}
+
 MetaScanOperatorX::MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode,
                                      const DescriptorTbl& descs)
         : ScanOperatorX<MetaScanLocalState>(pool, tnode, descs),
diff --git a/be/src/pipeline/exec/meta_scan_operator.h 
b/be/src/pipeline/exec/meta_scan_operator.h
index 0e9c8db791e..bbe67ba974d 100644
--- a/be/src/pipeline/exec/meta_scan_operator.h
+++ b/be/src/pipeline/exec/meta_scan_operator.h
@@ -50,6 +50,7 @@ private:
 
     void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
     Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) 
override;
+    Status _process_conjuncts() override;
 
     std::vector<TScanRangeParams> _scan_ranges;
 };
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index b10911528a8..ddc966edb61 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -172,9 +172,8 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
     RETURN_IF_ERROR(_acquire_runtime_filter());
     RETURN_IF_ERROR(_process_conjuncts());
 
-    auto status = _eos_dependency->read_blocked_by() == nullptr
-                          ? Status::OK()
-                          : 
_prepare_scanners(state->query_parallel_instance_num());
+    auto status =
+            _eos_dependency->read_blocked_by() == nullptr ? Status::OK() : 
_prepare_scanners();
     if (_scanner_ctx) {
         DCHECK(_eos_dependency->read_blocked_by() != nullptr && 
_num_scanners->value() > 0);
         RETURN_IF_ERROR(_scanner_ctx->init());
@@ -1163,21 +1162,21 @@ Status 
ScanLocalState<Derived>::_normalize_match_predicate(
 }
 
 template <typename Derived>
-Status ScanLocalState<Derived>::_prepare_scanners(const int 
query_parallel_instance_num) {
+Status ScanLocalState<Derived>::_prepare_scanners() {
     std::list<vectorized::VScannerSPtr> scanners;
     RETURN_IF_ERROR(_init_scanners(&scanners));
     if (scanners.empty()) {
         _eos_dependency->set_ready_for_read();
     } else {
         COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
-        RETURN_IF_ERROR(_start_scanners(scanners, 
query_parallel_instance_num));
+        RETURN_IF_ERROR(_start_scanners(scanners));
     }
     return Status::OK();
 }
 
 template <typename Derived>
-Status ScanLocalState<Derived>::_start_scanners(const 
std::list<vectorized::VScannerSPtr>& scanners,
-                                                const int 
query_parallel_instance_num) {
+Status ScanLocalState<Derived>::_start_scanners(
+        const std::list<vectorized::VScannerSPtr>& scanners) {
     auto& p = _parent->cast<typename Derived::Parent>();
     _scanner_ctx = PipScannerContext::create_shared(state(), this, 
p._output_tuple_desc, scanners,
                                                     p.limit(), 
state()->scan_queue_mem_limit(),
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index ef9f54bff79..ba9cee464df 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -134,7 +134,7 @@ public:
 
     [[nodiscard]] virtual int runtime_filter_num() const = 0;
 
-    Status virtual clone_conjunct_ctxs(vectorized::VExprContextSPtrs& 
conjuncts) = 0;
+    virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& 
conjuncts) = 0;
     virtual void set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) = 0;
 
     virtual TPushAggOp::type get_push_down_agg_type() = 0;
@@ -351,11 +351,10 @@ protected:
                                const ChangeFixedValueRangeFunc& func, const 
std::string& fn_name,
                                int slot_ref_child = -1);
 
-    Status _prepare_scanners(const int query_parallel_instance_num);
+    Status _prepare_scanners();
 
     // Submit the scanner to the thread pool and start execution
-    Status _start_scanners(const std::list<vectorized::VScannerSPtr>& scanners,
-                           const int query_parallel_instance_num);
+    Status _start_scanners(const std::list<vectorized::VScannerSPtr>& 
scanners);
 
     // Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in 
this vector
     // so that it will be destroyed uniformly at the end of the query.
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 9244a435856..672e585db73 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -337,6 +337,96 @@ Status 
OperatorX<UnionSourceLocalState>::setup_local_states(RuntimeState* state,
     return Status::OK();
 }
 
+template <typename DependencyType>
+Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, 
LocalStateInfo& info) {
+    _runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
+                                              " (id=" + 
std::to_string(_parent->id()) + ")"));
+    _runtime_profile->set_metadata(_parent->id());
+    info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
+    if constexpr (!std::is_same_v<FakeDependency, Dependency>) {
+        _dependency = (DependencyType*)info.dependency;
+        if (_dependency) {
+            _shared_state = (typename 
DependencyType::SharedState*)_dependency->shared_state();
+            _wait_for_dependency_timer = ADD_TIMER(
+                    _runtime_profile, "WaitForDependency[" + 
_dependency->name() + "]Time");
+        }
+    }
+
+    _conjuncts.resize(_parent->_conjuncts.size());
+    _projections.resize(_parent->_projections.size());
+    for (size_t i = 0; i < _conjuncts.size(); i++) {
+        RETURN_IF_ERROR(_parent->_conjuncts[i]->clone(state, _conjuncts[i]));
+    }
+    for (size_t i = 0; i < _projections.size(); i++) {
+        RETURN_IF_ERROR(_parent->_projections[i]->clone(state, 
_projections[i]));
+    }
+    _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", 
TUnit::UNIT);
+    _blocks_returned_counter = ADD_COUNTER(_runtime_profile, "BlocksReturned", 
TUnit::UNIT);
+    _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
+    _open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
+    _close_timer = ADD_TIMER(_runtime_profile, "CloseTime");
+    _rows_returned_rate = profile()->add_derived_counter(
+            doris::ExecNode::ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
+            std::bind<int64_t>(&RuntimeProfile::units_per_second, 
_rows_returned_counter,
+                               profile()->total_time_counter()),
+            "");
+    _mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + 
_runtime_profile->name());
+    _memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, "MemoryUsage");
+    _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
+            "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
+    return Status::OK();
+}
+
+template <typename DependencyType>
+Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
+    if (_closed) {
+        return Status::OK();
+    }
+    if (_dependency) {
+        COUNTER_SET(_wait_for_dependency_timer, 
_dependency->read_watcher_elapse_time());
+    }
+    if (_rows_returned_counter != nullptr) {
+        COUNTER_SET(_rows_returned_counter, _num_rows_returned);
+    }
+    profile()->add_to_span(_span);
+    _closed = true;
+    return Status::OK();
+}
+
+template <typename DependencyType>
+Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
+                                                     LocalSinkStateInfo& info) 
{
+    // create profile
+    _profile = state->obj_pool()->add(new RuntimeProfile(
+            _parent->get_name() + " (id=" + std::to_string(_parent->id()) + 
")"));
+    if constexpr (!std::is_same_v<FakeDependency, Dependency>) {
+        _dependency = (DependencyType*)info.dependency;
+        if (_dependency) {
+            _shared_state = (typename 
DependencyType::SharedState*)_dependency->shared_state();
+            _wait_for_dependency_timer =
+                    ADD_TIMER(_profile, "WaitForDependency[" + 
_dependency->name() + "]Time");
+        }
+    }
+    _rows_input_counter = ADD_COUNTER(_profile, "InputRows", TUnit::UNIT);
+    _open_timer = ADD_TIMER(_profile, "OpenTime");
+    _close_timer = ADD_TIMER(_profile, "CloseTime");
+    info.parent_profile->add_child(_profile, true, nullptr);
+    _mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
+    return Status::OK();
+}
+
+template <typename DependencyType>
+Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state, 
Status exec_status) {
+    if (_closed) {
+        return Status::OK();
+    }
+    if (_dependency) {
+        COUNTER_SET(_wait_for_dependency_timer, 
_dependency->write_watcher_elapse_time());
+    }
+    _closed = true;
+    return Status::OK();
+}
+
 template <typename LocalStateType>
 Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                      SourceState& 
source_state) {
@@ -377,6 +467,70 @@ Status 
StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori
     return Status::OK();
 }
 
+template <typename Writer, typename Parent>
+Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
+    RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
+    
_output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size());
+    for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
+        RETURN_IF_ERROR(
+                _parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(state, 
_output_vexpr_ctxs[i]));
+    }
+
+    _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
+    _async_writer_dependency = 
AsyncWriterDependency::create_shared(_parent->id());
+    _writer->set_dependency(_async_writer_dependency.get());
+
+    _wait_for_dependency_timer =
+            ADD_TIMER(_profile, "WaitForDependency[" + 
_async_writer_dependency->name() + "]Time");
+    return Status::OK();
+}
+
+template <typename Writer, typename Parent>
+Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) {
+    RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
+    _writer->start_writer(state, _profile);
+    return Status::OK();
+}
+
+template <typename Writer, typename Parent>
+Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState* state, 
vectorized::Block* block,
+                                             SourceState source_state) {
+    return _writer->sink(block, source_state == SourceState::FINISHED);
+}
+
+template <typename Writer, typename Parent>
+WriteDependency* AsyncWriterSink<Writer, Parent>::write_blocked_by() {
+    return _writer->write_blocked_by();
+}
+
+template <typename Writer, typename Parent>
+Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status 
exec_status) {
+    if (_closed) {
+        return Status::OK();
+    }
+    COUNTER_SET(_wait_for_dependency_timer, 
_async_writer_dependency->write_watcher_elapse_time());
+    if (_writer->need_normal_close()) {
+        if (exec_status.ok() && !state->is_cancelled()) {
+            RETURN_IF_ERROR(_writer->commit_trans());
+        }
+        RETURN_IF_ERROR(_writer->close(exec_status));
+    }
+    return PipelineXSinkLocalState<>::close(state, exec_status);
+}
+
+template <typename Writer, typename Parent>
+Status AsyncWriterSink<Writer, Parent>::try_close(RuntimeState* state, Status 
exec_status) {
+    if (state->is_cancelled() || !exec_status.ok()) {
+        _writer->force_close(!exec_status.ok() ? exec_status : 
Status::Cancelled("Cancelled"));
+    }
+    return Status::OK();
+}
+
+template <typename Writer, typename Parent>
+bool AsyncWriterSink<Writer, Parent>::is_pending_finish() {
+    return _writer->is_pending_finish();
+}
+
 #define DECLARE_OPERATOR_X(LOCAL_STATE) template class 
DataSinkOperatorX<LOCAL_STATE>;
 DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
 DECLARE_OPERATOR_X(ResultSinkLocalState)
@@ -445,4 +599,6 @@ template class PipelineXLocalState<MultiCastDependency>;
 template class PipelineXSinkLocalState<MultiCastDependency>;
 template class PipelineXLocalState<PartitionSortDependency>;
 
+template class AsyncWriterSink<doris::vectorized::VFileResultWriter, 
ResultFileSinkOperatorX>;
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index cb6c8f1e3a1..53a294412a2 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -319,60 +319,9 @@ public:
             : PipelineXLocalStateBase(state, parent) {}
     ~PipelineXLocalState() override = default;
 
-    Status init(RuntimeState* state, LocalStateInfo& info) override {
-        _runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
-                                                  " (id=" + 
std::to_string(_parent->id()) + ")"));
-        _runtime_profile->set_metadata(_parent->id());
-        info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
-        if constexpr (!std::is_same_v<FakeDependency, Dependency>) {
-            _dependency = (DependencyType*)info.dependency;
-            if (_dependency) {
-                _shared_state = (typename 
DependencyType::SharedState*)_dependency->shared_state();
-                _wait_for_dependency_timer = ADD_TIMER(
-                        _runtime_profile, "WaitForDependency[" + 
_dependency->name() + "]Time");
-            }
-        }
+    Status init(RuntimeState* state, LocalStateInfo& info) override;
 
-        _conjuncts.resize(_parent->_conjuncts.size());
-        _projections.resize(_parent->_projections.size());
-        for (size_t i = 0; i < _conjuncts.size(); i++) {
-            RETURN_IF_ERROR(_parent->_conjuncts[i]->clone(state, 
_conjuncts[i]));
-        }
-        for (size_t i = 0; i < _projections.size(); i++) {
-            RETURN_IF_ERROR(_parent->_projections[i]->clone(state, 
_projections[i]));
-        }
-        _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned", 
TUnit::UNIT);
-        _blocks_returned_counter = ADD_COUNTER(_runtime_profile, 
"BlocksReturned", TUnit::UNIT);
-        _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
-        _open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
-        _close_timer = ADD_TIMER(_runtime_profile, "CloseTime");
-        _rows_returned_rate = profile()->add_derived_counter(
-                doris::ExecNode::ROW_THROUGHPUT_COUNTER, 
TUnit::UNIT_PER_SECOND,
-                std::bind<int64_t>(&RuntimeProfile::units_per_second, 
_rows_returned_counter,
-                                   profile()->total_time_counter()),
-                "");
-        _mem_tracker =
-                std::make_unique<MemTracker>("PipelineXLocalState:" + 
_runtime_profile->name());
-        _memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, 
"MemoryUsage");
-        _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
-                "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
-        return Status::OK();
-    }
-
-    Status close(RuntimeState* state) override {
-        if (_closed) {
-            return Status::OK();
-        }
-        if (_dependency) {
-            COUNTER_SET(_wait_for_dependency_timer, 
_dependency->read_watcher_elapse_time());
-        }
-        if (_rows_returned_counter != nullptr) {
-            COUNTER_SET(_rows_returned_counter, _num_rows_returned);
-        }
-        profile()->add_to_span(_span);
-        _closed = true;
-        return Status::OK();
-    }
+    Status close(RuntimeState* state) override;
 
     [[nodiscard]] std::string debug_string(int indentation_level = 0) const 
override;
 
@@ -596,40 +545,13 @@ public:
             : PipelineXSinkLocalStateBase(parent, state) {}
     ~PipelineXSinkLocalState() override = default;
 
-    Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
-        // create profile
-        _profile = state->obj_pool()->add(new RuntimeProfile(
-                _parent->get_name() + " (id=" + std::to_string(_parent->id()) 
+ ")"));
-        if constexpr (!std::is_same_v<FakeDependency, Dependency>) {
-            _dependency = (DependencyType*)info.dependency;
-            if (_dependency) {
-                _shared_state = (typename 
DependencyType::SharedState*)_dependency->shared_state();
-                _wait_for_dependency_timer =
-                        ADD_TIMER(_profile, "WaitForDependency[" + 
_dependency->name() + "]Time");
-            }
-        }
-        _rows_input_counter = ADD_COUNTER(_profile, "InputRows", TUnit::UNIT);
-        _open_timer = ADD_TIMER(_profile, "OpenTime");
-        _close_timer = ADD_TIMER(_profile, "CloseTime");
-        info.parent_profile->add_child(_profile, true, nullptr);
-        _mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
-        return Status::OK();
-    }
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
 
     Status open(RuntimeState* state) override { return Status::OK(); }
 
     Status try_close(RuntimeState* state, Status exec_status) override { 
return Status::OK(); }
 
-    Status close(RuntimeState* state, Status exec_status) override {
-        if (_closed) {
-            return Status::OK();
-        }
-        if (_dependency) {
-            COUNTER_SET(_wait_for_dependency_timer, 
_dependency->write_watcher_elapse_time());
-        }
-        _closed = true;
-        return Status::OK();
-    }
+    Status close(RuntimeState* state, Status exec_status) override;
 
     [[nodiscard]] std::string debug_string(int indentation_level) const 
override;
     typename DependencyType::SharedState*& get_shared_state() { return 
_shared_state; }
@@ -687,58 +609,19 @@ public:
     AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
             : PipelineXSinkLocalState<>(parent, state), 
_async_writer_dependency(nullptr) {}
 
-    Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
-        RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
-        
_output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size());
-        for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
-            
RETURN_IF_ERROR(_parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(
-                    state, _output_vexpr_ctxs[i]));
-        }
+    Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
 
-        _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
-        _async_writer_dependency = 
AsyncWriterDependency::create_shared(_parent->id());
-        _writer->set_dependency(_async_writer_dependency.get());
-
-        _wait_for_dependency_timer = ADD_TIMER(
-                _profile, "WaitForDependency[" + 
_async_writer_dependency->name() + "]Time");
-        return Status::OK();
-    }
-
-    Status open(RuntimeState* state) override {
-        RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
-        _writer->start_writer(state, _profile);
-        return Status::OK();
-    }
+    Status open(RuntimeState* state) override;
 
-    Status sink(RuntimeState* state, vectorized::Block* block, SourceState 
source_state) {
-        return _writer->sink(block, source_state == SourceState::FINISHED);
-    }
+    Status sink(RuntimeState* state, vectorized::Block* block, SourceState 
source_state);
 
-    WriteDependency* write_blocked_by() { return _writer->write_blocked_by(); }
+    WriteDependency* write_blocked_by();
 
-    Status close(RuntimeState* state, Status exec_status) override {
-        if (_closed) {
-            return Status::OK();
-        }
-        COUNTER_SET(_wait_for_dependency_timer,
-                    _async_writer_dependency->write_watcher_elapse_time());
-        if (_writer->need_normal_close()) {
-            if (exec_status.ok() && !state->is_cancelled()) {
-                RETURN_IF_ERROR(_writer->commit_trans());
-            }
-            RETURN_IF_ERROR(_writer->close(exec_status));
-        }
-        return PipelineXSinkLocalState<>::close(state, exec_status);
-    }
+    Status close(RuntimeState* state, Status exec_status) override;
 
-    Status try_close(RuntimeState* state, Status exec_status) override {
-        if (state->is_cancelled() || !exec_status.ok()) {
-            _writer->force_close(!exec_status.ok() ? exec_status : 
Status::Cancelled("Cancelled"));
-        }
-        return Status::OK();
-    }
+    Status try_close(RuntimeState* state, Status exec_status) override;
 
-    bool is_pending_finish() { return _writer->is_pending_finish(); }
+    bool is_pending_finish();
 
 protected:
     vectorized::VExprContextSPtrs _output_vexpr_ctxs;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8a138aeda44..fbea68d0e1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1654,7 +1654,7 @@ public class SessionVariable implements Serializable, 
Writable {
             int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
             int autoInstance = (size + 1) / 2;
             return Math.min(autoInstance, maxInstanceNum);
-        } else if (enablePipelineEngine) {
+        } else if (getEnablePipelineEngine()) {
             return parallelPipelineTaskNum;
         } else {
             return parallelExecInstanceNum;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to