This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 47832e3e323 [refactor](rename) Rename open function in operator
(#48402)
47832e3e323 is described below
commit 47832e3e323db0dc36a9bfd13c595cb1beeb36cc
Author: Gabriel <[email protected]>
AuthorDate: Thu Feb 27 16:20:25 2025 +0800
[refactor](rename) Rename open function in operator (#48402)
Change `open` to `prepare` in operator class.
---
be/src/pipeline/exec/aggregation_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/aggregation_sink_operator.h | 2 +-
be/src/pipeline/exec/analytic_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/analytic_sink_operator.h | 2 +-
be/src/pipeline/exec/analytic_source_operator.cpp | 4 ++--
be/src/pipeline/exec/analytic_source_operator.h | 2 +-
be/src/pipeline/exec/cache_source_operator.h | 5 -----
be/src/pipeline/exec/datagen_operator.cpp | 4 ++--
be/src/pipeline/exec/datagen_operator.h | 2 +-
.../distinct_streaming_aggregation_operator.cpp | 4 ++--
.../exec/distinct_streaming_aggregation_operator.h | 2 +-
be/src/pipeline/exec/es_scan_operator.cpp | 4 ++--
be/src/pipeline/exec/es_scan_operator.h | 2 +-
be/src/pipeline/exec/exchange_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/exchange_sink_operator.h | 2 +-
be/src/pipeline/exec/exchange_source_operator.cpp | 4 ++--
be/src/pipeline/exec/exchange_source_operator.h | 2 +-
be/src/pipeline/exec/file_scan_operator.cpp | 4 ++--
be/src/pipeline/exec/file_scan_operator.h | 2 +-
.../exec/group_commit_block_sink_operator.cpp | 4 ++--
.../exec/group_commit_block_sink_operator.h | 2 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 ++--
be/src/pipeline/exec/hashjoin_build_sink.h | 2 +-
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 4 ++--
be/src/pipeline/exec/hashjoin_probe_operator.h | 2 +-
be/src/pipeline/exec/hive_table_sink_operator.h | 4 ++--
be/src/pipeline/exec/iceberg_table_sink_operator.h | 4 ++--
be/src/pipeline/exec/jdbc_table_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/jdbc_table_sink_operator.h | 2 +-
.../pipeline/exec/memory_scratch_sink_operator.cpp | 4 ++--
.../pipeline/exec/memory_scratch_sink_operator.h | 2 +-
.../pipeline/exec/multi_cast_data_stream_source.h | 4 ++--
.../exec/nested_loop_join_build_operator.cpp | 4 ++--
.../exec/nested_loop_join_build_operator.h | 2 +-
.../exec/nested_loop_join_probe_operator.cpp | 4 ++--
.../exec/nested_loop_join_probe_operator.h | 2 +-
be/src/pipeline/exec/olap_table_sink_operator.h | 4 ++--
be/src/pipeline/exec/olap_table_sink_v2_operator.h | 4 ++--
be/src/pipeline/exec/operator.cpp | 4 ++--
be/src/pipeline/exec/operator.h | 6 +++---
.../pipeline/exec/partition_sort_sink_operator.cpp | 4 ++--
.../pipeline/exec/partition_sort_sink_operator.h | 2 +-
.../exec/partitioned_aggregation_sink_operator.cpp | 4 ++--
.../exec/partitioned_aggregation_sink_operator.h | 2 +-
.../partitioned_aggregation_source_operator.cpp | 6 +++---
.../exec/partitioned_aggregation_source_operator.h | 2 +-
.../exec/partitioned_hash_join_probe_operator.cpp | 8 ++++----
.../exec/partitioned_hash_join_probe_operator.h | 2 +-
.../exec/partitioned_hash_join_sink_operator.cpp | 6 +++---
.../exec/partitioned_hash_join_sink_operator.h | 2 +-
be/src/pipeline/exec/repeat_operator.cpp | 4 ++--
be/src/pipeline/exec/repeat_operator.h | 2 +-
be/src/pipeline/exec/result_file_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/result_file_sink_operator.h | 2 +-
be/src/pipeline/exec/result_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/result_sink_operator.h | 2 +-
be/src/pipeline/exec/scan_operator.cpp | 4 ++--
be/src/pipeline/exec/scan_operator.h | 2 +-
be/src/pipeline/exec/schema_scan_operator.cpp | 4 ++--
be/src/pipeline/exec/schema_scan_operator.h | 2 +-
be/src/pipeline/exec/set_probe_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/set_probe_sink_operator.h | 2 +-
be/src/pipeline/exec/set_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/set_sink_operator.h | 2 +-
be/src/pipeline/exec/sort_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/sort_sink_operator.h | 2 +-
be/src/pipeline/exec/sort_source_operator.cpp | 4 ++--
be/src/pipeline/exec/sort_source_operator.h | 2 +-
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 6 +++---
be/src/pipeline/exec/spill_sort_sink_operator.h | 2 +-
.../pipeline/exec/spill_sort_source_operator.cpp | 6 +++---
be/src/pipeline/exec/spill_sort_source_operator.h | 2 +-
.../exec/streaming_aggregation_operator.cpp | 4 ++--
.../pipeline/exec/streaming_aggregation_operator.h | 2 +-
be/src/pipeline/exec/table_function_operator.cpp | 4 ++--
be/src/pipeline/exec/table_function_operator.h | 2 +-
be/src/pipeline/exec/union_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/union_sink_operator.h | 2 +-
be/src/pipeline/exec/union_source_operator.h | 4 ++--
.../local_exchange_sink_operator.cpp | 4 ++--
.../local_exchange/local_exchange_sink_operator.h | 2 +-
.../local_exchange_source_operator.h | 2 +-
be/src/pipeline/pipeline.cpp | 4 ++--
be/test/pipeline/operator/agg_operator_test.cpp | 24 +++++++++++-----------
.../operator/exchange_sink_operator_test.cpp | 2 +-
be/test/vec/exec/vfile_scanner_exception_test.cpp | 2 +-
be/test/vec/exec/vwal_scanner_test.cpp | 2 +-
87 files changed, 149 insertions(+), 154 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index a21d8aefdce..0f1c9c9ffeb 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -770,8 +770,8 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode,
RuntimeState* state) {
return Status::OK();
}
-Status AggSinkOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::open(state));
+Status AggSinkOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::prepare(state));
RETURN_IF_ERROR(_init_probe_expr_ctx(state));
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 7ee1d973be7..e664c3bf297 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -150,7 +150,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 939a924b9bd..fb3117c800f 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -644,8 +644,8 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode,
RuntimeState* state)
return Status::OK();
}
-Status AnalyticSinkOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::open(state));
+Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::prepare(state));
for (const auto& ctx : _agg_expr_ctxs) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state,
_child->row_desc()));
}
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h
b/be/src/pipeline/exec/analytic_sink_operator.h
index 31b15d5a245..99e09372302 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -176,7 +176,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
DataDistribution required_data_distribution() const override {
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index ada7fa511a8..44e334de6a1 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -86,8 +86,8 @@ Status AnalyticSourceOperatorX::get_block(RuntimeState*
state, vectorized::Block
return Status::OK();
}
-Status AnalyticSourceOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
+Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::prepare(state));
DCHECK(_child->row_desc().is_prefix_of(_row_descriptor));
return Status::OK();
}
diff --git a/be/src/pipeline/exec/analytic_source_operator.h
b/be/src/pipeline/exec/analytic_source_operator.h
index be1fdb2c9e5..b1828eeabe6 100644
--- a/be/src/pipeline/exec/analytic_source_operator.h
+++ b/be/src/pipeline/exec/analytic_source_operator.h
@@ -50,7 +50,7 @@ public:
bool is_source() const override { return true; }
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
private:
friend class AnalyticLocalState;
diff --git a/be/src/pipeline/exec/cache_source_operator.h
b/be/src/pipeline/exec/cache_source_operator.h
index 146c984d04a..651f9ff5596 100644
--- a/be/src/pipeline/exec/cache_source_operator.h
+++ b/be/src/pipeline/exec/cache_source_operator.h
@@ -81,11 +81,6 @@ public:
bool is_source() const override { return true; }
- Status open(RuntimeState* state) override {
- static_cast<void>(Base::open(state));
- return Status::OK();
- }
-
const RowDescriptor& intermediate_row_desc() const override {
return _child->intermediate_row_desc();
}
diff --git a/be/src/pipeline/exec/datagen_operator.cpp
b/be/src/pipeline/exec/datagen_operator.cpp
index 4e6e4ec513f..7118574282a 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -50,8 +50,8 @@ Status DataGenSourceOperatorX::init(const TPlanNode& tnode,
RuntimeState* state)
return Status::OK();
}
-Status DataGenSourceOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(OperatorX<DataGenLocalState>::open(state));
+Status DataGenSourceOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(OperatorX<DataGenLocalState>::prepare(state));
// get tuple desc
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
diff --git a/be/src/pipeline/exec/datagen_operator.h
b/be/src/pipeline/exec/datagen_operator.h
index 8284db35c76..a45f7381f57 100644
--- a/be/src/pipeline/exec/datagen_operator.h
+++ b/be/src/pipeline/exec/datagen_operator.h
@@ -58,7 +58,7 @@ public:
#endif
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
[[nodiscard]] bool is_source() const override { return true; }
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 6187e356351..24df662bb57 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -363,8 +363,8 @@ Status DistinctStreamingAggOperatorX::init(const TPlanNode&
tnode, RuntimeState*
return Status::OK();
}
-Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
-
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::open(state));
+Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::prepare(state));
_intermediate_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(),
_output_tuple_desc->slots().size());
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index 4c5fcd5efa7..1066ea37236 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -98,7 +98,7 @@ public:
DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const
TPlanNode& tnode,
const DescriptorTbl& descs, bool
require_bucket_distribution);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status pull(RuntimeState* state, vectorized::Block* block, bool* eos)
const override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos)
const override;
bool need_more_input_data(RuntimeState* state) const override;
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp
b/be/src/pipeline/exec/es_scan_operator.cpp
index 2cb3cd5e0b2..030753cd231 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -136,8 +136,8 @@ Status EsScanOperatorX::init(const TPlanNode& tnode,
RuntimeState* state) {
return Status::OK();
}
-Status EsScanOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::open(state));
+Status EsScanOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::prepare(state));
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == nullptr) {
diff --git a/be/src/pipeline/exec/es_scan_operator.h
b/be/src/pipeline/exec/es_scan_operator.h
index 6e64110997e..daf391ae982 100644
--- a/be/src/pipeline/exec/es_scan_operator.h
+++ b/be/src/pipeline/exec/es_scan_operator.h
@@ -72,7 +72,7 @@ public:
const DescriptorTbl& descs, int parallel_tasks);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
private:
friend class EsScanLocalState;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 71311e4d51f..18f3f0740c5 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -313,8 +313,8 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
return Status::OK();
}
-Status ExchangeSinkOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state));
+Status ExchangeSinkOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::prepare(state));
_state = state;
_compression_type = state->fragement_transmission_compression_type();
if (_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) {
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index eb82474d694..3254992bbd5 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -195,7 +195,7 @@ public:
RuntimeState* state() { return _state; }
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 2aefdf8cd3d..e81e2e5c7b4 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -128,8 +128,8 @@ Status ExchangeSourceOperatorX::init(const TPlanNode&
tnode, RuntimeState* state
return Status::OK();
}
-Status ExchangeSourceOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::open(state));
+Status ExchangeSourceOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::prepare(state));
DCHECK_GT(_num_senders, 0);
if (_is_merging) {
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index ff9c5840033..414b0f79c34 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -70,7 +70,7 @@ public:
ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
const DescriptorTbl& descs, int num_senders);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index 8d9dc657501..4fcf51f8320 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -125,8 +125,8 @@ Status FileScanLocalState::_process_conjuncts(RuntimeState*
state) {
return Status::OK();
}
-Status FileScanOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::open(state));
+Status FileScanOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::prepare(state));
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.contains(node_id())) {
TFileScanRangeParams& params =
diff --git a/be/src/pipeline/exec/file_scan_operator.h
b/be/src/pipeline/exec/file_scan_operator.h
index 8b7b25a025e..73b8454c532 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -77,7 +77,7 @@ public:
_output_tuple_id = tnode.file_scan_node.tuple_id;
}
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
bool is_file_scan_operator() const override { return true; }
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 9f99d55d3ea..958a60b66e3 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -269,8 +269,8 @@ Status GroupCommitBlockSinkOperatorX::init(const TDataSink&
t_sink) {
return Status::OK();
}
-Status GroupCommitBlockSinkOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(Base::open(state));
+Status GroupCommitBlockSinkOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::prepare(state));
// get table's tuple descriptor
_output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
if (_output_tuple_desc == nullptr) {
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
index a4d2fa16f96..89dbf76a6bc 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.h
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -100,7 +100,7 @@ public:
Status init(const TDataSink& sink) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* block, bool eos)
override;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index e271dbacba0..e2b911a583b 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -574,8 +574,8 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode&
tnode, RuntimeState* st
return Status::OK();
}
-Status HashJoinBuildSinkOperatorX::open(RuntimeState* state) {
-
RETURN_IF_ERROR(JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::open(state));
+Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState>::prepare(state));
if (_is_broadcast_join) {
if (state->enable_share_hash_table_for_broadcast_join()) {
_shared_hashtable_controller =
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index b1f0cdca640..62a4be16792 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -121,7 +121,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index e3212800d5c..a55379a02ad 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -493,8 +493,8 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode,
RuntimeState* state)
return Status::OK();
}
-Status HashJoinProbeOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::open(state));
+Status HashJoinProbeOperatorX::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(JoinProbeOperatorX<HashJoinProbeLocalState>::prepare(state));
// init left/right output slots flags, only column of slot_id in
_hash_output_slot_ids need
// insert to output block of hash join.
// _left_output_slots_flags : column of left table need to output set flag
= true
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index d36658b995a..7928c4bc411 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -129,7 +129,7 @@ public:
HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
const DescriptorTbl& descs);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos)
const override;
Status pull(doris::RuntimeState* state, vectorized::Block* output_block,
diff --git a/be/src/pipeline/exec/hive_table_sink_operator.h
b/be/src/pipeline/exec/hive_table_sink_operator.h
index 9304d15962c..725fd8e0490 100644
--- a/be/src/pipeline/exec/hive_table_sink_operator.h
+++ b/be/src/pipeline/exec/hive_table_sink_operator.h
@@ -60,8 +60,8 @@ public:
return Status::OK();
}
- Status open(RuntimeState* state) override {
- RETURN_IF_ERROR(Base::open(state));
+ Status prepare(RuntimeState* state) override {
+ RETURN_IF_ERROR(Base::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc));
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
diff --git a/be/src/pipeline/exec/iceberg_table_sink_operator.h
b/be/src/pipeline/exec/iceberg_table_sink_operator.h
index 9b7ed510936..284657a7219 100644
--- a/be/src/pipeline/exec/iceberg_table_sink_operator.h
+++ b/be/src/pipeline/exec/iceberg_table_sink_operator.h
@@ -59,8 +59,8 @@ public:
return Status::OK();
}
- Status open(RuntimeState* state) override {
- RETURN_IF_ERROR(Base::open(state));
+ Status prepare(RuntimeState* state) override {
+ RETURN_IF_ERROR(Base::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc));
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
index df47a6ea3ac..d0abe6aa0d2 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
@@ -39,8 +39,8 @@ Status JdbcTableSinkOperatorX::init(const TDataSink&
thrift_sink) {
return Status::OK();
}
-Status JdbcTableSinkOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::open(state));
+Status JdbcTableSinkOperatorX::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(DataSinkOperatorX<JdbcTableSinkLocalState>::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc));
RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
return Status::OK();
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.h
b/be/src/pipeline/exec/jdbc_table_sink_operator.h
index a0dae301a5f..d1b617402b2 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.h
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.h
@@ -45,7 +45,7 @@ public:
JdbcTableSinkOperatorX(const RowDescriptor& row_desc, int operator_id,
const std::vector<TExpr>& select_exprs);
Status init(const TDataSink& thrift_sink) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
index 69ef3ec8b58..86afd607432 100644
--- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
+++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp
@@ -77,8 +77,8 @@ Status MemoryScratchSinkOperatorX::init(const TDataSink&
thrift_sink) {
return Status::OK();
}
-Status MemoryScratchSinkOperatorX::open(RuntimeState* state) {
-
RETURN_IF_ERROR(DataSinkOperatorX<MemoryScratchSinkLocalState>::open(state));
+Status MemoryScratchSinkOperatorX::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(DataSinkOperatorX<MemoryScratchSinkLocalState>::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc));
_timezone_obj = state->timezone_obj();
RETURN_IF_ERROR(vectorized::VExpr::open(_output_vexpr_ctxs, state));
diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.h
b/be/src/pipeline/exec/memory_scratch_sink_operator.h
index 352826955fc..6b5c15feed7 100644
--- a/be/src/pipeline/exec/memory_scratch_sink_operator.h
+++ b/be/src/pipeline/exec/memory_scratch_sink_operator.h
@@ -56,7 +56,7 @@ public:
MemoryScratchSinkOperatorX(const RowDescriptor& row_desc, int operator_id,
const std::vector<TExpr>& t_output_expr);
Status init(const TDataSink& thrift_sink) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index a69ef52ba1c..bfe6b00a29a 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -91,8 +91,8 @@ public:
};
~MultiCastDataStreamerSourceOperatorX() override = default;
- Status open(RuntimeState* state) override {
- RETURN_IF_ERROR(Base::open(state));
+ Status prepare(RuntimeState* state) override {
+ RETURN_IF_ERROR(Base::prepare(state));
// init profile for runtime filter
//
RuntimeFilterConsumer::_init_profile(local_state._shared_state->_multi_cast_data_streamer->profile());
if (_t_data_stream_sink.__isset.output_exprs) {
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index a4dd990f666..bc6c7e274da 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -105,8 +105,8 @@ Status NestedLoopJoinBuildSinkOperatorX::init(const
TPlanNode& tnode, RuntimeSta
return Status::OK();
}
-Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState* state) {
-
RETURN_IF_ERROR(JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>::open(state));
+Status NestedLoopJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(JoinBuildSinkOperatorX<NestedLoopJoinBuildSinkLocalState>::prepare(state));
size_t num_build_tuples = _child->row_desc().tuple_descriptors().size();
for (size_t i = 0; i < num_build_tuples; ++i) {
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index c5a9dd7dcde..2ffaec3873d 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -67,7 +67,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index b860b454e41..c602c6d82d4 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -409,8 +409,8 @@ Status NestedLoopJoinProbeOperatorX::init(const TPlanNode&
tnode, RuntimeState*
return Status::OK();
}
-Status NestedLoopJoinProbeOperatorX::open(RuntimeState* state) {
-
RETURN_IF_ERROR(JoinProbeOperatorX<NestedLoopJoinProbeLocalState>::open(state));
+Status NestedLoopJoinProbeOperatorX::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(JoinProbeOperatorX<NestedLoopJoinProbeLocalState>::prepare(state));
for (auto& conjunct : _join_conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
}
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index 4f50a98efdb..63ce1f2e2df 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -204,7 +204,7 @@ public:
NestedLoopJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
const DescriptorTbl& descs);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos)
const override;
Status pull(doris::RuntimeState* state, vectorized::Block* output_block,
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h
b/be/src/pipeline/exec/olap_table_sink_operator.h
index fe4fdb13b0b..755fa6cdc21 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -52,8 +52,8 @@ public:
return Status::OK();
}
- Status open(RuntimeState* state) override {
- RETURN_IF_ERROR(Base::open(state));
+ Status prepare(RuntimeState* state) override {
+ RETURN_IF_ERROR(Base::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc));
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
diff --git a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
index 69ac9796866..af49c34d581 100644
--- a/be/src/pipeline/exec/olap_table_sink_v2_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_v2_operator.h
@@ -53,8 +53,8 @@ public:
return Status::OK();
}
- Status open(RuntimeState* state) override {
- RETURN_IF_ERROR(Base::open(state));
+ Status prepare(RuntimeState* state) override {
+ RETURN_IF_ERROR(Base::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc));
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 443f49b572f..d2d0d6a6827 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -208,7 +208,7 @@ Status OperatorXBase::init(const TPlanNode& tnode,
RuntimeState* /*state*/) {
return Status::OK();
}
-Status OperatorXBase::open(RuntimeState* state) {
+Status OperatorXBase::prepare(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
}
@@ -231,7 +231,7 @@ Status OperatorXBase::open(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
}
if (_child && !is_source()) {
- RETURN_IF_ERROR(_child->open(state));
+ RETURN_IF_ERROR(_child->prepare(state));
}
return Status::OK();
}
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 85a9059db97..1f46fa7ea61 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -101,7 +101,7 @@ public:
[[nodiscard]] virtual Status init(const TDataSink& tsink) { return
Status::OK(); }
[[nodiscard]] virtual std::string get_name() const = 0;
- [[nodiscard]] virtual Status open(RuntimeState* state) = 0;
+ [[nodiscard]] virtual Status prepare(RuntimeState* state) = 0;
[[nodiscard]] virtual Status close(RuntimeState* state);
[[nodiscard]] virtual Status set_child(OperatorPtr child) {
@@ -550,7 +550,7 @@ public:
return Status::InternalError("init() is only implemented in local
exchange!");
}
- Status open(RuntimeState* state) override { return Status::OK(); }
+ Status prepare(RuntimeState* state) override { return Status::OK(); }
[[nodiscard]] bool is_finished(RuntimeState* state) const {
auto result = state->get_sink_local_state_result();
if (!result) {
@@ -807,7 +807,7 @@ public:
// Tablets should be hold before open phase.
[[nodiscard]] virtual Status hold_tablets(RuntimeState* state) { return
Status::OK(); }
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
[[nodiscard]] virtual Status get_block(RuntimeState* state,
vectorized::Block* block,
bool* eos) = 0;
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 1569fc12170..b2444414dde 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -94,8 +94,8 @@ Status PartitionSortSinkOperatorX::init(const TPlanNode&
tnode, RuntimeState* st
return Status::OK();
}
-Status PartitionSortSinkOperatorX::open(RuntimeState* state) {
-
RETURN_IF_ERROR(DataSinkOperatorX<PartitionSortSinkLocalState>::open(state));
+Status PartitionSortSinkOperatorX::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(DataSinkOperatorX<PartitionSortSinkLocalState>::prepare(state));
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(),
_row_descriptor));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_partition_expr_ctxs, state,
_child->row_desc()));
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.h
b/be/src/pipeline/exec/partition_sort_sink_operator.h
index a6f18ea8448..a158a3772a9 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.h
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.h
@@ -79,7 +79,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
DataDistribution required_data_distribution() const override {
if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index f70a115d686..e003ea23240 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -135,8 +135,8 @@ Status PartitionedAggSinkOperatorX::init(const TPlanNode&
tnode, RuntimeState* s
return _agg_sink_operator->init(tnode, state);
}
-Status PartitionedAggSinkOperatorX::open(RuntimeState* state) {
- return _agg_sink_operator->open(state);
+Status PartitionedAggSinkOperatorX::prepare(RuntimeState* state) {
+ return _agg_sink_operator->prepare(state);
}
Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state,
vectorized::Block* in_block,
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 47ca1a47a42..a625ac20d36 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -299,7 +299,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 5a68c69a7e2..524a8b7db64 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -106,9 +106,9 @@ Status PartitionedAggSourceOperatorX::init(const TPlanNode&
tnode, RuntimeState*
return _agg_source_operator->init(tnode, state);
}
-Status PartitionedAggSourceOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(OperatorXBase::open(state));
- return _agg_source_operator->open(state);
+Status PartitionedAggSourceOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(OperatorXBase::prepare(state));
+ return _agg_source_operator->prepare(state);
}
Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index bc800f02fb5..24e56df1be8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -74,7 +74,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status close(RuntimeState* state) override;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 1aafab2a7cd..af84d515e4a 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -501,16 +501,16 @@ Status PartitionedHashJoinProbeOperatorX::init(const
TPlanNode& tnode, RuntimeSt
return Status::OK();
}
-Status PartitionedHashJoinProbeOperatorX::open(RuntimeState* state) {
+Status PartitionedHashJoinProbeOperatorX::prepare(RuntimeState* state) {
// to avoid open _child twice
auto child = std::move(_child);
- RETURN_IF_ERROR(JoinProbeOperatorX::open(state));
+ RETURN_IF_ERROR(JoinProbeOperatorX::prepare(state));
RETURN_IF_ERROR(_inner_probe_operator->set_child(child));
DCHECK(_build_side_child != nullptr);
_inner_probe_operator->set_build_side_child(_build_side_child);
RETURN_IF_ERROR(_inner_sink_operator->set_child(_build_side_child));
- RETURN_IF_ERROR(_inner_probe_operator->open(state));
- RETURN_IF_ERROR(_inner_sink_operator->open(state));
+ RETURN_IF_ERROR(_inner_probe_operator->prepare(state));
+ RETURN_IF_ERROR(_inner_sink_operator->prepare(state));
_child = std::move(child);
RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
RETURN_IF_ERROR(_partitioner->open(state));
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index cceac79b357..4bbdfa7371c 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -118,7 +118,7 @@ public:
PartitionedHashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode&
tnode, int operator_id,
const DescriptorTbl& descs, uint32_t
partition_count);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
[[nodiscard]] Status get_block(RuntimeState* state, vectorized::Block*
block,
bool* eos) override;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 24c91ea3180..83f094bf92e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -510,12 +510,12 @@ Status PartitionedHashJoinSinkOperatorX::init(const
TPlanNode& tnode, RuntimeSta
return Status::OK();
}
-Status PartitionedHashJoinSinkOperatorX::open(RuntimeState* state) {
-
RETURN_IF_ERROR(JoinBuildSinkOperatorX<PartitionedHashJoinSinkLocalState>::open(state));
+Status PartitionedHashJoinSinkOperatorX::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(JoinBuildSinkOperatorX<PartitionedHashJoinSinkLocalState>::prepare(state));
RETURN_IF_ERROR(_inner_sink_operator->set_child(_child));
RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
RETURN_IF_ERROR(_partitioner->open(state));
- return _inner_sink_operator->open(state);
+ return _inner_sink_operator->prepare(state);
}
Status
PartitionedHashJoinSinkLocalState::_setup_internal_operator(RuntimeState*
state) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 155a1500e28..d073a69516b 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -101,7 +101,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
diff --git a/be/src/pipeline/exec/repeat_operator.cpp
b/be/src/pipeline/exec/repeat_operator.cpp
index c3649a1f8c0..43f5a5f4944 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -63,9 +63,9 @@ Status RepeatOperatorX::init(const TPlanNode& tnode,
RuntimeState* state) {
return Status::OK();
}
-Status RepeatOperatorX::open(RuntimeState* state) {
+Status RepeatOperatorX::prepare(RuntimeState* state) {
VLOG_CRITICAL << "VRepeatNode::open";
- RETURN_IF_ERROR(OperatorXBase::open(state));
+ RETURN_IF_ERROR(OperatorXBase::prepare(state));
_output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
if (_output_tuple_desc == nullptr) {
return Status::InternalError("Failed to get tuple descriptor.");
diff --git a/be/src/pipeline/exec/repeat_operator.h
b/be/src/pipeline/exec/repeat_operator.h
index 2c2af32de0b..337ce1e08c5 100644
--- a/be/src/pipeline/exec/repeat_operator.h
+++ b/be/src/pipeline/exec/repeat_operator.h
@@ -68,7 +68,7 @@ public:
const DescriptorTbl& descs);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
bool need_more_input_data(RuntimeState* state) const override;
Status pull(RuntimeState* state, vectorized::Block* output_block, bool*
eos) const override;
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 76d60bbdf1a..60e3663ee8c 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -71,8 +71,8 @@ Status ResultFileSinkOperatorX::init(const TDataSink& tsink) {
return Status::OK();
}
-Status ResultFileSinkOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::open(state));
+Status ResultFileSinkOperatorX::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(DataSinkOperatorX<ResultFileSinkLocalState>::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state,
_row_desc));
if (state->query_options().enable_parallel_outfile) {
RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->query_id(),
_buf_size,
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h
b/be/src/pipeline/exec/result_file_sink_operator.h
index c3c5e345f77..6e570b8181b 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -60,7 +60,7 @@ public:
const std::vector<TExpr>& t_output_expr,
DescriptorTbl& descs);
Status init(const TDataSink& thrift_sink) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 256a90d8852..e9f1661b3a2 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -117,8 +117,8 @@ ResultSinkOperatorX::ResultSinkOperatorX(int operator_id,
const RowDescriptor& r
_name = "ResultSink";
}
-Status ResultSinkOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(DataSinkOperatorX<ResultSinkLocalState>::open(state));
+Status ResultSinkOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(DataSinkOperatorX<ResultSinkLocalState>::prepare(state));
// prepare output_expr
// From the thrift expressions create the real exprs.
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(_t_output_expr,
_output_vexpr_ctxs));
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index 479343ed6d5..6659d19025f 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -146,7 +146,7 @@ class ResultSinkOperatorX final : public
DataSinkOperatorX<ResultSinkLocalState>
public:
ResultSinkOperatorX(int operator_id, const RowDescriptor& row_desc,
const std::vector<TExpr>& select_exprs, const
TResultSink& sink);
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index b1028be10d2..ea8ee0e5cbe 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1212,10 +1212,10 @@ Status ScanOperatorX<LocalStateType>::init(const
TPlanNode& tnode, RuntimeState*
}
template <typename LocalStateType>
-Status ScanOperatorX<LocalStateType>::open(RuntimeState* state) {
+Status ScanOperatorX<LocalStateType>::prepare(RuntimeState* state) {
_input_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_input_tuple_id);
_output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
- RETURN_IF_ERROR(OperatorX<LocalStateType>::open(state));
+ RETURN_IF_ERROR(OperatorX<LocalStateType>::prepare(state));
const auto slots = _output_tuple_desc->slots();
for (auto* slot : slots) {
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index a7e769319a9..aace8f40e24 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -350,7 +350,7 @@ template <typename LocalStateType>
class ScanOperatorX : public OperatorX<LocalStateType> {
public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
Status get_block_after_projects(RuntimeState* state, vectorized::Block*
block,
bool* eos) override {
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp
b/be/src/pipeline/exec/schema_scan_operator.cpp
index af578637c31..d22943b740a 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -135,8 +135,8 @@ Status SchemaScanOperatorX::init(const TPlanNode& tnode,
RuntimeState* state) {
return Status::OK();
}
-Status SchemaScanOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(Base::open(state));
+Status SchemaScanOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::prepare(state));
// get dest tuple desc
_dest_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
diff --git a/be/src/pipeline/exec/schema_scan_operator.h
b/be/src/pipeline/exec/schema_scan_operator.h
index 2d861002748..594901e812c 100644
--- a/be/src/pipeline/exec/schema_scan_operator.h
+++ b/be/src/pipeline/exec/schema_scan_operator.h
@@ -65,7 +65,7 @@ public:
~SchemaScanOperatorX() override = default;
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
[[nodiscard]] bool is_source() const override { return true; }
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index ee1d1bc128c..be8191ecc0f 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -57,8 +57,8 @@ Status SetProbeSinkOperatorX<is_intersect>::init(const
TPlanNode& tnode, Runtime
}
template <bool is_intersect>
-Status SetProbeSinkOperatorX<is_intersect>::open(RuntimeState* state) {
-
RETURN_IF_ERROR(DataSinkOperatorX<SetProbeSinkLocalState<is_intersect>>::open(state));
+Status SetProbeSinkOperatorX<is_intersect>::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(DataSinkOperatorX<SetProbeSinkLocalState<is_intersect>>::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state,
_child->row_desc()));
return vectorized::VExpr::open(_child_exprs, state);
}
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 5e4098e863c..a90a9775d1b 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -94,7 +94,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
DataDistribution required_data_distribution() const override {
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index 82bf523c60a..4faeb975ef9 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -233,8 +233,8 @@ size_t
SetSinkOperatorX<is_intersect>::get_reserve_mem_size(RuntimeState* state,
}
template <bool is_intersect>
-Status SetSinkOperatorX<is_intersect>::open(RuntimeState* state) {
- RETURN_IF_ERROR(Base::open(state));
+Status SetSinkOperatorX<is_intersect>::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_exprs, state,
_child->row_desc()));
return vectorized::VExpr::open(_child_exprs, state);
}
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index 4d57c0351f9..08f789f702a 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -87,7 +87,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
DataDistribution required_data_distribution() const override {
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index a1851d4ee41..12301c60501 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -114,8 +114,8 @@ Status SortSinkOperatorX::init(const TPlanNode& tnode,
RuntimeState* state) {
return Status::OK();
}
-Status SortSinkOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(DataSinkOperatorX<SortSinkLocalState>::open(state));
+Status SortSinkOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(DataSinkOperatorX<SortSinkLocalState>::prepare(state));
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(),
_row_descriptor));
return _vsort_exec_exprs.open(state);
}
diff --git a/be/src/pipeline/exec/sort_sink_operator.h
b/be/src/pipeline/exec/sort_sink_operator.h
index d2edaf7c281..ee6d364fa57 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -64,7 +64,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
DataDistribution required_data_distribution() const override {
if (_is_analytic_sort) {
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp
b/be/src/pipeline/exec/sort_source_operator.cpp
index b57d422309d..d8982ce1466 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -41,8 +41,8 @@ Status SortSourceOperatorX::init(const TPlanNode& tnode,
RuntimeState* state) {
return Status::OK();
}
-Status SortSourceOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(Base::open(state));
+Status SortSourceOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(Base::prepare(state));
// spill sort _child may be nullptr.
if (_child) {
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _child->row_desc(),
_row_descriptor));
diff --git a/be/src/pipeline/exec/sort_source_operator.h
b/be/src/pipeline/exec/sort_source_operator.h
index 7902e4815bf..14ac7b81da9 100644
--- a/be/src/pipeline/exec/sort_source_operator.h
+++ b/be/src/pipeline/exec/sort_source_operator.h
@@ -49,7 +49,7 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos)
override;
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
bool is_source() const override { return true; }
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 7893fd57bb4..5d1df0b4be0 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -116,9 +116,9 @@ Status SpillSortSinkOperatorX::init(const TPlanNode& tnode,
RuntimeState* state)
return _sort_sink_operator->init(tnode, state);
}
-Status SpillSortSinkOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::open(state));
- return _sort_sink_operator->open(state);
+Status SpillSortSinkOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(DataSinkOperatorX<LocalStateType>::prepare(state));
+ return _sort_sink_operator->prepare(state);
}
size_t SpillSortSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool
eos) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index 0dda4e83e15..4094bb9a36f 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -69,7 +69,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
DataDistribution required_data_distribution() const override {
return _sort_sink_operator->required_data_distribution();
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index d815bfb7d1b..ef777f19403 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -235,9 +235,9 @@ Status SpillSortSourceOperatorX::init(const TPlanNode&
tnode, RuntimeState* stat
return _sort_source_operator->init(tnode, state);
}
-Status SpillSortSourceOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(OperatorXBase::open(state));
- return _sort_source_operator->open(state);
+Status SpillSortSourceOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(OperatorXBase::prepare(state));
+ return _sort_source_operator->prepare(state);
}
Status SpillSortSourceOperatorX::close(RuntimeState* state) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h
b/be/src/pipeline/exec/spill_sort_source_operator.h
index fae64e051f4..1706f5dda72 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -73,7 +73,7 @@ public:
~SpillSortSourceOperatorX() override = default;
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status close(RuntimeState* state) override;
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index a2cc5e72b15..16ab1703059 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -1167,8 +1167,8 @@ Status StreamingAggOperatorX::init(const TPlanNode&
tnode, RuntimeState* state)
return Status::OK();
}
-Status StreamingAggOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(StatefulOperatorX<StreamingAggLocalState>::open(state));
+Status StreamingAggOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(StatefulOperatorX<StreamingAggLocalState>::prepare(state));
_intermediate_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(),
_output_tuple_desc->slots().size());
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 3ee52eeb6ec..a75cb16a436 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -205,7 +205,7 @@ public:
const DescriptorTbl& descs);
~StreamingAggOperatorX() override = default;
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status pull(RuntimeState* state, vectorized::Block* block, bool* eos)
const override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos)
const override;
bool need_more_input_data(RuntimeState* state) const override;
diff --git a/be/src/pipeline/exec/table_function_operator.cpp
b/be/src/pipeline/exec/table_function_operator.cpp
index acd867fcc6d..975e5907177 100644
--- a/be/src/pipeline/exec/table_function_operator.cpp
+++ b/be/src/pipeline/exec/table_function_operator.cpp
@@ -287,8 +287,8 @@ Status TableFunctionOperatorX::init(const TPlanNode& tnode,
RuntimeState* state)
return Status::OK();
}
-Status TableFunctionOperatorX::open(doris::RuntimeState* state) {
- RETURN_IF_ERROR(Base::open(state));
+Status TableFunctionOperatorX::prepare(doris::RuntimeState* state) {
+ RETURN_IF_ERROR(Base::prepare(state));
for (auto* fn : _fns) {
RETURN_IF_ERROR(fn->prepare());
}
diff --git a/be/src/pipeline/exec/table_function_operator.h
b/be/src/pipeline/exec/table_function_operator.h
index 9aa26e9ae22..ada828cab6e 100644
--- a/be/src/pipeline/exec/table_function_operator.h
+++ b/be/src/pipeline/exec/table_function_operator.h
@@ -83,7 +83,7 @@ public:
TableFunctionOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
const DescriptorTbl& descs);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(doris::RuntimeState* state) override;
+ Status prepare(doris::RuntimeState* state) override;
bool need_more_input_data(RuntimeState* state) const override {
auto& local_state =
state->get_local_state(operator_id())->cast<TableFunctionLocalState>();
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp
b/be/src/pipeline/exec/union_sink_operator.cpp
index d04ae2130d8..4bbb5eba3e3 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -76,8 +76,8 @@ Status UnionSinkOperatorX::init(const TPlanNode& tnode,
RuntimeState* state) {
return Status::OK();
}
-Status UnionSinkOperatorX::open(RuntimeState* state) {
- RETURN_IF_ERROR(DataSinkOperatorX<UnionSinkLocalState>::open(state));
+Status UnionSinkOperatorX::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(DataSinkOperatorX<UnionSinkLocalState>::prepare(state));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_child_expr, state,
_child->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::check_expr_output_type(_child_expr,
_row_descriptor));
// open const expr lists.
diff --git a/be/src/pipeline/exec/union_sink_operator.h
b/be/src/pipeline/exec/union_sink_operator.h
index 75b3adab49e..170b99f12f1 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -74,7 +74,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
diff --git a/be/src/pipeline/exec/union_source_operator.h
b/be/src/pipeline/exec/union_source_operator.h
index 8a00c35b04b..6619b623ef5 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -83,8 +83,8 @@ public:
return Status::OK();
}
- Status open(RuntimeState* state) override {
- static_cast<void>(Base::open(state));
+ Status prepare(RuntimeState* state) override {
+ static_cast<void>(Base::prepare(state));
// Prepare const expr lists.
for (const vectorized::VExprContextSPtrs& exprs : _const_expr_lists) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(exprs, state,
_row_descriptor));
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index 1cf94af663e..45d99004750 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -66,8 +66,8 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type,
const int num_buckets
return Status::OK();
}
-Status LocalExchangeSinkOperatorX::open(RuntimeState* state) {
-
RETURN_IF_ERROR(DataSinkOperatorX<LocalExchangeSinkLocalState>::open(state));
+Status LocalExchangeSinkOperatorX::prepare(RuntimeState* state) {
+
RETURN_IF_ERROR(DataSinkOperatorX<LocalExchangeSinkLocalState>::prepare(state));
if (_type == ExchangeType::HASH_SHUFFLE || _type ==
ExchangeType::BUCKET_HASH_SHUFFLE) {
RETURN_IF_ERROR(_partitioner->prepare(state, _child->row_desc()));
RETURN_IF_ERROR(_partitioner->open(state));
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 148dab29604..16e40c2a428 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -113,7 +113,7 @@ public:
Status init(ExchangeType type, const int num_buckets, const bool
use_global_hash_shuffle,
const std::map<int, int>& shuffle_idx_to_instance_idx)
override;
- Status open(RuntimeState* state) override;
+ Status prepare(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos)
override;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index d6c8cecfef3..a71cdaafacc 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -78,7 +78,7 @@ public:
_exchange_type = type;
return Status::OK();
}
- Status open(RuntimeState* state) override { return Status::OK(); }
+ Status prepare(RuntimeState* state) override { return Status::OK(); }
const RowDescriptor& intermediate_row_desc() const override {
return _child->intermediate_row_desc();
}
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 27980976d81..4ef3cff9dbd 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -86,8 +86,8 @@ Status Pipeline::add_operator(OperatorPtr& op, const int
parallelism) {
}
Status Pipeline::prepare(RuntimeState* state) {
- RETURN_IF_ERROR(_operators.back()->open(state));
- RETURN_IF_ERROR(_sink->open(state));
+ RETURN_IF_ERROR(_operators.back()->prepare(state));
+ RETURN_IF_ERROR(_sink->prepare(state));
_name.append(std::to_string(id()));
_name.push_back('-');
for (auto& op : _operators) {
diff --git a/be/test/pipeline/operator/agg_operator_test.cpp
b/be/test/pipeline/operator/agg_operator_test.cpp
index 7f0c0e6ae07..de9af15c1ba 100644
--- a/be/test/pipeline/operator/agg_operator_test.cpp
+++ b/be/test/pipeline/operator/agg_operator_test.cpp
@@ -96,7 +96,7 @@ std::shared_ptr<AggSinkOperatorX>
create_agg_sink_op(OperatorContext& ctx, bool
op->_aggregate_evaluators.push_back(
vectorized::create_mock_agg_fn_evaluator(ctx.pool, is_merge,
without_key));
op->_pool = &ctx.pool;
- EXPECT_TRUE(op->open(&ctx.state).ok());
+ EXPECT_TRUE(op->prepare(&ctx.state).ok());
return op;
}
@@ -107,7 +107,7 @@ std::shared_ptr<AggSourceOperatorX>
create_agg_source_op(OperatorContext& ctx, b
new MockRowDescriptor
{{std::make_shared<vectorized::DataTypeInt64>()}, &ctx.pool});
op->_without_key = without_key;
op->_needs_finalize = needs_finalize;
- EXPECT_TRUE(op->open(&ctx.state).ok());
+ EXPECT_TRUE(op->prepare(&ctx.state).ok());
return op;
}
@@ -231,7 +231,7 @@ TEST(AggOperatorTestWithOutGroupBy, test_multi_input) {
ctx.pool, MockSlotRef::create_mock_contexts(1,
std::make_shared<const DataTypeInt64>()),
false, true));
sink_op->_pool = &ctx.pool;
- EXPECT_TRUE(sink_op->open(&ctx.state).ok());
+ EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
auto source_op = std::make_shared<MockAggSourceOperator>();
source_op->mock_row_descriptor.reset(
@@ -240,7 +240,7 @@ TEST(AggOperatorTestWithOutGroupBy, test_multi_input) {
&ctx.pool});
source_op->_without_key = true;
source_op->_needs_finalize = true;
- EXPECT_TRUE(source_op->open(&ctx.state).ok());
+ EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
@@ -297,7 +297,7 @@ TEST_F(AggOperatorTestWithGroupBy,
test_need_finalize_only_key) {
sink_op->_aggregate_evaluators.push_back(
vectorized::create_mock_agg_fn_evaluator(ctx.pool, false, false));
sink_op->_pool = &ctx.pool;
- EXPECT_TRUE(sink_op->open(&ctx.state).ok());
+ EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
sink_op->_probe_expr_ctxs =
MockSlotRef::create_mock_contexts(std::make_shared<DataTypeInt64>());
@@ -308,7 +308,7 @@ TEST_F(AggOperatorTestWithGroupBy,
test_need_finalize_only_key) {
&ctx.pool});
source_op->_without_key = false;
source_op->_needs_finalize = true;
- EXPECT_TRUE(source_op->open(&ctx.state).ok());
+ EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
@@ -359,7 +359,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_need_finalize) {
ctx.pool, MockSlotRef::create_mock_contexts(1,
std::make_shared<DataTypeInt64>()),
false, false));
sink_op->_pool = &ctx.pool;
- EXPECT_TRUE(sink_op->open(&ctx.state).ok());
+ EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
sink_op->_probe_expr_ctxs =
MockSlotRef::create_mock_contexts(0,
std::make_shared<DataTypeInt64>());
@@ -370,7 +370,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_need_finalize) {
&ctx.pool});
source_op->_without_key = false;
source_op->_needs_finalize = true;
- EXPECT_TRUE(source_op->open(&ctx.state).ok());
+ EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
@@ -425,7 +425,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_2_phase) {
ctx.pool, MockSlotRef::create_mock_contexts(1,
std::make_shared<DataTypeInt64>()),
false, false));
sink_op->_pool = &ctx.pool;
- EXPECT_TRUE(sink_op->open(&ctx.state).ok());
+ EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
sink_op->_probe_expr_ctxs =
MockSlotRef::create_mock_contexts(0,
std::make_shared<DataTypeInt64>());
@@ -436,7 +436,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_2_phase) {
&ctx.pool});
source_op->_without_key = false;
source_op->_needs_finalize = false;
- EXPECT_TRUE(source_op->open(&ctx.state).ok());
+ EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
@@ -466,7 +466,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_2_phase) {
ctx.pool, MockSlotRef::create_mock_contexts(1,
std::make_shared<DataTypeInt64>()),
true, false));
sink_op->_pool = &ctx.pool;
- EXPECT_TRUE(sink_op->open(&ctx.state).ok());
+ EXPECT_TRUE(sink_op->prepare(&ctx.state).ok());
sink_op->_probe_expr_ctxs =
MockSlotRef::create_mock_contexts(0,
std::make_shared<DataTypeInt64>());
@@ -477,7 +477,7 @@ TEST_F(AggOperatorTestWithGroupBy, test_2_phase) {
&ctx.pool});
source_op->_without_key = false;
source_op->_needs_finalize = true;
- EXPECT_TRUE(source_op->open(&ctx.state).ok());
+ EXPECT_TRUE(source_op->prepare(&ctx.state).ok());
auto shared_state = init_sink_and_source(sink_op, source_op, ctx);
diff --git a/be/test/pipeline/operator/exchange_sink_operator_test.cpp
b/be/test/pipeline/operator/exchange_sink_operator_test.cpp
index efa30c82b57..945eab84ec5 100644
--- a/be/test/pipeline/operator/exchange_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/exchange_sink_operator_test.cpp
@@ -77,7 +77,7 @@ auto create_exchange_sink(std::vector<ChannelInfo>
channel_info) {
std::shared_ptr<MockExchangeSinkOperatorX> op =
std::make_shared<MockExchangeSinkOperatorX>(*ctx);
- EXPECT_TRUE(op->open(&ctx->state));
+ EXPECT_TRUE(op->prepare(&ctx->state));
auto local_state = std::make_unique<MockExchangeLocalState>(op.get(),
&ctx->state);
diff --git a/be/test/vec/exec/vfile_scanner_exception_test.cpp
b/be/test/vec/exec/vfile_scanner_exception_test.cpp
index 4b6ce46bd88..bf0d35d9eba 100644
--- a/be/test/vec/exec/vfile_scanner_exception_test.cpp
+++ b/be/test/vec/exec/vfile_scanner_exception_test.cpp
@@ -247,7 +247,7 @@ void VfileScannerExceptionTest::init() {
std::make_shared<pipeline::FileScanOperatorX>(&_obj_pool, _tnode,
0, *_desc_tbl, 1);
_scan_node->_output_tuple_desc =
_runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init
scan_node");
- WARN_IF_ERROR(_scan_node->open(&_runtime_state), "fail to open scan_node");
+ WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to open
scan_node");
auto local_state =
pipeline::FileScanLocalState::create_unique(&_runtime_state,
_scan_node.get());
diff --git a/be/test/vec/exec/vwal_scanner_test.cpp
b/be/test/vec/exec/vwal_scanner_test.cpp
index 2e6d4bf5cde..af32b8677d4 100644
--- a/be/test/vec/exec/vwal_scanner_test.cpp
+++ b/be/test/vec/exec/vwal_scanner_test.cpp
@@ -261,7 +261,7 @@ void VWalScannerTest::init() {
std::make_shared<pipeline::FileScanOperatorX>(&_obj_pool, _tnode,
0, *_desc_tbl, 1);
_scan_node->_output_tuple_desc =
_runtime_state.desc_tbl().get_tuple_descriptor(_dst_tuple_id);
WARN_IF_ERROR(_scan_node->init(_tnode, &_runtime_state), "fail to init
scan_node");
- WARN_IF_ERROR(_scan_node->open(&_runtime_state), "fail to prepare
scan_node");
+ WARN_IF_ERROR(_scan_node->prepare(&_runtime_state), "fail to prepare
scan_node");
auto local_state =
pipeline::FileScanLocalState::create_unique(&_runtime_state,
_scan_node.get());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]