This is an automated email from the ASF dual-hosted git repository.
Mryange pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c27fd0ba968 [refactor](be) Add operator IO wrappers (#64139)
c27fd0ba968 is described below
commit c27fd0ba968daffe8329186205315f8b1cd147b7
Author: Mryange <[email protected]>
AuthorDate: Sat Jun 6 17:46:46 2026 +0800
[refactor](be) Add operator IO wrappers (#64139)
### What problem does this PR solve?
Issue Number: N/A
Problem Summary:
Pipeline operator source and sink paths need a common place to validate
output and input blocks. Before this change, `sink` and `get_block` were
the virtual override points, so common validation either had to stay in
call sites or be duplicated across operator implementations.
Root cause: the public operator data-flow entry points were also the
polymorphic implementation hooks, which left no wrapper layer for shared
checks.
This change makes `DataSinkOperatorXBase::sink` and
`OperatorXBase::get_block` non-virtual wrappers. The wrappers run
`Block::check_type_and_column()` at the source/sink boundary and then
dispatch to the new virtual `sink_impl` and `get_block_impl` methods.
All pipeline operator implementations, exchange operators, scan
operators, and related BE test mocks are migrated to the new impl
methods. The scan projection path is updated to call the base
`get_block` wrapper so the shared checks still apply.
---
.../exec/exchange/local_exchange_sink_operator.cpp | 2 +-
.../exec/exchange/local_exchange_sink_operator.h | 2 +-
.../exchange/local_exchange_source_operator.cpp | 2 +-
.../exec/exchange/local_exchange_source_operator.h | 2 +-
be/src/exec/operator/aggregation_sink_operator.cpp | 2 +-
be/src/exec/operator/aggregation_sink_operator.h | 2 +-
.../exec/operator/aggregation_source_operator.cpp | 2 +-
be/src/exec/operator/aggregation_source_operator.h | 2 +-
be/src/exec/operator/analytic_sink_operator.cpp | 2 +-
be/src/exec/operator/analytic_sink_operator.h | 2 +-
be/src/exec/operator/analytic_source_operator.cpp | 3 ++-
be/src/exec/operator/analytic_source_operator.h | 2 +-
be/src/exec/operator/blackhole_sink_operator.cpp | 2 +-
be/src/exec/operator/blackhole_sink_operator.h | 2 +-
.../bucketed_aggregation_sink_operator.cpp | 2 +-
.../operator/bucketed_aggregation_sink_operator.h | 2 +-
.../bucketed_aggregation_source_operator.cpp | 2 +-
.../bucketed_aggregation_source_operator.h | 2 +-
be/src/exec/operator/cache_sink_operator.cpp | 2 +-
be/src/exec/operator/cache_sink_operator.h | 2 +-
be/src/exec/operator/cache_source_operator.cpp | 2 +-
be/src/exec/operator/cache_source_operator.h | 2 +-
be/src/exec/operator/datagen_operator.cpp | 2 +-
be/src/exec/operator/datagen_operator.h | 2 +-
be/src/exec/operator/dict_sink_operator.cpp | 2 +-
be/src/exec/operator/dict_sink_operator.h | 2 +-
be/src/exec/operator/empty_set_operator.cpp | 2 +-
be/src/exec/operator/empty_set_operator.h | 2 +-
be/src/exec/operator/exchange_sink_operator.cpp | 2 +-
be/src/exec/operator/exchange_sink_operator.h | 2 +-
be/src/exec/operator/exchange_source_operator.cpp | 2 +-
be/src/exec/operator/exchange_source_operator.h | 2 +-
.../operator/group_commit_block_sink_operator.cpp | 2 +-
.../operator/group_commit_block_sink_operator.h | 2 +-
.../exec/operator/group_commit_scan_operator.cpp | 2 +-
be/src/exec/operator/group_commit_scan_operator.h | 2 +-
be/src/exec/operator/hashjoin_build_sink.cpp | 2 +-
be/src/exec/operator/hashjoin_build_sink.h | 2 +-
be/src/exec/operator/hive_table_sink_operator.h | 2 +-
.../exec/operator/iceberg_delete_sink_operator.h | 2 +-
be/src/exec/operator/iceberg_merge_sink_operator.h | 2 +-
be/src/exec/operator/iceberg_table_sink_operator.h | 2 +-
be/src/exec/operator/jdbc_table_sink_operator.cpp | 2 +-
be/src/exec/operator/jdbc_table_sink_operator.h | 2 +-
.../operator/local_merge_sort_source_operator.cpp | 2 +-
.../operator/local_merge_sort_source_operator.h | 2 +-
.../exec/operator/maxcompute_table_sink_operator.h | 2 +-
.../exec/operator/memory_scratch_sink_operator.cpp | 2 +-
.../exec/operator/memory_scratch_sink_operator.h | 2 +-
be/src/exec/operator/mock_operator.h | 2 +-
be/src/exec/operator/mock_scan_operator.h | 2 +-
.../exec/operator/multi_cast_data_stream_sink.cpp | 2 +-
be/src/exec/operator/multi_cast_data_stream_sink.h | 2 +-
.../operator/multi_cast_data_stream_source.cpp | 4 ++--
.../exec/operator/multi_cast_data_stream_source.h | 2 +-
.../operator/nested_loop_join_build_operator.cpp | 3 ++-
.../operator/nested_loop_join_build_operator.h | 2 +-
be/src/exec/operator/olap_table_sink_operator.h | 2 +-
be/src/exec/operator/olap_table_sink_v2_operator.h | 2 +-
be/src/exec/operator/operator.cpp | 6 ++++--
be/src/exec/operator/operator.h | 23 ++++++++++++++++------
.../exec/operator/partition_sort_sink_operator.cpp | 2 +-
.../exec/operator/partition_sort_sink_operator.h | 2 +-
.../operator/partition_sort_source_operator.cpp | 4 ++--
.../exec/operator/partition_sort_source_operator.h | 2 +-
.../partitioned_aggregation_sink_operator.cpp | 3 ++-
.../partitioned_aggregation_sink_operator.h | 2 +-
.../partitioned_aggregation_source_operator.cpp | 2 +-
.../partitioned_aggregation_source_operator.h | 2 +-
.../partitioned_hash_join_probe_operator.cpp | 3 ++-
.../partitioned_hash_join_probe_operator.h | 2 +-
.../partitioned_hash_join_sink_operator.cpp | 2 +-
.../operator/partitioned_hash_join_sink_operator.h | 2 +-
.../exec/operator/rec_cte_anchor_sink_operator.h | 2 +-
be/src/exec/operator/rec_cte_scan_operator.h | 2 +-
be/src/exec/operator/rec_cte_sink_operator.h | 2 +-
be/src/exec/operator/rec_cte_source_operator.h | 2 +-
be/src/exec/operator/result_file_sink_operator.cpp | 2 +-
be/src/exec/operator/result_file_sink_operator.h | 2 +-
be/src/exec/operator/result_sink_operator.cpp | 2 +-
be/src/exec/operator/result_sink_operator.h | 2 +-
be/src/exec/operator/scan_operator.cpp | 2 +-
be/src/exec/operator/scan_operator.h | 4 ++--
be/src/exec/operator/schema_scan_operator.cpp | 2 +-
be/src/exec/operator/schema_scan_operator.h | 2 +-
be/src/exec/operator/set_probe_sink_operator.cpp | 3 ++-
be/src/exec/operator/set_probe_sink_operator.h | 2 +-
be/src/exec/operator/set_sink_operator.cpp | 2 +-
be/src/exec/operator/set_sink_operator.h | 2 +-
be/src/exec/operator/set_source_operator.cpp | 3 ++-
be/src/exec/operator/set_source_operator.h | 2 +-
be/src/exec/operator/sort_sink_operator.cpp | 2 +-
be/src/exec/operator/sort_sink_operator.h | 2 +-
be/src/exec/operator/sort_source_operator.cpp | 2 +-
be/src/exec/operator/sort_source_operator.h | 2 +-
.../operator/spill_iceberg_table_sink_operator.cpp | 2 +-
.../operator/spill_iceberg_table_sink_operator.h | 2 +-
be/src/exec/operator/spill_sort_sink_operator.cpp | 2 +-
be/src/exec/operator/spill_sort_sink_operator.h | 2 +-
.../exec/operator/spill_sort_source_operator.cpp | 2 +-
be/src/exec/operator/spill_sort_source_operator.h | 2 +-
be/src/exec/operator/tvf_table_sink_operator.h | 2 +-
be/src/exec/operator/union_sink_operator.cpp | 2 +-
be/src/exec/operator/union_sink_operator.h | 2 +-
be/src/exec/operator/union_source_operator.cpp | 2 +-
be/src/exec/operator/union_source_operator.h | 2 +-
be/src/exec/pipeline/pipeline_task.cpp | 3 +--
be/test/exec/operator/agg_operator_test.cpp | 2 +-
.../exec/operator/analytic_sink_operator_test.cpp | 4 +++-
.../operator/partition_sort_sink_operator_test.cpp | 4 +++-
.../operator/partitioned_aggregation_test_helper.h | 4 +++-
.../operator/partitioned_hash_join_test_helper.h | 4 +++-
.../exec/operator/query_cache_operator_test.cpp | 4 +++-
be/test/exec/operator/sort_operator_test.cpp | 4 +++-
be/test/exec/operator/spill_sort_test_helper.h | 2 +-
.../exec/operator/streaming_agg_operator_test.cpp | 4 +++-
.../exec/operator/table_function_operator_test.cpp | 4 +++-
be/test/testutil/mock/mock_operators.h | 4 ++--
be/test/util/profile_spec_test.cpp | 4 ++--
119 files changed, 165 insertions(+), 131 deletions(-)
diff --git a/be/src/exec/exchange/local_exchange_sink_operator.cpp
b/be/src/exec/exchange/local_exchange_sink_operator.cpp
index 2f9443208e7..0a2aeccd811 100644
--- a/be/src/exec/exchange/local_exchange_sink_operator.cpp
+++ b/be/src/exec/exchange/local_exchange_sink_operator.cpp
@@ -142,7 +142,7 @@ std::string LocalExchangeSinkLocalState::debug_string(int
indentation_level) con
return fmt::to_string(debug_string_buffer);
}
-Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, Block* in_block,
bool eos) {
+Status LocalExchangeSinkOperatorX::sink_impl(RuntimeState* state, Block*
in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/exchange/local_exchange_sink_operator.h
b/be/src/exec/exchange/local_exchange_sink_operator.h
index f1ca1935457..ddaeb906b26 100644
--- a/be/src/exec/exchange/local_exchange_sink_operator.h
+++ b/be/src/exec/exchange/local_exchange_sink_operator.h
@@ -103,7 +103,7 @@ public:
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
void set_low_memory_mode(RuntimeState* state) override {
auto& local_state = get_local_state(state);
diff --git a/be/src/exec/exchange/local_exchange_source_operator.cpp
b/be/src/exec/exchange/local_exchange_source_operator.cpp
index ad092656f21..ace8573d704 100644
--- a/be/src/exec/exchange/local_exchange_source_operator.cpp
+++ b/be/src/exec/exchange/local_exchange_source_operator.cpp
@@ -89,7 +89,7 @@ std::string LocalExchangeSourceLocalState::debug_string(int
indentation_level) c
return fmt::to_string(debug_string_buffer);
}
-Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, Block*
block, bool* eos) {
+Status LocalExchangeSourceOperatorX::get_block_impl(RuntimeState* state,
Block* block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._exchanger->get_block(
diff --git a/be/src/exec/exchange/local_exchange_source_operator.h
b/be/src/exec/exchange/local_exchange_source_operator.h
index 3fdf90b50f0..c315680901f 100644
--- a/be/src/exec/exchange/local_exchange_source_operator.h
+++ b/be/src/exec/exchange/local_exchange_source_operator.h
@@ -78,7 +78,7 @@ public:
RowDescriptor& row_descriptor() override { return
_child->row_descriptor(); }
const RowDescriptor& row_desc() const override { return
_child->row_desc(); }
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
bool is_source() const override { return true; }
diff --git a/be/src/exec/operator/aggregation_sink_operator.cpp
b/be/src/exec/operator/aggregation_sink_operator.cpp
index 8e40a53d3d4..4adaecec6a9 100644
--- a/be/src/exec/operator/aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/aggregation_sink_operator.cpp
@@ -988,7 +988,7 @@ Status AggSinkOperatorX::_check_agg_fn_output() {
return Status::OK();
}
-Status AggSinkOperatorX::sink(doris::RuntimeState* state, Block* in_block,
bool eos) {
+Status AggSinkOperatorX::sink_impl(doris::RuntimeState* state, Block*
in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/aggregation_sink_operator.h
b/be/src/exec/operator/aggregation_sink_operator.h
index 8d3273dde2a..4d103d54119 100644
--- a/be/src/exec/operator/aggregation_sink_operator.h
+++ b/be/src/exec/operator/aggregation_sink_operator.h
@@ -154,7 +154,7 @@ public:
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
DataDistribution required_data_distribution(RuntimeState* state) const
override {
if (_partition_exprs.empty()) {
diff --git a/be/src/exec/operator/aggregation_source_operator.cpp
b/be/src/exec/operator/aggregation_source_operator.cpp
index 0699d6a6002..945fb24abe8 100644
--- a/be/src/exec/operator/aggregation_source_operator.cpp
+++ b/be/src/exec/operator/aggregation_source_operator.cpp
@@ -562,7 +562,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
_needs_finalize(tnode.agg_node.need_finalize),
_without_key(tnode.agg_node.grouping_exprs.empty()) {}
-Status AggSourceOperatorX::get_block(RuntimeState* state, Block* block, bool*
eos) {
+Status AggSourceOperatorX::get_block_impl(RuntimeState* state, Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
diff --git a/be/src/exec/operator/aggregation_source_operator.h
b/be/src/exec/operator/aggregation_source_operator.h
index c9348826ca4..090f493ae28 100644
--- a/be/src/exec/operator/aggregation_source_operator.h
+++ b/be/src/exec/operator/aggregation_source_operator.h
@@ -102,7 +102,7 @@ public:
AggSourceOperatorX() = default;
#endif
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
bool is_source() const override { return true; }
diff --git a/be/src/exec/operator/analytic_sink_operator.cpp
b/be/src/exec/operator/analytic_sink_operator.cpp
index f4fb437d31c..cd15757a6eb 100644
--- a/be/src/exec/operator/analytic_sink_operator.cpp
+++ b/be/src/exec/operator/analytic_sink_operator.cpp
@@ -744,7 +744,7 @@ Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
return Status::OK();
}
-Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, Block*
input_block, bool eos) {
+Status AnalyticSinkOperatorX::sink_impl(doris::RuntimeState* state, Block*
input_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)input_block->rows());
diff --git a/be/src/exec/operator/analytic_sink_operator.h
b/be/src/exec/operator/analytic_sink_operator.h
index 9a64ba1d0b7..62d8bcde692 100644
--- a/be/src/exec/operator/analytic_sink_operator.h
+++ b/be/src/exec/operator/analytic_sink_operator.h
@@ -209,7 +209,7 @@ public:
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
diff --git a/be/src/exec/operator/analytic_source_operator.cpp
b/be/src/exec/operator/analytic_source_operator.cpp
index 3d25b20c7a4..d3bb0de3975 100644
--- a/be/src/exec/operator/analytic_source_operator.cpp
+++ b/be/src/exec/operator/analytic_source_operator.cpp
@@ -42,7 +42,8 @@ AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool*
pool, const TPlanNo
int operator_id, const
DescriptorTbl& descs)
: OperatorX<AnalyticLocalState>(pool, tnode, operator_id, descs) {}
-Status AnalyticSourceOperatorX::get_block(RuntimeState* state, Block*
output_block, bool* eos) {
+Status AnalyticSourceOperatorX::get_block_impl(RuntimeState* state, Block*
output_block,
+ bool* eos) {
RETURN_IF_CANCELLED(state);
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/analytic_source_operator.h
b/be/src/exec/operator/analytic_source_operator.h
index 696ec74cb69..72273ebd98a 100644
--- a/be/src/exec/operator/analytic_source_operator.h
+++ b/be/src/exec/operator/analytic_source_operator.h
@@ -46,7 +46,7 @@ public:
#ifdef BE_TEST
AnalyticSourceOperatorX() = default;
#endif
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
bool is_source() const override { return true; }
diff --git a/be/src/exec/operator/blackhole_sink_operator.cpp
b/be/src/exec/operator/blackhole_sink_operator.cpp
index 0745c3285cc..e8daabec852 100644
--- a/be/src/exec/operator/blackhole_sink_operator.cpp
+++ b/be/src/exec/operator/blackhole_sink_operator.cpp
@@ -44,7 +44,7 @@ Status BlackholeSinkOperatorX::init(const TDataSink& tsink) {
return Status::OK();
}
-Status BlackholeSinkOperatorX::sink(RuntimeState* state, Block* block, bool
eos) {
+Status BlackholeSinkOperatorX::sink_impl(RuntimeState* state, Block* block,
bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
diff --git a/be/src/exec/operator/blackhole_sink_operator.h
b/be/src/exec/operator/blackhole_sink_operator.h
index 23a2e995306..8bb32e3d4f7 100644
--- a/be/src/exec/operator/blackhole_sink_operator.h
+++ b/be/src/exec/operator/blackhole_sink_operator.h
@@ -68,7 +68,7 @@ public:
Status init(const TDataSink& tsink) override;
- Status sink(RuntimeState* state, Block* block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* block, bool eos) override;
Status close(RuntimeState* state) override;
diff --git a/be/src/exec/operator/bucketed_aggregation_sink_operator.cpp
b/be/src/exec/operator/bucketed_aggregation_sink_operator.cpp
index 8cb58b2d532..aee32541c5a 100644
--- a/be/src/exec/operator/bucketed_aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/bucketed_aggregation_sink_operator.cpp
@@ -472,7 +472,7 @@ Status BucketedAggSinkOperatorX::prepare(RuntimeState*
state) {
return Status::OK();
}
-Status BucketedAggSinkOperatorX::sink(RuntimeState* state, Block* in_block,
bool eos) {
+Status BucketedAggSinkOperatorX::sink_impl(RuntimeState* state, Block*
in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/bucketed_aggregation_sink_operator.h
b/be/src/exec/operator/bucketed_aggregation_sink_operator.h
index 34b8103155e..4f2abe0a96a 100644
--- a/be/src/exec/operator/bucketed_aggregation_sink_operator.h
+++ b/be/src/exec/operator/bucketed_aggregation_sink_operator.h
@@ -110,7 +110,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
// No local exchange needed — each instance builds its own hash tables
independently.
DataDistribution required_data_distribution(RuntimeState* state) const
override {
diff --git a/be/src/exec/operator/bucketed_aggregation_source_operator.cpp
b/be/src/exec/operator/bucketed_aggregation_source_operator.cpp
index e1bd7108955..ac571248b5d 100644
--- a/be/src/exec/operator/bucketed_aggregation_source_operator.cpp
+++ b/be/src/exec/operator/bucketed_aggregation_source_operator.cpp
@@ -730,7 +730,7 @@
BucketedAggSourceOperatorX::BucketedAggSourceOperatorX(ObjectPool* pool, const T
: Base(pool, tnode, operator_id, descs),
_needs_finalize(tnode.bucketed_agg_node.need_finalize) {}
-Status BucketedAggSourceOperatorX::get_block(RuntimeState* state, Block*
block, bool* eos) {
+Status BucketedAggSourceOperatorX::get_block_impl(RuntimeState* state, Block*
block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/bucketed_aggregation_source_operator.h
b/be/src/exec/operator/bucketed_aggregation_source_operator.h
index 1a90c464ce7..454b6f067a2 100644
--- a/be/src/exec/operator/bucketed_aggregation_source_operator.h
+++ b/be/src/exec/operator/bucketed_aggregation_source_operator.h
@@ -108,7 +108,7 @@ public:
const DescriptorTbl& descs);
~BucketedAggSourceOperatorX() override = default;
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
bool is_source() const override { return true; }
diff --git a/be/src/exec/operator/cache_sink_operator.cpp
b/be/src/exec/operator/cache_sink_operator.cpp
index 88c1580f8d2..97c42b99c25 100644
--- a/be/src/exec/operator/cache_sink_operator.cpp
+++ b/be/src/exec/operator/cache_sink_operator.cpp
@@ -49,7 +49,7 @@ CacheSinkOperatorX::CacheSinkOperatorX(int sink_id, int
child_id, int dest_id)
_name = "CACHE_SINK_OPERATOR";
}
-Status CacheSinkOperatorX::sink(RuntimeState* state, Block* in_block, bool
eos) {
+Status CacheSinkOperatorX::sink_impl(RuntimeState* state, Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/cache_sink_operator.h
b/be/src/exec/operator/cache_sink_operator.h
index a644a89416c..6204b20eb95 100644
--- a/be/src/exec/operator/cache_sink_operator.h
+++ b/be/src/exec/operator/cache_sink_operator.h
@@ -57,7 +57,7 @@ public:
DataSinkOperatorX<CacheSinkLocalState>::_name);
}
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
std::shared_ptr<BasicSharedState> create_shared_state() const override {
std::shared_ptr<BasicSharedState> ss =
std::make_shared<DataQueueSharedState>();
diff --git a/be/src/exec/operator/cache_source_operator.cpp
b/be/src/exec/operator/cache_source_operator.cpp
index 6f2dc9e084e..ec7d947680a 100644
--- a/be/src/exec/operator/cache_source_operator.cpp
+++ b/be/src/exec/operator/cache_source_operator.cpp
@@ -118,7 +118,7 @@ std::string CacheSourceLocalState::debug_string(int
indentation_level) const {
return fmt::to_string(debug_string_buffer);
}
-Status CacheSourceOperatorX::get_block(RuntimeState* state, Block* block,
bool* eos) {
+Status CacheSourceOperatorX::get_block_impl(RuntimeState* state, Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/cache_source_operator.h
b/be/src/exec/operator/cache_source_operator.h
index fed5e3bb4f2..3a68bd337fc 100644
--- a/be/src/exec/operator/cache_source_operator.h
+++ b/be/src/exec/operator/cache_source_operator.h
@@ -78,7 +78,7 @@ public:
#endif
~CacheSourceOperatorX() override = default;
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
bool is_source() const override { return true; }
diff --git a/be/src/exec/operator/datagen_operator.cpp
b/be/src/exec/operator/datagen_operator.cpp
index a51b4fc978d..a9e0003c779 100644
--- a/be/src/exec/operator/datagen_operator.cpp
+++ b/be/src/exec/operator/datagen_operator.cpp
@@ -60,7 +60,7 @@ Status DataGenSourceOperatorX::prepare(RuntimeState* state) {
return Status::OK();
}
-Status DataGenSourceOperatorX::get_block(RuntimeState* state, Block* block,
bool* eos) {
+Status DataGenSourceOperatorX::get_block_impl(RuntimeState* state, Block*
block, bool* eos) {
if (state == nullptr || block == nullptr) {
return Status::InternalError("input is NULL pointer");
}
diff --git a/be/src/exec/operator/datagen_operator.h
b/be/src/exec/operator/datagen_operator.h
index 376f5960483..7b2a5d4f69c 100644
--- a/be/src/exec/operator/datagen_operator.h
+++ b/be/src/exec/operator/datagen_operator.h
@@ -58,7 +58,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
[[nodiscard]] bool is_source() const override { return true; }
diff --git a/be/src/exec/operator/dict_sink_operator.cpp
b/be/src/exec/operator/dict_sink_operator.cpp
index bb0d1d9d146..8f8a5a13685 100644
--- a/be/src/exec/operator/dict_sink_operator.cpp
+++ b/be/src/exec/operator/dict_sink_operator.cpp
@@ -158,7 +158,7 @@ Status DictSinkOperatorX::prepare(RuntimeState* state) {
return Status::OK();
}
-Status DictSinkOperatorX::sink(RuntimeState* state, Block* in_block, bool eos)
{
+Status DictSinkOperatorX::sink_impl(RuntimeState* state, Block* in_block, bool
eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/dict_sink_operator.h
b/be/src/exec/operator/dict_sink_operator.h
index 2ae82b67a89..23747a99601 100644
--- a/be/src/exec/operator/dict_sink_operator.h
+++ b/be/src/exec/operator/dict_sink_operator.h
@@ -49,7 +49,7 @@ public:
const std::vector<TExpr>& dict_input_expr, const
TDictionarySink& dict_sink);
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
private:
friend class DictSinkLocalState;
diff --git a/be/src/exec/operator/empty_set_operator.cpp
b/be/src/exec/operator/empty_set_operator.cpp
index 58c69bf91bc..72c91997f4c 100644
--- a/be/src/exec/operator/empty_set_operator.cpp
+++ b/be/src/exec/operator/empty_set_operator.cpp
@@ -23,7 +23,7 @@
namespace doris {
-Status EmptySetSourceOperatorX::get_block(RuntimeState* state, Block* block,
bool* eos) {
+Status EmptySetSourceOperatorX::get_block_impl(RuntimeState* state, Block*
block, bool* eos) {
*eos = true;
return Status::OK();
}
diff --git a/be/src/exec/operator/empty_set_operator.h
b/be/src/exec/operator/empty_set_operator.h
index 62dd855ea21..0f16236373c 100644
--- a/be/src/exec/operator/empty_set_operator.h
+++ b/be/src/exec/operator/empty_set_operator.h
@@ -42,7 +42,7 @@ public:
EmptySetSourceOperatorX() = default;
#endif
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
[[nodiscard]] bool is_source() const override { return true; }
};
diff --git a/be/src/exec/operator/exchange_sink_operator.cpp
b/be/src/exec/operator/exchange_sink_operator.cpp
index e65dd979ad2..2011a44d2ca 100644
--- a/be/src/exec/operator/exchange_sink_operator.cpp
+++ b/be/src/exec/operator/exchange_sink_operator.cpp
@@ -392,7 +392,7 @@ Status
ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPt
return channel->close(state);
}
-Status ExchangeSinkOperatorX::sink(RuntimeState* state, Block* block, bool
eos) {
+Status ExchangeSinkOperatorX::sink_impl(RuntimeState* state, Block* block,
bool eos) {
auto& local_state = get_local_state(state);
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)block->rows()); // for auto-partition, may decease
when do_partitioning
diff --git a/be/src/exec/operator/exchange_sink_operator.h
b/be/src/exec/operator/exchange_sink_operator.h
index ea224ed99bd..10351154d1d 100644
--- a/be/src/exec/operator/exchange_sink_operator.h
+++ b/be/src/exec/operator/exchange_sink_operator.h
@@ -198,7 +198,7 @@ public:
// TaskExecutionContext.
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
bool is_serial_operator() const override { return true; }
void set_low_memory_mode(RuntimeState* state) override {
diff --git a/be/src/exec/operator/exchange_source_operator.cpp
b/be/src/exec/operator/exchange_source_operator.cpp
index e008d599078..1e4d24de8e8 100644
--- a/be/src/exec/operator/exchange_source_operator.cpp
+++ b/be/src/exec/operator/exchange_source_operator.cpp
@@ -146,7 +146,7 @@ Status ExchangeSourceOperatorX::prepare(RuntimeState*
state) {
return Status::OK();
}
-Status ExchangeSourceOperatorX::get_block(RuntimeState* state, Block* block,
bool* eos) {
+Status ExchangeSourceOperatorX::get_block_impl(RuntimeState* state, Block*
block, bool* eos) {
auto& local_state = get_local_state(state);
Defer is_eos([&]() {
if (*eos) {
diff --git a/be/src/exec/operator/exchange_source_operator.h
b/be/src/exec/operator/exchange_source_operator.h
index da00e088586..0c4aa1e43eb 100644
--- a/be/src/exec/operator/exchange_source_operator.h
+++ b/be/src/exec/operator/exchange_source_operator.h
@@ -100,7 +100,7 @@ public:
Status reset(RuntimeState* state) override;
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
std::string debug_string(int indentation_level = 0) const override;
diff --git a/be/src/exec/operator/group_commit_block_sink_operator.cpp
b/be/src/exec/operator/group_commit_block_sink_operator.cpp
index a72755720d5..38ea6d3c18a 100644
--- a/be/src/exec/operator/group_commit_block_sink_operator.cpp
+++ b/be/src/exec/operator/group_commit_block_sink_operator.cpp
@@ -280,7 +280,7 @@ Status GroupCommitBlockSinkOperatorX::prepare(RuntimeState*
state) {
return VExpr::open(_output_vexpr_ctxs, state);
}
-Status GroupCommitBlockSinkOperatorX::sink(RuntimeState* state, Block*
input_block, bool eos) {
+Status GroupCommitBlockSinkOperatorX::sink_impl(RuntimeState* state, Block*
input_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)input_block->rows());
diff --git a/be/src/exec/operator/group_commit_block_sink_operator.h
b/be/src/exec/operator/group_commit_block_sink_operator.h
index 644100d42b0..854f2f3cc1d 100644
--- a/be/src/exec/operator/group_commit_block_sink_operator.h
+++ b/be/src/exec/operator/group_commit_block_sink_operator.h
@@ -102,7 +102,7 @@ public:
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* block, bool eos) override;
private:
friend class GroupCommitBlockSinkLocalState;
diff --git a/be/src/exec/operator/group_commit_scan_operator.cpp
b/be/src/exec/operator/group_commit_scan_operator.cpp
index e591ed22edd..209481d86bf 100644
--- a/be/src/exec/operator/group_commit_scan_operator.cpp
+++ b/be/src/exec/operator/group_commit_scan_operator.cpp
@@ -28,7 +28,7 @@ GroupCommitOperatorX::GroupCommitOperatorX(ObjectPool* pool,
const TPlanNode& tn
_output_tuple_id = tnode.file_scan_node.tuple_id;
}
-Status GroupCommitOperatorX::get_block(RuntimeState* state, Block* block,
bool* eos) {
+Status GroupCommitOperatorX::get_block_impl(RuntimeState* state, Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
bool find_node = false;
diff --git a/be/src/exec/operator/group_commit_scan_operator.h
b/be/src/exec/operator/group_commit_scan_operator.h
index 7ed01c16931..679b46b1125 100644
--- a/be/src/exec/operator/group_commit_scan_operator.h
+++ b/be/src/exec/operator/group_commit_scan_operator.h
@@ -54,7 +54,7 @@ public:
GroupCommitOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
const DescriptorTbl& descs, int parallel_tasks);
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
protected:
friend class GroupCommitLocalState;
diff --git a/be/src/exec/operator/hashjoin_build_sink.cpp
b/be/src/exec/operator/hashjoin_build_sink.cpp
index 7d295588710..14772cf3662 100644
--- a/be/src/exec/operator/hashjoin_build_sink.cpp
+++ b/be/src/exec/operator/hashjoin_build_sink.cpp
@@ -814,7 +814,7 @@ Status HashJoinBuildSinkOperatorX::prepare(RuntimeState*
state) {
return VExpr::open(_build_expr_ctxs, state);
}
-Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, Block* in_block,
bool eos) {
+Status HashJoinBuildSinkOperatorX::sink_impl(RuntimeState* state, Block*
in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/hashjoin_build_sink.h
b/be/src/exec/operator/hashjoin_build_sink.h
index 3c3faabcdb5..dda24cef91b 100644
--- a/be/src/exec/operator/hashjoin_build_sink.h
+++ b/be/src/exec/operator/hashjoin_build_sink.h
@@ -118,7 +118,7 @@ public:
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
diff --git a/be/src/exec/operator/hive_table_sink_operator.h
b/be/src/exec/operator/hive_table_sink_operator.h
index a18af89f5df..51161809e68 100644
--- a/be/src/exec/operator/hive_table_sink_operator.h
+++ b/be/src/exec/operator/hive_table_sink_operator.h
@@ -65,7 +65,7 @@ public:
return VExpr::open(_output_vexpr_ctxs, state);
}
- Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/iceberg_delete_sink_operator.h
b/be/src/exec/operator/iceberg_delete_sink_operator.h
index f1d292a8b59..12f93bfc8b6 100644
--- a/be/src/exec/operator/iceberg_delete_sink_operator.h
+++ b/be/src/exec/operator/iceberg_delete_sink_operator.h
@@ -64,7 +64,7 @@ public:
return VExpr::open(_output_vexpr_ctxs, state);
}
- Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/iceberg_merge_sink_operator.h
b/be/src/exec/operator/iceberg_merge_sink_operator.h
index 6de1fbe59eb..0cf64681c1d 100644
--- a/be/src/exec/operator/iceberg_merge_sink_operator.h
+++ b/be/src/exec/operator/iceberg_merge_sink_operator.h
@@ -63,7 +63,7 @@ public:
return VExpr::open(_output_vexpr_ctxs, state);
}
- Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/iceberg_table_sink_operator.h
b/be/src/exec/operator/iceberg_table_sink_operator.h
index 2a9b59d7e36..0dec306edeb 100644
--- a/be/src/exec/operator/iceberg_table_sink_operator.h
+++ b/be/src/exec/operator/iceberg_table_sink_operator.h
@@ -64,7 +64,7 @@ public:
return VExpr::open(_output_vexpr_ctxs, state);
}
- Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/jdbc_table_sink_operator.cpp
b/be/src/exec/operator/jdbc_table_sink_operator.cpp
index ac5354df988..9ee8828714d 100644
--- a/be/src/exec/operator/jdbc_table_sink_operator.cpp
+++ b/be/src/exec/operator/jdbc_table_sink_operator.cpp
@@ -46,7 +46,7 @@ Status JdbcTableSinkOperatorX::prepare(RuntimeState* state) {
return Status::OK();
}
-Status JdbcTableSinkOperatorX::sink(RuntimeState* state, Block* block, bool
eos) {
+Status JdbcTableSinkOperatorX::sink_impl(RuntimeState* state, Block* block,
bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
diff --git a/be/src/exec/operator/jdbc_table_sink_operator.h
b/be/src/exec/operator/jdbc_table_sink_operator.h
index 58a8e52cc7f..a557ec80a8d 100644
--- a/be/src/exec/operator/jdbc_table_sink_operator.h
+++ b/be/src/exec/operator/jdbc_table_sink_operator.h
@@ -45,7 +45,7 @@ public:
Status init(const TDataSink& thrift_sink) override;
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
private:
friend class JdbcTableSinkLocalState;
diff --git a/be/src/exec/operator/local_merge_sort_source_operator.cpp
b/be/src/exec/operator/local_merge_sort_source_operator.cpp
index dacd97357d3..dc3bf09d7f8 100644
--- a/be/src/exec/operator/local_merge_sort_source_operator.cpp
+++ b/be/src/exec/operator/local_merge_sort_source_operator.cpp
@@ -116,7 +116,7 @@ void
LocalMergeSortSourceOperatorX::init_dependencies_and_sorter() {
_sorters.resize(_parallel_tasks);
}
-Status LocalMergeSortSourceOperatorX::get_block(RuntimeState* state, Block*
block, bool* eos) {
+Status LocalMergeSortSourceOperatorX::get_block_impl(RuntimeState* state,
Block* block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
diff --git a/be/src/exec/operator/local_merge_sort_source_operator.h
b/be/src/exec/operator/local_merge_sort_source_operator.h
index 6219edc4224..b22193c3686 100644
--- a/be/src/exec/operator/local_merge_sort_source_operator.h
+++ b/be/src/exec/operator/local_merge_sort_source_operator.h
@@ -83,7 +83,7 @@ public:
LocalMergeSortSourceOperatorX() : _merge_by_exchange(false), _offset(0) {}
#endif
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
diff --git a/be/src/exec/operator/maxcompute_table_sink_operator.h
b/be/src/exec/operator/maxcompute_table_sink_operator.h
index e1ec3b20ab1..3332143080b 100644
--- a/be/src/exec/operator/maxcompute_table_sink_operator.h
+++ b/be/src/exec/operator/maxcompute_table_sink_operator.h
@@ -62,7 +62,7 @@ public:
return VExpr::open(_output_vexpr_ctxs, state);
}
- Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/memory_scratch_sink_operator.cpp
b/be/src/exec/operator/memory_scratch_sink_operator.cpp
index ff9e4d4f508..5635a65168f 100644
--- a/be/src/exec/operator/memory_scratch_sink_operator.cpp
+++ b/be/src/exec/operator/memory_scratch_sink_operator.cpp
@@ -86,7 +86,7 @@ Status MemoryScratchSinkOperatorX::prepare(RuntimeState*
state) {
return Status::OK();
}
-Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, Block*
input_block, bool eos) {
+Status MemoryScratchSinkOperatorX::sink_impl(RuntimeState* state, Block*
input_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
if (nullptr == input_block || 0 == input_block->rows()) {
diff --git a/be/src/exec/operator/memory_scratch_sink_operator.h
b/be/src/exec/operator/memory_scratch_sink_operator.h
index 77fadc0c8da..f12e118e75e 100644
--- a/be/src/exec/operator/memory_scratch_sink_operator.h
+++ b/be/src/exec/operator/memory_scratch_sink_operator.h
@@ -57,7 +57,7 @@ public:
Status init(const TDataSink& thrift_sink) override;
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
private:
friend class MemoryScratchSinkLocalState;
diff --git a/be/src/exec/operator/mock_operator.h
b/be/src/exec/operator/mock_operator.h
index a83b5de5448..3c8d133def4 100644
--- a/be/src/exec/operator/mock_operator.h
+++ b/be/src/exec/operator/mock_operator.h
@@ -42,7 +42,7 @@ public:
ENABLE_FACTORY_CREATOR(MockOperatorX);
MockOperatorX() = default;
- Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
if (_outout_blocks.empty()) {
*eos = true;
return Status::OK();
diff --git a/be/src/exec/operator/mock_scan_operator.h
b/be/src/exec/operator/mock_scan_operator.h
index 3250a4219b3..2217dc6f87b 100644
--- a/be/src/exec/operator/mock_scan_operator.h
+++ b/be/src/exec/operator/mock_scan_operator.h
@@ -88,7 +88,7 @@ public:
_output_blocks.push_back(std::move(block));
}
- Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
if (_output_blocks.empty()) {
*eos = true;
return Status::OK();
diff --git a/be/src/exec/operator/multi_cast_data_stream_sink.cpp
b/be/src/exec/operator/multi_cast_data_stream_sink.cpp
index 2d2ddcce458..7a13a05eaeb 100644
--- a/be/src/exec/operator/multi_cast_data_stream_sink.cpp
+++ b/be/src/exec/operator/multi_cast_data_stream_sink.cpp
@@ -66,7 +66,7 @@ std::string
MultiCastDataStreamSinkLocalState::debug_string(int indentation_leve
return fmt::to_string(debug_string_buffer);
}
-Status MultiCastDataStreamSinkOperatorX::sink(RuntimeState* state, Block*
in_block, bool eos) {
+Status MultiCastDataStreamSinkOperatorX::sink_impl(RuntimeState* state, Block*
in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
if (in_block->rows() > 0 || eos) {
diff --git a/be/src/exec/operator/multi_cast_data_stream_sink.h
b/be/src/exec/operator/multi_cast_data_stream_sink.h
index d1fcc878924..0b0dc7ca537 100644
--- a/be/src/exec/operator/multi_cast_data_stream_sink.h
+++ b/be/src/exec/operator/multi_cast_data_stream_sink.h
@@ -56,7 +56,7 @@ public:
_num_dests(sources.size()) {}
~MultiCastDataStreamSinkOperatorX() override = default;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
std::shared_ptr<BasicSharedState> create_shared_state() const override;
diff --git a/be/src/exec/operator/multi_cast_data_stream_source.cpp
b/be/src/exec/operator/multi_cast_data_stream_source.cpp
index 8a669d3fbff..2b9e663ce07 100644
--- a/be/src/exec/operator/multi_cast_data_stream_source.cpp
+++ b/be/src/exec/operator/multi_cast_data_stream_source.cpp
@@ -79,8 +79,8 @@ Status
MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
return Base::close(state);
}
-Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
Block* block,
- bool* eos) {
+Status MultiCastDataStreamerSourceOperatorX::get_block_impl(RuntimeState*
state, Block* block,
+ bool* eos) {
//auto& local_state = get_local_state(state);
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/multi_cast_data_stream_source.h
b/be/src/exec/operator/multi_cast_data_stream_source.h
index f7ed78e376c..31488df78da 100644
--- a/be/src/exec/operator/multi_cast_data_stream_source.h
+++ b/be/src/exec/operator/multi_cast_data_stream_source.h
@@ -104,7 +104,7 @@ public:
return Status::OK();
}
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
bool is_source() const override { return true; }
diff --git a/be/src/exec/operator/nested_loop_join_build_operator.cpp
b/be/src/exec/operator/nested_loop_join_build_operator.cpp
index b55b22d1a58..857ac9318f2 100644
--- a/be/src/exec/operator/nested_loop_join_build_operator.cpp
+++ b/be/src/exec/operator/nested_loop_join_build_operator.cpp
@@ -93,7 +93,8 @@ Status
NestedLoopJoinBuildSinkOperatorX::prepare(RuntimeState* state) {
return VExpr::open(_filter_src_expr_ctxs, state);
}
-Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state,
Block* block, bool eos) {
+Status NestedLoopJoinBuildSinkOperatorX::sink_impl(doris::RuntimeState* state,
Block* block,
+ bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
diff --git a/be/src/exec/operator/nested_loop_join_build_operator.h
b/be/src/exec/operator/nested_loop_join_build_operator.h
index 265a8767b3e..5984aeedf05 100644
--- a/be/src/exec/operator/nested_loop_join_build_operator.h
+++ b/be/src/exec/operator/nested_loop_join_build_operator.h
@@ -64,7 +64,7 @@ public:
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
diff --git a/be/src/exec/operator/olap_table_sink_operator.h
b/be/src/exec/operator/olap_table_sink_operator.h
index e8261daf46b..9567b82e082 100644
--- a/be/src/exec/operator/olap_table_sink_operator.h
+++ b/be/src/exec/operator/olap_table_sink_operator.h
@@ -57,7 +57,7 @@ public:
return VExpr::open(_output_vexpr_ctxs, state);
}
- Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/olap_table_sink_v2_operator.h
b/be/src/exec/operator/olap_table_sink_v2_operator.h
index 358cb4c10e2..038484c83ee 100644
--- a/be/src/exec/operator/olap_table_sink_v2_operator.h
+++ b/be/src/exec/operator/olap_table_sink_v2_operator.h
@@ -58,7 +58,7 @@ public:
return VExpr::open(_output_vexpr_ctxs, state);
}
- Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/operator.cpp
b/be/src/exec/operator/operator.cpp
index 1ce7dc8727d..b833ca83ff3 100644
--- a/be/src/exec/operator/operator.cpp
+++ b/be/src/exec/operator/operator.cpp
@@ -729,13 +729,15 @@ Status
PipelineXSinkLocalState<SharedState>::close(RuntimeState* state, Status e
}
template <typename LocalStateType>
-Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state,
Block* block, bool* eos) {
+Status StreamingOperatorX<LocalStateType>::get_block_impl(RuntimeState* state,
Block* block,
+ bool* eos) {
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child->get_block_after_projects(state,
block, eos));
return pull(state, block, eos);
}
template <typename LocalStateType>
-Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state,
Block* block, bool* eos) {
+Status StatefulOperatorX<LocalStateType>::get_block_impl(RuntimeState* state,
Block* block,
+ bool* eos) {
auto& local_state = get_local_state(state);
if (need_more_input_data(state)) {
local_state._child_block->clear_column_data(
diff --git a/be/src/exec/operator/operator.h b/be/src/exec/operator/operator.h
index 565a650a0da..a09638f6d71 100644
--- a/be/src/exec/operator/operator.h
+++ b/be/src/exec/operator/operator.h
@@ -620,7 +620,12 @@ public:
return result.value()->is_finished();
}
- [[nodiscard]] virtual Status sink(RuntimeState* state, Block* block, bool
eos) = 0;
+ [[nodiscard]] Status sink(RuntimeState* state, Block* block, bool eos) {
+ RETURN_IF_ERROR(block->check_type_and_column());
+ return sink_impl(state, block, eos);
+ }
+
+ [[nodiscard]] virtual Status sink_impl(RuntimeState* state, Block* block,
bool eos) = 0;
[[nodiscard]] virtual Status setup_local_state(RuntimeState* state,
LocalSinkStateInfo& info) =
0;
@@ -877,7 +882,13 @@ public:
Status prepare(RuntimeState* state) override;
Status terminate(RuntimeState* state) override;
- [[nodiscard]] virtual Status get_block(RuntimeState* state, Block* block,
bool* eos) = 0;
+ [[nodiscard]] Status get_block(RuntimeState* state, Block* block, bool*
eos) {
+ RETURN_IF_ERROR(get_block_impl(state, block, eos));
+ RETURN_IF_ERROR(block->check_type_and_column());
+ return Status::OK();
+ }
+
+ [[nodiscard]] virtual Status get_block_impl(RuntimeState* state, Block*
block, bool* eos) = 0;
Status close(RuntimeState* state) override;
@@ -1070,7 +1081,7 @@ public:
virtual ~StreamingOperatorX() = default;
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
virtual Status pull(RuntimeState* state, Block* block, bool* eos) = 0;
};
@@ -1096,7 +1107,7 @@ public:
using OperatorX<LocalStateType>::get_local_state;
- [[nodiscard]] Status get_block(RuntimeState* state, Block* block, bool*
eos) override;
+ [[nodiscard]] Status get_block_impl(RuntimeState* state, Block* block,
bool* eos) override;
[[nodiscard]] virtual Status pull(RuntimeState* state, Block* block, bool*
eos) const = 0;
[[nodiscard]] virtual Status push(RuntimeState* state, Block* input_block,
bool eos) const = 0;
@@ -1170,7 +1181,7 @@ public:
[[nodiscard]] bool is_source() const override { return true; }
- Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
*eos = _eos;
return Status::OK();
}
@@ -1225,7 +1236,7 @@ class DummySinkOperatorX final : public
DataSinkOperatorX<DummySinkLocalState> {
public:
DummySinkOperatorX(int op_id, int node_id, int dest_id)
: DataSinkOperatorX<DummySinkLocalState>(op_id, node_id, dest_id)
{}
- Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
return _return_eof ? Status::Error<ErrorCode::END_OF_FILE>("source
have closed")
: Status::OK();
}
diff --git a/be/src/exec/operator/partition_sort_sink_operator.cpp
b/be/src/exec/operator/partition_sort_sink_operator.cpp
index 76695b8ee32..0f358fb70ad 100644
--- a/be/src/exec/operator/partition_sort_sink_operator.cpp
+++ b/be/src/exec/operator/partition_sort_sink_operator.cpp
@@ -111,7 +111,7 @@ Status PartitionSortSinkOperatorX::prepare(RuntimeState*
state) {
return Status::OK();
}
-Status PartitionSortSinkOperatorX::sink(RuntimeState* state, Block*
input_block, bool eos) {
+Status PartitionSortSinkOperatorX::sink_impl(RuntimeState* state, Block*
input_block, bool eos) {
auto& local_state = get_local_state(state);
auto current_rows = input_block->rows();
SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/partition_sort_sink_operator.h
b/be/src/exec/operator/partition_sort_sink_operator.h
index 48b709d3f1f..68a698dff8f 100644
--- a/be/src/exec/operator/partition_sort_sink_operator.h
+++ b/be/src/exec/operator/partition_sort_sink_operator.h
@@ -92,7 +92,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
return DataDistribution(ExchangeType::HASH_SHUFFLE,
_distribute_exprs);
diff --git a/be/src/exec/operator/partition_sort_source_operator.cpp
b/be/src/exec/operator/partition_sort_source_operator.cpp
index 3db89fb4cd2..7e07cd9d307 100644
--- a/be/src/exec/operator/partition_sort_source_operator.cpp
+++ b/be/src/exec/operator/partition_sort_source_operator.cpp
@@ -32,8 +32,8 @@ Status PartitionSortSourceLocalState::init(RuntimeState*
state, LocalStateInfo&
return Status::OK();
}
-Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, Block*
output_block,
- bool* eos) {
+Status PartitionSortSourceOperatorX::get_block_impl(RuntimeState* state,
Block* output_block,
+ bool* eos) {
RETURN_IF_CANCELLED(state);
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/partition_sort_source_operator.h
b/be/src/exec/operator/partition_sort_source_operator.h
index 82bf052f1c0..def4cab8ea0 100644
--- a/be/src/exec/operator/partition_sort_source_operator.h
+++ b/be/src/exec/operator/partition_sort_source_operator.h
@@ -52,7 +52,7 @@ public:
#ifdef BE_TEST
PartitionSortSourceOperatorX() = default;
#endif
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
bool is_source() const override { return true; }
diff --git a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
index 42739b9b2ac..5563eafdc50 100644
--- a/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_sink_operator.cpp
@@ -149,7 +149,8 @@ Status PartitionedAggSinkOperatorX::prepare(RuntimeState*
state) {
return _agg_sink_operator->prepare(state);
}
-Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, Block*
in_block, bool eos) {
+Status PartitionedAggSinkOperatorX::sink_impl(doris::RuntimeState* state,
Block* in_block,
+ bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/partitioned_aggregation_sink_operator.h
b/be/src/exec/operator/partitioned_aggregation_sink_operator.h
index 56834757bd3..b7916f755c4 100644
--- a/be/src/exec/operator/partitioned_aggregation_sink_operator.h
+++ b/be/src/exec/operator/partitioned_aggregation_sink_operator.h
@@ -114,7 +114,7 @@ public:
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
void update_operator(const TPlanNode& tnode, bool
followed_by_shuffled_operator,
bool require_bucket_distribution) override {
diff --git a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
index 4d69c49bb7a..c23d3c83dda 100644
--- a/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
+++ b/be/src/exec/operator/partitioned_aggregation_source_operator.cpp
@@ -226,7 +226,7 @@ Status
PartitionedAggSourceOperatorX::revoke_memory(RuntimeState* state) {
return Status::OK();
}
-Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, Block*
block, bool* eos) {
+Status PartitionedAggSourceOperatorX::get_block_impl(RuntimeState* state,
Block* block, bool* eos) {
auto& local_state = get_local_state(state);
Status status;
diff --git a/be/src/exec/operator/partitioned_aggregation_source_operator.h
b/be/src/exec/operator/partitioned_aggregation_source_operator.h
index 3e631d7c10c..540b60064bb 100644
--- a/be/src/exec/operator/partitioned_aggregation_source_operator.h
+++ b/be/src/exec/operator/partitioned_aggregation_source_operator.h
@@ -126,7 +126,7 @@ public:
Status close(RuntimeState* state) override;
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
bool is_source() const override { return true; }
diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
index 4810aca89fc..410556911f0 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.cpp
@@ -1013,7 +1013,8 @@ Status
PartitionedHashJoinProbeOperatorX::revoke_memory(RuntimeState* state) {
return local_state.revoke_build_data(state);
}
-Status PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state,
Block* block, bool* eos) {
+Status PartitionedHashJoinProbeOperatorX::get_block_impl(RuntimeState* state,
Block* block,
+ bool* eos) {
*eos = false;
auto& local_state = get_local_state(state);
const bool is_spilled = local_state._shared_state->_is_spilled;
diff --git a/be/src/exec/operator/partitioned_hash_join_probe_operator.h
b/be/src/exec/operator/partitioned_hash_join_probe_operator.h
index 15767504d85..01787014c8d 100644
--- a/be/src/exec/operator/partitioned_hash_join_probe_operator.h
+++ b/be/src/exec/operator/partitioned_hash_join_probe_operator.h
@@ -222,7 +222,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
- [[nodiscard]] Status get_block(RuntimeState* state, Block* block, bool*
eos) override;
+ [[nodiscard]] Status get_block_impl(RuntimeState* state, Block* block,
bool* eos) override;
Status push(RuntimeState* state, Block* input_block, bool eos) const
override;
Status pull(doris::RuntimeState* state, Block* output_block, bool* eos)
const override;
diff --git a/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
b/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
index 4a0d4d0f42e..11c137ba8c2 100644
--- a/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/exec/operator/partitioned_hash_join_sink_operator.cpp
@@ -513,7 +513,7 @@ void
PartitionedHashJoinSinkLocalState::update_profile_from_inner() {
#undef UPDATE_COUNTER_FROM_INNER
-Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState* state, Block*
in_block, bool eos) {
+Status PartitionedHashJoinSinkOperatorX::sink_impl(RuntimeState* state, Block*
in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
const auto rows = in_block->rows();
diff --git a/be/src/exec/operator/partitioned_hash_join_sink_operator.h
b/be/src/exec/operator/partitioned_hash_join_sink_operator.h
index c6dc2cdfb94..f48fe8f7137 100644
--- a/be/src/exec/operator/partitioned_hash_join_sink_operator.h
+++ b/be/src/exec/operator/partitioned_hash_join_sink_operator.h
@@ -115,7 +115,7 @@ public:
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
bool should_dry_run(RuntimeState* state) override { return false; }
diff --git a/be/src/exec/operator/rec_cte_anchor_sink_operator.h
b/be/src/exec/operator/rec_cte_anchor_sink_operator.h
index 42661ee4181..4e8596497ad 100644
--- a/be/src/exec/operator/rec_cte_anchor_sink_operator.h
+++ b/be/src/exec/operator/rec_cte_anchor_sink_operator.h
@@ -79,7 +79,7 @@ public:
return Base::close(state);
}
- Status sink(RuntimeState* state, Block* input_block, bool eos) override {
+ Status sink_impl(RuntimeState* state, Block* input_block, bool eos)
override {
auto& local_state = get_local_state(state);
RETURN_IF_ERROR(_notify_rec_side_ready_if_needed(state));
diff --git a/be/src/exec/operator/rec_cte_scan_operator.h
b/be/src/exec/operator/rec_cte_scan_operator.h
index adea08c4188..c374fe509ca 100644
--- a/be/src/exec/operator/rec_cte_scan_operator.h
+++ b/be/src/exec/operator/rec_cte_scan_operator.h
@@ -68,7 +68,7 @@ public:
const DescriptorTbl& descs)
: OperatorX<RecCTEScanLocalState>(pool, tnode, operator_id, descs)
{}
- Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
auto& local_state = get_local_state(state);
if (local_state._blocks.empty()) {
diff --git a/be/src/exec/operator/rec_cte_sink_operator.h
b/be/src/exec/operator/rec_cte_sink_operator.h
index a071cf55761..a351effb34a 100644
--- a/be/src/exec/operator/rec_cte_sink_operator.h
+++ b/be/src/exec/operator/rec_cte_sink_operator.h
@@ -80,7 +80,7 @@ public:
return {ExchangeType::NOOP};
}
- Status sink(RuntimeState* state, Block* input_block, bool eos) override {
+ Status sink_impl(RuntimeState* state, Block* input_block, bool eos)
override {
auto& local_state = get_local_state(state);
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)input_block->rows());
diff --git a/be/src/exec/operator/rec_cte_source_operator.h
b/be/src/exec/operator/rec_cte_source_operator.h
index 83e55937998..07f1c2c39a6 100644
--- a/be/src/exec/operator/rec_cte_source_operator.h
+++ b/be/src/exec/operator/rec_cte_source_operator.h
@@ -209,7 +209,7 @@ public:
return {ExchangeType::NOOP};
}
- Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
auto& local_state = get_local_state(state);
auto& ctx = local_state._shared_state;
ctx->update_ready_to_return();
diff --git a/be/src/exec/operator/result_file_sink_operator.cpp
b/be/src/exec/operator/result_file_sink_operator.cpp
index edc12412fce..7207a0dc503 100644
--- a/be/src/exec/operator/result_file_sink_operator.cpp
+++ b/be/src/exec/operator/result_file_sink_operator.cpp
@@ -150,7 +150,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state,
Status exec_status)
return Base::close(state, exec_status);
}
-Status ResultFileSinkOperatorX::sink(RuntimeState* state, Block* in_block,
bool eos) {
+Status ResultFileSinkOperatorX::sink_impl(RuntimeState* state, Block*
in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/result_file_sink_operator.h
b/be/src/exec/operator/result_file_sink_operator.h
index 8c759456b8e..6151918df9b 100644
--- a/be/src/exec/operator/result_file_sink_operator.h
+++ b/be/src/exec/operator/result_file_sink_operator.h
@@ -61,7 +61,7 @@ public:
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
private:
friend class ResultFileSinkLocalState;
diff --git a/be/src/exec/operator/result_sink_operator.cpp
b/be/src/exec/operator/result_sink_operator.cpp
index 2384a2e5f60..021ffb60983 100644
--- a/be/src/exec/operator/result_sink_operator.cpp
+++ b/be/src/exec/operator/result_sink_operator.cpp
@@ -132,7 +132,7 @@ Status ResultSinkOperatorX::prepare(RuntimeState* state) {
return VExpr::open(_output_vexpr_ctxs, state);
}
-Status ResultSinkOperatorX::sink(RuntimeState* state, Block* block, bool eos) {
+Status ResultSinkOperatorX::sink_impl(RuntimeState* state, Block* block, bool
eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
diff --git a/be/src/exec/operator/result_sink_operator.h
b/be/src/exec/operator/result_sink_operator.h
index b3a7c504a5d..4ead2985d85 100644
--- a/be/src/exec/operator/result_sink_operator.h
+++ b/be/src/exec/operator/result_sink_operator.h
@@ -159,7 +159,7 @@ public:
const std::vector<TExpr>& select_exprs, const
TResultSink& sink);
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
private:
friend class ResultSinkLocalState;
diff --git a/be/src/exec/operator/scan_operator.cpp
b/be/src/exec/operator/scan_operator.cpp
index 7904413f77b..8498512b16b 100644
--- a/be/src/exec/operator/scan_operator.cpp
+++ b/be/src/exec/operator/scan_operator.cpp
@@ -1348,7 +1348,7 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
}
template <typename LocalStateType>
-Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, Block*
block, bool* eos) {
+Status ScanOperatorX<LocalStateType>::get_block_impl(RuntimeState* state,
Block* block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/scan_operator.h
b/be/src/exec/operator/scan_operator.h
index f3d65df35ad..bebcc7ec708 100644
--- a/be/src/exec/operator/scan_operator.h
+++ b/be/src/exec/operator/scan_operator.h
@@ -360,9 +360,9 @@ class ScanOperatorX : public OperatorX<LocalStateType> {
public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
Status get_block_after_projects(RuntimeState* state, Block* block, bool*
eos) override {
- Status status = get_block(state, block, eos);
+ Status status = OperatorX<LocalStateType>::get_block(state, block,
eos);
if (status.ok()) {
state->get_local_state(operator_id())->update_output_block_counters(*block);
}
diff --git a/be/src/exec/operator/schema_scan_operator.cpp
b/be/src/exec/operator/schema_scan_operator.cpp
index 3d5922573b9..f50d6a003bb 100644
--- a/be/src/exec/operator/schema_scan_operator.cpp
+++ b/be/src/exec/operator/schema_scan_operator.cpp
@@ -208,7 +208,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) {
return Status::OK();
}
-Status SchemaScanOperatorX::get_block(RuntimeState* state, Block* block, bool*
eos) {
+Status SchemaScanOperatorX::get_block_impl(RuntimeState* state, Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_CANCELLED(state);
diff --git a/be/src/exec/operator/schema_scan_operator.h
b/be/src/exec/operator/schema_scan_operator.h
index 49729cabf33..fc7c79067d9 100644
--- a/be/src/exec/operator/schema_scan_operator.h
+++ b/be/src/exec/operator/schema_scan_operator.h
@@ -63,7 +63,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
[[nodiscard]] bool is_source() const override { return true; }
diff --git a/be/src/exec/operator/set_probe_sink_operator.cpp
b/be/src/exec/operator/set_probe_sink_operator.cpp
index 6954c93ee65..c3fb08a3725 100644
--- a/be/src/exec/operator/set_probe_sink_operator.cpp
+++ b/be/src/exec/operator/set_probe_sink_operator.cpp
@@ -61,7 +61,8 @@ Status
SetProbeSinkOperatorX<is_intersect>::prepare(RuntimeState* state) {
}
template <bool is_intersect>
-Status SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, Block*
in_block, bool eos) {
+Status SetProbeSinkOperatorX<is_intersect>::sink_impl(RuntimeState* state,
Block* in_block,
+ bool eos) {
RETURN_IF_CANCELLED(state);
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/set_probe_sink_operator.h
b/be/src/exec/operator/set_probe_sink_operator.h
index c736e2af229..7c294fe7254 100644
--- a/be/src/exec/operator/set_probe_sink_operator.h
+++ b/be/src/exec/operator/set_probe_sink_operator.h
@@ -101,7 +101,7 @@ public:
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
return _is_colocate ?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
diff --git a/be/src/exec/operator/set_sink_operator.cpp
b/be/src/exec/operator/set_sink_operator.cpp
index ec2c717c2cf..614a45afbc8 100644
--- a/be/src/exec/operator/set_sink_operator.cpp
+++ b/be/src/exec/operator/set_sink_operator.cpp
@@ -67,7 +67,7 @@ Status SetSinkLocalState<is_intersect>::close(RuntimeState*
state, Status exec_s
}
template <bool is_intersect>
-Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, Block*
in_block, bool eos) {
+Status SetSinkOperatorX<is_intersect>::sink_impl(RuntimeState* state, Block*
in_block, bool eos) {
RETURN_IF_CANCELLED(state);
auto& local_state = get_local_state(state);
diff --git a/be/src/exec/operator/set_sink_operator.h
b/be/src/exec/operator/set_sink_operator.h
index e1c937f1471..5ac71634cd8 100644
--- a/be/src/exec/operator/set_sink_operator.h
+++ b/be/src/exec/operator/set_sink_operator.h
@@ -112,7 +112,7 @@ public:
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
return _is_colocate ?
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE,
_partition_exprs);
diff --git a/be/src/exec/operator/set_source_operator.cpp
b/be/src/exec/operator/set_source_operator.cpp
index 5cc299e7dbb..63afdae0814 100644
--- a/be/src/exec/operator/set_source_operator.cpp
+++ b/be/src/exec/operator/set_source_operator.cpp
@@ -74,7 +74,8 @@ Status SetSourceLocalState<is_intersect>::open(RuntimeState*
state) {
}
template <bool is_intersect>
-Status SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, Block*
block, bool* eos) {
+Status SetSourceOperatorX<is_intersect>::get_block_impl(RuntimeState* state,
Block* block,
+ bool* eos) {
RETURN_IF_CANCELLED(state);
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/set_source_operator.h
b/be/src/exec/operator/set_source_operator.h
index 40887f35fab..cacfd6a335f 100644
--- a/be/src/exec/operator/set_source_operator.h
+++ b/be/src/exec/operator/set_source_operator.h
@@ -85,7 +85,7 @@ public:
: DataDistribution(ExchangeType::HASH_SHUFFLE);
}
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
Status set_child(OperatorPtr child) override {
Base::_child = child;
return Status::OK();
diff --git a/be/src/exec/operator/sort_sink_operator.cpp
b/be/src/exec/operator/sort_sink_operator.cpp
index 9b045da6d8c..9c77b52bd02 100644
--- a/be/src/exec/operator/sort_sink_operator.cpp
+++ b/be/src/exec/operator/sort_sink_operator.cpp
@@ -131,7 +131,7 @@ Status SortSinkOperatorX::prepare(RuntimeState* state) {
return VExpr::open(_ordering_expr_ctxs, state);
}
-Status SortSinkOperatorX::sink(doris::RuntimeState* state, Block* in_block,
bool eos) {
+Status SortSinkOperatorX::sink_impl(doris::RuntimeState* state, Block*
in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/sort_sink_operator.h
b/be/src/exec/operator/sort_sink_operator.h
index eda5bb31b53..b0c66483b9b 100644
--- a/be/src/exec/operator/sort_sink_operator.h
+++ b/be/src/exec/operator/sort_sink_operator.h
@@ -76,7 +76,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
DataDistribution required_data_distribution(RuntimeState* /*state*/) const
override {
if (_is_analytic_sort) {
return _is_colocate && _require_bucket_distribution
diff --git a/be/src/exec/operator/sort_source_operator.cpp
b/be/src/exec/operator/sort_source_operator.cpp
index 681411d46ef..88ab9261b4a 100644
--- a/be/src/exec/operator/sort_source_operator.cpp
+++ b/be/src/exec/operator/sort_source_operator.cpp
@@ -30,7 +30,7 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnod
const DescriptorTbl& descs)
: OperatorX<SortLocalState>(pool, tnode, operator_id, descs) {}
-Status SortSourceOperatorX::get_block(RuntimeState* state, Block* block, bool*
eos) {
+Status SortSourceOperatorX::get_block_impl(RuntimeState* state, Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
diff --git a/be/src/exec/operator/sort_source_operator.h
b/be/src/exec/operator/sort_source_operator.h
index c2a63b82ccd..79e59f635e0 100644
--- a/be/src/exec/operator/sort_source_operator.h
+++ b/be/src/exec/operator/sort_source_operator.h
@@ -43,7 +43,7 @@ public:
#ifdef BE_TEST
SortSourceOperatorX() = default;
#endif
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
bool is_source() const override { return true; }
diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
index 67040e2762d..5bc2bab0d14 100644
--- a/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
+++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.cpp
@@ -118,7 +118,7 @@ Status
SpillIcebergTableSinkOperatorX::prepare(RuntimeState* state) {
return VExpr::open(_output_vexpr_ctxs, state);
}
-Status SpillIcebergTableSinkOperatorX::sink(RuntimeState* state, Block*
in_block, bool eos) {
+Status SpillIcebergTableSinkOperatorX::sink_impl(RuntimeState* state, Block*
in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/spill_iceberg_table_sink_operator.h
b/be/src/exec/operator/spill_iceberg_table_sink_operator.h
index b754e998896..6da926ae20f 100644
--- a/be/src/exec/operator/spill_iceberg_table_sink_operator.h
+++ b/be/src/exec/operator/spill_iceberg_table_sink_operator.h
@@ -63,7 +63,7 @@ public:
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
diff --git a/be/src/exec/operator/spill_sort_sink_operator.cpp
b/be/src/exec/operator/spill_sort_sink_operator.cpp
index f88774d3190..82a1bea731c 100644
--- a/be/src/exec/operator/spill_sort_sink_operator.cpp
+++ b/be/src/exec/operator/spill_sort_sink_operator.cpp
@@ -143,7 +143,7 @@ size_t
SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
return mem_size > state->spill_min_revocable_mem() ? mem_size : 0;
}
-Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, Block*
in_block, bool eos) {
+Status SpillSortSinkOperatorX::sink_impl(doris::RuntimeState* state, Block*
in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/spill_sort_sink_operator.h
b/be/src/exec/operator/spill_sort_sink_operator.h
index 19cfa1b9274..cf27bd3765b 100644
--- a/be/src/exec/operator/spill_sort_sink_operator.h
+++ b/be/src/exec/operator/spill_sort_sink_operator.h
@@ -79,7 +79,7 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
DataDistribution required_data_distribution(RuntimeState* state) const
override {
return _sort_sink_operator->required_data_distribution(state);
}
diff --git a/be/src/exec/operator/spill_sort_source_operator.cpp
b/be/src/exec/operator/spill_sort_source_operator.cpp
index a745bf2858d..c40ab3e255a 100644
--- a/be/src/exec/operator/spill_sort_source_operator.cpp
+++ b/be/src/exec/operator/spill_sort_source_operator.cpp
@@ -244,7 +244,7 @@ Status SpillSortSourceOperatorX::close(RuntimeState* state)
{
return _sort_source_operator->close(state);
}
-Status SpillSortSourceOperatorX::get_block(RuntimeState* state, Block* block,
bool* eos) {
+Status SpillSortSourceOperatorX::get_block_impl(RuntimeState* state, Block*
block, bool* eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/exec/operator/spill_sort_source_operator.h
b/be/src/exec/operator/spill_sort_source_operator.h
index 196ab6474b2..dbc8c3120ea 100644
--- a/be/src/exec/operator/spill_sort_source_operator.h
+++ b/be/src/exec/operator/spill_sort_source_operator.h
@@ -81,7 +81,7 @@ public:
Status close(RuntimeState* state) override;
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
bool is_source() const override { return true; }
diff --git a/be/src/exec/operator/tvf_table_sink_operator.h
b/be/src/exec/operator/tvf_table_sink_operator.h
index 7c20f3c482e..e1a06e675c9 100644
--- a/be/src/exec/operator/tvf_table_sink_operator.h
+++ b/be/src/exec/operator/tvf_table_sink_operator.h
@@ -64,7 +64,7 @@ public:
return VExpr::open(_output_vexpr_ctxs, state);
}
- Status sink(RuntimeState* state, Block* in_block, bool eos) override {
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/exec/operator/union_sink_operator.cpp
b/be/src/exec/operator/union_sink_operator.cpp
index dc6fb2e0d9a..b0f7cf619a8 100644
--- a/be/src/exec/operator/union_sink_operator.cpp
+++ b/be/src/exec/operator/union_sink_operator.cpp
@@ -93,7 +93,7 @@ Status UnionSinkOperatorX::prepare(RuntimeState* state) {
return Status::OK();
}
-Status UnionSinkOperatorX::sink(RuntimeState* state, Block* in_block, bool
eos) {
+Status UnionSinkOperatorX::sink_impl(RuntimeState* state, Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
if (local_state.low_memory_mode()) {
set_low_memory_mode(state);
diff --git a/be/src/exec/operator/union_sink_operator.h
b/be/src/exec/operator/union_sink_operator.h
index 14978ae4526..4da532b9f0b 100644
--- a/be/src/exec/operator/union_sink_operator.h
+++ b/be/src/exec/operator/union_sink_operator.h
@@ -99,7 +99,7 @@ public:
Status prepare(RuntimeState* state) override;
- Status sink(RuntimeState* state, Block* in_block, bool eos) override;
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
std::shared_ptr<BasicSharedState> create_shared_state() const override {
if (_cur_child_id > 0) {
diff --git a/be/src/exec/operator/union_source_operator.cpp
b/be/src/exec/operator/union_source_operator.cpp
index 0efe4ed4efd..6cc39ebb7ce 100644
--- a/be/src/exec/operator/union_source_operator.cpp
+++ b/be/src/exec/operator/union_source_operator.cpp
@@ -100,7 +100,7 @@ std::string UnionSourceLocalState::debug_string(int
indentation_level) const {
return fmt::to_string(debug_string_buffer);
}
-Status UnionSourceOperatorX::get_block(RuntimeState* state, Block* block,
bool* eos) {
+Status UnionSourceOperatorX::get_block_impl(RuntimeState* state, Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
Defer set_eos {[&]() {
// the eos check of union operator is complex, need check all logical
if you want modify
diff --git a/be/src/exec/operator/union_source_operator.h
b/be/src/exec/operator/union_source_operator.h
index 748c9a48e60..01de0cbb05b 100644
--- a/be/src/exec/operator/union_source_operator.h
+++ b/be/src/exec/operator/union_source_operator.h
@@ -68,7 +68,7 @@ public:
#ifdef BE_TEST
UnionSourceOperatorX(int child_size) : _child_size(child_size) {}
#endif
- Status get_block(RuntimeState* state, Block* block, bool* eos) override;
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
bool is_source() const override { return true; }
diff --git a/be/src/exec/pipeline/pipeline_task.cpp
b/be/src/exec/pipeline/pipeline_task.cpp
index fe75d7b499d..dfbb0955145 100644
--- a/be/src/exec/pipeline/pipeline_task.cpp
+++ b/be/src/exec/pipeline/pipeline_task.cpp
@@ -647,7 +647,6 @@ Status PipelineTask::execute(bool* done) {
bool eos = false;
RETURN_IF_ERROR(_root->get_block_after_projects(_state, block,
&eos));
- RETURN_IF_ERROR(block->check_type_and_column());
_eos = eos;
}
@@ -718,7 +717,7 @@ Status PipelineTask::execute(bool* done) {
}
}
});
- RETURN_IF_ERROR(block->check_type_and_column());
+
status = _sink->sink(_state, block, _eos);
if (_eos) {
diff --git a/be/test/exec/operator/agg_operator_test.cpp
b/be/test/exec/operator/agg_operator_test.cpp
index 02b6a79bb72..bac6f0da381 100644
--- a/be/test/exec/operator/agg_operator_test.cpp
+++ b/be/test/exec/operator/agg_operator_test.cpp
@@ -104,7 +104,7 @@ class MockDistributionOperator final : public
OperatorX<MockLocalState> {
public:
MockDistributionOperator(ExchangeType exchange_type) :
_exchange_type(exchange_type) {}
- Status get_block(RuntimeState* /*state*/, Block* /*block*/, bool* eos)
override {
+ Status get_block_impl(RuntimeState* /*state*/, Block* /*block*/, bool*
eos) override {
*eos = true;
return Status::OK();
}
diff --git a/be/test/exec/operator/analytic_sink_operator_test.cpp
b/be/test/exec/operator/analytic_sink_operator_test.cpp
index a64d16c676f..bd865b9aca9 100644
--- a/be/test/exec/operator/analytic_sink_operator_test.cpp
+++ b/be/test/exec/operator/analytic_sink_operator_test.cpp
@@ -40,7 +40,9 @@ public:
return Status::OK();
}
- Status get_block(RuntimeState* state, Block* block, bool* eos) override {
return Status::OK(); }
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
+ return Status::OK();
+ }
Status setup_local_state(RuntimeState* state, LocalStateInfo& info)
override {
return Status::OK();
}
diff --git a/be/test/exec/operator/partition_sort_sink_operator_test.cpp
b/be/test/exec/operator/partition_sort_sink_operator_test.cpp
index 744ca8e8452..36a90bf5a38 100644
--- a/be/test/exec/operator/partition_sort_sink_operator_test.cpp
+++ b/be/test/exec/operator/partition_sort_sink_operator_test.cpp
@@ -37,7 +37,9 @@ public:
return Status::OK();
}
- Status get_block(RuntimeState* state, Block* block, bool* eos) override {
return Status::OK(); }
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
+ return Status::OK();
+ }
Status setup_local_state(RuntimeState* state, LocalStateInfo& info)
override {
return Status::OK();
}
diff --git a/be/test/exec/operator/partitioned_aggregation_test_helper.h
b/be/test/exec/operator/partitioned_aggregation_test_helper.h
index 5ecfe8dd297..da0881e84ea 100644
--- a/be/test/exec/operator/partitioned_aggregation_test_helper.h
+++ b/be/test/exec/operator/partitioned_aggregation_test_helper.h
@@ -83,7 +83,9 @@ public:
return Status::OK();
}
- Status sink(RuntimeState* state, Block* in_block, bool eos) override {
return Status::OK(); }
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
+ return Status::OK();
+ }
};
class MockPartitionedAggLocalState : public PartitionedAggLocalState {
diff --git a/be/test/exec/operator/partitioned_hash_join_test_helper.h
b/be/test/exec/operator/partitioned_hash_join_test_helper.h
index 9dcbb7335f5..ba4ff661288 100644
--- a/be/test/exec/operator/partitioned_hash_join_test_helper.h
+++ b/be/test/exec/operator/partitioned_hash_join_test_helper.h
@@ -115,7 +115,9 @@ public:
return Status::OK();
}
- Status sink(RuntimeState* state, Block* in_block, bool eos) override {
return Status::OK(); }
+ Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override {
+ return Status::OK();
+ }
std::string get_memory_usage_debug_str(RuntimeState* state) const override
{ return "mock"; }
};
diff --git a/be/test/exec/operator/query_cache_operator_test.cpp
b/be/test/exec/operator/query_cache_operator_test.cpp
index a99e9bcb9d9..91c73b99077 100644
--- a/be/test/exec/operator/query_cache_operator_test.cpp
+++ b/be/test/exec/operator/query_cache_operator_test.cpp
@@ -36,7 +36,9 @@ public:
return Status::OK();
}
- Status get_block(RuntimeState* state, Block* block, bool* eos) override {
return Status::OK(); }
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
+ return Status::OK();
+ }
Status setup_local_state(RuntimeState* state, LocalStateInfo& info)
override {
return Status::OK();
}
diff --git a/be/test/exec/operator/sort_operator_test.cpp
b/be/test/exec/operator/sort_operator_test.cpp
index 310f3ffb4b6..0a26844b3bc 100644
--- a/be/test/exec/operator/sort_operator_test.cpp
+++ b/be/test/exec/operator/sort_operator_test.cpp
@@ -37,7 +37,9 @@ public:
return Status::OK();
}
- Status get_block(RuntimeState* state, Block* block, bool* eos) override {
return Status::OK(); }
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
+ return Status::OK();
+ }
Status setup_local_state(RuntimeState* state, LocalStateInfo& info)
override {
return Status::OK();
}
diff --git a/be/test/exec/operator/spill_sort_test_helper.h
b/be/test/exec/operator/spill_sort_test_helper.h
index c887212b2fd..81ca44ce2bd 100644
--- a/be/test/exec/operator/spill_sort_test_helper.h
+++ b/be/test/exec/operator/spill_sort_test_helper.h
@@ -53,7 +53,7 @@ public:
const DescriptorTbl& descs)
: SortSourceOperatorX(pool, tnode, operator_id, descs) {}
- Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
std::swap(*block, this->block);
*eos = this->eos;
return Status::OK();
diff --git a/be/test/exec/operator/streaming_agg_operator_test.cpp
b/be/test/exec/operator/streaming_agg_operator_test.cpp
index bbe54ebec5d..4596f040b03 100644
--- a/be/test/exec/operator/streaming_agg_operator_test.cpp
+++ b/be/test/exec/operator/streaming_agg_operator_test.cpp
@@ -66,7 +66,9 @@ public:
return Status::OK();
}
- Status get_block(RuntimeState* state, Block* block, bool* eos) override {
return Status::OK(); }
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
+ return Status::OK();
+ }
Status setup_local_state(RuntimeState* state, LocalStateInfo& info)
override {
return Status::OK();
}
diff --git a/be/test/exec/operator/table_function_operator_test.cpp
b/be/test/exec/operator/table_function_operator_test.cpp
index 913687414be..72e189d0fa8 100644
--- a/be/test/exec/operator/table_function_operator_test.cpp
+++ b/be/test/exec/operator/table_function_operator_test.cpp
@@ -53,7 +53,9 @@ public:
return Status::OK();
}
- Status get_block(RuntimeState* state, Block* block, bool* eos) override {
return Status::OK(); }
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
+ return Status::OK();
+ }
Status setup_local_state(RuntimeState* state, LocalStateInfo& info)
override {
return Status::OK();
}
diff --git a/be/test/testutil/mock/mock_operators.h
b/be/test/testutil/mock/mock_operators.h
index bba11eb7473..1077a767018 100644
--- a/be/test/testutil/mock/mock_operators.h
+++ b/be/test/testutil/mock/mock_operators.h
@@ -34,7 +34,7 @@ public:
return Status::OK();
}
- Status get_block(RuntimeState* state, Block* block, bool* eos) override {
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
block->swap(_block);
*eos = _eos;
return Status::OK();
@@ -57,7 +57,7 @@ public:
class MockSinkOperator final : public DataSinkOperatorXBase {
public:
- Status sink(RuntimeState* state, Block* block, bool eos) override { return
Status::OK(); }
+ Status sink_impl(RuntimeState* state, Block* block, bool eos) override {
return Status::OK(); }
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info)
override {
return Status::OK();
diff --git a/be/test/util/profile_spec_test.cpp
b/be/test/util/profile_spec_test.cpp
index 68a535c9cd0..172427395d0 100644
--- a/be/test/util/profile_spec_test.cpp
+++ b/be/test/util/profile_spec_test.cpp
@@ -97,7 +97,7 @@ private:
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) { return Status::OK(); }
Status close(RuntimeState* state) override { return Status::OK(); }
- Status get_block(RuntimeState* state, Block* block, bool* eos)
override {
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
return Status::OK();
}
};
@@ -113,7 +113,7 @@ private:
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status close(RuntimeState* state) override { return Status::OK(); }
- Status get_block(RuntimeState* state, Block* block, bool* eos)
override {
+ Status get_block_impl(RuntimeState* state, Block* block, bool* eos)
override {
*eos = true;
block->swap(_block);
return Status::OK();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]