This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c929d26996e [Fix](Expr&code-style) check prepare&open before every
VExpr execute (#26673)
c929d26996e is described below
commit c929d26996e76e6bb41ca1117b2efbd9d0a18c45
Author: zclllyybb <[email protected]>
AuthorDate: Mon Jan 22 18:04:22 2024 +0800
[Fix](Expr&code-style) check prepare&open before every VExpr execute
(#26673)
---
be/src/exec/exec_node.cpp | 52 +++++++++---------
be/src/exec/exec_node.h | 19 ++-----
.../exec/multi_cast_data_stream_source.cpp | 8 ++-
be/src/pipeline/exec/operator.h | 18 ++-----
be/src/pipeline/exec/set_sink_operator.cpp | 16 +++---
be/src/pipeline/pipeline_fragment_context.cpp | 26 ++++-----
be/src/pipeline/pipeline_fragment_context.h | 4 +-
be/src/service/point_query_executor.cpp | 10 ++--
be/src/vec/columns/column_nullable.cpp | 6 +--
be/src/vec/columns/column_nullable.h | 4 +-
be/src/vec/exec/join/vjoin_node_base.h | 19 +++----
be/src/vec/exec/vset_operation_node.cpp | 7 +--
be/src/vec/exec/vset_operation_node.h | 7 +--
be/src/vec/exec/vunion_node.cpp | 21 +++-----
be/src/vec/exprs/vbitmap_predicate.cpp | 4 ++
be/src/vec/exprs/vbloom_predicate.cpp | 4 ++
be/src/vec/exprs/vcase_expr.cpp | 9 +++-
be/src/vec/exprs/vcast_expr.cpp | 4 ++
be/src/vec/exprs/vcolumn_ref.h | 10 ++++
be/src/vec/exprs/vdirect_in_predicate.h | 19 ++++++-
be/src/vec/exprs/vectorized_fn_call.cpp | 19 +++----
be/src/vec/exprs/vexpr.cpp | 62 ++++++++++-----------
be/src/vec/exprs/vexpr.h | 63 +++++++++++++---------
be/src/vec/exprs/vexpr_context.cpp | 28 ++++------
be/src/vec/exprs/vexpr_context.h | 15 +++---
be/src/vec/exprs/vin_predicate.cpp | 4 ++
be/src/vec/exprs/vlambda_function_call_expr.h | 12 ++++-
be/src/vec/exprs/vlambda_function_expr.h | 15 ++++++
be/src/vec/exprs/vliteral.cpp | 19 +++++--
be/src/vec/exprs/vliteral.h | 5 ++
be/src/vec/exprs/vmatch_predicate.cpp | 5 +-
be/src/vec/exprs/vruntimefilter_wrapper.cpp | 15 +++++-
be/src/vec/exprs/vslot_ref.cpp | 11 ++++
be/src/vec/exprs/vslot_ref.h | 5 +-
be/src/vec/exprs/vtuple_is_null_predicate.cpp | 9 ++++
be/src/vec/exprs/vtuple_is_null_predicate.h | 4 +-
be/src/vec/runtime/vdata_stream_recvr.h | 8 +--
37 files changed, 329 insertions(+), 237 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index d65e8f96bd2..ac6e7eae9a0 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -25,8 +25,8 @@
#include <thrift/protocol/TDebugProtocol.h>
#include <map>
+#include <memory>
#include <sstream>
-#include <typeinfo>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
@@ -84,16 +84,10 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode&
tnode, const DescriptorTbl
_tuple_ids(tnode.row_tuples),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_resource_profile(tnode.resource_profile),
- _limit(tnode.limit),
- _num_rows_returned(0),
- _rows_returned_counter(nullptr),
- _rows_returned_rate(nullptr),
- _memory_used_counter(nullptr),
- _peak_memory_usage_counter(nullptr),
- _is_closed(false),
- _ref(0) {
+ _limit(tnode.limit) {
if (tnode.__isset.output_tuple_id) {
- _output_row_descriptor.reset(new RowDescriptor(descs,
{tnode.output_tuple_id}, {true}));
+ _output_row_descriptor = std::make_unique<RowDescriptor>(
+ descs, std::vector {tnode.output_tuple_id}, std::vector
{true});
}
_query_statistics = std::make_shared<QueryStatistics>();
}
@@ -108,7 +102,7 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState*
state) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(tnode.vconjunct,
context));
_conjuncts.emplace_back(context);
} else if (tnode.__isset.conjuncts) {
- for (auto& conjunct : tnode.conjuncts) {
+ for (const auto& conjunct : tnode.conjuncts) {
vectorized::VExprContextSPtr context;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(conjunct,
context));
_conjuncts.emplace_back(context);
@@ -136,8 +130,9 @@ Status ExecNode::prepare(RuntimeState* state) {
_projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
_rows_returned_rate = runtime_profile()->add_derived_counter(
ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
- std::bind<int64_t>(&RuntimeProfile::units_per_second,
_rows_returned_counter,
- runtime_profile()->total_time_counter()),
+ [this, capture0 = runtime_profile()->total_time_counter()] {
+ return
RuntimeProfile::units_per_second(_rows_returned_counter, capture0);
+ },
"");
_memory_used_counter = ADD_LABEL_COUNTER(runtime_profile(), "MemoryUsage");
_peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
@@ -150,13 +145,13 @@ Status ExecNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state,
intermediate_row_desc()));
- for (int i = 0; i < _children.size(); ++i) {
- RETURN_IF_ERROR(_children[i]->prepare(state));
+ for (auto& i : _children) {
+ RETURN_IF_ERROR(i->prepare(state));
}
return Status::OK();
}
-Status ExecNode::alloc_resource(doris::RuntimeState* state) {
+Status ExecNode::alloc_resource(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->open(state));
}
@@ -170,8 +165,8 @@ Status ExecNode::open(RuntimeState* state) {
Status ExecNode::reset(RuntimeState* state) {
_num_rows_returned = 0;
- for (int i = 0; i < _children.size(); ++i) {
- RETURN_IF_ERROR(_children[i]->reset(state));
+ for (auto& i : _children) {
+ RETURN_IF_ERROR(i->reset(state));
}
return Status::OK();
}
@@ -199,8 +194,8 @@ Status ExecNode::close(RuntimeState* state) {
_is_closed = true;
Status result;
- for (int i = 0; i < _children.size(); ++i) {
- auto st = _children[i]->close(state);
+ for (auto& i : _children) {
+ auto st = i->close(state);
if (result.ok() && !st.ok()) {
result = st;
}
@@ -227,7 +222,7 @@ void ExecNode::add_runtime_exec_option(const std::string&
str) {
Status ExecNode::create_tree(RuntimeState* state, ObjectPool* pool, const
TPlan& plan,
const DescriptorTbl& descs, ExecNode** root) {
- if (plan.nodes.size() == 0) {
+ if (plan.nodes.empty()) {
*root = nullptr;
return Status::OK();
}
@@ -305,6 +300,7 @@ Status ExecNode::create_tree_helper(RuntimeState* state,
ObjectPool* pool,
return Status::OK();
}
+// NOLINTBEGIN(readability-function-size)
Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const
TPlanNode& tnode,
const DescriptorTbl& descs, ExecNode** node) {
VLOG_CRITICAL << "tnode:\n" << apache::thrift::ThriftDebugString(tnode);
@@ -428,8 +424,7 @@ Status ExecNode::create_node(RuntimeState* state,
ObjectPool* pool, const TPlanN
return Status::OK();
default:
- std::map<int, const char*>::const_iterator i =
- _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
+ auto i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
const char* str = "unknown node type";
if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) {
@@ -443,6 +438,7 @@ Status ExecNode::create_node(RuntimeState* state,
ObjectPool* pool, const TPlanN
return Status::OK();
}
+// NOLINTEND(readability-function-size)
std::string ExecNode::debug_string() const {
std::stringstream out;
@@ -459,9 +455,9 @@ void ExecNode::debug_string(int indentation_level,
std::stringstream* out) const
}
*out << "]";
- for (int i = 0; i < _children.size(); ++i) {
+ for (auto* i : _children) {
*out << "\n";
- _children[i]->debug_string(indentation_level + 1, out);
+ i->debug_string(indentation_level + 1, out);
}
}
@@ -470,8 +466,8 @@ void ExecNode::collect_nodes(TPlanNodeType::type node_type,
std::vector<ExecNode
nodes->push_back(this);
}
- for (int i = 0; i < _children.size(); ++i) {
- _children[i]->collect_nodes(node_type, nodes);
+ for (auto& i : _children) {
+ i->collect_nodes(node_type, nodes);
}
}
@@ -488,7 +484,7 @@ void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes)
{
void ExecNode::init_runtime_profile(const std::string& name) {
std::stringstream ss;
ss << name << " (id=" << _id << ")";
- _runtime_profile.reset(new RuntimeProfile(ss.str()));
+ _runtime_profile = std::make_unique<RuntimeProfile>(ss.str());
_runtime_profile->set_metadata(_id);
}
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index f4b49cba6f5..903122ecded 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -21,10 +21,10 @@
#pragma once
#include <gen_cpp/PlanNodes_types.h>
-#include <stddef.h>
-#include <stdint.h>
#include <atomic>
+#include <cstddef>
+#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
@@ -267,7 +267,7 @@ protected:
const TBackendResourceProfile _resource_profile;
int64_t _limit; // -1: no limit
- int64_t _num_rows_returned;
+ int64_t _num_rows_returned = 0;
std::unique_ptr<RuntimeProfile> _runtime_profile;
@@ -303,15 +303,6 @@ protected:
bool is_closed() const { return _is_closed; }
- // TODO(zc)
- /// Pointer to the containing SubplanNode or nullptr if not inside a
subplan.
- /// Set by SubplanNode::Init(). Not owned.
- // SubplanNode* containing_subplan_;
-
- /// Returns true if this node is inside the right-hand side plan tree of a
SubplanNode.
- /// Valid to call in or after Prepare().
- bool is_in_subplan() const { return false; }
-
// Create a single exec node derived from thrift node; place exec node in
'pool'.
static Status create_node(RuntimeState* state, ObjectPool* pool, const
TPlanNode& tnode,
const DescriptorTbl& descs, ExecNode** node);
@@ -334,9 +325,9 @@ private:
ExecNode** root);
friend class pipeline::OperatorBase;
- bool _is_closed;
+ bool _is_closed = false;
bool _is_resource_released = false;
- std::atomic_int _ref; // used by pipeline operator to release resource.
+ std::atomic_int _ref = 0; // used by pipeline operator to release resource.
};
} // namespace doris
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index a4f3ff55a5c..6ac06ee5f10 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -17,13 +17,11 @@
#include "multi_cast_data_stream_source.h"
-#include <functional>
-
#include "common/status.h"
#include "pipeline/exec/multi_cast_data_streamer.h"
#include "pipeline/exec/operator.h"
-#include "runtime/query_statistics.h"
#include "vec/core/block.h"
+#include "vec/core/materialize_block.h"
namespace doris::pipeline {
@@ -108,7 +106,7 @@ Status
MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto
if (!_output_expr_contexts.empty() && output_block->rows() > 0) {
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
_output_expr_contexts, *output_block, block, true));
- materialize_block_inplace(*block);
+ vectorized::materialize_block_inplace(*block);
}
if (eos) {
source_state = SourceState::FINISHED;
@@ -176,7 +174,7 @@ Status
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
if (!local_state._output_expr_contexts.empty() && output_block->rows() >
0) {
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
local_state._output_expr_contexts, *output_block, block,
true));
- materialize_block_inplace(*block);
+ vectorized::materialize_block_inplace(*block);
}
COUNTER_UPDATE(local_state._rows_returned_counter, block->rows());
if (eos) {
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index cd5fba5fee3..bf41c670e0c 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -19,20 +19,16 @@
#include <fmt/format.h>
#include <glog/logging.h>
-#include <stdint.h>
+#include <cstdint>
#include <functional>
#include <memory>
-#include <ostream>
#include <string>
-#include <type_traits>
#include <utility>
#include <vector>
#include "common/status.h"
#include "exec/exec_node.h"
-#include "pipeline/pipeline_x/dependency.h"
-#include "runtime/memory/mem_tracker.h"
#include "runtime/runtime_state.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
@@ -105,7 +101,7 @@ using OperatorBuilders = std::vector<OperatorBuilderPtr>;
class OperatorBuilderBase {
public:
- OperatorBuilderBase(int32_t id, const std::string& name) : _id(id),
_name(name) {}
+ OperatorBuilderBase(int32_t id, std::string name) : _id(id),
_name(std::move(name)) {}
virtual ~OperatorBuilderBase() = default;
@@ -333,10 +329,7 @@ public:
return Status::OK();
}
- Status open(RuntimeState* state) override {
- RETURN_IF_ERROR(_node->alloc_resource(state));
- return Status::OK();
- }
+ Status open(RuntimeState* state) override { return
_node->alloc_resource(state); }
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
@@ -413,8 +406,7 @@ class StatefulOperator : public
StreamingOperator<StatefulNodeType> {
public:
StatefulOperator(OperatorBuilderBase* builder, ExecNode* node)
: StreamingOperator<StatefulNodeType>(builder, node),
- _child_block(vectorized::Block::create_shared()),
- _child_source_state(SourceState::DEPEND_ON_SOURCE) {}
+ _child_block(vectorized::Block::create_shared()) {}
virtual ~StatefulOperator() = default;
@@ -454,7 +446,7 @@ public:
protected:
std::shared_ptr<vectorized::Block> _child_block;
- SourceState _child_source_state;
+ SourceState _child_source_state {SourceState::DEPEND_ON_SOURCE};
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index 6c18cab03f6..cb106d76edb 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -21,6 +21,7 @@
#include "pipeline/exec/operator.h"
#include "vec/common/hash_table/hash_table_set_build.h"
+#include "vec/core/materialize_block.h"
#include "vec/exec/vset_operation_node.h"
namespace doris {
@@ -139,10 +140,10 @@ Status
SetSinkOperatorX<is_intersect>::_extract_build_column(
block.get_by_position(result_col_id).column =
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
- auto column = block.get_by_position(result_col_id).column.get();
+ const auto* column = block.get_by_position(result_col_id).column.get();
- if (auto* nullable =
check_and_get_column<vectorized::ColumnNullable>(*column)) {
- auto& col_nested = nullable->get_nested_column();
+ if (const auto* nullable =
check_and_get_column<vectorized::ColumnNullable>(*column)) {
+ const auto& col_nested = nullable->get_nested_column();
if (local_state._shared_state->build_not_ignore_null[i]) {
raw_ptrs[i] = nullable;
} else {
@@ -165,7 +166,7 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState*
state, LocalSinkState
SCOPED_TIMER(_open_timer);
_build_timer = ADD_TIMER(_profile, "BuildTime");
- Parent& parent = _parent->cast<Parent>();
+ auto& parent = _parent->cast<Parent>();
_dependency->set_cur_child_id(parent._cur_child_id);
_child_exprs.resize(parent._child_exprs.size());
for (size_t i = 0; i < _child_exprs.size(); i++) {
@@ -175,16 +176,15 @@ Status
SetSinkLocalState<is_intersect>::init(RuntimeState* state, LocalSinkState
_shared_state->child_quantity = parent._child_quantity;
auto& child_exprs_lists = _shared_state->child_exprs_lists;
- DCHECK(child_exprs_lists.size() == 0 || child_exprs_lists.size() ==
parent._child_quantity);
- if (child_exprs_lists.size() == 0) {
+ DCHECK(child_exprs_lists.empty() || child_exprs_lists.size() ==
parent._child_quantity);
+ if (child_exprs_lists.empty()) {
child_exprs_lists.resize(parent._child_quantity);
}
child_exprs_lists[parent._cur_child_id] = _child_exprs;
_shared_state->hash_table_variants =
std::make_unique<vectorized::SetHashTableVariants>();
- for (int i = 0; i < child_exprs_lists[0].size(); ++i) {
- const auto& ctx = child_exprs_lists[0][i];
+ for (const auto& ctx : child_exprs_lists[0]) {
_shared_state->build_not_ignore_null.push_back(ctx->root()->is_nullable());
}
_shared_state->hash_table_init();
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index b9c2382ce86..538a2ce1bdb 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -22,13 +22,15 @@
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Planner_types.h>
#include <pthread.h>
-#include <stdlib.h>
+
+#include <cstdlib>
// IWYU pragma: no_include <bits/chrono.h>
#include <fmt/format.h>
#include <fmt/ranges.h>
#include <chrono> // IWYU pragma: keep
#include <map>
+#include <memory>
#include <ostream>
#include <typeinfo>
#include <utility>
@@ -212,8 +214,7 @@ PipelinePtr
PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
return pipeline;
}
-Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams&
request,
- const size_t idx) {
+Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams&
request, size_t idx) {
if (_prepared) {
return Status::InternalError("Already prepared");
}
@@ -299,16 +300,16 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
<< local_params.per_node_scan_ranges.size();
// set scan range in ScanNode
- for (int i = 0; i < scan_nodes.size(); ++i) {
+ for (auto& i : scan_nodes) {
// TODO(cmy): this "if...else" should be removed once all ScanNode are
derived from VScanNode.
- ExecNode* node = scan_nodes[i];
+ ExecNode* node = i;
if (typeid(*node) == typeid(vectorized::NewOlapScanNode) ||
typeid(*node) == typeid(vectorized::NewFileScanNode) ||
typeid(*node) == typeid(vectorized::NewOdbcScanNode) ||
typeid(*node) == typeid(vectorized::NewEsScanNode) ||
typeid(*node) == typeid(vectorized::VMetaScanNode) ||
typeid(*node) == typeid(vectorized::NewJdbcScanNode)) {
- auto* scan_node =
static_cast<vectorized::VScanNode*>(scan_nodes[i]);
+ auto* scan_node = static_cast<vectorized::VScanNode*>(i);
auto scan_ranges =
find_with_default(local_params.per_node_scan_ranges, scan_node->id(),
no_scan_ranges);
const bool shared_scan =
@@ -316,7 +317,7 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
scan_node->set_scan_ranges(_runtime_state.get(), scan_ranges);
scan_node->set_shared_scan(_runtime_state.get(), shared_scan);
} else {
- ScanNode* scan_node = static_cast<ScanNode*>(node);
+ auto* scan_node = static_cast<ScanNode*>(node);
auto scan_ranges =
find_with_default(local_params.per_node_scan_ranges, scan_node->id(),
no_scan_ranges);
static_cast<void>(scan_node->set_scan_ranges(_runtime_state.get(),
scan_ranges));
@@ -850,10 +851,10 @@ Status PipelineFragmentContext::_create_sink(int
sender_id, const TDataSink& thr
{false})
: sink_->row_desc();
// 1. create the data stream sender sink
- _multi_cast_stream_sink_senders[i].reset(new
vectorized::VDataStreamSender(
+ _multi_cast_stream_sink_senders[i] =
std::make_unique<vectorized::VDataStreamSender>(
_runtime_state.get(), _runtime_state->obj_pool(),
sender_id, row_desc,
thrift_sink.multi_cast_stream_sink.sinks[i],
- thrift_sink.multi_cast_stream_sink.destinations[i]));
+ thrift_sink.multi_cast_stream_sink.destinations[i]);
// 2. create and set the source operator of
multi_cast_data_stream_source for new pipeline
OperatorBuilderPtr source_op =
@@ -941,9 +942,10 @@ Status PipelineFragmentContext::send_report(bool done) {
_fragment_instance_id,
_backend_num,
_runtime_state.get(),
- std::bind(&PipelineFragmentContext::update_status, this,
std::placeholders::_1),
- std::bind(&PipelineFragmentContext::cancel, this,
std::placeholders::_1,
- std::placeholders::_2),
+ [this](auto&& PH1) { return
update_status(std::forward<decltype(PH1)>(PH1)); },
+ [this](auto&& PH1, auto&& PH2) {
+ cancel(std::forward<decltype(PH1)>(PH1),
std::forward<decltype(PH2)>(PH2));
+ },
_query_ctx->get_query_statistics()},
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
}
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 353e7a06586..a7a45d8f07f 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -67,7 +67,7 @@ public:
const std::function<void(RuntimeState*, Status*)>&
call_back,
const report_status_callback& report_status_cb);
- virtual ~PipelineFragmentContext();
+ ~PipelineFragmentContext() override;
PipelinePtr add_pipeline();
@@ -89,7 +89,7 @@ public:
int32_t next_operator_builder_id() { return _next_operator_builder_id++; }
- Status prepare(const doris::TPipelineFragmentParams& request, const size_t
idx);
+ Status prepare(const doris::TPipelineFragmentParams& request, size_t idx);
virtual Status prepare(const doris::TPipelineFragmentParams& request) {
return Status::InternalError("Pipeline fragment context do not
implement prepare");
diff --git a/be/src/service/point_query_executor.cpp
b/be/src/service/point_query_executor.cpp
index a86d5ed90b8..83d059ba206 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -26,6 +26,7 @@
#include <unordered_map>
#include <vector>
+#include "common/status.h"
#include "gutil/integral_types.h"
#include "olap/lru_cache.h"
#include "olap/olap_tuple.h"
@@ -56,21 +57,22 @@ Status Reusable::init(const TDescriptorTable& t_desc_tbl,
const std::vector<TExp
RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(),
t_desc_tbl, &_desc_tbl));
_runtime_state->set_desc_tbl(_desc_tbl);
_block_pool.resize(block_size);
- for (int i = 0; i < _block_pool.size(); ++i) {
- _block_pool[i] =
vectorized::Block::create_unique(tuple_desc()->slots(), 2);
+ for (auto& i : _block_pool) {
+ i = vectorized::Block::create_unique(tuple_desc()->slots(), 2);
// Name is useless but cost space
- _block_pool[i]->clear_names();
+ i->clear_names();
}
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(output_exprs,
_output_exprs_ctxs));
RowDescriptor row_desc(tuple_desc(), false);
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_exprs_ctxs,
_runtime_state.get(), row_desc));
+ RETURN_IF_ERROR(vectorized::VExpr::open(_output_exprs_ctxs,
_runtime_state.get()));
_create_timestamp = butil::gettimeofday_ms();
_data_type_serdes =
vectorized::create_data_type_serdes(tuple_desc()->slots());
_col_default_values.resize(tuple_desc()->slots().size());
for (int i = 0; i < tuple_desc()->slots().size(); ++i) {
- auto slot = tuple_desc()->slots()[i];
+ auto* slot = tuple_desc()->slots()[i];
_col_uid_to_idx[slot->col_unique_id()] = i;
_col_default_values[i] = slot->col_default_value();
}
diff --git a/be/src/vec/columns/column_nullable.cpp
b/be/src/vec/columns/column_nullable.cpp
index 426de2d4f70..8b0008d6e2e 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -23,9 +23,7 @@
#include "vec/columns/column_const.h"
#include "vec/common/arena.h"
#include "vec/common/assert_cast.h"
-#include "vec/common/nan_utils.h"
#include "vec/common/sip_hash.h"
-#include "vec/common/typeid_cast.h"
#include "vec/core/sort_block.h"
#include "vec/data_types/data_type.h"
#include "vec/utils/util.hpp"
@@ -571,7 +569,9 @@ bool ColumnNullable::has_null(size_t size) const {
}
ColumnPtr make_nullable(const ColumnPtr& column, bool is_nullable) {
- if (is_column_nullable(*column)) return column;
+ if (is_column_nullable(*column)) {
+ return column;
+ }
if (is_column_const(*column)) {
return ColumnConst::create(
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index 83cbe82e328..91128fb69a8 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -32,7 +32,6 @@
#include "olap/olap_common.h"
#include "runtime/define_primitive_type.h"
#include "vec/columns/column.h"
-#include "vec/columns/column_impl.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
@@ -77,8 +76,7 @@ public:
null_map_->assume_mutable());
}
- template <typename... Args,
- typename = typename
std::enable_if<IsMutableColumns<Args...>::value>::type>
+ template <typename... Args, typename =
std::enable_if_t<IsMutableColumns<Args...>::value>>
static MutablePtr create(Args&&... args) {
return Base::create(std::forward<Args>(args)...);
}
diff --git a/be/src/vec/exec/join/vjoin_node_base.h
b/be/src/vec/exec/join/vjoin_node_base.h
index a44bc5513a9..c918e26e6fe 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -23,7 +23,6 @@
#include <memory>
#include <type_traits>
#include <variant>
-#include <vector>
#include "common/status.h"
#include "exec/exec_node.h"
@@ -57,22 +56,20 @@ class VJoinNodeBase : public ExecNode {
public:
VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
- virtual Status prepare(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
- virtual Status close(RuntimeState* state) override;
+ Status close(RuntimeState* state) override;
- virtual Status open(RuntimeState* state) override;
+ Status open(RuntimeState* state) override;
- virtual const RowDescriptor& row_desc() const override { return
*_output_row_desc; }
+ const RowDescriptor& row_desc() const override { return *_output_row_desc;
}
- virtual const RowDescriptor& intermediate_row_desc() const override {
- return *_intermediate_row_desc;
- }
+ const RowDescriptor& intermediate_row_desc() const override { return
*_intermediate_row_desc; }
- virtual Status alloc_resource(RuntimeState* state) override;
- virtual void release_resource(RuntimeState* state) override;
+ Status alloc_resource(RuntimeState* state) override;
+ void release_resource(RuntimeState* state) override;
- virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr)
override;
+ Status init(const TPlanNode& tnode, RuntimeState* state = nullptr)
override;
[[nodiscard]] bool can_terminate_early() override { return
_short_circuit_for_probe; }
diff --git a/be/src/vec/exec/vset_operation_node.cpp
b/be/src/vec/exec/vset_operation_node.cpp
index 4f4634fbd64..3c47638ef42 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -22,8 +22,6 @@
#include <gen_cpp/PlanNodes_types.h>
#include <glog/logging.h>
-#include <array>
-#include <atomic>
#include <ostream>
#include <string>
#include <type_traits>
@@ -31,12 +29,9 @@
#include "runtime/define_primitive_type.h"
#include "runtime/runtime_state.h"
-#include "util/defer_op.h"
#include "vec/columns/column_nullable.h"
-#include "vec/common/columns_hashing.h"
#include "vec/common/hash_table/hash_table_set_build.h"
#include "vec/common/hash_table/hash_table_set_probe.h"
-#include "vec/common/uint128.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/materialize_block.h"
#include "vec/core/types.h"
@@ -100,6 +95,8 @@ Status VSetOperationNode<is_intersect>::init(const
TPlanNode& tnode, RuntimeStat
template <bool is_intersect>
Status VSetOperationNode<is_intersect>::alloc_resource(RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
+ // will open projections
+ RETURN_IF_ERROR(ExecNode::alloc_resource(state));
// open result expr lists.
for (const VExprContextSPtrs& exprs : _child_expr_lists) {
RETURN_IF_ERROR(VExpr::open(exprs, state));
diff --git a/be/src/vec/exec/vset_operation_node.h
b/be/src/vec/exec/vset_operation_node.h
index ae600c6490d..b1ab9c47650 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -17,10 +17,7 @@
#pragma once
-#include <stddef.h>
-#include <stdint.h>
-
-#include <functional>
+#include <cstdint>
#include <iosfwd>
#include <memory>
#include <unordered_map>
@@ -33,8 +30,6 @@
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/common/arena.h"
-#include "vec/common/hash_table/hash_map.h"
-#include "vec/common/string_ref.h"
#include "vec/core/block.h"
#include "vec/exec/join/process_hash_table_probe.h"
#include "vec/exec/join/vhash_join_node.h"
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index e77fd9fee90..eac8aa1b167 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -20,7 +20,6 @@
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/PlanNodes_types.h>
-#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <functional>
#include <memory>
@@ -66,8 +65,8 @@ Status VUnionNode::init(const TPlanNode& tnode, RuntimeState*
state) {
_const_expr_lists.push_back(ctxs);
}
// Create result_expr_ctx_lists_ from thrift exprs.
- auto& result_texpr_lists = tnode.union_node.result_expr_lists;
- for (auto& texprs : result_texpr_lists) {
+ const auto& result_texpr_lists = tnode.union_node.result_expr_lists;
+ for (const auto& texprs : result_texpr_lists) {
VExprContextSPtrs ctxs;
RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, ctxs));
_child_expr_lists.push_back(ctxs);
@@ -127,7 +126,6 @@ Status VUnionNode::alloc_resource(RuntimeState* state) {
Status VUnionNode::get_next_pass_through(RuntimeState* state, Block* block) {
DCHECK(!reached_limit());
- DCHECK(!is_in_subplan());
DCHECK_LT(_child_idx, _children.size());
DCHECK(is_child_passthrough(_child_idx));
if (_child_eos) {
@@ -196,12 +194,6 @@ Status VUnionNode::get_next_materialized(RuntimeState*
state, Block* block) {
// incremented '_num_rows_returned' yet.
DCHECK(!reached_limit());
if (_child_eos) {
- // Unless we are inside a subplan expecting to call
open()/get_next() on the child
- // again, the child can be closed at this point.
- // TODO: Recheck whether is_in_subplan() is right
- // if (!is_in_subplan()) {
- // child(_child_idx)->close(state);
- // }
++_child_idx;
}
}
@@ -275,7 +267,6 @@ Status VUnionNode::get_next(RuntimeState* state, Block*
block, bool* eos) {
// The previous child needs to be closed if passthrough was enabled
for it. In the non
// passthrough case, the child was already closed in the previous call
to get_next().
DCHECK(is_child_passthrough(_to_close_child_idx));
- DCHECK(!is_in_subplan());
static_cast<void>(child(_to_close_child_idx)->close(state));
_to_close_child_idx = -1;
}
@@ -317,8 +308,8 @@ void VUnionNode::debug_string(int indentation_level,
std::stringstream* out) con
*out << string(indentation_level * 2, ' ');
*out << "_union(_first_materialized_child_idx=" <<
_first_materialized_child_idx
<< " _child_expr_lists=[";
- for (int i = 0; i < _child_expr_lists.size(); ++i) {
- *out << VExpr::debug_string(_child_expr_lists[i]) << ", ";
+ for (const auto& _child_expr_list : _child_expr_lists) {
+ *out << VExpr::debug_string(_child_expr_list) << ", ";
}
*out << "] \n";
ExecNode::debug_string(indentation_level, out);
@@ -329,9 +320,9 @@ Status VUnionNode::materialize_block(Block* src_block, int
child_idx, Block* res
SCOPED_TIMER(_exec_timer);
const auto& child_exprs = _child_expr_lists[child_idx];
ColumnsWithTypeAndName colunms;
- for (size_t i = 0; i < child_exprs.size(); ++i) {
+ for (const auto& child_expr : child_exprs) {
int result_column_id = -1;
- RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id));
+ RETURN_IF_ERROR(child_expr->execute(src_block, &result_column_id));
colunms.emplace_back(src_block->get_by_position(result_column_id));
}
_child_row_idx += src_block->rows();
diff --git a/be/src/vec/exprs/vbitmap_predicate.cpp
b/be/src/vec/exprs/vbitmap_predicate.cpp
index 0e158298d85..8116311247b 100644
--- a/be/src/vec/exprs/vbitmap_predicate.cpp
+++ b/be/src/vec/exprs/vbitmap_predicate.cpp
@@ -66,19 +66,23 @@ doris::Status
vectorized::VBitmapPredicate::prepare(doris::RuntimeState* state,
auto column = child->data_type()->create_column();
argument_template.emplace_back(std::move(column), child->data_type(),
child->expr_name());
}
+ _prepare_finished = true;
return Status::OK();
}
doris::Status vectorized::VBitmapPredicate::open(doris::RuntimeState* state,
vectorized::VExprContext*
context,
FunctionContext::FunctionStateScope scope) {
+ DCHECK(_prepare_finished);
RETURN_IF_ERROR(VExpr::open(state, context, scope));
+ _open_finished = true;
return Status::OK();
}
doris::Status vectorized::VBitmapPredicate::execute(vectorized::VExprContext*
context,
doris::vectorized::Block*
block,
int* result_column_id) {
+ DCHECK(_open_finished || _getting_const_col);
doris::vectorized::ColumnNumbers arguments(_children.size());
for (int i = 0; i < _children.size(); ++i) {
int column_id = -1;
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp
b/be/src/vec/exprs/vbloom_predicate.cpp
index f72657c528a..08f891b0e56 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -60,12 +60,15 @@ Status VBloomPredicate::prepare(RuntimeState* state, const
RowDescriptor& desc,
}
_be_exec_version = state->be_exec_version();
+ _prepare_finished = true;
return Status::OK();
}
Status VBloomPredicate::open(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
+ DCHECK(_prepare_finished);
RETURN_IF_ERROR(VExpr::open(state, context, scope));
+ _open_finished = true;
return Status::OK();
}
@@ -74,6 +77,7 @@ void VBloomPredicate::close(VExprContext* context,
FunctionContext::FunctionStat
}
Status VBloomPredicate::execute(VExprContext* context, Block* block, int*
result_column_id) {
+ DCHECK(_open_finished || _getting_const_col);
doris::vectorized::ColumnNumbers arguments(_children.size());
for (int i = 0; i < _children.size(); ++i) {
int column_id = -1;
diff --git a/be/src/vec/exprs/vcase_expr.cpp b/be/src/vec/exprs/vcase_expr.cpp
index e09d62bfb23..dee60b5a6f1 100644
--- a/be/src/vec/exprs/vcase_expr.cpp
+++ b/be/src/vec/exprs/vcase_expr.cpp
@@ -74,22 +74,26 @@ Status VCaseExpr::prepare(RuntimeState* state, const
RowDescriptor& desc, VExprC
}
VExpr::register_function_context(state, context);
+ _prepare_finished = true;
return Status::OK();
}
Status VCaseExpr::open(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
- for (int i = 0; i < _children.size(); ++i) {
- RETURN_IF_ERROR(_children[i]->open(state, context, scope));
+ DCHECK(_prepare_finished);
+ for (auto& i : _children) {
+ RETURN_IF_ERROR(i->open(state, context, scope));
}
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
+ _open_finished = true;
return Status::OK();
}
void VCaseExpr::close(VExprContext* context,
FunctionContext::FunctionStateScope scope) {
+ DCHECK(_prepare_finished);
VExpr::close_function_context(context, scope, _function);
VExpr::close(context, scope);
}
@@ -98,6 +102,7 @@ Status VCaseExpr::execute(VExprContext* context, Block*
block, int* result_colum
if (is_const_and_have_executed()) { // const have execute in open function
return get_result_from_const(block, _expr_name, result_column_id);
}
+ DCHECK(_open_finished || _getting_const_col);
ColumnNumbers arguments(_children.size());
for (int i = 0; i < _children.size(); i++) {
int column_id = -1;
diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp
index 3207ba5b541..f322c1d2fae 100644
--- a/be/src/vec/exprs/vcast_expr.cpp
+++ b/be/src/vec/exprs/vcast_expr.cpp
@@ -76,6 +76,7 @@ doris::Status VCastExpr::prepare(doris::RuntimeState* state,
const doris::RowDes
VExpr::register_function_context(state, context);
_expr_name = fmt::format("(CAST {}({}) TO {})", child_name,
child->data_type()->get_name(),
_target_data_type_name);
+ _prepare_finished = true;
return Status::OK();
}
@@ -85,6 +86,7 @@ const DataTypePtr& VCastExpr::get_target_type() const {
doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext*
context,
FunctionContext::FunctionStateScope scope) {
+ DCHECK(_prepare_finished);
for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->open(state, context, scope));
}
@@ -92,6 +94,7 @@ doris::Status VCastExpr::open(doris::RuntimeState* state,
VExprContext* context,
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
+ _open_finished = true;
return Status::OK();
}
@@ -102,6 +105,7 @@ void VCastExpr::close(VExprContext* context,
FunctionContext::FunctionStateScope
doris::Status VCastExpr::execute(VExprContext* context,
doris::vectorized::Block* block,
int* result_column_id) {
+ DCHECK(_open_finished || _getting_const_col);
// for each child call execute
int column_id = 0;
RETURN_IF_ERROR(_children[0]->execute(context, block, &column_id));
diff --git a/be/src/vec/exprs/vcolumn_ref.h b/be/src/vec/exprs/vcolumn_ref.h
index 25e35dec4a5..a763797880e 100644
--- a/be/src/vec/exprs/vcolumn_ref.h
+++ b/be/src/vec/exprs/vcolumn_ref.h
@@ -43,10 +43,20 @@ public:
"VColumnRef have invalid slot id: {}, _column_name: {},
desc: {}", _column_id,
_column_name, desc.debug_string());
}
+ _prepare_finished = true;
+ return Status::OK();
+ }
+
+ Status open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override {
+ DCHECK(_prepare_finished);
+ RETURN_IF_ERROR(VExpr::open(state, context, scope));
+ _open_finished = true;
return Status::OK();
}
Status execute(VExprContext* context, Block* block, int* result_column_id)
override {
+ DCHECK(_open_finished || _getting_const_col);
*result_column_id = _column_id;
return Status::OK();
}
diff --git a/be/src/vec/exprs/vdirect_in_predicate.h
b/be/src/vec/exprs/vdirect_in_predicate.h
index 5211e013466..a68a6c3121a 100644
--- a/be/src/vec/exprs/vdirect_in_predicate.h
+++ b/be/src/vec/exprs/vdirect_in_predicate.h
@@ -18,6 +18,7 @@
#pragma once
#include "common/status.h"
+#include "exprs/hybrid_set.h"
#include "vec/exprs/vexpr.h"
namespace doris::vectorized {
@@ -29,7 +30,23 @@ public:
: VExpr(node), _filter(nullptr), _expr_name("direct_in_predicate")
{}
~VDirectInPredicate() override = default;
+ Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
+ VExprContext* context) override {
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, row_desc, context));
+ _prepare_finished = true;
+ return Status::OK();
+ }
+
+ Status open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override {
+ DCHECK(_prepare_finished);
+ RETURN_IF_ERROR(VExpr::open(state, context, scope));
+ _open_finished = true;
+ return Status::OK();
+ }
+
Status execute(VExprContext* context, Block* block, int* result_column_id)
override {
+ DCHECK(_open_finished || _getting_const_col);
ColumnNumbers arguments(_children.size());
for (int i = 0; i < _children.size(); ++i) {
int column_id = -1;
@@ -47,7 +64,7 @@ public:
if (argument_column->is_nullable()) {
auto column_nested = static_cast<const
ColumnNullable*>(argument_column.get())
->get_nested_column_ptr();
- auto& null_map =
+ const auto& null_map =
static_cast<const
ColumnNullable*>(argument_column.get())->get_null_map_data();
_filter->find_batch_nullable(*column_nested, sz, null_map,
res_data_column->get_data());
} else {
diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp
b/be/src/vec/exprs/vectorized_fn_call.cpp
index 48522b35500..bf38185f7df 100644
--- a/be/src/vec/exprs/vectorized_fn_call.cpp
+++ b/be/src/vec/exprs/vectorized_fn_call.cpp
@@ -21,8 +21,6 @@
#include <fmt/ranges.h> // IWYU pragma: keep
#include <gen_cpp/Types_types.h>
-#include <algorithm>
-#include <memory>
#include <ostream>
#include <string_view>
#include <utility>
@@ -115,19 +113,21 @@ Status VectorizedFnCall::prepare(RuntimeState* state,
const RowDescriptor& desc,
VExpr::register_function_context(state, context);
_function_name = _fn.name.function_name;
_can_fast_execute = _function->can_fast_execute();
-
+ _prepare_finished = true;
return Status::OK();
}
Status VectorizedFnCall::open(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
- for (int i = 0; i < _children.size(); ++i) {
- RETURN_IF_ERROR(_children[i]->open(state, context, scope));
+ DCHECK(_prepare_finished);
+ for (auto& i : _children) {
+ RETURN_IF_ERROR(i->open(state, context, scope));
}
RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
+ _open_finished = true;
return Status::OK();
}
@@ -142,6 +142,7 @@ Status VectorizedFnCall::execute(VExprContext* context,
vectorized::Block* block
return get_result_from_const(block, _expr_name, result_column_id);
}
+ DCHECK(_open_finished || _getting_const_col) << debug_string();
// TODO: not execute const expr again, but use the const column in
function context
vectorized::ColumnNumbers arguments(_children.size());
for (int i = 0; i < _children.size(); ++i) {
@@ -188,9 +189,9 @@ bool VectorizedFnCall::fast_execute(FunctionContext*
context, Block& block,
block.get_by_name(result_column_name).column->convert_to_full_column_if_const();
auto& result_info = block.get_by_position(result);
if (result_info.type->is_nullable()) {
- block.replace_by_position(result,
-
ColumnNullable::create(std::move(result_column),
-
ColumnUInt8::create(input_rows_count, 0)));
+ block.replace_by_position(
+ result,
+ ColumnNullable::create(result_column,
ColumnUInt8::create(input_rows_count, 0)));
} else {
block.replace_by_position(result, std::move(result_column));
}
@@ -208,7 +209,7 @@ std::string VectorizedFnCall::debug_string() const {
out << _expr_name;
out << "]{";
bool first = true;
- for (auto& input_expr : children()) {
+ for (const auto& input_expr : children()) {
if (first) {
first = false;
} else {
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 7270126e563..4f6b984e8fc 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -17,6 +17,7 @@
#include "vec/exprs/vexpr.h"
+#include <fmt/format.h>
#include <gen_cpp/Exprs_types.h>
#include <thrift/protocol/TDebugProtocol.h>
@@ -27,7 +28,6 @@
#include "common/config.h"
#include "common/exception.h"
-#include "common/object_pool.h"
#include "common/status.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
@@ -40,6 +40,7 @@
#include "vec/exprs/vcompound_pred.h"
#include "vec/exprs/vectorized_fn_call.h"
#include "vec/exprs/vexpr_context.h"
+#include "vec/exprs/vexpr_fwd.h"
#include "vec/exprs/vin_predicate.h"
#include "vec/exprs/vinfo_func.h"
#include "vec/exprs/vlambda_function_call_expr.h"
@@ -55,6 +56,9 @@
namespace doris {
class RowDescriptor;
class RuntimeState;
+
+// NOLINTBEGIN(readability-function-cognitive-complexity)
+// NOLINTBEGIN(readability-function-size)
TExprNode create_texpr_node_from(const void* data, const PrimitiveType& type,
int precision,
int scale) {
TExprNode node;
@@ -146,6 +150,8 @@ TExprNode create_texpr_node_from(const void* data, const
PrimitiveType& type, in
}
return node;
}
+// NOLINTEND(readability-function-size)
+// NOLINTEND(readability-function-cognitive-complexity)
} // namespace doris
namespace doris::vectorized {
@@ -162,9 +168,7 @@ bool VExpr::is_acting_on_a_slot(const VExpr& expr) {
VExpr::VExpr(const TExprNode& node)
: _node_type(node.node_type),
_opcode(node.__isset.opcode ? node.opcode :
TExprOpcode::INVALID_OPCODE),
- _type(TypeDescriptor::from_thrift(node.type)),
- _fn_context_index(-1),
- _prepared(false) {
+ _type(TypeDescriptor::from_thrift(node.type)) {
if (node.__isset.fn) {
_fn = node.fn;
}
@@ -183,10 +187,7 @@ VExpr::VExpr(const TExprNode& node)
VExpr::VExpr(const VExpr& vexpr) = default;
VExpr::VExpr(TypeDescriptor type, bool is_slotref, bool is_nullable)
- : _opcode(TExprOpcode::INVALID_OPCODE),
- _type(std::move(type)),
- _fn_context_index(-1),
- _prepared(false) {
+ : _opcode(TExprOpcode::INVALID_OPCODE), _type(std::move(type)) {
if (is_slotref) {
_node_type = TExprNodeType::SLOT_REF;
}
@@ -221,11 +222,12 @@ Status VExpr::open(RuntimeState* state, VExprContext*
context,
}
void VExpr::close(VExprContext* context, FunctionContext::FunctionStateScope
scope) {
- for (int i = 0; i < _children.size(); ++i) {
- _children[i]->close(context, scope);
+ for (auto& i : _children) {
+ i->close(context, scope);
}
}
+// NOLINTBEGIN(readability-function-size)
Status VExpr::create_expr(const TExprNode& expr_node, VExprSPtr& expr) {
try {
switch (expr_node.node_type) {
@@ -326,6 +328,7 @@ Status VExpr::create_expr(const TExprNode& expr_node,
VExprSPtr& expr) {
}
return Status::OK();
}
+// NOLINTEND(readability-function-size)
Status VExpr::create_tree_from_thrift(const std::vector<TExprNode>& nodes,
int* node_idx,
VExprSPtr& root_expr, VExprContextSPtr&
ctx) {
@@ -348,7 +351,7 @@ Status VExpr::create_tree_from_thrift(const
std::vector<TExprNode>& nodes, int*
// non-recursive traversal
std::stack<std::pair<VExprSPtr, int>> s;
- s.push({root, root_children});
+ s.emplace(root, root_children);
while (!s.empty()) {
auto& parent = s.top();
if (parent.second > 1) {
@@ -366,14 +369,14 @@ Status VExpr::create_tree_from_thrift(const
std::vector<TExprNode>& nodes, int*
parent.first->add_child(expr);
int num_children = nodes[*node_idx].num_children;
if (num_children > 0) {
- s.push({expr, num_children});
+ s.emplace(expr, num_children);
}
}
return Status::OK();
}
Status VExpr::create_expr_tree(const TExpr& texpr, VExprContextSPtr& ctx) {
- if (texpr.nodes.size() == 0) {
+ if (texpr.nodes.empty()) {
ctx = nullptr;
return Status::OK();
}
@@ -395,9 +398,9 @@ Status VExpr::create_expr_tree(const TExpr& texpr,
VExprContextSPtr& ctx) {
Status VExpr::create_expr_trees(const std::vector<TExpr>& texprs,
VExprContextSPtrs& ctxs) {
ctxs.clear();
- for (int i = 0; i < texprs.size(); ++i) {
+ for (const auto& texpr : texprs) {
VExprContextSPtr ctx;
- RETURN_IF_ERROR(create_expr_tree(texprs[i], ctx));
+ RETURN_IF_ERROR(create_expr_tree(texpr, ctx));
ctxs.push_back(ctx);
}
return Status::OK();
@@ -412,8 +415,8 @@ Status VExpr::prepare(const VExprContextSPtrs& ctxs,
RuntimeState* state,
}
Status VExpr::open(const VExprContextSPtrs& ctxs, RuntimeState* state) {
- for (int i = 0; i < ctxs.size(); ++i) {
- RETURN_IF_ERROR(ctxs[i]->open(state));
+ for (const auto& ctx : ctxs) {
+ RETURN_IF_ERROR(ctx->open(state));
}
return Status::OK();
}
@@ -423,8 +426,8 @@ Status VExpr::clone_if_not_exists(const VExprContextSPtrs&
ctxs, RuntimeState* s
if (!new_ctxs.empty()) {
// 'ctxs' was already cloned into '*new_ctxs', nothing to do.
DCHECK_EQ(new_ctxs.size(), ctxs.size());
- for (int i = 0; i < new_ctxs.size(); ++i) {
- DCHECK(new_ctxs[i]->_is_clone);
+ for (auto& new_ctx : new_ctxs) {
+ DCHECK(new_ctx->_is_clone);
}
return Status::OK();
}
@@ -461,20 +464,15 @@ std::string VExpr::debug_string(const VExprSPtrs& exprs) {
std::string VExpr::debug_string(const VExprContextSPtrs& ctxs) {
VExprSPtrs exprs;
- for (int i = 0; i < ctxs.size(); ++i) {
- exprs.push_back(ctxs[i]->root());
+ for (const auto& ctx : ctxs) {
+ exprs.push_back(ctx->root());
}
return debug_string(exprs);
}
bool VExpr::is_constant() const {
- for (int i = 0; i < _children.size(); ++i) {
- if (!_children[i]->is_constant()) {
- return false;
- }
- }
-
- return true;
+ return std::all_of(_children.begin(), _children.end(),
+ [](const VExprSPtr& expr) { return expr->is_constant();
});
}
Status VExpr::get_const_col(VExprContext* context,
@@ -494,7 +492,11 @@ Status VExpr::get_const_col(VExprContext* context,
// If block is empty, some functions will produce no result. So we insert
a column with
// single value here.
block.insert({ColumnUInt8::create(1), std::make_shared<DataTypeUInt8>(),
""});
+
+ _getting_const_col = true;
RETURN_IF_ERROR(execute(context, &block, &result));
+ _getting_const_col = false;
+
DCHECK(result != -1);
const auto& column = block.get_by_position(result).column;
_constant_col = std::make_shared<ColumnPtrWrapper>(column);
@@ -507,8 +509,8 @@ Status VExpr::get_const_col(VExprContext* context,
void VExpr::register_function_context(RuntimeState* state, VExprContext*
context) {
std::vector<TypeDescriptor> arg_types;
- for (int i = 0; i < _children.size(); ++i) {
- arg_types.push_back(_children[i]->type());
+ for (auto& i : _children) {
+ arg_types.push_back(i->type());
}
_fn_context_index = context->register_function_context(state, _type,
arg_types);
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index b6a2b4ac6bd..a852afeb2d2 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -21,15 +21,14 @@
#include <gen_cpp/Opcodes_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
-#include <stddef.h>
+#include <cstddef>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <vector>
-#include "common/factory_creator.h"
#include "common/status.h"
#include "runtime/define_primitive_type.h"
#include "runtime/large_int_value.h"
@@ -57,10 +56,9 @@ namespace vectorized {
#define RETURN_IF_ERROR_OR_PREPARED(stmt) \
if (_prepared) { \
return Status::OK(); \
- } else { \
- _prepared = true; \
- RETURN_IF_ERROR(stmt); \
- }
+ } \
+ _prepared = true; \
+ RETURN_IF_ERROR(stmt);
// VExpr should be used as shared pointer because it will be passed between
classes
// like runtime filter to scan node, or from scannode to scanner. We could not
make sure
@@ -106,6 +104,14 @@ public:
virtual Status open(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope);
+ // before execute, check if expr has been parepared+opened.
+ [[maybe_unused]] Status ready_status() const {
+ if (_prepare_finished && _open_finished) {
+ return Status::OK();
+ }
+ return Status::InternalError(expr_name() + " is not ready when
execute");
+ }
+
virtual Status execute(VExprContext* context, Block* block, int*
result_column_id) = 0;
/// Subclasses overriding this function should call VExpr::Close().
@@ -156,6 +162,8 @@ public:
static std::string debug_string(const VExprSPtrs& exprs);
static std::string debug_string(const VExprContextSPtrs& ctxs);
+ void set_getting_const_col(bool val = true) { _getting_const_col = val; }
+
bool is_and_expr() const { return _fn.name.function_name == "and"; }
virtual bool is_compound_predicate() const { return false; }
@@ -254,63 +262,69 @@ protected:
/// Index to pass to ExprContext::fn_context() to retrieve this expr's
FunctionContext.
/// Set in RegisterFunctionContext(). -1 if this expr does not need a
FunctionContext and
/// doesn't call RegisterFunctionContext().
- int _fn_context_index;
+ int _fn_context_index = -1;
// If this expr is constant, this will store and cache the value generated
by
// get_const_col()
std::shared_ptr<ColumnPtrWrapper> _constant_col;
- bool _prepared;
+ bool _prepared = false; // for base class VExpr
+ bool _getting_const_col =
+ false; // if true, current execute() is in prepare() (that is,
can't check _prepared)
+ // for concrete classes
+ bool _prepare_finished = false;
+ bool _open_finished = false;
};
} // namespace vectorized
+// NOLINTBEGIN(readability-function-size)
template <PrimitiveType T>
Status create_texpr_literal_node(const void* data, TExprNode* node, int
precision = 0,
int scale = 0) {
if constexpr (T == TYPE_BOOLEAN) {
- auto origin_value = reinterpret_cast<const bool*>(data);
+ const auto* origin_value = reinterpret_cast<const bool*>(data);
TBoolLiteral boolLiteral;
(*node).__set_node_type(TExprNodeType::BOOL_LITERAL);
boolLiteral.__set_value(*origin_value);
(*node).__set_bool_literal(boolLiteral);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
} else if constexpr (T == TYPE_TINYINT) {
- auto origin_value = reinterpret_cast<const int8_t*>(data);
+ const auto* origin_value = reinterpret_cast<const int8_t*>(data);
(*node).__set_node_type(TExprNodeType::INT_LITERAL);
TIntLiteral intLiteral;
intLiteral.__set_value(*origin_value);
(*node).__set_int_literal(intLiteral);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_TINYINT));
} else if constexpr (T == TYPE_SMALLINT) {
- auto origin_value = reinterpret_cast<const int16_t*>(data);
+ const auto* origin_value = reinterpret_cast<const int16_t*>(data);
(*node).__set_node_type(TExprNodeType::INT_LITERAL);
TIntLiteral intLiteral;
intLiteral.__set_value(*origin_value);
(*node).__set_int_literal(intLiteral);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_SMALLINT));
} else if constexpr (T == TYPE_INT) {
- auto origin_value = reinterpret_cast<const int32_t*>(data);
+ const auto* origin_value = reinterpret_cast<const int32_t*>(data);
(*node).__set_node_type(TExprNodeType::INT_LITERAL);
TIntLiteral intLiteral;
intLiteral.__set_value(*origin_value);
(*node).__set_int_literal(intLiteral);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_INT));
} else if constexpr (T == TYPE_BIGINT) {
- auto origin_value = reinterpret_cast<const int64_t*>(data);
+ const auto* origin_value = reinterpret_cast<const int64_t*>(data);
(*node).__set_node_type(TExprNodeType::INT_LITERAL);
TIntLiteral intLiteral;
intLiteral.__set_value(*origin_value);
(*node).__set_int_literal(intLiteral);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_BIGINT));
} else if constexpr (T == TYPE_LARGEINT) {
- auto origin_value = reinterpret_cast<const int128_t*>(data);
+ const auto* origin_value = reinterpret_cast<const int128_t*>(data);
(*node).__set_node_type(TExprNodeType::LARGE_INT_LITERAL);
TLargeIntLiteral large_int_literal;
large_int_literal.__set_value(LargeIntValue::to_string(*origin_value));
(*node).__set_large_int_literal(large_int_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_LARGEINT));
} else if constexpr ((T == TYPE_DATE) || (T == TYPE_DATETIME) || (T ==
TYPE_TIME)) {
- auto origin_value = reinterpret_cast<const VecDateTimeValue*>(data);
+ const auto* origin_value = reinterpret_cast<const
VecDateTimeValue*>(data);
TDateLiteral date_literal;
char convert_buffer[30];
origin_value->to_string(convert_buffer);
@@ -325,7 +339,7 @@ Status create_texpr_literal_node(const void* data,
TExprNode* node, int precisio
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_TIME));
}
} else if constexpr (T == TYPE_DATEV2) {
- auto origin_value = reinterpret_cast<const
DateV2Value<DateV2ValueType>*>(data);
+ const auto* origin_value = reinterpret_cast<const
DateV2Value<DateV2ValueType>*>(data);
TDateLiteral date_literal;
char convert_buffer[30];
origin_value->to_string(convert_buffer);
@@ -334,7 +348,7 @@ Status create_texpr_literal_node(const void* data,
TExprNode* node, int precisio
(*node).__set_node_type(TExprNodeType::DATE_LITERAL);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATEV2));
} else if constexpr (T == TYPE_DATETIMEV2) {
- auto origin_value = reinterpret_cast<const
DateV2Value<DateTimeV2ValueType>*>(data);
+ const auto* origin_value = reinterpret_cast<const
DateV2Value<DateTimeV2ValueType>*>(data);
TDateLiteral date_literal;
char convert_buffer[30];
origin_value->to_string(convert_buffer);
@@ -343,28 +357,28 @@ Status create_texpr_literal_node(const void* data,
TExprNode* node, int precisio
(*node).__set_node_type(TExprNodeType::DATE_LITERAL);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATETIMEV2));
} else if constexpr (T == TYPE_DECIMALV2) {
- auto origin_value = reinterpret_cast<const DecimalV2Value*>(data);
+ const auto* origin_value = reinterpret_cast<const
DecimalV2Value*>(data);
(*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
TDecimalLiteral decimal_literal;
decimal_literal.__set_value(origin_value->to_string());
(*node).__set_decimal_literal(decimal_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMALV2,
precision, scale));
} else if constexpr (T == TYPE_DECIMAL32) {
- auto origin_value = reinterpret_cast<const
vectorized::Decimal<int32_t>*>(data);
+ const auto* origin_value = reinterpret_cast<const
vectorized::Decimal<int32_t>*>(data);
(*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
TDecimalLiteral decimal_literal;
decimal_literal.__set_value(origin_value->to_string(scale));
(*node).__set_decimal_literal(decimal_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL32,
precision, scale));
} else if constexpr (T == TYPE_DECIMAL64) {
- auto origin_value = reinterpret_cast<const
vectorized::Decimal<int64_t>*>(data);
+ const auto* origin_value = reinterpret_cast<const
vectorized::Decimal<int64_t>*>(data);
(*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
TDecimalLiteral decimal_literal;
decimal_literal.__set_value(origin_value->to_string(scale));
(*node).__set_decimal_literal(decimal_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL64,
precision, scale));
} else if constexpr (T == TYPE_DECIMAL128I) {
- auto origin_value = reinterpret_cast<const
vectorized::Decimal<int128_t>*>(data);
+ const auto* origin_value = reinterpret_cast<const
vectorized::Decimal<int128_t>*>(data);
(*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
TDecimalLiteral decimal_literal;
decimal_literal.__set_value(origin_value->to_string(scale));
@@ -378,21 +392,21 @@ Status create_texpr_literal_node(const void* data,
TExprNode* node, int precisio
(*node).__set_decimal_literal(decimal_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL256,
precision, scale));
} else if constexpr (T == TYPE_FLOAT) {
- auto origin_value = reinterpret_cast<const float*>(data);
+ const auto* origin_value = reinterpret_cast<const float*>(data);
(*node).__set_node_type(TExprNodeType::FLOAT_LITERAL);
TFloatLiteral float_literal;
float_literal.__set_value(*origin_value);
(*node).__set_float_literal(float_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_FLOAT));
} else if constexpr (T == TYPE_DOUBLE) {
- auto origin_value = reinterpret_cast<const double*>(data);
+ const auto* origin_value = reinterpret_cast<const double*>(data);
(*node).__set_node_type(TExprNodeType::FLOAT_LITERAL);
TFloatLiteral float_literal;
float_literal.__set_value(*origin_value);
(*node).__set_float_literal(float_literal);
(*node).__set_type(create_type_desc(PrimitiveType::TYPE_DOUBLE));
} else if constexpr ((T == TYPE_STRING) || (T == TYPE_CHAR) || (T ==
TYPE_VARCHAR)) {
- auto origin_value = reinterpret_cast<const StringRef*>(data);
+ const auto* origin_value = reinterpret_cast<const StringRef*>(data);
(*node).__set_node_type(TExprNodeType::STRING_LITERAL);
TStringLiteral string_literal;
string_literal.__set_value(origin_value->to_string());
@@ -403,6 +417,7 @@ Status create_texpr_literal_node(const void* data,
TExprNode* node, int precisio
}
return Status::OK();
}
+// NOLINTEND(readability-function-size)
TExprNode create_texpr_node_from(const void* data, const PrimitiveType& type,
int precision = 0,
int scale = 0);
diff --git a/be/src/vec/exprs/vexpr_context.cpp
b/be/src/vec/exprs/vexpr_context.cpp
index 35eaae5c607..cebb7dd2e53 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -17,17 +17,14 @@
#include "vec/exprs/vexpr_context.h"
-#include <algorithm>
#include <ostream>
#include <string>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/exception.h"
-#include "common/object_pool.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "udf/udf.h"
-#include "util/stack_util.h"
#include "vec/columns/column_const.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/columns_with_type_and_name.h"
@@ -38,13 +35,6 @@ class RowDescriptor;
} // namespace doris
namespace doris::vectorized {
-VExprContext::VExprContext(const VExprSPtr& expr)
- : _root(expr),
- _is_clone(false),
- _prepared(false),
- _opened(false),
- _last_result_column_id(-1) {}
-
VExprContext::~VExprContext() {
// In runtime filter, only create expr context to get expr root, will not
call
// prepare or open, so that it is not need to call close. And call close
may core
@@ -154,19 +144,19 @@ Status VExprContext::execute_conjuncts(const
VExprContextSPtrs& ctxs,
return execute_conjuncts(ctxs, filters, false, block, result_filter,
can_filter_all);
}
-// TODO Performance Optimization
+// TODO: Performance Optimization
Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
const std::vector<IColumn::Filter*>*
filters,
- const bool accept_null, Block* block,
+ bool accept_null, Block* block,
IColumn::Filter* result_filter, bool*
can_filter_all) {
DCHECK(result_filter->size() == block->rows());
*can_filter_all = false;
auto* __restrict result_filter_data = result_filter->data();
- for (auto& ctx : ctxs) {
+ for (const auto& ctx : ctxs) {
int result_column_id = -1;
RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
ColumnPtr& filter_column =
block->get_by_position(result_column_id).column;
- if (auto* nullable_column =
check_and_get_column<ColumnNullable>(*filter_column)) {
+ if (const auto* nullable_column =
check_and_get_column<ColumnNullable>(*filter_column)) {
size_t column_size = nullable_column->size();
if (column_size == 0) {
*can_filter_all = true;
@@ -175,9 +165,9 @@ Status VExprContext::execute_conjuncts(const
VExprContextSPtrs& ctxs,
const ColumnPtr& nested_column =
nullable_column->get_nested_column_ptr();
const IColumn::Filter& filter =
assert_cast<const
ColumnUInt8&>(*nested_column).get_data();
- auto* __restrict filter_data = filter.data();
+ const auto* __restrict filter_data = filter.data();
const size_t size = filter.size();
- auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
+ const auto* __restrict null_map_data =
nullable_column->get_null_map_data().data();
if (accept_null) {
for (size_t i = 0; i < size; ++i) {
@@ -194,7 +184,7 @@ Status VExprContext::execute_conjuncts(const
VExprContextSPtrs& ctxs,
return Status::OK();
}
}
- } else if (auto* const_column =
check_and_get_column<ColumnConst>(*filter_column)) {
+ } else if (const auto* const_column =
check_and_get_column<ColumnConst>(*filter_column)) {
// filter all
if (!const_column->get_bool(0)) {
*can_filter_all = true;
@@ -204,7 +194,7 @@ Status VExprContext::execute_conjuncts(const
VExprContextSPtrs& ctxs,
} else {
const IColumn::Filter& filter =
assert_cast<const ColumnUInt8&>(*filter_column).get_data();
- auto* __restrict filter_data = filter.data();
+ const auto* __restrict filter_data = filter.data();
const size_t size = filter.size();
for (size_t i = 0; i < size; ++i) {
@@ -297,7 +287,7 @@ Status VExprContext::get_output_block_after_execute_exprs(
auto rows = input_block.rows();
vectorized::Block tmp_block(input_block.get_columns_with_type_and_name());
vectorized::ColumnsWithTypeAndName result_columns;
- for (auto& vexpr_ctx : output_vexpr_ctxs) {
+ for (const auto& vexpr_ctx : output_vexpr_ctxs) {
int result_column_id = -1;
RETURN_IF_ERROR(vexpr_ctx->execute(&tmp_block, &result_column_id));
DCHECK(result_column_id != -1);
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index db5c4c87d8d..70bd37b1878 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -20,6 +20,7 @@
#include <glog/logging.h>
#include <memory>
+#include <utility>
#include <vector>
#include "common/factory_creator.h"
@@ -40,7 +41,7 @@ class VExprContext {
ENABLE_FACTORY_CREATOR(VExprContext);
public:
- VExprContext(const VExprSPtr& expr);
+ VExprContext(VExprSPtr expr) : _root(std::move(expr)) {}
~VExprContext();
[[nodiscard]] Status prepare(RuntimeState* state, const RowDescriptor&
row_desc);
[[nodiscard]] Status open(RuntimeState* state);
@@ -76,7 +77,7 @@ public:
[[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs&
ctxs,
const
std::vector<IColumn::Filter*>* filters,
- const bool accept_null,
Block* block,
+ bool accept_null, Block*
block,
IColumn::Filter*
result_filter,
bool* can_filter_all);
@@ -121,7 +122,7 @@ public:
_prepared = other._prepared;
_opened = other._opened;
- for (auto& fn : other._fn_contexts) {
+ for (const auto& fn : other._fn_contexts) {
_fn_contexts.emplace_back(fn->clone());
}
@@ -152,17 +153,17 @@ private:
VExprSPtr _root;
/// True if this context came from a Clone() call. Used to manage
FunctionStateScope.
- bool _is_clone;
+ bool _is_clone = false;
/// Variables keeping track of current state.
- bool _prepared;
- bool _opened;
+ bool _prepared = false;
+ bool _opened = false;
/// FunctionContexts for each registered expression. The FunctionContexts
are created
/// and owned by this VExprContext.
std::vector<std::unique_ptr<FunctionContext>> _fn_contexts;
- int _last_result_column_id;
+ int _last_result_column_id = -1;
/// The depth of expression-tree.
int _depth_num = 0;
diff --git a/be/src/vec/exprs/vin_predicate.cpp
b/be/src/vec/exprs/vin_predicate.cpp
index 9a25d3a2230..896b2a903d3 100644
--- a/be/src/vec/exprs/vin_predicate.cpp
+++ b/be/src/vec/exprs/vin_predicate.cpp
@@ -73,11 +73,13 @@ Status VInPredicate::prepare(RuntimeState* state, const
RowDescriptor& desc,
}
VExpr::register_function_context(state, context);
+ _prepare_finished = true;
return Status::OK();
}
Status VInPredicate::open(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
+ DCHECK(_prepare_finished);
for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->open(state, context, scope));
}
@@ -85,6 +87,7 @@ Status VInPredicate::open(RuntimeState* state, VExprContext*
context,
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
+ _open_finished = true;
return Status::OK();
}
@@ -97,6 +100,7 @@ Status VInPredicate::execute(VExprContext* context, Block*
block, int* result_co
if (is_const_and_have_executed()) { // const have execute in open function
return get_result_from_const(block, _expr_name, result_column_id);
}
+ DCHECK(_open_finished || _getting_const_col);
// TODO: not execute const expr again, but use the const column in
function context
doris::vectorized::ColumnNumbers arguments(_children.size());
for (int i = 0; i < _children.size(); ++i) {
diff --git a/be/src/vec/exprs/vlambda_function_call_expr.h
b/be/src/vec/exprs/vlambda_function_call_expr.h
index 44678498027..44d22b1f9eb 100644
--- a/be/src/vec/exprs/vlambda_function_call_expr.h
+++ b/be/src/vec/exprs/vlambda_function_call_expr.h
@@ -34,6 +34,8 @@ public:
VLambdaFunctionCallExpr(const TExprNode& node) : VExpr(node) {}
~VLambdaFunctionCallExpr() override = default;
+ const std::string& expr_name() const override { return _expr_name; }
+
Status prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) override {
RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
@@ -48,12 +50,20 @@ public:
return Status::InternalError("Lambda Function {} is not
implemented.",
_fn.name.function_name);
}
+ _prepare_finished = true;
return Status::OK();
}
- const std::string& expr_name() const override { return _expr_name; }
+ Status open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override {
+ DCHECK(_prepare_finished);
+ RETURN_IF_ERROR(VExpr::open(state, context, scope));
+ _open_finished = true;
+ return Status::OK();
+ }
Status execute(VExprContext* context, Block* block, int* result_column_id)
override {
+ DCHECK(_open_finished || _getting_const_col);
return _lambda_function->execute(context, block, result_column_id,
_data_type, _children);
}
diff --git a/be/src/vec/exprs/vlambda_function_expr.h
b/be/src/vec/exprs/vlambda_function_expr.h
index 6d84abb937f..94571712e40 100644
--- a/be/src/vec/exprs/vlambda_function_expr.h
+++ b/be/src/vec/exprs/vlambda_function_expr.h
@@ -28,7 +28,22 @@ public:
VLambdaFunctionExpr(const TExprNode& node) : VExpr(node) {}
~VLambdaFunctionExpr() override = default;
+ Status prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) override {
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
+ _prepare_finished = true;
+ return Status::OK();
+ }
+
+ Status open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override {
+ DCHECK(_prepare_finished);
+ RETURN_IF_ERROR(VExpr::open(state, context, scope));
+ _open_finished = true;
+ return Status::OK();
+ }
+
Status execute(VExprContext* context, Block* block, int* result_column_id)
override {
+ DCHECK(_open_finished || _getting_const_col);
return get_child(0)->execute(context, block, result_column_id);
}
diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp
index 03d1659eee6..c7fbb081675 100644
--- a/be/src/vec/exprs/vliteral.cpp
+++ b/be/src/vec/exprs/vliteral.cpp
@@ -47,9 +47,7 @@
#include "vec/data_types/data_type_decimal.h"
#include "vec/runtime/vdatetime_value.h"
-namespace doris {
-
-namespace vectorized {
+namespace doris::vectorized {
class VExprContext;
void VLiteral::init(const TExprNode& node) {
@@ -58,8 +56,20 @@ void VLiteral::init(const TExprNode& node) {
_column_ptr = _data_type->create_column_const(1, field);
}
+Status VLiteral::prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) {
+ RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
+ return Status::OK();
+}
+
+Status VLiteral::open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) {
+ RETURN_IF_ERROR(VExpr::open(state, context, scope));
+ return Status::OK();
+}
+
Status VLiteral::execute(VExprContext* context, vectorized::Block* block, int*
result_column_id) {
// Literal expr should return least one row.
+ // sometimes we just use a VLiteral without open or prepare. so can't
check it at this moment
size_t row_size = std::max(block->rows(), _column_ptr->size());
*result_column_id = VExpr::insert_param(block, {_column_ptr, _data_type,
_expr_name}, row_size);
return Status::OK();
@@ -86,5 +96,4 @@ std::string VLiteral::debug_string() const {
return out.str();
}
-} // namespace vectorized
-} // namespace doris
+} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h
index 78879c00d04..d443478ada5 100644
--- a/be/src/vec/exprs/vliteral.h
+++ b/be/src/vec/exprs/vliteral.h
@@ -42,7 +42,12 @@ public:
init(node);
}
}
+
+ Status prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) override;
+ Status open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override;
Status execute(VExprContext* context, Block* block, int* result_column_id)
override;
+
const std::string& expr_name() const override { return _expr_name; }
std::string debug_string() const override;
diff --git a/be/src/vec/exprs/vmatch_predicate.cpp
b/be/src/vec/exprs/vmatch_predicate.cpp
index 23a34aae5ac..17326b5b23b 100644
--- a/be/src/vec/exprs/vmatch_predicate.cpp
+++ b/be/src/vec/exprs/vmatch_predicate.cpp
@@ -91,12 +91,13 @@ Status VMatchPredicate::prepare(RuntimeState* state, const
RowDescriptor& desc,
VExpr::register_function_context(state, context);
_expr_name = fmt::format("{}({})", _fn.name.function_name,
child_expr_name);
_function_name = _fn.name.function_name;
-
+ _prepare_finished = true;
return Status::OK();
}
Status VMatchPredicate::open(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
+ DCHECK(_prepare_finished);
for (int i = 0; i < _children.size(); ++i) {
RETURN_IF_ERROR(_children[i]->open(state, context, scope));
}
@@ -107,6 +108,7 @@ Status VMatchPredicate::open(RuntimeState* state,
VExprContext* context,
if (scope == FunctionContext::FRAGMENT_LOCAL) {
RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
}
+ _open_finished = true;
return Status::OK();
}
@@ -116,6 +118,7 @@ void VMatchPredicate::close(VExprContext* context,
FunctionContext::FunctionStat
}
Status VMatchPredicate::execute(VExprContext* context, Block* block, int*
result_column_id) {
+ DCHECK(_open_finished || _getting_const_col);
// TODO: not execute const expr again, but use the const column in
function context
doris::vectorized::ColumnNumbers arguments(_children.size());
for (int i = 0; i < _children.size(); ++i) {
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index 62ef2bbdb64..c623355d673 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -52,12 +52,16 @@ Status VRuntimeFilterWrapper::prepare(RuntimeState* state,
const RowDescriptor&
VExprContext* context) {
RETURN_IF_ERROR_OR_PREPARED(_impl->prepare(state, desc, context));
_expr_name = fmt::format("VRuntimeFilterWrapper({})", _impl->expr_name());
+ _prepare_finished = true;
return Status::OK();
}
Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
FunctionContext::FunctionStateScope scope) {
- return _impl->open(state, context, scope);
+ DCHECK(_prepare_finished);
+ RETURN_IF_ERROR(_impl->open(state, context, scope));
+ _open_finished = true;
+ return Status::OK();
}
void VRuntimeFilterWrapper::close(VExprContext* context,
@@ -66,6 +70,7 @@ void VRuntimeFilterWrapper::close(VExprContext* context,
}
Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block,
int* result_column_id) {
+ DCHECK(_open_finished || _getting_const_col);
if (_always_true) {
auto res_data_column = ColumnVector<UInt8>::create(block->rows(), 1);
size_t num_columns_without_result = block->columns();
@@ -80,7 +85,15 @@ Status VRuntimeFilterWrapper::execute(VExprContext* context,
Block* block, int*
return Status::OK();
} else {
_scan_rows += block->rows();
+
+ if (_getting_const_col) {
+ _impl->set_getting_const_col(true);
+ }
RETURN_IF_ERROR(_impl->execute(context, block, result_column_id));
+ if (_getting_const_col) {
+ _impl->set_getting_const_col(false);
+ }
+
uint8_t* data = nullptr;
const ColumnWithTypeAndName& result_column =
block->get_by_position(*result_column_id);
if (is_column_const(*result_column.column)) {
diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp
index 5a34999accb..b683a1fb156 100644
--- a/be/src/vec/exprs/vslot_ref.cpp
+++ b/be/src/vec/exprs/vslot_ref.cpp
@@ -51,6 +51,7 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const
doris::RowDescriptor&
RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
DCHECK_EQ(_children.size(), 0);
if (_slot_id == -1) {
+ _prepare_finished = true;
return Status::OK();
}
const SlotDescriptor* slot_desc =
state->desc_tbl().get_slot_descriptor(_slot_id);
@@ -63,6 +64,7 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const
doris::RowDescriptor&
if (!context->force_materialize_slot() && !slot_desc->need_materialize()) {
// slot should be ignored manually
_column_id = -1;
+ _prepare_finished = true;
return Status::OK();
}
_column_id = desc.get_column_id(_slot_id,
context->force_materialize_slot());
@@ -72,6 +74,15 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const
doris::RowDescriptor&
*_column_name, _slot_id, desc.debug_string(),
slot_desc->debug_string(),
state->desc_tbl().debug_string());
}
+ _prepare_finished = true;
+ return Status::OK();
+}
+
+Status VSlotRef::open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) {
+ DCHECK(_prepare_finished);
+ RETURN_IF_ERROR(VExpr::open(state, context, scope));
+ _open_finished = true;
return Status::OK();
}
diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h
index 2084ae18715..c30ac64041f 100644
--- a/be/src/vec/exprs/vslot_ref.h
+++ b/be/src/vec/exprs/vslot_ref.h
@@ -38,8 +38,11 @@ class VSlotRef final : public VExpr {
public:
VSlotRef(const TExprNode& node);
VSlotRef(const SlotDescriptor* desc);
- Status execute(VExprContext* context, Block* block, int* result_column_id)
override;
Status prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) override;
+ Status open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override;
+ Status execute(VExprContext* context, Block* block, int* result_column_id)
override;
+
const std::string& expr_name() const override;
std::string debug_string() const override;
bool is_constant() const override { return false; }
diff --git a/be/src/vec/exprs/vtuple_is_null_predicate.cpp
b/be/src/vec/exprs/vtuple_is_null_predicate.cpp
index b17428bfc00..641e34590a4 100644
--- a/be/src/vec/exprs/vtuple_is_null_predicate.cpp
+++ b/be/src/vec/exprs/vtuple_is_null_predicate.cpp
@@ -48,11 +48,20 @@ Status VTupleIsNullPredicate::prepare(RuntimeState* state,
const RowDescriptor&
DCHECK_EQ(0, _children.size());
_column_to_check =
_is_left_null_side ? desc.num_materialized_slots() :
desc.num_materialized_slots() + 1;
+ _prepare_finished = true;
+ return Status::OK();
+}
+Status VTupleIsNullPredicate::open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) {
+ DCHECK(_prepare_finished);
+ RETURN_IF_ERROR(VExpr::open(state, context, scope));
+ _open_finished = true;
return Status::OK();
}
Status VTupleIsNullPredicate::execute(VExprContext* context, Block* block,
int* result_column_id) {
+ DCHECK(_open_finished || _getting_const_col);
*result_column_id = _column_to_check;
return Status::OK();
}
diff --git a/be/src/vec/exprs/vtuple_is_null_predicate.h
b/be/src/vec/exprs/vtuple_is_null_predicate.h
index 9d3b794fb8b..c42e7300d1d 100644
--- a/be/src/vec/exprs/vtuple_is_null_predicate.h
+++ b/be/src/vec/exprs/vtuple_is_null_predicate.h
@@ -42,8 +42,10 @@ class VTupleIsNullPredicate final : public VExpr {
public:
explicit VTupleIsNullPredicate(const TExprNode& node);
~VTupleIsNullPredicate() override = default;
- Status execute(VExprContext* context, Block* block, int* result_column_id)
override;
Status prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) override;
+ Status open(RuntimeState* state, VExprContext* context,
+ FunctionContext::FunctionStateScope scope) override;
+ Status execute(VExprContext* context, Block* block, int* result_column_id)
override;
[[nodiscard]] bool is_constant() const override { return false; }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 141a5c54b64..2da6f9f920b 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -20,11 +20,11 @@
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
-#include <stddef.h>
-#include <stdint.h>
#include <atomic>
#include <condition_variable>
+#include <cstddef>
+#include <cstdint>
#include <deque>
#include <list>
#include <memory>
@@ -42,14 +42,10 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "runtime/descriptors.h"
-#include "runtime/query_statistics.h"
#include "runtime/task_execution_context.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
-#include "vec/columns/column.h"
#include "vec/core/block.h"
-#include "vec/core/column_with_type_and_name.h"
-#include "vec/core/materialize_block.h"
#include "vec/exprs/vexpr_fwd.h"
namespace doris {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]