This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1fb9022d073 [pipelineX](bug) Fix meta scan operator (#24963)
1fb9022d073 is described below
commit 1fb9022d07396d79bcc9028afb2b7225d8f5a545
Author: Gabriel <[email protected]>
AuthorDate: Wed Sep 27 20:34:47 2023 +0800
[pipelineX](bug) Fix meta scan operator (#24963)
---
be/src/pipeline/exec/meta_scan_operator.cpp | 4 +
be/src/pipeline/exec/meta_scan_operator.h | 1 +
be/src/pipeline/exec/scan_operator.cpp | 13 +-
be/src/pipeline/exec/scan_operator.h | 7 +-
be/src/pipeline/pipeline_x/operator.cpp | 156 +++++++++++++++++++++
be/src/pipeline/pipeline_x/operator.h | 139 ++----------------
.../java/org/apache/doris/qe/SessionVariable.java | 2 +-
7 files changed, 182 insertions(+), 140 deletions(-)
diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp
b/be/src/pipeline/exec/meta_scan_operator.cpp
index 4f639eb9f21..87f4e2187ad 100644
--- a/be/src/pipeline/exec/meta_scan_operator.cpp
+++ b/be/src/pipeline/exec/meta_scan_operator.cpp
@@ -43,6 +43,10 @@ void MetaScanLocalState::set_scan_ranges(const
std::vector<TScanRangeParams>& sc
_scan_ranges = scan_ranges;
}
+Status MetaScanLocalState::_process_conjuncts() {
+ return Status::OK();
+}
+
MetaScanOperatorX::MetaScanOperatorX(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ScanOperatorX<MetaScanLocalState>(pool, tnode, descs),
diff --git a/be/src/pipeline/exec/meta_scan_operator.h
b/be/src/pipeline/exec/meta_scan_operator.h
index 0e9c8db791e..bbe67ba974d 100644
--- a/be/src/pipeline/exec/meta_scan_operator.h
+++ b/be/src/pipeline/exec/meta_scan_operator.h
@@ -50,6 +50,7 @@ private:
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners)
override;
+ Status _process_conjuncts() override;
std::vector<TScanRangeParams> _scan_ranges;
};
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index b10911528a8..ddc966edb61 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -172,9 +172,8 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
RETURN_IF_ERROR(_acquire_runtime_filter());
RETURN_IF_ERROR(_process_conjuncts());
- auto status = _eos_dependency->read_blocked_by() == nullptr
- ? Status::OK()
- :
_prepare_scanners(state->query_parallel_instance_num());
+ auto status =
+ _eos_dependency->read_blocked_by() == nullptr ? Status::OK() :
_prepare_scanners();
if (_scanner_ctx) {
DCHECK(_eos_dependency->read_blocked_by() != nullptr &&
_num_scanners->value() > 0);
RETURN_IF_ERROR(_scanner_ctx->init());
@@ -1163,21 +1162,21 @@ Status
ScanLocalState<Derived>::_normalize_match_predicate(
}
template <typename Derived>
-Status ScanLocalState<Derived>::_prepare_scanners(const int
query_parallel_instance_num) {
+Status ScanLocalState<Derived>::_prepare_scanners() {
std::list<vectorized::VScannerSPtr> scanners;
RETURN_IF_ERROR(_init_scanners(&scanners));
if (scanners.empty()) {
_eos_dependency->set_ready_for_read();
} else {
COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
- RETURN_IF_ERROR(_start_scanners(scanners,
query_parallel_instance_num));
+ RETURN_IF_ERROR(_start_scanners(scanners));
}
return Status::OK();
}
template <typename Derived>
-Status ScanLocalState<Derived>::_start_scanners(const
std::list<vectorized::VScannerSPtr>& scanners,
- const int
query_parallel_instance_num) {
+Status ScanLocalState<Derived>::_start_scanners(
+ const std::list<vectorized::VScannerSPtr>& scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = PipScannerContext::create_shared(state(), this,
p._output_tuple_desc, scanners,
p.limit(),
state()->scan_queue_mem_limit(),
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index ef9f54bff79..ba9cee464df 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -134,7 +134,7 @@ public:
[[nodiscard]] virtual int runtime_filter_num() const = 0;
- Status virtual clone_conjunct_ctxs(vectorized::VExprContextSPtrs&
conjuncts) = 0;
+ virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs&
conjuncts) = 0;
virtual void set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) = 0;
virtual TPushAggOp::type get_push_down_agg_type() = 0;
@@ -351,11 +351,10 @@ protected:
const ChangeFixedValueRangeFunc& func, const
std::string& fn_name,
int slot_ref_child = -1);
- Status _prepare_scanners(const int query_parallel_instance_num);
+ Status _prepare_scanners();
// Submit the scanner to the thread pool and start execution
- Status _start_scanners(const std::list<vectorized::VScannerSPtr>& scanners,
- const int query_parallel_instance_num);
+ Status _start_scanners(const std::list<vectorized::VScannerSPtr>&
scanners);
// Every time vconjunct_ctx_ptr is updated, the old ctx will be stored in
this vector
// so that it will be destroyed uniformly at the end of the query.
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 9244a435856..672e585db73 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -337,6 +337,96 @@ Status
OperatorX<UnionSourceLocalState>::setup_local_states(RuntimeState* state,
return Status::OK();
}
+template <typename DependencyType>
+Status PipelineXLocalState<DependencyType>::init(RuntimeState* state,
LocalStateInfo& info) {
+ _runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
+ " (id=" +
std::to_string(_parent->id()) + ")"));
+ _runtime_profile->set_metadata(_parent->id());
+ info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
+ if constexpr (!std::is_same_v<FakeDependency, Dependency>) {
+ _dependency = (DependencyType*)info.dependency;
+ if (_dependency) {
+ _shared_state = (typename
DependencyType::SharedState*)_dependency->shared_state();
+ _wait_for_dependency_timer = ADD_TIMER(
+ _runtime_profile, "WaitForDependency[" +
_dependency->name() + "]Time");
+ }
+ }
+
+ _conjuncts.resize(_parent->_conjuncts.size());
+ _projections.resize(_parent->_projections.size());
+ for (size_t i = 0; i < _conjuncts.size(); i++) {
+ RETURN_IF_ERROR(_parent->_conjuncts[i]->clone(state, _conjuncts[i]));
+ }
+ for (size_t i = 0; i < _projections.size(); i++) {
+ RETURN_IF_ERROR(_parent->_projections[i]->clone(state,
_projections[i]));
+ }
+ _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned",
TUnit::UNIT);
+ _blocks_returned_counter = ADD_COUNTER(_runtime_profile, "BlocksReturned",
TUnit::UNIT);
+ _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
+ _open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
+ _close_timer = ADD_TIMER(_runtime_profile, "CloseTime");
+ _rows_returned_rate = profile()->add_derived_counter(
+ doris::ExecNode::ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
+ std::bind<int64_t>(&RuntimeProfile::units_per_second,
_rows_returned_counter,
+ profile()->total_time_counter()),
+ "");
+ _mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" +
_runtime_profile->name());
+ _memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile, "MemoryUsage");
+ _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
+ "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
+ return Status::OK();
+}
+
+template <typename DependencyType>
+Status PipelineXLocalState<DependencyType>::close(RuntimeState* state) {
+ if (_closed) {
+ return Status::OK();
+ }
+ if (_dependency) {
+ COUNTER_SET(_wait_for_dependency_timer,
_dependency->read_watcher_elapse_time());
+ }
+ if (_rows_returned_counter != nullptr) {
+ COUNTER_SET(_rows_returned_counter, _num_rows_returned);
+ }
+ profile()->add_to_span(_span);
+ _closed = true;
+ return Status::OK();
+}
+
+template <typename DependencyType>
+Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
+ LocalSinkStateInfo& info)
{
+ // create profile
+ _profile = state->obj_pool()->add(new RuntimeProfile(
+ _parent->get_name() + " (id=" + std::to_string(_parent->id()) +
")"));
+ if constexpr (!std::is_same_v<FakeDependency, Dependency>) {
+ _dependency = (DependencyType*)info.dependency;
+ if (_dependency) {
+ _shared_state = (typename
DependencyType::SharedState*)_dependency->shared_state();
+ _wait_for_dependency_timer =
+ ADD_TIMER(_profile, "WaitForDependency[" +
_dependency->name() + "]Time");
+ }
+ }
+ _rows_input_counter = ADD_COUNTER(_profile, "InputRows", TUnit::UNIT);
+ _open_timer = ADD_TIMER(_profile, "OpenTime");
+ _close_timer = ADD_TIMER(_profile, "CloseTime");
+ info.parent_profile->add_child(_profile, true, nullptr);
+ _mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
+ return Status::OK();
+}
+
+template <typename DependencyType>
+Status PipelineXSinkLocalState<DependencyType>::close(RuntimeState* state,
Status exec_status) {
+ if (_closed) {
+ return Status::OK();
+ }
+ if (_dependency) {
+ COUNTER_SET(_wait_for_dependency_timer,
_dependency->write_watcher_elapse_time());
+ }
+ _closed = true;
+ return Status::OK();
+}
+
template <typename LocalStateType>
Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState&
source_state) {
@@ -377,6 +467,70 @@ Status
StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectori
return Status::OK();
}
+template <typename Writer, typename Parent>
+Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state,
LocalSinkStateInfo& info) {
+ RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
+
_output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size());
+ for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
+ RETURN_IF_ERROR(
+ _parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(state,
_output_vexpr_ctxs[i]));
+ }
+
+ _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
+ _async_writer_dependency =
AsyncWriterDependency::create_shared(_parent->id());
+ _writer->set_dependency(_async_writer_dependency.get());
+
+ _wait_for_dependency_timer =
+ ADD_TIMER(_profile, "WaitForDependency[" +
_async_writer_dependency->name() + "]Time");
+ return Status::OK();
+}
+
+template <typename Writer, typename Parent>
+Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) {
+ RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
+ _writer->start_writer(state, _profile);
+ return Status::OK();
+}
+
+template <typename Writer, typename Parent>
+Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState* state,
vectorized::Block* block,
+ SourceState source_state) {
+ return _writer->sink(block, source_state == SourceState::FINISHED);
+}
+
+template <typename Writer, typename Parent>
+WriteDependency* AsyncWriterSink<Writer, Parent>::write_blocked_by() {
+ return _writer->write_blocked_by();
+}
+
+template <typename Writer, typename Parent>
+Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status
exec_status) {
+ if (_closed) {
+ return Status::OK();
+ }
+ COUNTER_SET(_wait_for_dependency_timer,
_async_writer_dependency->write_watcher_elapse_time());
+ if (_writer->need_normal_close()) {
+ if (exec_status.ok() && !state->is_cancelled()) {
+ RETURN_IF_ERROR(_writer->commit_trans());
+ }
+ RETURN_IF_ERROR(_writer->close(exec_status));
+ }
+ return PipelineXSinkLocalState<>::close(state, exec_status);
+}
+
+template <typename Writer, typename Parent>
+Status AsyncWriterSink<Writer, Parent>::try_close(RuntimeState* state, Status
exec_status) {
+ if (state->is_cancelled() || !exec_status.ok()) {
+ _writer->force_close(!exec_status.ok() ? exec_status :
Status::Cancelled("Cancelled"));
+ }
+ return Status::OK();
+}
+
+template <typename Writer, typename Parent>
+bool AsyncWriterSink<Writer, Parent>::is_pending_finish() {
+ return _writer->is_pending_finish();
+}
+
#define DECLARE_OPERATOR_X(LOCAL_STATE) template class
DataSinkOperatorX<LOCAL_STATE>;
DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState)
DECLARE_OPERATOR_X(ResultSinkLocalState)
@@ -445,4 +599,6 @@ template class PipelineXLocalState<MultiCastDependency>;
template class PipelineXSinkLocalState<MultiCastDependency>;
template class PipelineXLocalState<PartitionSortDependency>;
+template class AsyncWriterSink<doris::vectorized::VFileResultWriter,
ResultFileSinkOperatorX>;
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index cb6c8f1e3a1..53a294412a2 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -319,60 +319,9 @@ public:
: PipelineXLocalStateBase(state, parent) {}
~PipelineXLocalState() override = default;
- Status init(RuntimeState* state, LocalStateInfo& info) override {
- _runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
- " (id=" +
std::to_string(_parent->id()) + ")"));
- _runtime_profile->set_metadata(_parent->id());
- info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
- if constexpr (!std::is_same_v<FakeDependency, Dependency>) {
- _dependency = (DependencyType*)info.dependency;
- if (_dependency) {
- _shared_state = (typename
DependencyType::SharedState*)_dependency->shared_state();
- _wait_for_dependency_timer = ADD_TIMER(
- _runtime_profile, "WaitForDependency[" +
_dependency->name() + "]Time");
- }
- }
+ Status init(RuntimeState* state, LocalStateInfo& info) override;
- _conjuncts.resize(_parent->_conjuncts.size());
- _projections.resize(_parent->_projections.size());
- for (size_t i = 0; i < _conjuncts.size(); i++) {
- RETURN_IF_ERROR(_parent->_conjuncts[i]->clone(state,
_conjuncts[i]));
- }
- for (size_t i = 0; i < _projections.size(); i++) {
- RETURN_IF_ERROR(_parent->_projections[i]->clone(state,
_projections[i]));
- }
- _rows_returned_counter = ADD_COUNTER(_runtime_profile, "RowsReturned",
TUnit::UNIT);
- _blocks_returned_counter = ADD_COUNTER(_runtime_profile,
"BlocksReturned", TUnit::UNIT);
- _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
- _open_timer = ADD_TIMER(_runtime_profile, "OpenTime");
- _close_timer = ADD_TIMER(_runtime_profile, "CloseTime");
- _rows_returned_rate = profile()->add_derived_counter(
- doris::ExecNode::ROW_THROUGHPUT_COUNTER,
TUnit::UNIT_PER_SECOND,
- std::bind<int64_t>(&RuntimeProfile::units_per_second,
_rows_returned_counter,
- profile()->total_time_counter()),
- "");
- _mem_tracker =
- std::make_unique<MemTracker>("PipelineXLocalState:" +
_runtime_profile->name());
- _memory_used_counter = ADD_LABEL_COUNTER(_runtime_profile,
"MemoryUsage");
- _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
- "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage");
- return Status::OK();
- }
-
- Status close(RuntimeState* state) override {
- if (_closed) {
- return Status::OK();
- }
- if (_dependency) {
- COUNTER_SET(_wait_for_dependency_timer,
_dependency->read_watcher_elapse_time());
- }
- if (_rows_returned_counter != nullptr) {
- COUNTER_SET(_rows_returned_counter, _num_rows_returned);
- }
- profile()->add_to_span(_span);
- _closed = true;
- return Status::OK();
- }
+ Status close(RuntimeState* state) override;
[[nodiscard]] std::string debug_string(int indentation_level = 0) const
override;
@@ -596,40 +545,13 @@ public:
: PipelineXSinkLocalStateBase(parent, state) {}
~PipelineXSinkLocalState() override = default;
- Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
- // create profile
- _profile = state->obj_pool()->add(new RuntimeProfile(
- _parent->get_name() + " (id=" + std::to_string(_parent->id())
+ ")"));
- if constexpr (!std::is_same_v<FakeDependency, Dependency>) {
- _dependency = (DependencyType*)info.dependency;
- if (_dependency) {
- _shared_state = (typename
DependencyType::SharedState*)_dependency->shared_state();
- _wait_for_dependency_timer =
- ADD_TIMER(_profile, "WaitForDependency[" +
_dependency->name() + "]Time");
- }
- }
- _rows_input_counter = ADD_COUNTER(_profile, "InputRows", TUnit::UNIT);
- _open_timer = ADD_TIMER(_profile, "OpenTime");
- _close_timer = ADD_TIMER(_profile, "CloseTime");
- info.parent_profile->add_child(_profile, true, nullptr);
- _mem_tracker = std::make_unique<MemTracker>(_parent->get_name());
- return Status::OK();
- }
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override { return Status::OK(); }
Status try_close(RuntimeState* state, Status exec_status) override {
return Status::OK(); }
- Status close(RuntimeState* state, Status exec_status) override {
- if (_closed) {
- return Status::OK();
- }
- if (_dependency) {
- COUNTER_SET(_wait_for_dependency_timer,
_dependency->write_watcher_elapse_time());
- }
- _closed = true;
- return Status::OK();
- }
+ Status close(RuntimeState* state, Status exec_status) override;
[[nodiscard]] std::string debug_string(int indentation_level) const
override;
typename DependencyType::SharedState*& get_shared_state() { return
_shared_state; }
@@ -687,58 +609,19 @@ public:
AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
: PipelineXSinkLocalState<>(parent, state),
_async_writer_dependency(nullptr) {}
- Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
- RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
-
_output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size());
- for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
-
RETURN_IF_ERROR(_parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(
- state, _output_vexpr_ctxs[i]));
- }
+ Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
- _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs));
- _async_writer_dependency =
AsyncWriterDependency::create_shared(_parent->id());
- _writer->set_dependency(_async_writer_dependency.get());
-
- _wait_for_dependency_timer = ADD_TIMER(
- _profile, "WaitForDependency[" +
_async_writer_dependency->name() + "]Time");
- return Status::OK();
- }
-
- Status open(RuntimeState* state) override {
- RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
- _writer->start_writer(state, _profile);
- return Status::OK();
- }
+ Status open(RuntimeState* state) override;
- Status sink(RuntimeState* state, vectorized::Block* block, SourceState
source_state) {
- return _writer->sink(block, source_state == SourceState::FINISHED);
- }
+ Status sink(RuntimeState* state, vectorized::Block* block, SourceState
source_state);
- WriteDependency* write_blocked_by() { return _writer->write_blocked_by(); }
+ WriteDependency* write_blocked_by();
- Status close(RuntimeState* state, Status exec_status) override {
- if (_closed) {
- return Status::OK();
- }
- COUNTER_SET(_wait_for_dependency_timer,
- _async_writer_dependency->write_watcher_elapse_time());
- if (_writer->need_normal_close()) {
- if (exec_status.ok() && !state->is_cancelled()) {
- RETURN_IF_ERROR(_writer->commit_trans());
- }
- RETURN_IF_ERROR(_writer->close(exec_status));
- }
- return PipelineXSinkLocalState<>::close(state, exec_status);
- }
+ Status close(RuntimeState* state, Status exec_status) override;
- Status try_close(RuntimeState* state, Status exec_status) override {
- if (state->is_cancelled() || !exec_status.ok()) {
- _writer->force_close(!exec_status.ok() ? exec_status :
Status::Cancelled("Cancelled"));
- }
- return Status::OK();
- }
+ Status try_close(RuntimeState* state, Status exec_status) override;
- bool is_pending_finish() { return _writer->is_pending_finish(); }
+ bool is_pending_finish();
protected:
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 8a138aeda44..fbea68d0e1b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1654,7 +1654,7 @@ public class SessionVariable implements Serializable,
Writable {
int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
int autoInstance = (size + 1) / 2;
return Math.min(autoInstance, maxInstanceNum);
- } else if (enablePipelineEngine) {
+ } else if (getEnablePipelineEngine()) {
return parallelPipelineTaskNum;
} else {
return parallelExecInstanceNum;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]