This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 97c2fe75d19 [feature](pipelineX) use expected<T, Status> in 
local_state (#25878)
97c2fe75d19 is described below

commit 97c2fe75d19f41524a3f3804eea5847bd671c65f
Author: Mryange <[email protected]>
AuthorDate: Wed Oct 25 15:23:17 2023 +0800

    [feature](pipelineX) use expected<T, Status> in local_state (#25878)
---
 be/src/common/status.h                             |  2 ++
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  2 +-
 be/src/pipeline/exec/aggregation_sink_operator.h   |  1 +
 .../pipeline/exec/aggregation_source_operator.cpp  |  2 +-
 be/src/pipeline/exec/analytic_sink_operator.cpp    |  2 +-
 be/src/pipeline/exec/analytic_source_operator.cpp  |  2 +-
 be/src/pipeline/exec/assert_num_rows_operator.cpp  |  2 +-
 be/src/pipeline/exec/datagen_operator.cpp          |  2 +-
 ...istinct_streaming_aggregation_sink_operator.cpp |  2 +-
 ...tinct_streaming_aggregation_source_operator.cpp |  2 +-
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  4 ++--
 be/src/pipeline/exec/exchange_source_operator.cpp  |  2 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  2 +-
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |  4 ++--
 be/src/pipeline/exec/jdbc_table_sink_operator.cpp  |  2 +-
 be/src/pipeline/exec/multi_cast_data_stream_sink.h |  2 +-
 .../exec/multi_cast_data_stream_source.cpp         |  3 ++-
 .../exec/nested_loop_join_build_operator.cpp       |  2 +-
 .../exec/nested_loop_join_probe_operator.cpp       |  4 ++--
 be/src/pipeline/exec/olap_table_sink_operator.h    |  2 +-
 .../pipeline/exec/partition_sort_sink_operator.cpp |  2 +-
 .../exec/partition_sort_source_operator.cpp        |  2 +-
 be/src/pipeline/exec/repeat_operator.cpp           |  4 ++--
 be/src/pipeline/exec/result_file_sink_operator.cpp |  2 +-
 be/src/pipeline/exec/result_sink_operator.cpp      |  2 +-
 be/src/pipeline/exec/scan_operator.cpp             |  4 ++--
 be/src/pipeline/exec/scan_operator.h               |  1 +
 be/src/pipeline/exec/schema_scan_operator.cpp      |  2 +-
 be/src/pipeline/exec/select_operator.h             |  2 +-
 be/src/pipeline/exec/set_probe_sink_operator.cpp   |  2 +-
 be/src/pipeline/exec/set_probe_sink_operator.h     |  1 +
 be/src/pipeline/exec/set_sink_operator.cpp         |  2 +-
 be/src/pipeline/exec/set_sink_operator.h           |  1 +
 be/src/pipeline/exec/set_source_operator.cpp       |  2 +-
 be/src/pipeline/exec/set_source_operator.h         |  1 +
 be/src/pipeline/exec/sort_sink_operator.cpp        |  2 +-
 be/src/pipeline/exec/sort_source_operator.cpp      |  2 +-
 .../exec/streaming_aggregation_sink_operator.cpp   |  2 +-
 .../exec/streaming_aggregation_source_operator.cpp |  2 +-
 be/src/pipeline/exec/table_function_operator.h     |  4 ++--
 be/src/pipeline/exec/union_sink_operator.cpp       |  2 +-
 be/src/pipeline/exec/union_sink_operator.h         |  2 +-
 be/src/pipeline/exec/union_source_operator.cpp     |  2 +-
 be/src/pipeline/pipeline_x/operator.h              |  6 ++++++
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     | 12 +++++++++--
 be/src/runtime/runtime_state.cpp                   | 23 ++++++++++++++++++++++
 be/src/runtime/runtime_state.h                     | 16 +++++++++------
 47 files changed, 100 insertions(+), 51 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index fa1d5dbfc8b..88977ce310d 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -596,6 +596,8 @@ inline std::string Status::to_string() const {
 template <typename T>
 using Result = expected<T, Status>;
 
+using ResultError = unexpected<Status>;
+
 #define RETURN_IF_ERROR_RESULT(stmt)                \
     do {                                            \
         Status _status_ = (stmt);                   \
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 403343b76e6..93b2e2d968c 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -783,7 +783,7 @@ template <typename LocalStateType>
 Status AggSinkOperatorX<LocalStateType>::sink(doris::RuntimeState* state,
                                               vectorized::Block* in_block,
                                               SourceState source_state) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     local_state._shared_state->input_num_rows += in_block->rows();
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index bee007b3b47..9615228ca13 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -330,6 +330,7 @@ public:
 
     using DataSinkOperatorX<LocalStateType>::id;
     using DataSinkOperatorX<LocalStateType>::operator_id;
+    using DataSinkOperatorX<LocalStateType>::get_local_state;
 
 protected:
     using LocalState = LocalStateType;
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index 69900fc8f27..482a263887b 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -508,7 +508,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnode,
 
 Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block,
                                      SourceState& source_state) {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     RETURN_IF_ERROR(local_state._executor.get_result(state, block, 
source_state));
     local_state.make_nullable_output_key(block);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 7260ae9d77f..7109fa9deab 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -133,7 +133,7 @@ Status AnalyticSinkOperatorX::open(RuntimeState* state) {
 
 Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* input_block,
                                    SourceState source_state) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)input_block->rows());
     local_state._shared_state->input_eos = source_state == 
SourceState::FINISHED;
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index a139d76fdde..7c310db1237 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -378,7 +378,7 @@ Status AnalyticSourceOperatorX::init(const TPlanNode& 
tnode, RuntimeState* state
 
 Status AnalyticSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                           SourceState& source_state) {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     if (local_state._shared_state->input_eos &&
         (local_state._output_block_index == 
local_state._shared_state->input_blocks.size() ||
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp 
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index b2b97fee42e..f58f2568cfc 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -37,7 +37,7 @@ AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool* 
pool, const TPlanNode
 
 Status AssertNumRowsOperatorX::pull(doris::RuntimeState* state, 
vectorized::Block* block,
                                     SourceState& source_state) {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     local_state.add_num_rows_returned(block->rows());
     int64_t num_rows_returned = local_state.num_rows_returned();
diff --git a/be/src/pipeline/exec/datagen_operator.cpp 
b/be/src/pipeline/exec/datagen_operator.cpp
index 240e2c44e2d..46a0dacb78e 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -80,7 +80,7 @@ Status DataGenSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block*
         return Status::InternalError("input is NULL pointer");
     }
     RETURN_IF_CANCELLED(state);
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     bool eos = false;
     Status res = local_state._table_func->get_next(state, block, &eos);
     source_state = eos ? SourceState::FINISHED : source_state;
diff --git 
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
index fec9f7e3c65..ef2551124a3 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp
@@ -193,7 +193,7 @@ Status DistinctStreamingAggSinkOperatorX::init(const 
TPlanNode& tnode, RuntimeSt
 
 Status DistinctStreamingAggSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_block,
                                                SourceState source_state) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     local_state._shared_state->input_num_rows += in_block->rows();
diff --git 
a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
index 1abfb6510db..43456f10e3e 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_source_operator.cpp
@@ -106,7 +106,7 @@ 
DistinctStreamingAggSourceOperatorX::DistinctStreamingAggSourceOperatorX(ObjectP
 
 Status DistinctStreamingAggSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                       SourceState& 
source_state) {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     std::unique_ptr<vectorized::Block> agg_block;
     
RETURN_IF_ERROR(local_state._shared_state->data_queue->get_block_from_queue(&agg_block));
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 4528d3411d4..aa22f4cbd69 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -288,7 +288,7 @@ void 
ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrT
 
 Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
block,
                                    SourceState source_state) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     
local_state._peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
@@ -475,7 +475,7 @@ Status 
ExchangeSinkOperatorX::channel_add_rows(RuntimeState* state, Channels& ch
 }
 
 Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status 
exec_status) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     local_state._serializer.reset_block();
     Status final_st = Status::OK();
     Status final_status = exec_status;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 4a3260b51e9..362853fa18e 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -124,7 +124,7 @@ Status ExchangeSourceOperatorX::open(RuntimeState* state) {
 
 Status ExchangeSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                           SourceState& source_state) {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     if (_is_merging && !local_state.is_ready) {
         RETURN_IF_ERROR(local_state.stream_recvr->create_merger(
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index e04229f7a29..1af98cd77aa 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -401,7 +401,7 @@ Status HashJoinBuildSinkOperatorX::open(RuntimeState* 
state) {
 
 Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_block,
                                         SourceState source_state) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 5fe5dcc7436..61f244c8dec 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -226,7 +226,7 @@ HashJoinProbeOperatorX::HashJoinProbeOperatorX(ObjectPool* 
pool, const TPlanNode
 
 Status HashJoinProbeOperatorX::pull(doris::RuntimeState* state, 
vectorized::Block* output_block,
                                     SourceState& source_state) const {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     local_state.init_for_probe(state);
     SCOPED_TIMER(local_state._probe_timer);
     if (local_state._shared_state->short_circuit_for_probe) {
@@ -456,7 +456,7 @@ Status 
HashJoinProbeOperatorX::_do_evaluate(vectorized::Block& block,
 
 Status HashJoinProbeOperatorX::push(RuntimeState* state, vectorized::Block* 
input_block,
                                     SourceState source_state) const {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     local_state.prepare_for_next();
     local_state._probe_eos = source_state == SourceState::FINISHED;
     if (input_block->rows() > 0) {
diff --git a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp 
b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
index e68ce9500a0..139c6c434a8 100644
--- a/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_table_sink_operator.cpp
@@ -57,7 +57,7 @@ Status JdbcTableSinkOperatorX::open(RuntimeState* state) {
 
 Status JdbcTableSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
block,
                                     SourceState source_state) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     RETURN_IF_ERROR(local_state.sink(state, block, source_state));
     return Status::OK();
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index c0e4edd3e30..83c5390a2fb 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -76,7 +76,7 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override {
-        CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+        auto& local_state = get_local_state(state);
         SCOPED_TIMER(local_state.profile()->total_time_counter());
         COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
         if (in_block->rows() > 0 || source_state == SourceState::FINISHED) {
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index bbd66973f8d..1d3565c65b5 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -151,7 +151,8 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
 Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
                                                        vectorized::Block* 
block,
                                                        SourceState& 
source_state) {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    //auto& local_state = get_local_state(state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     bool eos = false;
     vectorized::Block tmp_block;
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
index e09d1041c0e..152bec5d7d6 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp
@@ -92,7 +92,7 @@ Status NestedLoopJoinBuildSinkOperatorX::open(RuntimeState* 
state) {
 
 Status NestedLoopJoinBuildSinkOperatorX::sink(doris::RuntimeState* state, 
vectorized::Block* block,
                                               SourceState source_state) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
     auto rows = block->rows();
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index 718d66decfb..ca51d0f2714 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -487,7 +487,7 @@ bool 
NestedLoopJoinProbeOperatorX::need_more_input_data(RuntimeState* state) con
 
 Status NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, 
vectorized::Block* block,
                                           SourceState source_state) const {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     COUNTER_UPDATE(local_state._probe_rows_counter, block->rows());
     local_state._cur_probe_row_visited_flags.resize(block->rows());
     std::fill(local_state._cur_probe_row_visited_flags.begin(),
@@ -513,7 +513,7 @@ Status 
NestedLoopJoinProbeOperatorX::push(doris::RuntimeState* state, vectorized
 
 Status NestedLoopJoinProbeOperatorX::pull(RuntimeState* state, 
vectorized::Block* block,
                                           SourceState& source_state) const {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     if (_is_output_left_side_only) {
         
RETURN_IF_ERROR(local_state._build_output_block(local_state._child_block.get(), 
block));
         source_state =
diff --git a/be/src/pipeline/exec/olap_table_sink_operator.h 
b/be/src/pipeline/exec/olap_table_sink_operator.h
index ed7ac10cfb7..6c84f3ef523 100644
--- a/be/src/pipeline/exec/olap_table_sink_operator.h
+++ b/be/src/pipeline/exec/olap_table_sink_operator.h
@@ -94,7 +94,7 @@ public:
     }
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override {
-        CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+        auto& local_state = get_local_state(state);
         SCOPED_TIMER(local_state.profile()->total_time_counter());
         COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
         return local_state.sink(state, in_block, source_state);
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp 
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index 4379a088a35..5abbecbef89 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -96,7 +96,7 @@ Status PartitionSortSinkOperatorX::open(RuntimeState* state) {
 
 Status PartitionSortSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* input_block,
                                         SourceState source_state) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     auto current_rows = input_block->rows();
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     if (current_rows > 0) {
diff --git a/be/src/pipeline/exec/partition_sort_source_operator.cpp 
b/be/src/pipeline/exec/partition_sort_source_operator.cpp
index 9800efd8eaa..84a20a2f640 100644
--- a/be/src/pipeline/exec/partition_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_source_operator.cpp
@@ -42,7 +42,7 @@ Status PartitionSortSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
 Status PartitionSortSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* output_block,
                                                SourceState& source_state) {
     RETURN_IF_CANCELLED(state);
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     SCOPED_TIMER(local_state._get_next_timer);
     output_block->clear_column_data();
diff --git a/be/src/pipeline/exec/repeat_operator.cpp 
b/be/src/pipeline/exec/repeat_operator.cpp
index e804685bc45..c9e0a38ec4c 100644
--- a/be/src/pipeline/exec/repeat_operator.cpp
+++ b/be/src/pipeline/exec/repeat_operator.cpp
@@ -183,7 +183,7 @@ Status 
RepeatLocalState::get_repeated_block(vectorized::Block* child_block, int
 
 Status RepeatOperatorX::push(RuntimeState* state, vectorized::Block* 
input_block,
                              SourceState source_state) const {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     local_state._child_eos = source_state == SourceState::FINISHED;
     auto& _intermediate_block = local_state._intermediate_block;
     auto& _expr_ctxs = local_state._expr_ctxs;
@@ -210,7 +210,7 @@ Status RepeatOperatorX::push(RuntimeState* state, 
vectorized::Block* input_block
 
 Status RepeatOperatorX::pull(doris::RuntimeState* state, vectorized::Block* 
output_block,
                              SourceState& source_state) const {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     auto& _repeat_id_idx = local_state._repeat_id_idx;
     auto& _child_block = *local_state._child_block;
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index eeb0bab68a2..79380642863 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -263,7 +263,7 @@ void 
ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state, ChannelP
 
 Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
in_block,
                                      SourceState source_state) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     return local_state.sink(state, in_block, source_state);
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp 
b/be/src/pipeline/exec/result_sink_operator.cpp
index 1ef338b509f..12c06794a95 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -138,7 +138,7 @@ Status ResultSinkOperatorX::open(RuntimeState* state) {
 
 Status ResultSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block,
                                  SourceState source_state) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
         RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 10289f63afe..07872f1b7cf 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1329,7 +1329,7 @@ Status ScanOperatorX<LocalStateType>::open(RuntimeState* 
state) {
 
 template <typename LocalStateType>
 Status ScanOperatorX<LocalStateType>::try_close(RuntimeState* state) {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     if (local_state._scanner_ctx.get()) {
         // mark this scanner ctx as should_stop to make sure scanners will not 
be scheduled anymore
         // TODO: there is a lock in `set_should_stop` may cause some slight 
impact
@@ -1379,7 +1379,7 @@ bool 
ScanOperatorX<LocalStateType>::runtime_filters_are_ready_or_timeout(
 template <typename LocalStateType>
 Status ScanOperatorX<LocalStateType>::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                 SourceState& source_state) {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state._get_next_timer);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     // in inverted index apply logic, in order to optimize query performance,
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 83724e3df5d..4355efed8a6 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -436,6 +436,7 @@ public:
     int64_t get_push_down_count() const { return _push_down_count; }
     using OperatorX<LocalStateType>::id;
     using OperatorX<LocalStateType>::operator_id;
+    using OperatorX<LocalStateType>::get_local_state;
 
 protected:
     using LocalState = LocalStateType;
diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp 
b/be/src/pipeline/exec/schema_scan_operator.cpp
index 165885e63a3..430315b9987 100644
--- a/be/src/pipeline/exec/schema_scan_operator.cpp
+++ b/be/src/pipeline/exec/schema_scan_operator.cpp
@@ -211,7 +211,7 @@ Status SchemaScanOperatorX::prepare(RuntimeState* state) {
 
 Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block,
                                       SourceState& source_state) {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     RETURN_IF_CANCELLED(state);
     bool schema_eos = false;
diff --git a/be/src/pipeline/exec/select_operator.h 
b/be/src/pipeline/exec/select_operator.h
index 93e2ee184ca..6c998c8ab3c 100644
--- a/be/src/pipeline/exec/select_operator.h
+++ b/be/src/pipeline/exec/select_operator.h
@@ -60,7 +60,7 @@ public:
             : StreamingOperatorX<SelectLocalState>(pool, tnode, operator_id, 
descs) {}
 
     Status pull(RuntimeState* state, vectorized::Block* block, SourceState& 
source_state) override {
-        CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+        auto& local_state = get_local_state(state);
         SCOPED_TIMER(local_state.profile()->total_time_counter());
         RETURN_IF_CANCELLED(state);
         
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
block,
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.cpp 
b/be/src/pipeline/exec/set_probe_sink_operator.cpp
index 31e99d6e5e7..81f0c986b74 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_probe_sink_operator.cpp
@@ -105,7 +105,7 @@ template <bool is_intersect>
 Status SetProbeSinkOperatorX<is_intersect>::sink(RuntimeState* state, 
vectorized::Block* in_block,
                                                  SourceState source_state) {
     RETURN_IF_CANCELLED(state);
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
 
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index d31c822d62d..bb5eccdbdc5 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -100,6 +100,7 @@ class SetProbeSinkOperatorX final : public 
DataSinkOperatorX<SetProbeSinkLocalSt
 public:
     using Base = DataSinkOperatorX<SetProbeSinkLocalState<is_intersect>>;
     using DataSinkOperatorXBase::operator_id;
+    using Base::get_local_state;
     using typename Base::LocalState;
 
     friend class SetProbeSinkLocalState<is_intersect>;
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp 
b/be/src/pipeline/exec/set_sink_operator.cpp
index 6725deffa14..07205e6c8a2 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -54,7 +54,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, vectorized::Blo
                                             SourceState source_state) {
     constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
     RETURN_IF_CANCELLED(state);
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
 
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index a475807d196..a1b9d8b7079 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -93,6 +93,7 @@ class SetSinkOperatorX final : public 
DataSinkOperatorX<SetSinkLocalState<is_int
 public:
     using Base = DataSinkOperatorX<SetSinkLocalState<is_intersect>>;
     using DataSinkOperatorXBase::operator_id;
+    using Base::get_local_state;
     using typename Base::LocalState;
 
     friend class SetSinkLocalState<is_intersect>;
diff --git a/be/src/pipeline/exec/set_source_operator.cpp 
b/be/src/pipeline/exec/set_source_operator.cpp
index 6baf084e94a..31d372a002a 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -89,7 +89,7 @@ template <bool is_intersect>
 Status SetSourceOperatorX<is_intersect>::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                    SourceState& source_state) {
     RETURN_IF_CANCELLED(state);
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     _create_mutable_cols(local_state, block);
     auto st = std::visit(
diff --git a/be/src/pipeline/exec/set_source_operator.h 
b/be/src/pipeline/exec/set_source_operator.h
index e8cbce8a7d3..de81bbe4ec0 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -80,6 +80,7 @@ public:
     using Base = OperatorX<SetSourceLocalState<is_intersect>>;
     // for non-delay tempalte instantiation
     using OperatorXBase::operator_id;
+    using Base::get_local_state;
     using typename Base::LocalState;
 
     SetSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index 0184ceab663..cc41fbc4bff 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -138,7 +138,7 @@ Status SortSinkOperatorX::open(RuntimeState* state) {
 
 Status SortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* 
in_block,
                                SourceState source_state) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     if (in_block->rows() > 0) {
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp 
b/be/src/pipeline/exec/sort_source_operator.cpp
index 8b189b87a67..6732094272a 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -34,7 +34,7 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnod
 
 Status SortSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block,
                                       SourceState& source_state) {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     bool eos = false;
     RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
index 3022d542e8b..3d246ec9d9c 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp
@@ -353,7 +353,7 @@ Status StreamingAggSinkOperatorX::init(const TPlanNode& 
tnode, RuntimeState* sta
 
 Status StreamingAggSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
in_block,
                                        SourceState source_state) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     local_state._shared_state->input_num_rows += in_block->rows();
diff --git a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
index 0db8ef6f6bc..45d2e1902f6 100644
--- a/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_source_operator.cpp
@@ -80,7 +80,7 @@ 
StreamingAggSourceOperatorX::StreamingAggSourceOperatorX(ObjectPool* pool, const
 
 Status StreamingAggSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                               SourceState& source_state) {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     if (!local_state._shared_state->data_queue->data_exhausted()) {
         std::unique_ptr<vectorized::Block> agg_block;
diff --git a/be/src/pipeline/exec/table_function_operator.h 
b/be/src/pipeline/exec/table_function_operator.h
index b71cc52ef23..0d292c324fa 100644
--- a/be/src/pipeline/exec/table_function_operator.h
+++ b/be/src/pipeline/exec/table_function_operator.h
@@ -98,7 +98,7 @@ public:
 
     Status push(RuntimeState* state, vectorized::Block* input_block,
                 SourceState source_state) const override {
-        CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+        auto& local_state = get_local_state(state);
         if (input_block->rows() == 0) {
             return Status::OK();
         }
@@ -112,7 +112,7 @@ public:
 
     Status pull(RuntimeState* state, vectorized::Block* output_block,
                 SourceState& source_state) const override {
-        CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+        auto& local_state = get_local_state(state);
         RETURN_IF_ERROR(local_state.get_expanded_block(state, output_block, 
source_state));
         local_state.reached_limit(output_block, source_state);
         return Status::OK();
diff --git a/be/src/pipeline/exec/union_sink_operator.cpp 
b/be/src/pipeline/exec/union_sink_operator.cpp
index 4472468e694..9b07fb6a835 100644
--- a/be/src/pipeline/exec/union_sink_operator.cpp
+++ b/be/src/pipeline/exec/union_sink_operator.cpp
@@ -145,7 +145,7 @@ Status UnionSinkOperatorX::open(RuntimeState* state) {
 
 Status UnionSinkOperatorX::sink(RuntimeState* state, vectorized::Block* 
in_block,
                                 SourceState source_state) {
-    CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     if (local_state._output_block == nullptr) {
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index 8648979658f..f4891ce1c5d 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -152,7 +152,7 @@ private:
 
     Status materialize_block(RuntimeState* state, vectorized::Block* 
src_block, int child_idx,
                              vectorized::Block* res_block) {
-        CREATE_SINK_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+        auto& local_state = get_local_state(state);
         const auto& child_exprs = local_state._child_expr;
         vectorized::ColumnsWithTypeAndName colunms;
         for (size_t i = 0; i < child_exprs.size(); ++i) {
diff --git a/be/src/pipeline/exec/union_source_operator.cpp 
b/be/src/pipeline/exec/union_source_operator.cpp
index d486c3d7755..1f27ab920c8 100644
--- a/be/src/pipeline/exec/union_source_operator.cpp
+++ b/be/src/pipeline/exec/union_source_operator.cpp
@@ -151,7 +151,7 @@ std::shared_ptr<UnionSharedState> 
UnionSourceLocalState::create_shared_state() {
 
 Status UnionSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block,
                                        SourceState& source_state) {
-    CREATE_LOCAL_STATE_RETURN_IF_ERROR(local_state);
+    auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
     if (local_state._need_read_for_const_expr) {
         if (has_more_const(state)) {
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index b7a437cf6a3..b4f5adf573e 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -294,6 +294,9 @@ public:
 
     Status setup_local_state(RuntimeState* state, LocalStateInfo& info) 
override;
     using LocalState = LocalStateType;
+    [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
+        return state->get_local_state(operator_id())->template 
cast<LocalState>();
+    }
 };
 
 template <typename DependencyType = FakeDependency>
@@ -536,6 +539,9 @@ public:
     void get_dependency(std::vector<DependencySPtr>& dependency) override;
 
     using LocalState = LocalStateType;
+    [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
+        return state->get_sink_local_state(operator_id())->template 
cast<LocalState>();
+    }
 };
 
 template <typename DependencyType = FakeDependency>
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 7a2d837d69f..9f5d08c2b57 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -96,13 +96,21 @@ Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams
 
 Status PipelineXTask::extract_dependencies() {
     for (auto op : _operators) {
-        auto* local_state = _state->get_local_state(op->operator_id());
+        auto result = _state->get_local_state_result(op->operator_id());
+        if (!result) {
+            return result.error();
+        }
+        auto* local_state = result.value();
         auto* dep = local_state->dependency();
         DCHECK(dep != nullptr);
         _read_dependencies.push_back(dep);
     }
     {
-        auto* local_state = _state->get_sink_local_state(_sink->operator_id());
+        auto result = 
_state->get_sink_local_state_result(_sink->operator_id());
+        if (!result) {
+            return result.error();
+        }
+        auto* local_state = result.value();
         auto* dep = local_state->dependency();
         DCHECK(dep != nullptr);
         _write_dependencies = dep;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 8535e2dcc1a..7a24b86621b 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -438,6 +438,17 @@ doris::pipeline::PipelineXLocalStateBase* 
RuntimeState::get_local_state(int id)
     return _op_id_to_local_state[id].get();
 }
 
+Result<RuntimeState::LocalState*> RuntimeState::get_local_state_result(int id) 
{
+    if (id >= _op_id_to_local_state.size()) {
+        return ResultError(Status::InternalError("get_local_state out of range 
size:{} , id:{}",
+                                                 _op_id_to_local_state.size(), 
id));
+    }
+    if (!_op_id_to_local_state[id]) {
+        return ResultError(Status::InternalError("get_local_state id:{} is 
null", id));
+    }
+    return _op_id_to_local_state[id].get();
+};
+
 void RuntimeState::emplace_sink_local_state(
         int id, std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase> 
state) {
     _op_id_to_sink_local_state[id] = std::move(state);
@@ -447,6 +458,18 @@ doris::pipeline::PipelineXSinkLocalStateBase* 
RuntimeState::get_sink_local_state
     return _op_id_to_sink_local_state[id].get();
 }
 
+Result<RuntimeState::SinkLocalState*> 
RuntimeState::get_sink_local_state_result(int id) {
+    if (id >= _op_id_to_sink_local_state.size()) {
+        return ResultError(
+                Status::InternalError("_op_id_to_sink_local_state out of range 
size:{} , id:{}",
+                                      _op_id_to_sink_local_state.size(), id));
+    }
+    if (!_op_id_to_sink_local_state[id]) {
+        return ResultError(Status::InternalError("_op_id_to_sink_local_state 
id:{} is null", id));
+    }
+    return _op_id_to_sink_local_state[id].get();
+}
+
 bool RuntimeState::enable_page_cache() const {
     return !config::disable_storage_page_cache &&
            (_query_options.__isset.enable_page_cache && 
_query_options.enable_page_cache);
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f555b54ba64..5dda1201cea 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -454,15 +454,19 @@ public:
                _query_options.enable_delete_sub_predicate_v2;
     }
 
-    void emplace_local_state(int id,
-                             
std::unique_ptr<doris::pipeline::PipelineXLocalStateBase> state);
+    using LocalState = doris::pipeline::PipelineXLocalStateBase;
+    using SinkLocalState = doris::pipeline::PipelineXSinkLocalStateBase;
+    // get result can return an error message, and we will only call it during 
the prepare.
+    void emplace_local_state(int id, std::unique_ptr<LocalState> state);
 
-    doris::pipeline::PipelineXLocalStateBase* get_local_state(int id);
+    LocalState* get_local_state(int id);
+    Result<LocalState*> get_local_state_result(int id);
 
-    void emplace_sink_local_state(
-            int id, 
std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase> state);
+    void emplace_sink_local_state(int id, std::unique_ptr<SinkLocalState> 
state);
 
-    doris::pipeline::PipelineXSinkLocalStateBase* get_sink_local_state(int id);
+    SinkLocalState* get_sink_local_state(int id);
+
+    Result<SinkLocalState*> get_sink_local_state_result(int id);
 
     void resize_op_id_to_local_state(int size);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to