This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 97c2fe75d19 [feature](pipelineX) use expected<T, Status> in
local_state (#25878)
97c2fe75d19 is described below
commit 97c2fe75d19f41524a3f3804eea5847bd671c65f
Author: Mryange <[email protected]>
AuthorDate: Wed Oct 25 15:23:17 2023 +0800
[feature](pipelineX) use expected<T, Status> in local_state (#25878)
---
be/src/common/status.h | 2 ++
be/src/pipeline/exec/aggregation_sink_operator.cpp | 2 +-
be/src/pipeline/exec/aggregation_sink_operator.h | 1 +
.../pipeline/exec/aggregation_source_operator.cpp | 2 +-
be/src/pipeline/exec/analytic_sink_operator.cpp | 2 +-
be/src/pipeline/exec/analytic_source_operator.cpp | 2 +-
be/src/pipeline/exec/assert_num_rows_operator.cpp | 2 +-
be/src/pipeline/exec/datagen_operator.cpp | 2 +-
...istinct_streaming_aggregation_sink_operator.cpp | 2 +-
...tinct_streaming_aggregation_source_operator.cpp | 2 +-
be/src/pipeline/exec/exchange_sink_operator.cpp | 4 ++--
be/src/pipeline/exec/exchange_source_operator.cpp | 2 +-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 2 +-
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 4 ++--
be/src/pipeline/exec/jdbc_table_sink_operator.cpp | 2 +-
be/src/pipeline/exec/multi_cast_data_stream_sink.h | 2 +-
.../exec/multi_cast_data_stream_source.cpp | 3 ++-
.../exec/nested_loop_join_build_operator.cpp | 2 +-
.../exec/nested_loop_join_probe_operator.cpp | 4 ++--
be/src/pipeline/exec/olap_table_sink_operator.h | 2 +-
.../pipeline/exec/partition_sort_sink_operator.cpp | 2 +-
.../exec/partition_sort_source_operator.cpp | 2 +-
be/src/pipeline/exec/repeat_operator.cpp | 4 ++--
be/src/pipeline/exec/result_file_sink_operator.cpp | 2 +-
be/src/pipeline/exec/result_sink_operator.cpp | 2 +-
be/src/pipeline/exec/scan_operator.cpp | 4 ++--
be/src/pipeline/exec/scan_operator.h | 1 +
be/src/pipeline/exec/schema_scan_operator.cpp | 2 +-
be/src/pipeline/exec/select_operator.h | 2 +-
be/src/pipeline/exec/set_probe_sink_operator.cpp | 2 +-
be/src/pipeline/exec/set_probe_sink_operator.h | 1 +
be/src/pipeline/exec/set_sink_operator.cpp | 2 +-
be/src/pipeline/exec/set_sink_operator.h | 1 +
be/src/pipeline/exec/set_source_operator.cpp | 2 +-
be/src/pipeline/exec/set_source_operator.h | 1 +
be/src/pipeline/exec/sort_sink_operator.cpp | 2 +-
be/src/pipeline/exec/sort_source_operator.cpp | 2 +-
.../exec/streaming_aggregation_sink_operator.cpp | 2 +-
.../exec/streaming_aggregation_source_operator.cpp | 2 +-
be/src/pipeline/exec/table_function_operator.h | 4 ++--
be/src/pipeline/exec/union_sink_operator.cpp | 2 +-
be/src/pipeline/exec/union_sink_operator.h | 2 +-
be/src/pipeline/exec/union_source_operator.cpp | 2 +-
be/src/pipeline/pipeline_x/operator.h | 6 ++++++
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 12 +++++++++--
be/src/runtime/runtime_state.cpp | 23 ++++++++++++++++++++++
be/src/runtime/runtime_state.h | 16 +++++++++------
47 files changed, 100 insertions(+), 51 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index fa1d5dbfc8b..88977ce310d 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -596,6 +596,8 @@ inline std::string Status::to_string() const {
template <typename T>
using Result = expected<T, Status>;
+using ResultError = unexpected<Status>;
+
#define RETURN_IF_ERROR_RESULT(stmt) \
do { \
Status _status_ = (stmt); \
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 403343b76e6..93b2e2d968c 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -783,7 +783,7 @@ template <typename LocalStateType>
Status AggSinkOperatorX<LocalStateType>::sink(doris::RuntimeState* state,
vectorized::Block* in_block,
SourceState source_state) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
local_state._shared_state->input_num_rows += in_block->rows();
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index bee007b3b47..9615228ca13 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -330,6 +330,7 @@ public:
using DataSinkOperatorX<LocalStateType>::id;
using DataSinkOperatorX<LocalStateType>::operator_id;
+ using DataSinkOperatorX<LocalStateType>::get_local_state;
protected:
using LocalState = LocalStateType;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 69900fc8f27..482a263887b 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -508,7 +508,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnode,
Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block*
block,
SourceState& source_state) {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
RETURN_IF_ERROR(local_state._executor.get_result(state, block,
source_state));
local_state.make_nullable_output_key(block);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 7260ae9d77f..7109fa9deab 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -133,7 +133,7 @@ Status AnalyticSinkOperatorX::open(RuntimeState* state) {
Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state,
vectorized::Block* input_block,
SourceState source_state) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)input_block->rows());
local_state._shared_state->input_eos = source_state ==
SourceState::FINISHED;
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp
b/be/src/pipeline/exec/analytic_source_operator.cpp
index a139d76fdde..7c310db1237 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -378,7 +378,7 @@ Status AnalyticSourceOperatorX::init(const TPlanNode&
tnode, RuntimeState* state
Status AnalyticSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
if (local_state._shared_state->input_eos &&
(local_state._output_block_index ==
local_state._shared_state->input_blocks.size() ||
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index b2b97fee42e..f58f2568cfc 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -37,7 +37,7 @@ AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool*
pool, const TPlanNode
Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
local_state.add_num_rows_returned(block->rows());
int64_t num_rows_returned = local_state.num_rows_returned();
diff --git a/be/src/pipeline/exec/datagen_operator.cpp
b/be/src/pipeline/exec/datagen_operator.cpp
index 240e2c44e2d..46a0dacb78e 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -80,7 +80,7 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block*
return Status::InternalError("input is NULL pointer");
}
RETURN_IF_CANCELLED(state);
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
bool eos = false;
Status res = local_state._table_func->get_next(state, block, &eos);
source_state = eos ? SourceState::FINISHED : source_state;
diff --git
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
index fec9f7e3c65..ef2551124a3 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
@@ -193,7 +193,7 @@ Status DistinctStreamingAggSinkOperatorX::init(const
TPlanNode& tnode, RuntimeSt
Status DistinctStreamingAggSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* in_block,
SourceState source_state) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
local_state._shared_state->input_num_rows += in_block->rows();
diff --git
a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
index 1abfb6510db..43456f10e3e 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
@@ -106,7 +106,7 @@
DistinctStreamingAggSourceOperatorX::DistinctStreamingAggSourceOperatorX(ObjectP
Status DistinctStreamingAggSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState&
source_state) {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
std::unique_ptr<vectorized::Block> agg_block;
RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block));
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 4528d3411d4..aa22f4cbd69 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -288,7 +288,7 @@ void
ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrT
Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
block,
SourceState source_state) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
SCOPED_TIMER(local_state.profile()->total_time_counter());
local_state._peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
@@ -475,7 +475,7 @@ Status
ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch
}
Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status
exec_status) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
local_state._serializer.reset_block();
Status final_st = Status::OK();
Status final_status = exec_status;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 4a3260b51e9..362853fa18e 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -124,7 +124,7 @@ Status ExchangeSourceOperatorX::open(RuntimeState* state) {
Status ExchangeSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
if (_is_merging && !local_state.is_ready) {
RETURN_IF_ERROR(local_state.stream_recvr->create_merger(
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index e04229f7a29..1af98cd77aa 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -401,7 +401,7 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState*
state) {
Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* in_block,
SourceState source_state) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 5fe5dcc7436..61f244c8dec 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -226,7 +226,7 @@ HashJoinProbeOperatorX::HashJoinProbeOperatorX(ObjectPool*
pool, const TPlanNode
Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state,
vectorized::Block* output_block,
SourceState& source_state) const {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
local_state.init_for_probe(state);
SCOPED_TIMER(local_state._probe_timer);
if (local_state._shared_state->short_circuit_for_probe) {
@@ -456,7 +456,7 @@ Status
HashJoinProbeOperatorX::_do_evaluate(vectorized::Block& block,
Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block*
input_block,
SourceState source_state) const {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
local_state.prepare_for_next();
local_state._probe_eos = source_state == SourceState::FINISHED;
if (input_block->rows() > 0) {
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
index e68ce9500a0..139c6c434a8 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
@@ -57,7 +57,7 @@ Status JdbcTableSinkOperatorX::open(RuntimeState* state) {
Status JdbcTableSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
block,
SourceState source_state) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
RETURN_IF_ERROR(local_state.sink(state, block, source_state));
return Status::OK();
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index c0e4edd3e30..83c5390a2fb 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -76,7 +76,7 @@ public:
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index bbd66973f8d..1d3565c65b5 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -151,7 +151,8 @@ Status
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block*
block,
SourceState&
source_state) {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ //auto& local_state = get_local_state(state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
bool eos = false;
vectorized::Block tmp_block;
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index e09d1041c0e..152bec5d7d6 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -92,7 +92,7 @@ Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState*
state) {
Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state,
vectorized::Block* block,
SourceState source_state) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
auto rows = block->rows();
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index 718d66decfb..ca51d0f2714 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -487,7 +487,7 @@ bool
NestedLoopJoinProbeOperatorX::need_more_input_data(RuntimeState* state) con
Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state,
vectorized::Block* block,
SourceState source_state) const {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
COUNTER_UPDATE(local_state._probe_rows_counter, block->rows());
local_state._cur_probe_row_visited_flags.resize(block->rows());
std::fill(local_state._cur_probe_row_visited_flags.begin(),
@@ -513,7 +513,7 @@ Status
NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized
Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) const {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
if (_is_output_left_side_only) {
RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(),
block));
source_state =
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h
b/be/src/pipeline/exec/olap_table_sink_operator.h
index ed7ac10cfb7..6c84f3ef523 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -94,7 +94,7 @@ public:
}
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
return local_state.sink(state, in_block, source_state);
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 4379a088a35..5abbecbef89 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -96,7 +96,7 @@ Status PartitionSortSinkOperatorX::open(RuntimeState* state) {
Status PartitionSortSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* input_block,
SourceState source_state) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
auto current_rows = input_block->rows();
SCOPED_TIMER(local_state.profile()->total_time_counter());
if (current_rows > 0) {
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 9800efd8eaa..84a20a2f640 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -42,7 +42,7 @@ Status PartitionSortSourceLocalState::init(RuntimeState*
state, LocalStateInfo&
Status PartitionSortSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* output_block,
SourceState& source_state) {
RETURN_IF_CANCELLED(state);
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
SCOPED_TIMER(local_state._get_next_timer);
output_block->clear_column_data();
diff --git a/be/src/pipeline/exec/repeat_operator.cpp
b/be/src/pipeline/exec/repeat_operator.cpp
index e804685bc45..c9e0a38ec4c 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -183,7 +183,7 @@ Status
RepeatLocalState::get_repeated_block(vectorized::Block* child_block, int
Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block*
input_block,
SourceState source_state) const {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
local_state._child_eos = source_state == SourceState::FINISHED;
auto& _intermediate_block = local_state._intermediate_block;
auto& _expr_ctxs = local_state._expr_ctxs;
@@ -210,7 +210,7 @@ Status RepeatOperatorX::push(RuntimeState* state,
vectorized::Block* input_block
Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block*
output_block,
SourceState& source_state) const {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
auto& _repeat_id_idx = local_state._repeat_id_idx;
auto& _child_block = *local_state._child_block;
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index eeb0bab68a2..79380642863 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -263,7 +263,7 @@ void
ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, ChannelP
Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
in_block,
SourceState source_state) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
return local_state.sink(state, in_block, source_state);
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 1ef338b509f..12c06794a95 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -138,7 +138,7 @@ Status ResultSinkOperatorX::open(RuntimeState* state) {
Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block,
SourceState source_state) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 10289f63afe..07872f1b7cf 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1329,7 +1329,7 @@ Status ScanOperatorX<LocalStateType>::open(RuntimeState*
state) {
template <typename LocalStateType>
Status ScanOperatorX<LocalStateType>::try_close(RuntimeState* state) {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
if (local_state._scanner_ctx.get()) {
// mark this scanner ctx as should_stop to make sure scanners will not
be scheduled anymore
// TODO: there is a lock in `set_should_stop` may cause some slight
impact
@@ -1379,7 +1379,7 @@ bool
ScanOperatorX<LocalStateType>::runtime_filters_are_ready_or_timeout(
template <typename LocalStateType>
Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state._get_next_timer);
SCOPED_TIMER(local_state.profile()->total_time_counter());
// in inverted index apply logic, in order to optimize query performance,
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 83724e3df5d..4355efed8a6 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -436,6 +436,7 @@ public:
int64_t get_push_down_count() const { return _push_down_count; }
using OperatorX<LocalStateType>::id;
using OperatorX<LocalStateType>::operator_id;
+ using OperatorX<LocalStateType>::get_local_state;
protected:
using LocalState = LocalStateType;
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp
b/be/src/pipeline/exec/schema_scan_operator.cpp
index 165885e63a3..430315b9987 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -211,7 +211,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) {
Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block*
block,
SourceState& source_state) {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
RETURN_IF_CANCELLED(state);
bool schema_eos = false;
diff --git a/be/src/pipeline/exec/select_operator.h
b/be/src/pipeline/exec/select_operator.h
index 93e2ee184ca..6c998c8ab3c 100644
--- a/be/src/pipeline/exec/select_operator.h
+++ b/be/src/pipeline/exec/select_operator.h
@@ -60,7 +60,7 @@ public:
: StreamingOperatorX<SelectLocalState>(pool, tnode, operator_id,
descs) {}
Status pull(RuntimeState* state, vectorized::Block* block, SourceState&
source_state) override {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts,
block,
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 31e99d6e5e7..81f0c986b74 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -105,7 +105,7 @@ template <bool is_intersect>
Status SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state,
vectorized::Block* in_block,
SourceState source_state) {
RETURN_IF_CANCELLED(state);
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index d31c822d62d..bb5eccdbdc5 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -100,6 +100,7 @@ class SetProbeSinkOperatorX final : public
DataSinkOperatorX<SetProbeSinkLocalSt
public:
using Base = DataSinkOperatorX<SetProbeSinkLocalState<is_intersect>>;
using DataSinkOperatorXBase::operator_id;
+ using Base::get_local_state;
using typename Base::LocalState;
friend class SetProbeSinkLocalState<is_intersect>;
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index 6725deffa14..07205e6c8a2 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -54,7 +54,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState*
state, vectorized::Blo
SourceState source_state) {
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
RETURN_IF_CANCELLED(state);
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index a475807d196..a1b9d8b7079 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -93,6 +93,7 @@ class SetSinkOperatorX final : public
DataSinkOperatorX<SetSinkLocalState<is_int
public:
using Base = DataSinkOperatorX<SetSinkLocalState<is_intersect>>;
using DataSinkOperatorXBase::operator_id;
+ using Base::get_local_state;
using typename Base::LocalState;
friend class SetSinkLocalState<is_intersect>;
diff --git a/be/src/pipeline/exec/set_source_operator.cpp
b/be/src/pipeline/exec/set_source_operator.cpp
index 6baf084e94a..31d372a002a 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -89,7 +89,7 @@ template <bool is_intersect>
Status SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
RETURN_IF_CANCELLED(state);
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
_create_mutable_cols(local_state, block);
auto st = std::visit(
diff --git a/be/src/pipeline/exec/set_source_operator.h
b/be/src/pipeline/exec/set_source_operator.h
index e8cbce8a7d3..de81bbe4ec0 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -80,6 +80,7 @@ public:
using Base = OperatorX<SetSourceLocalState<is_intersect>>;
// for non-delay tempalte instantiation
using OperatorXBase::operator_id;
+ using Base::get_local_state;
using typename Base::LocalState;
SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int
operator_id,
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 0184ceab663..cc41fbc4bff 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -138,7 +138,7 @@ Status SortSinkOperatorX::open(RuntimeState* state) {
Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block*
in_block,
SourceState source_state) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
if (in_block->rows() > 0) {
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp
b/be/src/pipeline/exec/sort_source_operator.cpp
index 8b189b87a67..6732094272a 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -34,7 +34,7 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool,
const TPlanNode& tnod
Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block*
block,
SourceState& source_state) {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
bool eos = false;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
index 3022d542e8b..3d246ec9d9c 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
@@ -353,7 +353,7 @@ Status StreamingAggSinkOperatorX::init(const TPlanNode&
tnode, RuntimeState* sta
Status StreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
in_block,
SourceState source_state) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
local_state._shared_state->input_num_rows += in_block->rows();
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
index 0db8ef6f6bc..45d2e1902f6 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
@@ -80,7 +80,7 @@
StreamingAggSourceOperatorX::StreamingAggSourceOperatorX(ObjectPool* pool, const
Status StreamingAggSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state) {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
if (!local_state._shared_state->data_queue->data_exhausted()) {
std::unique_ptr<vectorized::Block> agg_block;
diff --git a/be/src/pipeline/exec/table_function_operator.h
b/be/src/pipeline/exec/table_function_operator.h
index b71cc52ef23..0d292c324fa 100644
--- a/be/src/pipeline/exec/table_function_operator.h
+++ b/be/src/pipeline/exec/table_function_operator.h
@@ -98,7 +98,7 @@ public:
Status push(RuntimeState* state, vectorized::Block* input_block,
SourceState source_state) const override {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
if (input_block->rows() == 0) {
return Status::OK();
}
@@ -112,7 +112,7 @@ public:
Status pull(RuntimeState* state, vectorized::Block* output_block,
SourceState& source_state) const override {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
RETURN_IF_ERROR(local_state.get_expanded_block(state, output_block,
source_state));
local_state.reached_limit(output_block, source_state);
return Status::OK();
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp
b/be/src/pipeline/exec/union_sink_operator.cpp
index 4472468e694..9b07fb6a835 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -145,7 +145,7 @@ Status UnionSinkOperatorX::open(RuntimeState* state) {
Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
in_block,
SourceState source_state) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
if (local_state._output_block == nullptr) {
diff --git a/be/src/pipeline/exec/union_sink_operator.h
b/be/src/pipeline/exec/union_sink_operator.h
index 8648979658f..f4891ce1c5d 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -152,7 +152,7 @@ private:
Status materialize_block(RuntimeState* state, vectorized::Block*
src_block, int child_idx,
vectorized::Block* res_block) {
- CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
const auto& child_exprs = local_state._child_expr;
vectorized::ColumnsWithTypeAndName colunms;
for (size_t i = 0; i < child_exprs.size(); ++i) {
diff --git a/be/src/pipeline/exec/union_source_operator.cpp
b/be/src/pipeline/exec/union_source_operator.cpp
index d486c3d7755..1f27ab920c8 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -151,7 +151,7 @@ std::shared_ptr<UnionSharedState>
UnionSourceLocalState::create_shared_state() {
Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block*
block,
SourceState& source_state) {
- CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+ auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
if (local_state._need_read_for_const_expr) {
if (has_more_const(state)) {
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index b7a437cf6a3..b4f5adf573e 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -294,6 +294,9 @@ public:
Status setup_local_state(RuntimeState* state, LocalStateInfo& info)
override;
using LocalState = LocalStateType;
+ [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
+ return state->get_local_state(operator_id())->template
cast<LocalState>();
+ }
};
template <typename DependencyType = FakeDependency>
@@ -536,6 +539,9 @@ public:
void get_dependency(std::vector<DependencySPtr>& dependency) override;
using LocalState = LocalStateType;
+ [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
+ return state->get_sink_local_state(operator_id())->template
cast<LocalState>();
+ }
};
template <typename DependencyType = FakeDependency>
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 7a2d837d69f..9f5d08c2b57 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -96,13 +96,21 @@ Status PipelineXTask::prepare(RuntimeState* state, const
TPipelineInstanceParams
Status PipelineXTask::extract_dependencies() {
for (auto op : _operators) {
- auto* local_state = _state->get_local_state(op->operator_id());
+ auto result = _state->get_local_state_result(op->operator_id());
+ if (!result) {
+ return result.error();
+ }
+ auto* local_state = result.value();
auto* dep = local_state->dependency();
DCHECK(dep != nullptr);
_read_dependencies.push_back(dep);
}
{
- auto* local_state = _state->get_sink_local_state(_sink->operator_id());
+ auto result =
_state->get_sink_local_state_result(_sink->operator_id());
+ if (!result) {
+ return result.error();
+ }
+ auto* local_state = result.value();
auto* dep = local_state->dependency();
DCHECK(dep != nullptr);
_write_dependencies = dep;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 8535e2dcc1a..7a24b86621b 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -438,6 +438,17 @@ doris::pipeline::PipelineXLocalStateBase*
RuntimeState::get_local_state(int id)
return _op_id_to_local_state[id].get();
}
+Result<RuntimeState::LocalState*> RuntimeState::get_local_state_result(int id)
{
+ if (id >= _op_id_to_local_state.size()) {
+ return ResultError(Status::InternalError("get_local_state out of range
size:{} , id:{}",
+ _op_id_to_local_state.size(),
id));
+ }
+ if (!_op_id_to_local_state[id]) {
+ return ResultError(Status::InternalError("get_local_state id:{} is
null", id));
+ }
+ return _op_id_to_local_state[id].get();
+};
+
void RuntimeState::emplace_sink_local_state(
int id, std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase>
state) {
_op_id_to_sink_local_state[id] = std::move(state);
@@ -447,6 +458,18 @@ doris::pipeline::PipelineXSinkLocalStateBase*
RuntimeState::get_sink_local_state
return _op_id_to_sink_local_state[id].get();
}
+Result<RuntimeState::SinkLocalState*>
RuntimeState::get_sink_local_state_result(int id) {
+ if (id >= _op_id_to_sink_local_state.size()) {
+ return ResultError(
+ Status::InternalError("_op_id_to_sink_local_state out of range
size:{} , id:{}",
+ _op_id_to_sink_local_state.size(), id));
+ }
+ if (!_op_id_to_sink_local_state[id]) {
+ return ResultError(Status::InternalError("_op_id_to_sink_local_state
id:{} is null", id));
+ }
+ return _op_id_to_sink_local_state[id].get();
+}
+
bool RuntimeState::enable_page_cache() const {
return !config::disable_storage_page_cache &&
(_query_options.__isset.enable_page_cache &&
_query_options.enable_page_cache);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f555b54ba64..5dda1201cea 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -454,15 +454,19 @@ public:
_query_options.enable_delete_sub_predicate_v2;
}
- void emplace_local_state(int id,
-
std::unique_ptr<doris::pipeline::PipelineXLocalStateBase> state);
+ using LocalState = doris::pipeline::PipelineXLocalStateBase;
+ using SinkLocalState = doris::pipeline::PipelineXSinkLocalStateBase;
+ // get result can return an error message, and we will only call it during
the prepare.
+ void emplace_local_state(int id, std::unique_ptr<LocalState> state);
- doris::pipeline::PipelineXLocalStateBase* get_local_state(int id);
+ LocalState* get_local_state(int id);
+ Result<LocalState*> get_local_state_result(int id);
- void emplace_sink_local_state(
- int id,
std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase> state);
+ void emplace_sink_local_state(int id, std::unique_ptr<SinkLocalState>
state);
- doris::pipeline::PipelineXSinkLocalStateBase* get_sink_local_state(int id);
+ SinkLocalState* get_sink_local_state(int id);
+
+ Result<SinkLocalState*> get_sink_local_state_result(int id);
void resize_op_id_to_local_state(int size);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]