This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c929d26996e [Fix](Expr&code-style) check prepare&open before every 
VExpr execute (#26673)
c929d26996e is described below

commit c929d26996e76e6bb41ca1117b2efbd9d0a18c45
Author: zclllyybb <[email protected]>
AuthorDate: Mon Jan 22 18:04:22 2024 +0800

    [Fix](Expr&code-style) check prepare&open before every VExpr execute 
(#26673)
---
 be/src/exec/exec_node.cpp                          | 52 +++++++++---------
 be/src/exec/exec_node.h                            | 19 ++-----
 .../exec/multi_cast_data_stream_source.cpp         |  8 ++-
 be/src/pipeline/exec/operator.h                    | 18 ++-----
 be/src/pipeline/exec/set_sink_operator.cpp         | 16 +++---
 be/src/pipeline/pipeline_fragment_context.cpp      | 26 ++++-----
 be/src/pipeline/pipeline_fragment_context.h        |  4 +-
 be/src/service/point_query_executor.cpp            | 10 ++--
 be/src/vec/columns/column_nullable.cpp             |  6 +--
 be/src/vec/columns/column_nullable.h               |  4 +-
 be/src/vec/exec/join/vjoin_node_base.h             | 19 +++----
 be/src/vec/exec/vset_operation_node.cpp            |  7 +--
 be/src/vec/exec/vset_operation_node.h              |  7 +--
 be/src/vec/exec/vunion_node.cpp                    | 21 +++-----
 be/src/vec/exprs/vbitmap_predicate.cpp             |  4 ++
 be/src/vec/exprs/vbloom_predicate.cpp              |  4 ++
 be/src/vec/exprs/vcase_expr.cpp                    |  9 +++-
 be/src/vec/exprs/vcast_expr.cpp                    |  4 ++
 be/src/vec/exprs/vcolumn_ref.h                     | 10 ++++
 be/src/vec/exprs/vdirect_in_predicate.h            | 19 ++++++-
 be/src/vec/exprs/vectorized_fn_call.cpp            | 19 +++----
 be/src/vec/exprs/vexpr.cpp                         | 62 ++++++++++-----------
 be/src/vec/exprs/vexpr.h                           | 63 +++++++++++++---------
 be/src/vec/exprs/vexpr_context.cpp                 | 28 ++++------
 be/src/vec/exprs/vexpr_context.h                   | 15 +++---
 be/src/vec/exprs/vin_predicate.cpp                 |  4 ++
 be/src/vec/exprs/vlambda_function_call_expr.h      | 12 ++++-
 be/src/vec/exprs/vlambda_function_expr.h           | 15 ++++++
 be/src/vec/exprs/vliteral.cpp                      | 19 +++++--
 be/src/vec/exprs/vliteral.h                        |  5 ++
 be/src/vec/exprs/vmatch_predicate.cpp              |  5 +-
 be/src/vec/exprs/vruntimefilter_wrapper.cpp        | 15 +++++-
 be/src/vec/exprs/vslot_ref.cpp                     | 11 ++++
 be/src/vec/exprs/vslot_ref.h                       |  5 +-
 be/src/vec/exprs/vtuple_is_null_predicate.cpp      |  9 ++++
 be/src/vec/exprs/vtuple_is_null_predicate.h        |  4 +-
 be/src/vec/runtime/vdata_stream_recvr.h            |  8 +--
 37 files changed, 329 insertions(+), 237 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index d65e8f96bd2..ac6e7eae9a0 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -25,8 +25,8 @@
 #include <thrift/protocol/TDebugProtocol.h>
 
 #include <map>
+#include <memory>
 #include <sstream>
-#include <typeinfo>
 #include <utility>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
@@ -84,16 +84,10 @@ ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& 
tnode, const DescriptorTbl
           _tuple_ids(tnode.row_tuples),
           _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
           _resource_profile(tnode.resource_profile),
-          _limit(tnode.limit),
-          _num_rows_returned(0),
-          _rows_returned_counter(nullptr),
-          _rows_returned_rate(nullptr),
-          _memory_used_counter(nullptr),
-          _peak_memory_usage_counter(nullptr),
-          _is_closed(false),
-          _ref(0) {
+          _limit(tnode.limit) {
     if (tnode.__isset.output_tuple_id) {
-        _output_row_descriptor.reset(new RowDescriptor(descs, 
{tnode.output_tuple_id}, {true}));
+        _output_row_descriptor = std::make_unique<RowDescriptor>(
+                descs, std::vector {tnode.output_tuple_id}, std::vector 
{true});
     }
     _query_statistics = std::make_shared<QueryStatistics>();
 }
@@ -108,7 +102,7 @@ Status ExecNode::init(const TPlanNode& tnode, RuntimeState* 
state) {
         RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(tnode.vconjunct, 
context));
         _conjuncts.emplace_back(context);
     } else if (tnode.__isset.conjuncts) {
-        for (auto& conjunct : tnode.conjuncts) {
+        for (const auto& conjunct : tnode.conjuncts) {
             vectorized::VExprContextSPtr context;
             RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(conjunct, 
context));
             _conjuncts.emplace_back(context);
@@ -136,8 +130,9 @@ Status ExecNode::prepare(RuntimeState* state) {
     _projection_timer = ADD_TIMER(_runtime_profile, "ProjectionTime");
     _rows_returned_rate = runtime_profile()->add_derived_counter(
             ROW_THROUGHPUT_COUNTER, TUnit::UNIT_PER_SECOND,
-            std::bind<int64_t>(&RuntimeProfile::units_per_second, 
_rows_returned_counter,
-                               runtime_profile()->total_time_counter()),
+            [this, capture0 = runtime_profile()->total_time_counter()] {
+                return 
RuntimeProfile::units_per_second(_rows_returned_counter, capture0);
+            },
             "");
     _memory_used_counter = ADD_LABEL_COUNTER(runtime_profile(), "MemoryUsage");
     _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter(
@@ -150,13 +145,13 @@ Status ExecNode::prepare(RuntimeState* state) {
 
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, 
intermediate_row_desc()));
 
-    for (int i = 0; i < _children.size(); ++i) {
-        RETURN_IF_ERROR(_children[i]->prepare(state));
+    for (auto& i : _children) {
+        RETURN_IF_ERROR(i->prepare(state));
     }
     return Status::OK();
 }
 
-Status ExecNode::alloc_resource(doris::RuntimeState* state) {
+Status ExecNode::alloc_resource(RuntimeState* state) {
     for (auto& conjunct : _conjuncts) {
         RETURN_IF_ERROR(conjunct->open(state));
     }
@@ -170,8 +165,8 @@ Status ExecNode::open(RuntimeState* state) {
 
 Status ExecNode::reset(RuntimeState* state) {
     _num_rows_returned = 0;
-    for (int i = 0; i < _children.size(); ++i) {
-        RETURN_IF_ERROR(_children[i]->reset(state));
+    for (auto& i : _children) {
+        RETURN_IF_ERROR(i->reset(state));
     }
     return Status::OK();
 }
@@ -199,8 +194,8 @@ Status ExecNode::close(RuntimeState* state) {
     _is_closed = true;
 
     Status result;
-    for (int i = 0; i < _children.size(); ++i) {
-        auto st = _children[i]->close(state);
+    for (auto& i : _children) {
+        auto st = i->close(state);
         if (result.ok() && !st.ok()) {
             result = st;
         }
@@ -227,7 +222,7 @@ void ExecNode::add_runtime_exec_option(const std::string& 
str) {
 
 Status ExecNode::create_tree(RuntimeState* state, ObjectPool* pool, const 
TPlan& plan,
                              const DescriptorTbl& descs, ExecNode** root) {
-    if (plan.nodes.size() == 0) {
+    if (plan.nodes.empty()) {
         *root = nullptr;
         return Status::OK();
     }
@@ -305,6 +300,7 @@ Status ExecNode::create_tree_helper(RuntimeState* state, 
ObjectPool* pool,
     return Status::OK();
 }
 
+// NOLINTBEGIN(readability-function-size)
 Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const 
TPlanNode& tnode,
                              const DescriptorTbl& descs, ExecNode** node) {
     VLOG_CRITICAL << "tnode:\n" << apache::thrift::ThriftDebugString(tnode);
@@ -428,8 +424,7 @@ Status ExecNode::create_node(RuntimeState* state, 
ObjectPool* pool, const TPlanN
         return Status::OK();
 
     default:
-        std::map<int, const char*>::const_iterator i =
-                _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
+        auto i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
         const char* str = "unknown node type";
 
         if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) {
@@ -443,6 +438,7 @@ Status ExecNode::create_node(RuntimeState* state, 
ObjectPool* pool, const TPlanN
 
     return Status::OK();
 }
+// NOLINTEND(readability-function-size)
 
 std::string ExecNode::debug_string() const {
     std::stringstream out;
@@ -459,9 +455,9 @@ void ExecNode::debug_string(int indentation_level, 
std::stringstream* out) const
     }
     *out << "]";
 
-    for (int i = 0; i < _children.size(); ++i) {
+    for (auto* i : _children) {
         *out << "\n";
-        _children[i]->debug_string(indentation_level + 1, out);
+        i->debug_string(indentation_level + 1, out);
     }
 }
 
@@ -470,8 +466,8 @@ void ExecNode::collect_nodes(TPlanNodeType::type node_type, 
std::vector<ExecNode
         nodes->push_back(this);
     }
 
-    for (int i = 0; i < _children.size(); ++i) {
-        _children[i]->collect_nodes(node_type, nodes);
+    for (auto& i : _children) {
+        i->collect_nodes(node_type, nodes);
     }
 }
 
@@ -488,7 +484,7 @@ void ExecNode::collect_scan_nodes(vector<ExecNode*>* nodes) 
{
 void ExecNode::init_runtime_profile(const std::string& name) {
     std::stringstream ss;
     ss << name << " (id=" << _id << ")";
-    _runtime_profile.reset(new RuntimeProfile(ss.str()));
+    _runtime_profile = std::make_unique<RuntimeProfile>(ss.str());
     _runtime_profile->set_metadata(_id);
 }
 
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index f4b49cba6f5..903122ecded 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -21,10 +21,10 @@
 #pragma once
 
 #include <gen_cpp/PlanNodes_types.h>
-#include <stddef.h>
-#include <stdint.h>
 
 #include <atomic>
+#include <cstddef>
+#include <cstdint>
 #include <functional>
 #include <memory>
 #include <mutex>
@@ -267,7 +267,7 @@ protected:
     const TBackendResourceProfile _resource_profile;
 
     int64_t _limit; // -1: no limit
-    int64_t _num_rows_returned;
+    int64_t _num_rows_returned = 0;
 
     std::unique_ptr<RuntimeProfile> _runtime_profile;
 
@@ -303,15 +303,6 @@ protected:
 
     bool is_closed() const { return _is_closed; }
 
-    // TODO(zc)
-    /// Pointer to the containing SubplanNode or nullptr if not inside a 
subplan.
-    /// Set by SubplanNode::Init(). Not owned.
-    // SubplanNode* containing_subplan_;
-
-    /// Returns true if this node is inside the right-hand side plan tree of a 
SubplanNode.
-    /// Valid to call in or after Prepare().
-    bool is_in_subplan() const { return false; }
-
     // Create a single exec node derived from thrift node; place exec node in 
'pool'.
     static Status create_node(RuntimeState* state, ObjectPool* pool, const 
TPlanNode& tnode,
                               const DescriptorTbl& descs, ExecNode** node);
@@ -334,9 +325,9 @@ private:
                                      ExecNode** root);
 
     friend class pipeline::OperatorBase;
-    bool _is_closed;
+    bool _is_closed = false;
     bool _is_resource_released = false;
-    std::atomic_int _ref; // used by pipeline operator to release resource.
+    std::atomic_int _ref = 0; // used by pipeline operator to release resource.
 };
 
 } // namespace doris
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index a4f3ff55a5c..6ac06ee5f10 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -17,13 +17,11 @@
 
 #include "multi_cast_data_stream_source.h"
 
-#include <functional>
-
 #include "common/status.h"
 #include "pipeline/exec/multi_cast_data_streamer.h"
 #include "pipeline/exec/operator.h"
-#include "runtime/query_statistics.h"
 #include "vec/core/block.h"
+#include "vec/core/materialize_block.h"
 
 namespace doris::pipeline {
 
@@ -108,7 +106,7 @@ Status 
MultiCastDataStreamerSourceOperator::get_block(RuntimeState* state, vecto
     if (!_output_expr_contexts.empty() && output_block->rows() > 0) {
         
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
                 _output_expr_contexts, *output_block, block, true));
-        materialize_block_inplace(*block);
+        vectorized::materialize_block_inplace(*block);
     }
     if (eos) {
         source_state = SourceState::FINISHED;
@@ -176,7 +174,7 @@ Status 
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
     if (!local_state._output_expr_contexts.empty() && output_block->rows() > 
0) {
         
RETURN_IF_ERROR(vectorized::VExprContext::get_output_block_after_execute_exprs(
                 local_state._output_expr_contexts, *output_block, block, 
true));
-        materialize_block_inplace(*block);
+        vectorized::materialize_block_inplace(*block);
     }
     COUNTER_UPDATE(local_state._rows_returned_counter, block->rows());
     if (eos) {
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index cd5fba5fee3..bf41c670e0c 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -19,20 +19,16 @@
 
 #include <fmt/format.h>
 #include <glog/logging.h>
-#include <stdint.h>
 
+#include <cstdint>
 #include <functional>
 #include <memory>
-#include <ostream>
 #include <string>
-#include <type_traits>
 #include <utility>
 #include <vector>
 
 #include "common/status.h"
 #include "exec/exec_node.h"
-#include "pipeline/pipeline_x/dependency.h"
-#include "runtime/memory/mem_tracker.h"
 #include "runtime/runtime_state.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
@@ -105,7 +101,7 @@ using OperatorBuilders = std::vector<OperatorBuilderPtr>;
 
 class OperatorBuilderBase {
 public:
-    OperatorBuilderBase(int32_t id, const std::string& name) : _id(id), 
_name(name) {}
+    OperatorBuilderBase(int32_t id, std::string name) : _id(id), 
_name(std::move(name)) {}
 
     virtual ~OperatorBuilderBase() = default;
 
@@ -333,10 +329,7 @@ public:
         return Status::OK();
     }
 
-    Status open(RuntimeState* state) override {
-        RETURN_IF_ERROR(_node->alloc_resource(state));
-        return Status::OK();
-    }
+    Status open(RuntimeState* state) override { return 
_node->alloc_resource(state); }
 
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override {
@@ -413,8 +406,7 @@ class StatefulOperator : public 
StreamingOperator<StatefulNodeType> {
 public:
     StatefulOperator(OperatorBuilderBase* builder, ExecNode* node)
             : StreamingOperator<StatefulNodeType>(builder, node),
-              _child_block(vectorized::Block::create_shared()),
-              _child_source_state(SourceState::DEPEND_ON_SOURCE) {}
+              _child_block(vectorized::Block::create_shared()) {}
 
     virtual ~StatefulOperator() = default;
 
@@ -454,7 +446,7 @@ public:
 
 protected:
     std::shared_ptr<vectorized::Block> _child_block;
-    SourceState _child_source_state;
+    SourceState _child_source_state {SourceState::DEPEND_ON_SOURCE};
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 6c18cab03f6..cb106d76edb 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -21,6 +21,7 @@
 
 #include "pipeline/exec/operator.h"
 #include "vec/common/hash_table/hash_table_set_build.h"
+#include "vec/core/materialize_block.h"
 #include "vec/exec/vset_operation_node.h"
 
 namespace doris {
@@ -139,10 +140,10 @@ Status 
SetSinkOperatorX<is_intersect>::_extract_build_column(
 
         block.get_by_position(result_col_id).column =
                 
block.get_by_position(result_col_id).column->convert_to_full_column_if_const();
-        auto column = block.get_by_position(result_col_id).column.get();
+        const auto* column = block.get_by_position(result_col_id).column.get();
 
-        if (auto* nullable = 
check_and_get_column<vectorized::ColumnNullable>(*column)) {
-            auto& col_nested = nullable->get_nested_column();
+        if (const auto* nullable = 
check_and_get_column<vectorized::ColumnNullable>(*column)) {
+            const auto& col_nested = nullable->get_nested_column();
             if (local_state._shared_state->build_not_ignore_null[i]) {
                 raw_ptrs[i] = nullable;
             } else {
@@ -165,7 +166,7 @@ Status SetSinkLocalState<is_intersect>::init(RuntimeState* 
state, LocalSinkState
     SCOPED_TIMER(_open_timer);
     _build_timer = ADD_TIMER(_profile, "BuildTime");
 
-    Parent& parent = _parent->cast<Parent>();
+    auto& parent = _parent->cast<Parent>();
     _dependency->set_cur_child_id(parent._cur_child_id);
     _child_exprs.resize(parent._child_exprs.size());
     for (size_t i = 0; i < _child_exprs.size(); i++) {
@@ -175,16 +176,15 @@ Status 
SetSinkLocalState<is_intersect>::init(RuntimeState* state, LocalSinkState
     _shared_state->child_quantity = parent._child_quantity;
 
     auto& child_exprs_lists = _shared_state->child_exprs_lists;
-    DCHECK(child_exprs_lists.size() == 0 || child_exprs_lists.size() == 
parent._child_quantity);
-    if (child_exprs_lists.size() == 0) {
+    DCHECK(child_exprs_lists.empty() || child_exprs_lists.size() == 
parent._child_quantity);
+    if (child_exprs_lists.empty()) {
         child_exprs_lists.resize(parent._child_quantity);
     }
     child_exprs_lists[parent._cur_child_id] = _child_exprs;
 
     _shared_state->hash_table_variants = 
std::make_unique<vectorized::SetHashTableVariants>();
 
-    for (int i = 0; i < child_exprs_lists[0].size(); ++i) {
-        const auto& ctx = child_exprs_lists[0][i];
+    for (const auto& ctx : child_exprs_lists[0]) {
         
_shared_state->build_not_ignore_null.push_back(ctx->root()->is_nullable());
     }
     _shared_state->hash_table_init();
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index b9c2382ce86..538a2ce1bdb 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -22,13 +22,15 @@
 #include <gen_cpp/PlanNodes_types.h>
 #include <gen_cpp/Planner_types.h>
 #include <pthread.h>
-#include <stdlib.h>
+
+#include <cstdlib>
 // IWYU pragma: no_include <bits/chrono.h>
 #include <fmt/format.h>
 #include <fmt/ranges.h>
 
 #include <chrono> // IWYU pragma: keep
 #include <map>
+#include <memory>
 #include <ostream>
 #include <typeinfo>
 #include <utility>
@@ -212,8 +214,7 @@ PipelinePtr 
PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) {
     return pipeline;
 }
 
-Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& 
request,
-                                        const size_t idx) {
+Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& 
request, size_t idx) {
     if (_prepared) {
         return Status::InternalError("Already prepared");
     }
@@ -299,16 +300,16 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
                   << local_params.per_node_scan_ranges.size();
 
     // set scan range in ScanNode
-    for (int i = 0; i < scan_nodes.size(); ++i) {
+    for (auto& i : scan_nodes) {
         // TODO(cmy): this "if...else" should be removed once all ScanNode are 
derived from VScanNode.
-        ExecNode* node = scan_nodes[i];
+        ExecNode* node = i;
         if (typeid(*node) == typeid(vectorized::NewOlapScanNode) ||
             typeid(*node) == typeid(vectorized::NewFileScanNode) ||
             typeid(*node) == typeid(vectorized::NewOdbcScanNode) ||
             typeid(*node) == typeid(vectorized::NewEsScanNode) ||
             typeid(*node) == typeid(vectorized::VMetaScanNode) ||
             typeid(*node) == typeid(vectorized::NewJdbcScanNode)) {
-            auto* scan_node = 
static_cast<vectorized::VScanNode*>(scan_nodes[i]);
+            auto* scan_node = static_cast<vectorized::VScanNode*>(i);
             auto scan_ranges = 
find_with_default(local_params.per_node_scan_ranges, scan_node->id(),
                                                  no_scan_ranges);
             const bool shared_scan =
@@ -316,7 +317,7 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
             scan_node->set_scan_ranges(_runtime_state.get(), scan_ranges);
             scan_node->set_shared_scan(_runtime_state.get(), shared_scan);
         } else {
-            ScanNode* scan_node = static_cast<ScanNode*>(node);
+            auto* scan_node = static_cast<ScanNode*>(node);
             auto scan_ranges = 
find_with_default(local_params.per_node_scan_ranges, scan_node->id(),
                                                  no_scan_ranges);
             static_cast<void>(scan_node->set_scan_ranges(_runtime_state.get(), 
scan_ranges));
@@ -850,10 +851,10 @@ Status PipelineFragmentContext::_create_sink(int 
sender_id, const TDataSink& thr
                                       {false})
                             : sink_->row_desc();
             // 1. create the data stream sender sink
-            _multi_cast_stream_sink_senders[i].reset(new 
vectorized::VDataStreamSender(
+            _multi_cast_stream_sink_senders[i] = 
std::make_unique<vectorized::VDataStreamSender>(
                     _runtime_state.get(), _runtime_state->obj_pool(), 
sender_id, row_desc,
                     thrift_sink.multi_cast_stream_sink.sinks[i],
-                    thrift_sink.multi_cast_stream_sink.destinations[i]));
+                    thrift_sink.multi_cast_stream_sink.destinations[i]);
 
             // 2. create and set the source operator of 
multi_cast_data_stream_source for new pipeline
             OperatorBuilderPtr source_op =
@@ -941,9 +942,10 @@ Status PipelineFragmentContext::send_report(bool done) {
              _fragment_instance_id,
              _backend_num,
              _runtime_state.get(),
-             std::bind(&PipelineFragmentContext::update_status, this, 
std::placeholders::_1),
-             std::bind(&PipelineFragmentContext::cancel, this, 
std::placeholders::_1,
-                       std::placeholders::_2),
+             [this](auto&& PH1) { return 
update_status(std::forward<decltype(PH1)>(PH1)); },
+             [this](auto&& PH1, auto&& PH2) {
+                 cancel(std::forward<decltype(PH1)>(PH1), 
std::forward<decltype(PH2)>(PH2));
+             },
              _query_ctx->get_query_statistics()},
             
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
 }
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 353e7a06586..a7a45d8f07f 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -67,7 +67,7 @@ public:
                             const std::function<void(RuntimeState*, Status*)>& 
call_back,
                             const report_status_callback& report_status_cb);
 
-    virtual ~PipelineFragmentContext();
+    ~PipelineFragmentContext() override;
 
     PipelinePtr add_pipeline();
 
@@ -89,7 +89,7 @@ public:
 
     int32_t next_operator_builder_id() { return _next_operator_builder_id++; }
 
-    Status prepare(const doris::TPipelineFragmentParams& request, const size_t 
idx);
+    Status prepare(const doris::TPipelineFragmentParams& request, size_t idx);
 
     virtual Status prepare(const doris::TPipelineFragmentParams& request) {
         return Status::InternalError("Pipeline fragment context do not 
implement prepare");
diff --git a/be/src/service/point_query_executor.cpp 
b/be/src/service/point_query_executor.cpp
index a86d5ed90b8..83d059ba206 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -26,6 +26,7 @@
 #include <unordered_map>
 #include <vector>
 
+#include "common/status.h"
 #include "gutil/integral_types.h"
 #include "olap/lru_cache.h"
 #include "olap/olap_tuple.h"
@@ -56,21 +57,22 @@ Status Reusable::init(const TDescriptorTable& t_desc_tbl, 
const std::vector<TExp
     RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), 
t_desc_tbl, &_desc_tbl));
     _runtime_state->set_desc_tbl(_desc_tbl);
     _block_pool.resize(block_size);
-    for (int i = 0; i < _block_pool.size(); ++i) {
-        _block_pool[i] = 
vectorized::Block::create_unique(tuple_desc()->slots(), 2);
+    for (auto& i : _block_pool) {
+        i = vectorized::Block::create_unique(tuple_desc()->slots(), 2);
         // Name is useless but cost space
-        _block_pool[i]->clear_names();
+        i->clear_names();
     }
 
     RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(output_exprs, 
_output_exprs_ctxs));
     RowDescriptor row_desc(tuple_desc(), false);
     // Prepare the exprs to run.
     RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_exprs_ctxs, 
_runtime_state.get(), row_desc));
+    RETURN_IF_ERROR(vectorized::VExpr::open(_output_exprs_ctxs, 
_runtime_state.get()));
     _create_timestamp = butil::gettimeofday_ms();
     _data_type_serdes = 
vectorized::create_data_type_serdes(tuple_desc()->slots());
     _col_default_values.resize(tuple_desc()->slots().size());
     for (int i = 0; i < tuple_desc()->slots().size(); ++i) {
-        auto slot = tuple_desc()->slots()[i];
+        auto* slot = tuple_desc()->slots()[i];
         _col_uid_to_idx[slot->col_unique_id()] = i;
         _col_default_values[i] = slot->col_default_value();
     }
diff --git a/be/src/vec/columns/column_nullable.cpp 
b/be/src/vec/columns/column_nullable.cpp
index 426de2d4f70..8b0008d6e2e 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -23,9 +23,7 @@
 #include "vec/columns/column_const.h"
 #include "vec/common/arena.h"
 #include "vec/common/assert_cast.h"
-#include "vec/common/nan_utils.h"
 #include "vec/common/sip_hash.h"
-#include "vec/common/typeid_cast.h"
 #include "vec/core/sort_block.h"
 #include "vec/data_types/data_type.h"
 #include "vec/utils/util.hpp"
@@ -571,7 +569,9 @@ bool ColumnNullable::has_null(size_t size) const {
 }
 
 ColumnPtr make_nullable(const ColumnPtr& column, bool is_nullable) {
-    if (is_column_nullable(*column)) return column;
+    if (is_column_nullable(*column)) {
+        return column;
+    }
 
     if (is_column_const(*column)) {
         return ColumnConst::create(
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index 83cbe82e328..91128fb69a8 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -32,7 +32,6 @@
 #include "olap/olap_common.h"
 #include "runtime/define_primitive_type.h"
 #include "vec/columns/column.h"
-#include "vec/columns/column_impl.h"
 #include "vec/columns/column_vector.h"
 #include "vec/columns/columns_number.h"
 #include "vec/common/assert_cast.h"
@@ -77,8 +76,7 @@ public:
                                       null_map_->assume_mutable());
     }
 
-    template <typename... Args,
-              typename = typename 
std::enable_if<IsMutableColumns<Args...>::value>::type>
+    template <typename... Args, typename = 
std::enable_if_t<IsMutableColumns<Args...>::value>>
     static MutablePtr create(Args&&... args) {
         return Base::create(std::forward<Args>(args)...);
     }
diff --git a/be/src/vec/exec/join/vjoin_node_base.h 
b/be/src/vec/exec/join/vjoin_node_base.h
index a44bc5513a9..c918e26e6fe 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -23,7 +23,6 @@
 #include <memory>
 #include <type_traits>
 #include <variant>
-#include <vector>
 
 #include "common/status.h"
 #include "exec/exec_node.h"
@@ -57,22 +56,20 @@ class VJoinNodeBase : public ExecNode {
 public:
     VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const 
DescriptorTbl& descs);
 
-    virtual Status prepare(RuntimeState* state) override;
+    Status prepare(RuntimeState* state) override;
 
-    virtual Status close(RuntimeState* state) override;
+    Status close(RuntimeState* state) override;
 
-    virtual Status open(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
 
-    virtual const RowDescriptor& row_desc() const override { return 
*_output_row_desc; }
+    const RowDescriptor& row_desc() const override { return *_output_row_desc; 
}
 
-    virtual const RowDescriptor& intermediate_row_desc() const override {
-        return *_intermediate_row_desc;
-    }
+    const RowDescriptor& intermediate_row_desc() const override { return 
*_intermediate_row_desc; }
 
-    virtual Status alloc_resource(RuntimeState* state) override;
-    virtual void release_resource(RuntimeState* state) override;
+    Status alloc_resource(RuntimeState* state) override;
+    void release_resource(RuntimeState* state) override;
 
-    virtual Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) 
override;
+    Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) 
override;
 
     [[nodiscard]] bool can_terminate_early() override { return 
_short_circuit_for_probe; }
 
diff --git a/be/src/vec/exec/vset_operation_node.cpp 
b/be/src/vec/exec/vset_operation_node.cpp
index 4f4634fbd64..3c47638ef42 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -22,8 +22,6 @@
 #include <gen_cpp/PlanNodes_types.h>
 #include <glog/logging.h>
 
-#include <array>
-#include <atomic>
 #include <ostream>
 #include <string>
 #include <type_traits>
@@ -31,12 +29,9 @@
 
 #include "runtime/define_primitive_type.h"
 #include "runtime/runtime_state.h"
-#include "util/defer_op.h"
 #include "vec/columns/column_nullable.h"
-#include "vec/common/columns_hashing.h"
 #include "vec/common/hash_table/hash_table_set_build.h"
 #include "vec/common/hash_table/hash_table_set_probe.h"
-#include "vec/common/uint128.h"
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/core/materialize_block.h"
 #include "vec/core/types.h"
@@ -100,6 +95,8 @@ Status VSetOperationNode<is_intersect>::init(const 
TPlanNode& tnode, RuntimeStat
 template <bool is_intersect>
 Status VSetOperationNode<is_intersect>::alloc_resource(RuntimeState* state) {
     SCOPED_TIMER(_exec_timer);
+    // will open projections
+    RETURN_IF_ERROR(ExecNode::alloc_resource(state));
     // open result expr lists.
     for (const VExprContextSPtrs& exprs : _child_expr_lists) {
         RETURN_IF_ERROR(VExpr::open(exprs, state));
diff --git a/be/src/vec/exec/vset_operation_node.h 
b/be/src/vec/exec/vset_operation_node.h
index ae600c6490d..b1ab9c47650 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -17,10 +17,7 @@
 
 #pragma once
 
-#include <stddef.h>
-#include <stdint.h>
-
-#include <functional>
+#include <cstdint>
 #include <iosfwd>
 #include <memory>
 #include <unordered_map>
@@ -33,8 +30,6 @@
 #include "vec/aggregate_functions/aggregate_function.h"
 #include "vec/columns/column.h"
 #include "vec/common/arena.h"
-#include "vec/common/hash_table/hash_map.h"
-#include "vec/common/string_ref.h"
 #include "vec/core/block.h"
 #include "vec/exec/join/process_hash_table_probe.h"
 #include "vec/exec/join/vhash_join_node.h"
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index e77fd9fee90..eac8aa1b167 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -20,7 +20,6 @@
 #include <gen_cpp/Exprs_types.h>
 #include <gen_cpp/PlanNodes_types.h>
 
-#include <algorithm>
 #include <boost/iterator/iterator_facade.hpp>
 #include <functional>
 #include <memory>
@@ -66,8 +65,8 @@ Status VUnionNode::init(const TPlanNode& tnode, RuntimeState* 
state) {
         _const_expr_lists.push_back(ctxs);
     }
     // Create result_expr_ctx_lists_ from thrift exprs.
-    auto& result_texpr_lists = tnode.union_node.result_expr_lists;
-    for (auto& texprs : result_texpr_lists) {
+    const auto& result_texpr_lists = tnode.union_node.result_expr_lists;
+    for (const auto& texprs : result_texpr_lists) {
         VExprContextSPtrs ctxs;
         RETURN_IF_ERROR(VExpr::create_expr_trees(texprs, ctxs));
         _child_expr_lists.push_back(ctxs);
@@ -127,7 +126,6 @@ Status VUnionNode::alloc_resource(RuntimeState* state) {
 
 Status VUnionNode::get_next_pass_through(RuntimeState* state, Block* block) {
     DCHECK(!reached_limit());
-    DCHECK(!is_in_subplan());
     DCHECK_LT(_child_idx, _children.size());
     DCHECK(is_child_passthrough(_child_idx));
     if (_child_eos) {
@@ -196,12 +194,6 @@ Status VUnionNode::get_next_materialized(RuntimeState* 
state, Block* block) {
         // incremented '_num_rows_returned' yet.
         DCHECK(!reached_limit());
         if (_child_eos) {
-            // Unless we are inside a subplan expecting to call 
open()/get_next() on the child
-            // again, the child can be closed at this point.
-            // TODO: Recheck whether is_in_subplan() is right
-            //            if (!is_in_subplan()) {
-            //                child(_child_idx)->close(state);
-            //            }
             ++_child_idx;
         }
     }
@@ -275,7 +267,6 @@ Status VUnionNode::get_next(RuntimeState* state, Block* 
block, bool* eos) {
         // The previous child needs to be closed if passthrough was enabled 
for it. In the non
         // passthrough case, the child was already closed in the previous call 
to get_next().
         DCHECK(is_child_passthrough(_to_close_child_idx));
-        DCHECK(!is_in_subplan());
         static_cast<void>(child(_to_close_child_idx)->close(state));
         _to_close_child_idx = -1;
     }
@@ -317,8 +308,8 @@ void VUnionNode::debug_string(int indentation_level, 
std::stringstream* out) con
     *out << string(indentation_level * 2, ' ');
     *out << "_union(_first_materialized_child_idx=" << 
_first_materialized_child_idx
          << " _child_expr_lists=[";
-    for (int i = 0; i < _child_expr_lists.size(); ++i) {
-        *out << VExpr::debug_string(_child_expr_lists[i]) << ", ";
+    for (const auto& _child_expr_list : _child_expr_lists) {
+        *out << VExpr::debug_string(_child_expr_list) << ", ";
     }
     *out << "] \n";
     ExecNode::debug_string(indentation_level, out);
@@ -329,9 +320,9 @@ Status VUnionNode::materialize_block(Block* src_block, int 
child_idx, Block* res
     SCOPED_TIMER(_exec_timer);
     const auto& child_exprs = _child_expr_lists[child_idx];
     ColumnsWithTypeAndName colunms;
-    for (size_t i = 0; i < child_exprs.size(); ++i) {
+    for (const auto& child_expr : child_exprs) {
         int result_column_id = -1;
-        RETURN_IF_ERROR(child_exprs[i]->execute(src_block, &result_column_id));
+        RETURN_IF_ERROR(child_expr->execute(src_block, &result_column_id));
         colunms.emplace_back(src_block->get_by_position(result_column_id));
     }
     _child_row_idx += src_block->rows();
diff --git a/be/src/vec/exprs/vbitmap_predicate.cpp 
b/be/src/vec/exprs/vbitmap_predicate.cpp
index 0e158298d85..8116311247b 100644
--- a/be/src/vec/exprs/vbitmap_predicate.cpp
+++ b/be/src/vec/exprs/vbitmap_predicate.cpp
@@ -66,19 +66,23 @@ doris::Status 
vectorized::VBitmapPredicate::prepare(doris::RuntimeState* state,
         auto column = child->data_type()->create_column();
         argument_template.emplace_back(std::move(column), child->data_type(), 
child->expr_name());
     }
+    _prepare_finished = true;
     return Status::OK();
 }
 
 doris::Status vectorized::VBitmapPredicate::open(doris::RuntimeState* state,
                                                  vectorized::VExprContext* 
context,
                                                  
FunctionContext::FunctionStateScope scope) {
+    DCHECK(_prepare_finished);
     RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    _open_finished = true;
     return Status::OK();
 }
 
 doris::Status vectorized::VBitmapPredicate::execute(vectorized::VExprContext* 
context,
                                                     doris::vectorized::Block* 
block,
                                                     int* result_column_id) {
+    DCHECK(_open_finished || _getting_const_col);
     doris::vectorized::ColumnNumbers arguments(_children.size());
     for (int i = 0; i < _children.size(); ++i) {
         int column_id = -1;
diff --git a/be/src/vec/exprs/vbloom_predicate.cpp 
b/be/src/vec/exprs/vbloom_predicate.cpp
index f72657c528a..08f891b0e56 100644
--- a/be/src/vec/exprs/vbloom_predicate.cpp
+++ b/be/src/vec/exprs/vbloom_predicate.cpp
@@ -60,12 +60,15 @@ Status VBloomPredicate::prepare(RuntimeState* state, const 
RowDescriptor& desc,
     }
 
     _be_exec_version = state->be_exec_version();
+    _prepare_finished = true;
     return Status::OK();
 }
 
 Status VBloomPredicate::open(RuntimeState* state, VExprContext* context,
                              FunctionContext::FunctionStateScope scope) {
+    DCHECK(_prepare_finished);
     RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    _open_finished = true;
     return Status::OK();
 }
 
@@ -74,6 +77,7 @@ void VBloomPredicate::close(VExprContext* context, 
FunctionContext::FunctionStat
 }
 
 Status VBloomPredicate::execute(VExprContext* context, Block* block, int* 
result_column_id) {
+    DCHECK(_open_finished || _getting_const_col);
     doris::vectorized::ColumnNumbers arguments(_children.size());
     for (int i = 0; i < _children.size(); ++i) {
         int column_id = -1;
diff --git a/be/src/vec/exprs/vcase_expr.cpp b/be/src/vec/exprs/vcase_expr.cpp
index e09d62bfb23..dee60b5a6f1 100644
--- a/be/src/vec/exprs/vcase_expr.cpp
+++ b/be/src/vec/exprs/vcase_expr.cpp
@@ -74,22 +74,26 @@ Status VCaseExpr::prepare(RuntimeState* state, const 
RowDescriptor& desc, VExprC
     }
 
     VExpr::register_function_context(state, context);
+    _prepare_finished = true;
     return Status::OK();
 }
 
 Status VCaseExpr::open(RuntimeState* state, VExprContext* context,
                        FunctionContext::FunctionStateScope scope) {
-    for (int i = 0; i < _children.size(); ++i) {
-        RETURN_IF_ERROR(_children[i]->open(state, context, scope));
+    DCHECK(_prepare_finished);
+    for (auto& i : _children) {
+        RETURN_IF_ERROR(i->open(state, context, scope));
     }
     RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
     if (scope == FunctionContext::FRAGMENT_LOCAL) {
         RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
     }
+    _open_finished = true;
     return Status::OK();
 }
 
 void VCaseExpr::close(VExprContext* context, 
FunctionContext::FunctionStateScope scope) {
+    DCHECK(_prepare_finished);
     VExpr::close_function_context(context, scope, _function);
     VExpr::close(context, scope);
 }
@@ -98,6 +102,7 @@ Status VCaseExpr::execute(VExprContext* context, Block* 
block, int* result_colum
     if (is_const_and_have_executed()) { // const have execute in open function
         return get_result_from_const(block, _expr_name, result_column_id);
     }
+    DCHECK(_open_finished || _getting_const_col);
     ColumnNumbers arguments(_children.size());
     for (int i = 0; i < _children.size(); i++) {
         int column_id = -1;
diff --git a/be/src/vec/exprs/vcast_expr.cpp b/be/src/vec/exprs/vcast_expr.cpp
index 3207ba5b541..f322c1d2fae 100644
--- a/be/src/vec/exprs/vcast_expr.cpp
+++ b/be/src/vec/exprs/vcast_expr.cpp
@@ -76,6 +76,7 @@ doris::Status VCastExpr::prepare(doris::RuntimeState* state, 
const doris::RowDes
     VExpr::register_function_context(state, context);
     _expr_name = fmt::format("(CAST {}({}) TO {})", child_name, 
child->data_type()->get_name(),
                              _target_data_type_name);
+    _prepare_finished = true;
     return Status::OK();
 }
 
@@ -85,6 +86,7 @@ const DataTypePtr& VCastExpr::get_target_type() const {
 
 doris::Status VCastExpr::open(doris::RuntimeState* state, VExprContext* 
context,
                               FunctionContext::FunctionStateScope scope) {
+    DCHECK(_prepare_finished);
     for (int i = 0; i < _children.size(); ++i) {
         RETURN_IF_ERROR(_children[i]->open(state, context, scope));
     }
@@ -92,6 +94,7 @@ doris::Status VCastExpr::open(doris::RuntimeState* state, 
VExprContext* context,
     if (scope == FunctionContext::FRAGMENT_LOCAL) {
         RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
     }
+    _open_finished = true;
     return Status::OK();
 }
 
@@ -102,6 +105,7 @@ void VCastExpr::close(VExprContext* context, 
FunctionContext::FunctionStateScope
 
 doris::Status VCastExpr::execute(VExprContext* context, 
doris::vectorized::Block* block,
                                  int* result_column_id) {
+    DCHECK(_open_finished || _getting_const_col);
     // for each child call execute
     int column_id = 0;
     RETURN_IF_ERROR(_children[0]->execute(context, block, &column_id));
diff --git a/be/src/vec/exprs/vcolumn_ref.h b/be/src/vec/exprs/vcolumn_ref.h
index 25e35dec4a5..a763797880e 100644
--- a/be/src/vec/exprs/vcolumn_ref.h
+++ b/be/src/vec/exprs/vcolumn_ref.h
@@ -43,10 +43,20 @@ public:
                     "VColumnRef have invalid slot id: {}, _column_name: {}, 
desc: {}", _column_id,
                     _column_name, desc.debug_string());
         }
+        _prepare_finished = true;
+        return Status::OK();
+    }
+
+    Status open(RuntimeState* state, VExprContext* context,
+                FunctionContext::FunctionStateScope scope) override {
+        DCHECK(_prepare_finished);
+        RETURN_IF_ERROR(VExpr::open(state, context, scope));
+        _open_finished = true;
         return Status::OK();
     }
 
     Status execute(VExprContext* context, Block* block, int* result_column_id) 
override {
+        DCHECK(_open_finished || _getting_const_col);
         *result_column_id = _column_id;
         return Status::OK();
     }
diff --git a/be/src/vec/exprs/vdirect_in_predicate.h 
b/be/src/vec/exprs/vdirect_in_predicate.h
index 5211e013466..a68a6c3121a 100644
--- a/be/src/vec/exprs/vdirect_in_predicate.h
+++ b/be/src/vec/exprs/vdirect_in_predicate.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "common/status.h"
+#include "exprs/hybrid_set.h"
 #include "vec/exprs/vexpr.h"
 
 namespace doris::vectorized {
@@ -29,7 +30,23 @@ public:
             : VExpr(node), _filter(nullptr), _expr_name("direct_in_predicate") 
{}
     ~VDirectInPredicate() override = default;
 
+    Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
+                   VExprContext* context) override {
+        RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, row_desc, context));
+        _prepare_finished = true;
+        return Status::OK();
+    }
+
+    Status open(RuntimeState* state, VExprContext* context,
+                FunctionContext::FunctionStateScope scope) override {
+        DCHECK(_prepare_finished);
+        RETURN_IF_ERROR(VExpr::open(state, context, scope));
+        _open_finished = true;
+        return Status::OK();
+    }
+
     Status execute(VExprContext* context, Block* block, int* result_column_id) 
override {
+        DCHECK(_open_finished || _getting_const_col);
         ColumnNumbers arguments(_children.size());
         for (int i = 0; i < _children.size(); ++i) {
             int column_id = -1;
@@ -47,7 +64,7 @@ public:
         if (argument_column->is_nullable()) {
             auto column_nested = static_cast<const 
ColumnNullable*>(argument_column.get())
                                          ->get_nested_column_ptr();
-            auto& null_map =
+            const auto& null_map =
                     static_cast<const 
ColumnNullable*>(argument_column.get())->get_null_map_data();
             _filter->find_batch_nullable(*column_nested, sz, null_map, 
res_data_column->get_data());
         } else {
diff --git a/be/src/vec/exprs/vectorized_fn_call.cpp 
b/be/src/vec/exprs/vectorized_fn_call.cpp
index 48522b35500..bf38185f7df 100644
--- a/be/src/vec/exprs/vectorized_fn_call.cpp
+++ b/be/src/vec/exprs/vectorized_fn_call.cpp
@@ -21,8 +21,6 @@
 #include <fmt/ranges.h> // IWYU pragma: keep
 #include <gen_cpp/Types_types.h>
 
-#include <algorithm>
-#include <memory>
 #include <ostream>
 #include <string_view>
 #include <utility>
@@ -115,19 +113,21 @@ Status VectorizedFnCall::prepare(RuntimeState* state, 
const RowDescriptor& desc,
     VExpr::register_function_context(state, context);
     _function_name = _fn.name.function_name;
     _can_fast_execute = _function->can_fast_execute();
-
+    _prepare_finished = true;
     return Status::OK();
 }
 
 Status VectorizedFnCall::open(RuntimeState* state, VExprContext* context,
                               FunctionContext::FunctionStateScope scope) {
-    for (int i = 0; i < _children.size(); ++i) {
-        RETURN_IF_ERROR(_children[i]->open(state, context, scope));
+    DCHECK(_prepare_finished);
+    for (auto& i : _children) {
+        RETURN_IF_ERROR(i->open(state, context, scope));
     }
     RETURN_IF_ERROR(VExpr::init_function_context(context, scope, _function));
     if (scope == FunctionContext::FRAGMENT_LOCAL) {
         RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
     }
+    _open_finished = true;
     return Status::OK();
 }
 
@@ -142,6 +142,7 @@ Status VectorizedFnCall::execute(VExprContext* context, 
vectorized::Block* block
         return get_result_from_const(block, _expr_name, result_column_id);
     }
 
+    DCHECK(_open_finished || _getting_const_col) << debug_string();
     // TODO: not execute const expr again, but use the const column in 
function context
     vectorized::ColumnNumbers arguments(_children.size());
     for (int i = 0; i < _children.size(); ++i) {
@@ -188,9 +189,9 @@ bool VectorizedFnCall::fast_execute(FunctionContext* 
context, Block& block,
             
block.get_by_name(result_column_name).column->convert_to_full_column_if_const();
     auto& result_info = block.get_by_position(result);
     if (result_info.type->is_nullable()) {
-        block.replace_by_position(result,
-                                  
ColumnNullable::create(std::move(result_column),
-                                                         
ColumnUInt8::create(input_rows_count, 0)));
+        block.replace_by_position(
+                result,
+                ColumnNullable::create(result_column, 
ColumnUInt8::create(input_rows_count, 0)));
     } else {
         block.replace_by_position(result, std::move(result_column));
     }
@@ -208,7 +209,7 @@ std::string VectorizedFnCall::debug_string() const {
     out << _expr_name;
     out << "]{";
     bool first = true;
-    for (auto& input_expr : children()) {
+    for (const auto& input_expr : children()) {
         if (first) {
             first = false;
         } else {
diff --git a/be/src/vec/exprs/vexpr.cpp b/be/src/vec/exprs/vexpr.cpp
index 7270126e563..4f6b984e8fc 100644
--- a/be/src/vec/exprs/vexpr.cpp
+++ b/be/src/vec/exprs/vexpr.cpp
@@ -17,6 +17,7 @@
 
 #include "vec/exprs/vexpr.h"
 
+#include <fmt/format.h>
 #include <gen_cpp/Exprs_types.h>
 #include <thrift/protocol/TDebugProtocol.h>
 
@@ -27,7 +28,6 @@
 
 #include "common/config.h"
 #include "common/exception.h"
-#include "common/object_pool.h"
 #include "common/status.h"
 #include "vec/columns/column_vector.h"
 #include "vec/columns/columns_number.h"
@@ -40,6 +40,7 @@
 #include "vec/exprs/vcompound_pred.h"
 #include "vec/exprs/vectorized_fn_call.h"
 #include "vec/exprs/vexpr_context.h"
+#include "vec/exprs/vexpr_fwd.h"
 #include "vec/exprs/vin_predicate.h"
 #include "vec/exprs/vinfo_func.h"
 #include "vec/exprs/vlambda_function_call_expr.h"
@@ -55,6 +56,9 @@
 namespace doris {
 class RowDescriptor;
 class RuntimeState;
+
+// NOLINTBEGIN(readability-function-cognitive-complexity)
+// NOLINTBEGIN(readability-function-size)
 TExprNode create_texpr_node_from(const void* data, const PrimitiveType& type, 
int precision,
                                  int scale) {
     TExprNode node;
@@ -146,6 +150,8 @@ TExprNode create_texpr_node_from(const void* data, const 
PrimitiveType& type, in
     }
     return node;
 }
+// NOLINTEND(readability-function-size)
+// NOLINTEND(readability-function-cognitive-complexity)
 } // namespace doris
 
 namespace doris::vectorized {
@@ -162,9 +168,7 @@ bool VExpr::is_acting_on_a_slot(const VExpr& expr) {
 VExpr::VExpr(const TExprNode& node)
         : _node_type(node.node_type),
           _opcode(node.__isset.opcode ? node.opcode : 
TExprOpcode::INVALID_OPCODE),
-          _type(TypeDescriptor::from_thrift(node.type)),
-          _fn_context_index(-1),
-          _prepared(false) {
+          _type(TypeDescriptor::from_thrift(node.type)) {
     if (node.__isset.fn) {
         _fn = node.fn;
     }
@@ -183,10 +187,7 @@ VExpr::VExpr(const TExprNode& node)
 VExpr::VExpr(const VExpr& vexpr) = default;
 
 VExpr::VExpr(TypeDescriptor type, bool is_slotref, bool is_nullable)
-        : _opcode(TExprOpcode::INVALID_OPCODE),
-          _type(std::move(type)),
-          _fn_context_index(-1),
-          _prepared(false) {
+        : _opcode(TExprOpcode::INVALID_OPCODE), _type(std::move(type)) {
     if (is_slotref) {
         _node_type = TExprNodeType::SLOT_REF;
     }
@@ -221,11 +222,12 @@ Status VExpr::open(RuntimeState* state, VExprContext* 
context,
 }
 
 void VExpr::close(VExprContext* context, FunctionContext::FunctionStateScope 
scope) {
-    for (int i = 0; i < _children.size(); ++i) {
-        _children[i]->close(context, scope);
+    for (auto& i : _children) {
+        i->close(context, scope);
     }
 }
 
+// NOLINTBEGIN(readability-function-size)
 Status VExpr::create_expr(const TExprNode& expr_node, VExprSPtr& expr) {
     try {
         switch (expr_node.node_type) {
@@ -326,6 +328,7 @@ Status VExpr::create_expr(const TExprNode& expr_node, 
VExprSPtr& expr) {
     }
     return Status::OK();
 }
+// NOLINTEND(readability-function-size)
 
 Status VExpr::create_tree_from_thrift(const std::vector<TExprNode>& nodes, 
int* node_idx,
                                       VExprSPtr& root_expr, VExprContextSPtr& 
ctx) {
@@ -348,7 +351,7 @@ Status VExpr::create_tree_from_thrift(const 
std::vector<TExprNode>& nodes, int*
 
     // non-recursive traversal
     std::stack<std::pair<VExprSPtr, int>> s;
-    s.push({root, root_children});
+    s.emplace(root, root_children);
     while (!s.empty()) {
         auto& parent = s.top();
         if (parent.second > 1) {
@@ -366,14 +369,14 @@ Status VExpr::create_tree_from_thrift(const 
std::vector<TExprNode>& nodes, int*
         parent.first->add_child(expr);
         int num_children = nodes[*node_idx].num_children;
         if (num_children > 0) {
-            s.push({expr, num_children});
+            s.emplace(expr, num_children);
         }
     }
     return Status::OK();
 }
 
 Status VExpr::create_expr_tree(const TExpr& texpr, VExprContextSPtr& ctx) {
-    if (texpr.nodes.size() == 0) {
+    if (texpr.nodes.empty()) {
         ctx = nullptr;
         return Status::OK();
     }
@@ -395,9 +398,9 @@ Status VExpr::create_expr_tree(const TExpr& texpr, 
VExprContextSPtr& ctx) {
 
 Status VExpr::create_expr_trees(const std::vector<TExpr>& texprs, 
VExprContextSPtrs& ctxs) {
     ctxs.clear();
-    for (int i = 0; i < texprs.size(); ++i) {
+    for (const auto& texpr : texprs) {
         VExprContextSPtr ctx;
-        RETURN_IF_ERROR(create_expr_tree(texprs[i], ctx));
+        RETURN_IF_ERROR(create_expr_tree(texpr, ctx));
         ctxs.push_back(ctx);
     }
     return Status::OK();
@@ -412,8 +415,8 @@ Status VExpr::prepare(const VExprContextSPtrs& ctxs, 
RuntimeState* state,
 }
 
 Status VExpr::open(const VExprContextSPtrs& ctxs, RuntimeState* state) {
-    for (int i = 0; i < ctxs.size(); ++i) {
-        RETURN_IF_ERROR(ctxs[i]->open(state));
+    for (const auto& ctx : ctxs) {
+        RETURN_IF_ERROR(ctx->open(state));
     }
     return Status::OK();
 }
@@ -423,8 +426,8 @@ Status VExpr::clone_if_not_exists(const VExprContextSPtrs& 
ctxs, RuntimeState* s
     if (!new_ctxs.empty()) {
         // 'ctxs' was already cloned into '*new_ctxs', nothing to do.
         DCHECK_EQ(new_ctxs.size(), ctxs.size());
-        for (int i = 0; i < new_ctxs.size(); ++i) {
-            DCHECK(new_ctxs[i]->_is_clone);
+        for (auto& new_ctx : new_ctxs) {
+            DCHECK(new_ctx->_is_clone);
         }
         return Status::OK();
     }
@@ -461,20 +464,15 @@ std::string VExpr::debug_string(const VExprSPtrs& exprs) {
 
 std::string VExpr::debug_string(const VExprContextSPtrs& ctxs) {
     VExprSPtrs exprs;
-    for (int i = 0; i < ctxs.size(); ++i) {
-        exprs.push_back(ctxs[i]->root());
+    for (const auto& ctx : ctxs) {
+        exprs.push_back(ctx->root());
     }
     return debug_string(exprs);
 }
 
 bool VExpr::is_constant() const {
-    for (int i = 0; i < _children.size(); ++i) {
-        if (!_children[i]->is_constant()) {
-            return false;
-        }
-    }
-
-    return true;
+    return std::all_of(_children.begin(), _children.end(),
+                       [](const VExprSPtr& expr) { return expr->is_constant(); 
});
 }
 
 Status VExpr::get_const_col(VExprContext* context,
@@ -494,7 +492,11 @@ Status VExpr::get_const_col(VExprContext* context,
     // If block is empty, some functions will produce no result. So we insert 
a column with
     // single value here.
     block.insert({ColumnUInt8::create(1), std::make_shared<DataTypeUInt8>(), 
""});
+
+    _getting_const_col = true;
     RETURN_IF_ERROR(execute(context, &block, &result));
+    _getting_const_col = false;
+
     DCHECK(result != -1);
     const auto& column = block.get_by_position(result).column;
     _constant_col = std::make_shared<ColumnPtrWrapper>(column);
@@ -507,8 +509,8 @@ Status VExpr::get_const_col(VExprContext* context,
 
 void VExpr::register_function_context(RuntimeState* state, VExprContext* 
context) {
     std::vector<TypeDescriptor> arg_types;
-    for (int i = 0; i < _children.size(); ++i) {
-        arg_types.push_back(_children[i]->type());
+    for (auto& i : _children) {
+        arg_types.push_back(i->type());
     }
 
     _fn_context_index = context->register_function_context(state, _type, 
arg_types);
diff --git a/be/src/vec/exprs/vexpr.h b/be/src/vec/exprs/vexpr.h
index b6a2b4ac6bd..a852afeb2d2 100644
--- a/be/src/vec/exprs/vexpr.h
+++ b/be/src/vec/exprs/vexpr.h
@@ -21,15 +21,14 @@
 #include <gen_cpp/Opcodes_types.h>
 #include <gen_cpp/Types_types.h>
 #include <glog/logging.h>
-#include <stddef.h>
 
+#include <cstddef>
 #include <memory>
 #include <ostream>
 #include <string>
 #include <utility>
 #include <vector>
 
-#include "common/factory_creator.h"
 #include "common/status.h"
 #include "runtime/define_primitive_type.h"
 #include "runtime/large_int_value.h"
@@ -57,10 +56,9 @@ namespace vectorized {
 #define RETURN_IF_ERROR_OR_PREPARED(stmt) \
     if (_prepared) {                      \
         return Status::OK();              \
-    } else {                              \
-        _prepared = true;                 \
-        RETURN_IF_ERROR(stmt);            \
-    }
+    }                                     \
+    _prepared = true;                     \
+    RETURN_IF_ERROR(stmt);
 
 // VExpr should be used as shared pointer because it will be passed between 
classes
 // like runtime filter to scan node, or from scannode to scanner. We could not 
make sure
@@ -106,6 +104,14 @@ public:
     virtual Status open(RuntimeState* state, VExprContext* context,
                         FunctionContext::FunctionStateScope scope);
 
+    // before execute, check if expr has been parepared+opened.
+    [[maybe_unused]] Status ready_status() const {
+        if (_prepare_finished && _open_finished) {
+            return Status::OK();
+        }
+        return Status::InternalError(expr_name() + " is not ready when 
execute");
+    }
+
     virtual Status execute(VExprContext* context, Block* block, int* 
result_column_id) = 0;
 
     /// Subclasses overriding this function should call VExpr::Close().
@@ -156,6 +162,8 @@ public:
     static std::string debug_string(const VExprSPtrs& exprs);
     static std::string debug_string(const VExprContextSPtrs& ctxs);
 
+    void set_getting_const_col(bool val = true) { _getting_const_col = val; }
+
     bool is_and_expr() const { return _fn.name.function_name == "and"; }
 
     virtual bool is_compound_predicate() const { return false; }
@@ -254,63 +262,69 @@ protected:
     /// Index to pass to ExprContext::fn_context() to retrieve this expr's 
FunctionContext.
     /// Set in RegisterFunctionContext(). -1 if this expr does not need a 
FunctionContext and
     /// doesn't call RegisterFunctionContext().
-    int _fn_context_index;
+    int _fn_context_index = -1;
 
     // If this expr is constant, this will store and cache the value generated 
by
     // get_const_col()
     std::shared_ptr<ColumnPtrWrapper> _constant_col;
-    bool _prepared;
+    bool _prepared = false; // for base class VExpr
+    bool _getting_const_col =
+            false; // if true, current execute() is in prepare() (that is, 
can't check _prepared)
+    // for concrete classes
+    bool _prepare_finished = false;
+    bool _open_finished = false;
 };
 
 } // namespace vectorized
 
+// NOLINTBEGIN(readability-function-size)
 template <PrimitiveType T>
 Status create_texpr_literal_node(const void* data, TExprNode* node, int 
precision = 0,
                                  int scale = 0) {
     if constexpr (T == TYPE_BOOLEAN) {
-        auto origin_value = reinterpret_cast<const bool*>(data);
+        const auto* origin_value = reinterpret_cast<const bool*>(data);
         TBoolLiteral boolLiteral;
         (*node).__set_node_type(TExprNodeType::BOOL_LITERAL);
         boolLiteral.__set_value(*origin_value);
         (*node).__set_bool_literal(boolLiteral);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
     } else if constexpr (T == TYPE_TINYINT) {
-        auto origin_value = reinterpret_cast<const int8_t*>(data);
+        const auto* origin_value = reinterpret_cast<const int8_t*>(data);
         (*node).__set_node_type(TExprNodeType::INT_LITERAL);
         TIntLiteral intLiteral;
         intLiteral.__set_value(*origin_value);
         (*node).__set_int_literal(intLiteral);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_TINYINT));
     } else if constexpr (T == TYPE_SMALLINT) {
-        auto origin_value = reinterpret_cast<const int16_t*>(data);
+        const auto* origin_value = reinterpret_cast<const int16_t*>(data);
         (*node).__set_node_type(TExprNodeType::INT_LITERAL);
         TIntLiteral intLiteral;
         intLiteral.__set_value(*origin_value);
         (*node).__set_int_literal(intLiteral);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_SMALLINT));
     } else if constexpr (T == TYPE_INT) {
-        auto origin_value = reinterpret_cast<const int32_t*>(data);
+        const auto* origin_value = reinterpret_cast<const int32_t*>(data);
         (*node).__set_node_type(TExprNodeType::INT_LITERAL);
         TIntLiteral intLiteral;
         intLiteral.__set_value(*origin_value);
         (*node).__set_int_literal(intLiteral);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_INT));
     } else if constexpr (T == TYPE_BIGINT) {
-        auto origin_value = reinterpret_cast<const int64_t*>(data);
+        const auto* origin_value = reinterpret_cast<const int64_t*>(data);
         (*node).__set_node_type(TExprNodeType::INT_LITERAL);
         TIntLiteral intLiteral;
         intLiteral.__set_value(*origin_value);
         (*node).__set_int_literal(intLiteral);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_BIGINT));
     } else if constexpr (T == TYPE_LARGEINT) {
-        auto origin_value = reinterpret_cast<const int128_t*>(data);
+        const auto* origin_value = reinterpret_cast<const int128_t*>(data);
         (*node).__set_node_type(TExprNodeType::LARGE_INT_LITERAL);
         TLargeIntLiteral large_int_literal;
         large_int_literal.__set_value(LargeIntValue::to_string(*origin_value));
         (*node).__set_large_int_literal(large_int_literal);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_LARGEINT));
     } else if constexpr ((T == TYPE_DATE) || (T == TYPE_DATETIME) || (T == 
TYPE_TIME)) {
-        auto origin_value = reinterpret_cast<const VecDateTimeValue*>(data);
+        const auto* origin_value = reinterpret_cast<const 
VecDateTimeValue*>(data);
         TDateLiteral date_literal;
         char convert_buffer[30];
         origin_value->to_string(convert_buffer);
@@ -325,7 +339,7 @@ Status create_texpr_literal_node(const void* data, 
TExprNode* node, int precisio
             (*node).__set_type(create_type_desc(PrimitiveType::TYPE_TIME));
         }
     } else if constexpr (T == TYPE_DATEV2) {
-        auto origin_value = reinterpret_cast<const 
DateV2Value<DateV2ValueType>*>(data);
+        const auto* origin_value = reinterpret_cast<const 
DateV2Value<DateV2ValueType>*>(data);
         TDateLiteral date_literal;
         char convert_buffer[30];
         origin_value->to_string(convert_buffer);
@@ -334,7 +348,7 @@ Status create_texpr_literal_node(const void* data, 
TExprNode* node, int precisio
         (*node).__set_node_type(TExprNodeType::DATE_LITERAL);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATEV2));
     } else if constexpr (T == TYPE_DATETIMEV2) {
-        auto origin_value = reinterpret_cast<const 
DateV2Value<DateTimeV2ValueType>*>(data);
+        const auto* origin_value = reinterpret_cast<const 
DateV2Value<DateTimeV2ValueType>*>(data);
         TDateLiteral date_literal;
         char convert_buffer[30];
         origin_value->to_string(convert_buffer);
@@ -343,28 +357,28 @@ Status create_texpr_literal_node(const void* data, 
TExprNode* node, int precisio
         (*node).__set_node_type(TExprNodeType::DATE_LITERAL);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DATETIMEV2));
     } else if constexpr (T == TYPE_DECIMALV2) {
-        auto origin_value = reinterpret_cast<const DecimalV2Value*>(data);
+        const auto* origin_value = reinterpret_cast<const 
DecimalV2Value*>(data);
         (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
         TDecimalLiteral decimal_literal;
         decimal_literal.__set_value(origin_value->to_string());
         (*node).__set_decimal_literal(decimal_literal);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMALV2, 
precision, scale));
     } else if constexpr (T == TYPE_DECIMAL32) {
-        auto origin_value = reinterpret_cast<const 
vectorized::Decimal<int32_t>*>(data);
+        const auto* origin_value = reinterpret_cast<const 
vectorized::Decimal<int32_t>*>(data);
         (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
         TDecimalLiteral decimal_literal;
         decimal_literal.__set_value(origin_value->to_string(scale));
         (*node).__set_decimal_literal(decimal_literal);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL32, 
precision, scale));
     } else if constexpr (T == TYPE_DECIMAL64) {
-        auto origin_value = reinterpret_cast<const 
vectorized::Decimal<int64_t>*>(data);
+        const auto* origin_value = reinterpret_cast<const 
vectorized::Decimal<int64_t>*>(data);
         (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
         TDecimalLiteral decimal_literal;
         decimal_literal.__set_value(origin_value->to_string(scale));
         (*node).__set_decimal_literal(decimal_literal);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL64, 
precision, scale));
     } else if constexpr (T == TYPE_DECIMAL128I) {
-        auto origin_value = reinterpret_cast<const 
vectorized::Decimal<int128_t>*>(data);
+        const auto* origin_value = reinterpret_cast<const 
vectorized::Decimal<int128_t>*>(data);
         (*node).__set_node_type(TExprNodeType::DECIMAL_LITERAL);
         TDecimalLiteral decimal_literal;
         decimal_literal.__set_value(origin_value->to_string(scale));
@@ -378,21 +392,21 @@ Status create_texpr_literal_node(const void* data, 
TExprNode* node, int precisio
         (*node).__set_decimal_literal(decimal_literal);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DECIMAL256, 
precision, scale));
     } else if constexpr (T == TYPE_FLOAT) {
-        auto origin_value = reinterpret_cast<const float*>(data);
+        const auto* origin_value = reinterpret_cast<const float*>(data);
         (*node).__set_node_type(TExprNodeType::FLOAT_LITERAL);
         TFloatLiteral float_literal;
         float_literal.__set_value(*origin_value);
         (*node).__set_float_literal(float_literal);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_FLOAT));
     } else if constexpr (T == TYPE_DOUBLE) {
-        auto origin_value = reinterpret_cast<const double*>(data);
+        const auto* origin_value = reinterpret_cast<const double*>(data);
         (*node).__set_node_type(TExprNodeType::FLOAT_LITERAL);
         TFloatLiteral float_literal;
         float_literal.__set_value(*origin_value);
         (*node).__set_float_literal(float_literal);
         (*node).__set_type(create_type_desc(PrimitiveType::TYPE_DOUBLE));
     } else if constexpr ((T == TYPE_STRING) || (T == TYPE_CHAR) || (T == 
TYPE_VARCHAR)) {
-        auto origin_value = reinterpret_cast<const StringRef*>(data);
+        const auto* origin_value = reinterpret_cast<const StringRef*>(data);
         (*node).__set_node_type(TExprNodeType::STRING_LITERAL);
         TStringLiteral string_literal;
         string_literal.__set_value(origin_value->to_string());
@@ -403,6 +417,7 @@ Status create_texpr_literal_node(const void* data, 
TExprNode* node, int precisio
     }
     return Status::OK();
 }
+// NOLINTEND(readability-function-size)
 
 TExprNode create_texpr_node_from(const void* data, const PrimitiveType& type, 
int precision = 0,
                                  int scale = 0);
diff --git a/be/src/vec/exprs/vexpr_context.cpp 
b/be/src/vec/exprs/vexpr_context.cpp
index 35eaae5c607..cebb7dd2e53 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -17,17 +17,14 @@
 
 #include "vec/exprs/vexpr_context.h"
 
-#include <algorithm>
 #include <ostream>
 #include <string>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
 #include "common/exception.h"
-#include "common/object_pool.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "udf/udf.h"
-#include "util/stack_util.h"
 #include "vec/columns/column_const.h"
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/core/columns_with_type_and_name.h"
@@ -38,13 +35,6 @@ class RowDescriptor;
 } // namespace doris
 
 namespace doris::vectorized {
-VExprContext::VExprContext(const VExprSPtr& expr)
-        : _root(expr),
-          _is_clone(false),
-          _prepared(false),
-          _opened(false),
-          _last_result_column_id(-1) {}
-
 VExprContext::~VExprContext() {
     // In runtime filter, only create expr context to get expr root, will not 
call
     // prepare or open, so that it is not need to call close. And call close 
may core
@@ -154,19 +144,19 @@ Status VExprContext::execute_conjuncts(const 
VExprContextSPtrs& ctxs,
     return execute_conjuncts(ctxs, filters, false, block, result_filter, 
can_filter_all);
 }
 
-// TODO Performance Optimization
+// TODO: Performance Optimization
 Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
                                        const std::vector<IColumn::Filter*>* 
filters,
-                                       const bool accept_null, Block* block,
+                                       bool accept_null, Block* block,
                                        IColumn::Filter* result_filter, bool* 
can_filter_all) {
     DCHECK(result_filter->size() == block->rows());
     *can_filter_all = false;
     auto* __restrict result_filter_data = result_filter->data();
-    for (auto& ctx : ctxs) {
+    for (const auto& ctx : ctxs) {
         int result_column_id = -1;
         RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
         ColumnPtr& filter_column = 
block->get_by_position(result_column_id).column;
-        if (auto* nullable_column = 
check_and_get_column<ColumnNullable>(*filter_column)) {
+        if (const auto* nullable_column = 
check_and_get_column<ColumnNullable>(*filter_column)) {
             size_t column_size = nullable_column->size();
             if (column_size == 0) {
                 *can_filter_all = true;
@@ -175,9 +165,9 @@ Status VExprContext::execute_conjuncts(const 
VExprContextSPtrs& ctxs,
                 const ColumnPtr& nested_column = 
nullable_column->get_nested_column_ptr();
                 const IColumn::Filter& filter =
                         assert_cast<const 
ColumnUInt8&>(*nested_column).get_data();
-                auto* __restrict filter_data = filter.data();
+                const auto* __restrict filter_data = filter.data();
                 const size_t size = filter.size();
-                auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
+                const auto* __restrict null_map_data = 
nullable_column->get_null_map_data().data();
 
                 if (accept_null) {
                     for (size_t i = 0; i < size; ++i) {
@@ -194,7 +184,7 @@ Status VExprContext::execute_conjuncts(const 
VExprContextSPtrs& ctxs,
                     return Status::OK();
                 }
             }
-        } else if (auto* const_column = 
check_and_get_column<ColumnConst>(*filter_column)) {
+        } else if (const auto* const_column = 
check_and_get_column<ColumnConst>(*filter_column)) {
             // filter all
             if (!const_column->get_bool(0)) {
                 *can_filter_all = true;
@@ -204,7 +194,7 @@ Status VExprContext::execute_conjuncts(const 
VExprContextSPtrs& ctxs,
         } else {
             const IColumn::Filter& filter =
                     assert_cast<const ColumnUInt8&>(*filter_column).get_data();
-            auto* __restrict filter_data = filter.data();
+            const auto* __restrict filter_data = filter.data();
 
             const size_t size = filter.size();
             for (size_t i = 0; i < size; ++i) {
@@ -297,7 +287,7 @@ Status VExprContext::get_output_block_after_execute_exprs(
     auto rows = input_block.rows();
     vectorized::Block tmp_block(input_block.get_columns_with_type_and_name());
     vectorized::ColumnsWithTypeAndName result_columns;
-    for (auto& vexpr_ctx : output_vexpr_ctxs) {
+    for (const auto& vexpr_ctx : output_vexpr_ctxs) {
         int result_column_id = -1;
         RETURN_IF_ERROR(vexpr_ctx->execute(&tmp_block, &result_column_id));
         DCHECK(result_column_id != -1);
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index db5c4c87d8d..70bd37b1878 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -20,6 +20,7 @@
 #include <glog/logging.h>
 
 #include <memory>
+#include <utility>
 #include <vector>
 
 #include "common/factory_creator.h"
@@ -40,7 +41,7 @@ class VExprContext {
     ENABLE_FACTORY_CREATOR(VExprContext);
 
 public:
-    VExprContext(const VExprSPtr& expr);
+    VExprContext(VExprSPtr expr) : _root(std::move(expr)) {}
     ~VExprContext();
     [[nodiscard]] Status prepare(RuntimeState* state, const RowDescriptor& 
row_desc);
     [[nodiscard]] Status open(RuntimeState* state);
@@ -76,7 +77,7 @@ public:
 
     [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& 
ctxs,
                                                   const 
std::vector<IColumn::Filter*>* filters,
-                                                  const bool accept_null, 
Block* block,
+                                                  bool accept_null, Block* 
block,
                                                   IColumn::Filter* 
result_filter,
                                                   bool* can_filter_all);
 
@@ -121,7 +122,7 @@ public:
         _prepared = other._prepared;
         _opened = other._opened;
 
-        for (auto& fn : other._fn_contexts) {
+        for (const auto& fn : other._fn_contexts) {
             _fn_contexts.emplace_back(fn->clone());
         }
 
@@ -152,17 +153,17 @@ private:
     VExprSPtr _root;
 
     /// True if this context came from a Clone() call. Used to manage 
FunctionStateScope.
-    bool _is_clone;
+    bool _is_clone = false;
 
     /// Variables keeping track of current state.
-    bool _prepared;
-    bool _opened;
+    bool _prepared = false;
+    bool _opened = false;
 
     /// FunctionContexts for each registered expression. The FunctionContexts 
are created
     /// and owned by this VExprContext.
     std::vector<std::unique_ptr<FunctionContext>> _fn_contexts;
 
-    int _last_result_column_id;
+    int _last_result_column_id = -1;
 
     /// The depth of expression-tree.
     int _depth_num = 0;
diff --git a/be/src/vec/exprs/vin_predicate.cpp 
b/be/src/vec/exprs/vin_predicate.cpp
index 9a25d3a2230..896b2a903d3 100644
--- a/be/src/vec/exprs/vin_predicate.cpp
+++ b/be/src/vec/exprs/vin_predicate.cpp
@@ -73,11 +73,13 @@ Status VInPredicate::prepare(RuntimeState* state, const 
RowDescriptor& desc,
     }
 
     VExpr::register_function_context(state, context);
+    _prepare_finished = true;
     return Status::OK();
 }
 
 Status VInPredicate::open(RuntimeState* state, VExprContext* context,
                           FunctionContext::FunctionStateScope scope) {
+    DCHECK(_prepare_finished);
     for (int i = 0; i < _children.size(); ++i) {
         RETURN_IF_ERROR(_children[i]->open(state, context, scope));
     }
@@ -85,6 +87,7 @@ Status VInPredicate::open(RuntimeState* state, VExprContext* 
context,
     if (scope == FunctionContext::FRAGMENT_LOCAL) {
         RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
     }
+    _open_finished = true;
     return Status::OK();
 }
 
@@ -97,6 +100,7 @@ Status VInPredicate::execute(VExprContext* context, Block* 
block, int* result_co
     if (is_const_and_have_executed()) { // const have execute in open function
         return get_result_from_const(block, _expr_name, result_column_id);
     }
+    DCHECK(_open_finished || _getting_const_col);
     // TODO: not execute const expr again, but use the const column in 
function context
     doris::vectorized::ColumnNumbers arguments(_children.size());
     for (int i = 0; i < _children.size(); ++i) {
diff --git a/be/src/vec/exprs/vlambda_function_call_expr.h 
b/be/src/vec/exprs/vlambda_function_call_expr.h
index 44678498027..44d22b1f9eb 100644
--- a/be/src/vec/exprs/vlambda_function_call_expr.h
+++ b/be/src/vec/exprs/vlambda_function_call_expr.h
@@ -34,6 +34,8 @@ public:
     VLambdaFunctionCallExpr(const TExprNode& node) : VExpr(node) {}
     ~VLambdaFunctionCallExpr() override = default;
 
+    const std::string& expr_name() const override { return _expr_name; }
+
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override {
         RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
 
@@ -48,12 +50,20 @@ public:
             return Status::InternalError("Lambda Function {} is not 
implemented.",
                                          _fn.name.function_name);
         }
+        _prepare_finished = true;
         return Status::OK();
     }
 
-    const std::string& expr_name() const override { return _expr_name; }
+    Status open(RuntimeState* state, VExprContext* context,
+                FunctionContext::FunctionStateScope scope) override {
+        DCHECK(_prepare_finished);
+        RETURN_IF_ERROR(VExpr::open(state, context, scope));
+        _open_finished = true;
+        return Status::OK();
+    }
 
     Status execute(VExprContext* context, Block* block, int* result_column_id) 
override {
+        DCHECK(_open_finished || _getting_const_col);
         return _lambda_function->execute(context, block, result_column_id, 
_data_type, _children);
     }
 
diff --git a/be/src/vec/exprs/vlambda_function_expr.h 
b/be/src/vec/exprs/vlambda_function_expr.h
index 6d84abb937f..94571712e40 100644
--- a/be/src/vec/exprs/vlambda_function_expr.h
+++ b/be/src/vec/exprs/vlambda_function_expr.h
@@ -28,7 +28,22 @@ public:
     VLambdaFunctionExpr(const TExprNode& node) : VExpr(node) {}
     ~VLambdaFunctionExpr() override = default;
 
+    Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override {
+        RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
+        _prepare_finished = true;
+        return Status::OK();
+    }
+
+    Status open(RuntimeState* state, VExprContext* context,
+                FunctionContext::FunctionStateScope scope) override {
+        DCHECK(_prepare_finished);
+        RETURN_IF_ERROR(VExpr::open(state, context, scope));
+        _open_finished = true;
+        return Status::OK();
+    }
+
     Status execute(VExprContext* context, Block* block, int* result_column_id) 
override {
+        DCHECK(_open_finished || _getting_const_col);
         return get_child(0)->execute(context, block, result_column_id);
     }
 
diff --git a/be/src/vec/exprs/vliteral.cpp b/be/src/vec/exprs/vliteral.cpp
index 03d1659eee6..c7fbb081675 100644
--- a/be/src/vec/exprs/vliteral.cpp
+++ b/be/src/vec/exprs/vliteral.cpp
@@ -47,9 +47,7 @@
 #include "vec/data_types/data_type_decimal.h"
 #include "vec/runtime/vdatetime_value.h"
 
-namespace doris {
-
-namespace vectorized {
+namespace doris::vectorized {
 class VExprContext;
 
 void VLiteral::init(const TExprNode& node) {
@@ -58,8 +56,20 @@ void VLiteral::init(const TExprNode& node) {
     _column_ptr = _data_type->create_column_const(1, field);
 }
 
+Status VLiteral::prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) {
+    RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
+    return Status::OK();
+}
+
+Status VLiteral::open(RuntimeState* state, VExprContext* context,
+                      FunctionContext::FunctionStateScope scope) {
+    RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    return Status::OK();
+}
+
 Status VLiteral::execute(VExprContext* context, vectorized::Block* block, int* 
result_column_id) {
     // Literal expr should return least one row.
+    // sometimes we just use a VLiteral without open or prepare. so can't 
check it at this moment
     size_t row_size = std::max(block->rows(), _column_ptr->size());
     *result_column_id = VExpr::insert_param(block, {_column_ptr, _data_type, 
_expr_name}, row_size);
     return Status::OK();
@@ -86,5 +96,4 @@ std::string VLiteral::debug_string() const {
     return out.str();
 }
 
-} // namespace vectorized
-} // namespace doris
+} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vliteral.h b/be/src/vec/exprs/vliteral.h
index 78879c00d04..d443478ada5 100644
--- a/be/src/vec/exprs/vliteral.h
+++ b/be/src/vec/exprs/vliteral.h
@@ -42,7 +42,12 @@ public:
             init(node);
         }
     }
+
+    Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
+    Status open(RuntimeState* state, VExprContext* context,
+                FunctionContext::FunctionStateScope scope) override;
     Status execute(VExprContext* context, Block* block, int* result_column_id) 
override;
+
     const std::string& expr_name() const override { return _expr_name; }
     std::string debug_string() const override;
 
diff --git a/be/src/vec/exprs/vmatch_predicate.cpp 
b/be/src/vec/exprs/vmatch_predicate.cpp
index 23a34aae5ac..17326b5b23b 100644
--- a/be/src/vec/exprs/vmatch_predicate.cpp
+++ b/be/src/vec/exprs/vmatch_predicate.cpp
@@ -91,12 +91,13 @@ Status VMatchPredicate::prepare(RuntimeState* state, const 
RowDescriptor& desc,
     VExpr::register_function_context(state, context);
     _expr_name = fmt::format("{}({})", _fn.name.function_name, 
child_expr_name);
     _function_name = _fn.name.function_name;
-
+    _prepare_finished = true;
     return Status::OK();
 }
 
 Status VMatchPredicate::open(RuntimeState* state, VExprContext* context,
                              FunctionContext::FunctionStateScope scope) {
+    DCHECK(_prepare_finished);
     for (int i = 0; i < _children.size(); ++i) {
         RETURN_IF_ERROR(_children[i]->open(state, context, scope));
     }
@@ -107,6 +108,7 @@ Status VMatchPredicate::open(RuntimeState* state, 
VExprContext* context,
     if (scope == FunctionContext::FRAGMENT_LOCAL) {
         RETURN_IF_ERROR(VExpr::get_const_col(context, nullptr));
     }
+    _open_finished = true;
     return Status::OK();
 }
 
@@ -116,6 +118,7 @@ void VMatchPredicate::close(VExprContext* context, 
FunctionContext::FunctionStat
 }
 
 Status VMatchPredicate::execute(VExprContext* context, Block* block, int* 
result_column_id) {
+    DCHECK(_open_finished || _getting_const_col);
     // TODO: not execute const expr again, but use the const column in 
function context
     doris::vectorized::ColumnNumbers arguments(_children.size());
     for (int i = 0; i < _children.size(); ++i) {
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp 
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index 62ef2bbdb64..c623355d673 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -52,12 +52,16 @@ Status VRuntimeFilterWrapper::prepare(RuntimeState* state, 
const RowDescriptor&
                                       VExprContext* context) {
     RETURN_IF_ERROR_OR_PREPARED(_impl->prepare(state, desc, context));
     _expr_name = fmt::format("VRuntimeFilterWrapper({})", _impl->expr_name());
+    _prepare_finished = true;
     return Status::OK();
 }
 
 Status VRuntimeFilterWrapper::open(RuntimeState* state, VExprContext* context,
                                    FunctionContext::FunctionStateScope scope) {
-    return _impl->open(state, context, scope);
+    DCHECK(_prepare_finished);
+    RETURN_IF_ERROR(_impl->open(state, context, scope));
+    _open_finished = true;
+    return Status::OK();
 }
 
 void VRuntimeFilterWrapper::close(VExprContext* context,
@@ -66,6 +70,7 @@ void VRuntimeFilterWrapper::close(VExprContext* context,
 }
 
 Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block, 
int* result_column_id) {
+    DCHECK(_open_finished || _getting_const_col);
     if (_always_true) {
         auto res_data_column = ColumnVector<UInt8>::create(block->rows(), 1);
         size_t num_columns_without_result = block->columns();
@@ -80,7 +85,15 @@ Status VRuntimeFilterWrapper::execute(VExprContext* context, 
Block* block, int*
         return Status::OK();
     } else {
         _scan_rows += block->rows();
+
+        if (_getting_const_col) {
+            _impl->set_getting_const_col(true);
+        }
         RETURN_IF_ERROR(_impl->execute(context, block, result_column_id));
+        if (_getting_const_col) {
+            _impl->set_getting_const_col(false);
+        }
+
         uint8_t* data = nullptr;
         const ColumnWithTypeAndName& result_column = 
block->get_by_position(*result_column_id);
         if (is_column_const(*result_column.column)) {
diff --git a/be/src/vec/exprs/vslot_ref.cpp b/be/src/vec/exprs/vslot_ref.cpp
index 5a34999accb..b683a1fb156 100644
--- a/be/src/vec/exprs/vslot_ref.cpp
+++ b/be/src/vec/exprs/vslot_ref.cpp
@@ -51,6 +51,7 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const 
doris::RowDescriptor&
     RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
     DCHECK_EQ(_children.size(), 0);
     if (_slot_id == -1) {
+        _prepare_finished = true;
         return Status::OK();
     }
     const SlotDescriptor* slot_desc = 
state->desc_tbl().get_slot_descriptor(_slot_id);
@@ -63,6 +64,7 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const 
doris::RowDescriptor&
     if (!context->force_materialize_slot() && !slot_desc->need_materialize()) {
         // slot should be ignored manually
         _column_id = -1;
+        _prepare_finished = true;
         return Status::OK();
     }
     _column_id = desc.get_column_id(_slot_id, 
context->force_materialize_slot());
@@ -72,6 +74,15 @@ Status VSlotRef::prepare(doris::RuntimeState* state, const 
doris::RowDescriptor&
                 *_column_name, _slot_id, desc.debug_string(), 
slot_desc->debug_string(),
                 state->desc_tbl().debug_string());
     }
+    _prepare_finished = true;
+    return Status::OK();
+}
+
+Status VSlotRef::open(RuntimeState* state, VExprContext* context,
+                      FunctionContext::FunctionStateScope scope) {
+    DCHECK(_prepare_finished);
+    RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    _open_finished = true;
     return Status::OK();
 }
 
diff --git a/be/src/vec/exprs/vslot_ref.h b/be/src/vec/exprs/vslot_ref.h
index 2084ae18715..c30ac64041f 100644
--- a/be/src/vec/exprs/vslot_ref.h
+++ b/be/src/vec/exprs/vslot_ref.h
@@ -38,8 +38,11 @@ class VSlotRef final : public VExpr {
 public:
     VSlotRef(const TExprNode& node);
     VSlotRef(const SlotDescriptor* desc);
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
override;
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
+    Status open(RuntimeState* state, VExprContext* context,
+                FunctionContext::FunctionStateScope scope) override;
+    Status execute(VExprContext* context, Block* block, int* result_column_id) 
override;
+
     const std::string& expr_name() const override;
     std::string debug_string() const override;
     bool is_constant() const override { return false; }
diff --git a/be/src/vec/exprs/vtuple_is_null_predicate.cpp 
b/be/src/vec/exprs/vtuple_is_null_predicate.cpp
index b17428bfc00..641e34590a4 100644
--- a/be/src/vec/exprs/vtuple_is_null_predicate.cpp
+++ b/be/src/vec/exprs/vtuple_is_null_predicate.cpp
@@ -48,11 +48,20 @@ Status VTupleIsNullPredicate::prepare(RuntimeState* state, 
const RowDescriptor&
     DCHECK_EQ(0, _children.size());
     _column_to_check =
             _is_left_null_side ? desc.num_materialized_slots() : 
desc.num_materialized_slots() + 1;
+    _prepare_finished = true;
+    return Status::OK();
+}
 
+Status VTupleIsNullPredicate::open(RuntimeState* state, VExprContext* context,
+                                   FunctionContext::FunctionStateScope scope) {
+    DCHECK(_prepare_finished);
+    RETURN_IF_ERROR(VExpr::open(state, context, scope));
+    _open_finished = true;
     return Status::OK();
 }
 
 Status VTupleIsNullPredicate::execute(VExprContext* context, Block* block, 
int* result_column_id) {
+    DCHECK(_open_finished || _getting_const_col);
     *result_column_id = _column_to_check;
     return Status::OK();
 }
diff --git a/be/src/vec/exprs/vtuple_is_null_predicate.h 
b/be/src/vec/exprs/vtuple_is_null_predicate.h
index 9d3b794fb8b..c42e7300d1d 100644
--- a/be/src/vec/exprs/vtuple_is_null_predicate.h
+++ b/be/src/vec/exprs/vtuple_is_null_predicate.h
@@ -42,8 +42,10 @@ class VTupleIsNullPredicate final : public VExpr {
 public:
     explicit VTupleIsNullPredicate(const TExprNode& node);
     ~VTupleIsNullPredicate() override = default;
-    Status execute(VExprContext* context, Block* block, int* result_column_id) 
override;
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
+    Status open(RuntimeState* state, VExprContext* context,
+                FunctionContext::FunctionStateScope scope) override;
+    Status execute(VExprContext* context, Block* block, int* result_column_id) 
override;
 
     [[nodiscard]] bool is_constant() const override { return false; }
 
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 141a5c54b64..2da6f9f920b 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -20,11 +20,11 @@
 #include <gen_cpp/Types_types.h>
 #include <glog/logging.h>
 #include <google/protobuf/stubs/callback.h>
-#include <stddef.h>
-#include <stdint.h>
 
 #include <atomic>
 #include <condition_variable>
+#include <cstddef>
+#include <cstdint>
 #include <deque>
 #include <list>
 #include <memory>
@@ -42,14 +42,10 @@
 #include "common/object_pool.h"
 #include "common/status.h"
 #include "runtime/descriptors.h"
-#include "runtime/query_statistics.h"
 #include "runtime/task_execution_context.h"
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
-#include "vec/columns/column.h"
 #include "vec/core/block.h"
-#include "vec/core/column_with_type_and_name.h"
-#include "vec/core/materialize_block.h"
 #include "vec/exprs/vexpr_fwd.h"
 
 namespace doris {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to