This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3838b6fbaeb [refine](pipelineX) refine some code in pipelineX (#27472)
3838b6fbaeb is described below
commit 3838b6fbaeb2034b0bebb04cf605951c23a2e5ed
Author: Mryange <[email protected]>
AuthorDate: Mon Nov 27 11:04:16 2023 +0800
[refine](pipelineX) refine some code in pipelineX (#27472)
---
be/src/pipeline/pipeline_x/operator.cpp | 5 --
be/src/pipeline/pipeline_x/operator.h | 1 -
.../pipeline_x/pipeline_x_fragment_context.cpp | 56 ++++++++++++----------
.../pipeline_x/pipeline_x_fragment_context.h | 12 +++--
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 20 ++++----
be/src/runtime/runtime_state.cpp | 11 +++--
be/src/runtime/runtime_state.h | 2 +-
be/src/vec/sink/writer/async_result_writer.cpp | 6 ---
be/src/vec/sink/writer/async_result_writer.h | 2 -
9 files changed, 54 insertions(+), 61 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index c0f3a6f029a..6f305d86efa 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -528,11 +528,6 @@ Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState*
state, vectorized::Bl
return _writer->sink(block, source_state == SourceState::FINISHED);
}
-template <typename Writer, typename Parent>
-Dependency* AsyncWriterSink<Writer, Parent>::write_blocked_by(PipelineXTask*
task) {
- return _writer->write_blocked_by(task);
-}
-
template <typename Writer, typename Parent>
Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status
exec_status) {
if (_closed) {
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 28f156a15a9..2dc71dec96a 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -640,7 +640,6 @@ public:
Status sink(RuntimeState* state, vectorized::Block* block, SourceState
source_state);
- Dependency* write_blocked_by(PipelineXTask* task);
Dependency* dependency() override { return _async_writer_dependency.get();
}
Status close(RuntimeState* state, Status exec_status) override;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 3d7b8ea42b9..6a40f86dede 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -261,7 +261,7 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
params.__isset.send_query_statistics_with_every_batch
? params.send_query_statistics_with_every_batch
: false;
- _sink.reset(new ExchangeSinkOperatorX(state, row_desc,
next_operator_id(),
+ _sink.reset(new ExchangeSinkOperatorX(state, row_desc,
next_sink_operator_id(),
thrift_sink.stream_sink,
params.destinations,
send_query_statistics_with_every_batch));
break;
@@ -272,18 +272,18 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
}
// TODO: figure out good buffer size based on size of output row
- _sink.reset(new ResultSinkOperatorX(next_operator_id(), row_desc,
output_exprs,
+ _sink.reset(new ResultSinkOperatorX(next_sink_operator_id(), row_desc,
output_exprs,
thrift_sink.result_sink));
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
if (state->query_options().enable_memtable_on_sink_node &&
!_has_inverted_index_or_partial_update(thrift_sink.olap_table_sink)) {
- _sink.reset(new OlapTableSinkV2OperatorX(pool, next_operator_id(),
row_desc,
+ _sink.reset(new OlapTableSinkV2OperatorX(pool,
next_sink_operator_id(), row_desc,
output_exprs, false));
} else {
- _sink.reset(new OlapTableSinkOperatorX(pool, next_operator_id(),
row_desc, output_exprs,
- false));
+ _sink.reset(new OlapTableSinkOperatorX(pool,
next_sink_operator_id(), row_desc,
+ output_exprs, false));
}
break;
}
@@ -292,7 +292,8 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
return Status::InternalError("Missing data jdbc sink.");
}
if (config::enable_java_support) {
- _sink.reset(new JdbcTableSinkOperatorX(row_desc,
next_operator_id(), output_exprs));
+ _sink.reset(
+ new JdbcTableSinkOperatorX(row_desc,
next_sink_operator_id(), output_exprs));
} else {
return Status::InternalError(
"Jdbc table sink is not enabled, you can change be config "
@@ -313,10 +314,12 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
_sink.reset(new ResultFileSinkOperatorX(
- next_operator_id(), row_desc,
thrift_sink.result_file_sink, params.destinations,
- send_query_statistics_with_every_batch, output_exprs,
desc_tbl));
+ next_sink_operator_id(), row_desc,
thrift_sink.result_file_sink,
+ params.destinations,
send_query_statistics_with_every_batch, output_exprs,
+ desc_tbl));
} else {
- _sink.reset(new ResultFileSinkOperatorX(next_operator_id(),
row_desc, output_exprs));
+ _sink.reset(
+ new ResultFileSinkOperatorX(next_sink_operator_id(),
row_desc, output_exprs));
}
break;
}
@@ -324,7 +327,7 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
DCHECK(thrift_sink.__isset.multi_cast_stream_sink);
DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0);
// TODO: figure out good buffer size based on size of output row
- auto sink_id = next_operator_id();
+ auto sink_id = next_sink_operator_id();
auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
// one sink has multiple sources.
std::vector<int> sources;
@@ -359,7 +362,7 @@ Status
PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData
DataSinkOperatorXPtr sink_op;
sink_op.reset(new ExchangeSinkOperatorX(
- state, *_row_desc, next_operator_id(),
+ state, *_row_desc, next_sink_operator_id(),
thrift_sink.multi_cast_stream_sink.sinks[i],
thrift_sink.multi_cast_stream_sink.destinations[i],
false));
@@ -421,7 +424,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
_runtime_states[i]->set_desc_tbl(_desc_tbl);
_runtime_states[i]->set_per_fragment_instance_idx(local_params.sender_id);
_runtime_states[i]->set_num_per_fragment_instances(request.num_senders);
- _runtime_states[i]->resize_op_id_to_local_state(max_operator_id());
+ _runtime_states[i]->resize_op_id_to_local_state(max_operator_id(),
max_sink_operator_id());
_runtime_states[i]->set_load_stream_per_node(request.load_stream_per_node);
_runtime_states[i]->set_total_load_streams(request.total_load_streams);
_runtime_states[i]->set_num_local_sink(request.num_local_sink);
@@ -675,8 +678,8 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
cur_pipe = add_pipeline();
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
- sink.reset(
- new DistinctStreamingAggSinkOperatorX(pool,
next_operator_id(), tnode, descs));
+ sink.reset(new DistinctStreamingAggSinkOperatorX(pool,
next_sink_operator_id(), tnode,
+ descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
@@ -692,7 +695,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
cur_pipe = add_pipeline();
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
- sink.reset(new StreamingAggSinkOperatorX(pool, next_operator_id(),
tnode, descs));
+ sink.reset(new StreamingAggSinkOperatorX(pool,
next_sink_operator_id(), tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
@@ -708,7 +711,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
- sink.reset(new AggSinkOperatorX<>(pool, next_operator_id(), tnode,
descs));
+ sink.reset(new AggSinkOperatorX<>(pool, next_sink_operator_id(),
tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
@@ -733,7 +736,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorXPtr sink;
- sink.reset(new HashJoinBuildSinkOperatorX(pool, next_operator_id(),
tnode, descs));
+ sink.reset(new HashJoinBuildSinkOperatorX(pool,
next_sink_operator_id(), tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode,
_runtime_state.get()));
@@ -753,7 +756,8 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorXPtr sink;
- sink.reset(new NestedLoopJoinBuildSinkOperatorX(pool,
next_operator_id(), tnode, descs));
+ sink.reset(
+ new NestedLoopJoinBuildSinkOperatorX(pool,
next_sink_operator_id(), tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode,
_runtime_state.get()));
@@ -774,7 +778,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
PipelinePtr build_side_pipe = add_pipeline();
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorXPtr sink;
- sink.reset(new UnionSinkOperatorX(i, next_operator_id(), pool,
tnode, descs));
+ sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(),
pool, tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode,
_runtime_state.get()));
@@ -795,7 +799,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
- sink.reset(new SortSinkOperatorX(pool, next_operator_id(), tnode,
descs));
+ sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode,
descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
@@ -813,7 +817,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
- sink.reset(new PartitionSortSinkOperatorX(pool, next_operator_id(),
tnode, descs));
+ sink.reset(new PartitionSortSinkOperatorX(pool,
next_sink_operator_id(), tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
@@ -831,7 +835,7 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
_dag[downstream_pipeline_id].push_back(cur_pipe->id());
DataSinkOperatorXPtr sink;
- sink.reset(new AnalyticSinkOperatorX(pool, next_operator_id(), tnode,
descs));
+ sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(),
tnode, descs));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
@@ -912,11 +916,11 @@ Status
PipelineXFragmentContext::_build_operators_for_set_operation_node(
DataSinkOperatorXPtr sink;
if (child_id == 0) {
- sink.reset(new SetSinkOperatorX<is_intersect>(child_id,
next_operator_id(), pool, tnode,
- descs));
+ sink.reset(new SetSinkOperatorX<is_intersect>(child_id,
next_sink_operator_id(), pool,
+ tnode, descs));
} else {
- sink.reset(new SetProbeSinkOperatorX<is_intersect>(child_id,
next_operator_id(), pool,
- tnode, descs));
+ sink.reset(new SetProbeSinkOperatorX<is_intersect>(child_id,
next_sink_operator_id(),
+ pool, tnode,
descs));
}
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 23ff08fcb0a..f579265ab63 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -104,7 +104,7 @@ public:
RuntimeState* get_runtime_state(UniqueId fragment_instance_id) override {
std::lock_guard<std::mutex> l(_state_map_lock);
- if (_instance_id_to_runtime_state.count(fragment_instance_id) > 0) {
+ if (_instance_id_to_runtime_state.contains(fragment_instance_id)) {
return _instance_id_to_runtime_state[fragment_instance_id];
} else {
return _runtime_state.get();
@@ -115,6 +115,10 @@ public:
[[nodiscard]] int max_operator_id() const { return _operator_id; }
+ [[nodiscard]] int next_sink_operator_id() { return _sink_operator_id++; }
+
+ [[nodiscard]] int max_sink_operator_id() const { return _sink_operator_id;
}
+
std::string debug_string() override;
private:
@@ -203,11 +207,9 @@ private:
std::map<UniqueId, RuntimeState*> _instance_id_to_runtime_state;
std::mutex _state_map_lock;
- // We can guarantee that a plan node ID can correspond to an operator ID,
- // but some operators do not have a corresponding plan node ID.
- // We set these IDs as negative numbers, which are not visible to the user.
- int _operator_id = 0;
+ int _operator_id = 0;
+ int _sink_operator_id = 0;
std::map<PipelineId, std::shared_ptr<LocalExchangeSharedState>>
_op_id_to_le_state;
};
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index aaf9c2a16f1..c96f34b213b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -83,19 +83,15 @@ Status PipelineXTask::prepare(RuntimeState* state, const
TPipelineInstanceParams
std::vector<TScanRangeParams> no_scan_ranges;
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges,
_operators.front()->node_id(),
no_scan_ranges);
-
+ auto* parent_profile = _parent_profile;
for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
- auto& deps =
get_upstream_dependency(_operators[op_idx]->operator_id());
- LocalStateInfo info {
- op_idx == _operators.size() - 1
- ? _parent_profile
- : state->get_local_state(_operators[op_idx +
1]->operator_id())->profile(),
- scan_ranges,
- deps,
- _local_exchange_state,
- _task_idx,
- _source_dependency[_operators[op_idx]->operator_id()]};
- RETURN_IF_ERROR(_operators[op_idx]->setup_local_state(state, info));
+ auto& op = _operators[op_idx];
+ auto& deps = get_upstream_dependency(op->operator_id());
+ LocalStateInfo info {parent_profile, scan_ranges,
+ deps, _local_exchange_state,
+ _task_idx,
_source_dependency[op->operator_id()]};
+ RETURN_IF_ERROR(op->setup_local_state(state, info));
+ parent_profile = state->get_local_state(op->operator_id())->profile();
}
_block = doris::vectorized::Block::create_unique();
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index daf3e1bc75e..a8027d0d61b 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -23,6 +23,7 @@
#include <fmt/format.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
+#include <glog/logging.h>
#include <string>
@@ -424,13 +425,15 @@ int64_t RuntimeState::get_load_mem_limit() {
}
}
-void RuntimeState::resize_op_id_to_local_state(int size) {
- _op_id_to_local_state.resize(size);
- _op_id_to_sink_local_state.resize(size);
+void RuntimeState::resize_op_id_to_local_state(int operator_size, int
sink_size) {
+ _op_id_to_local_state.resize(operator_size);
+ _op_id_to_sink_local_state.resize(sink_size);
}
void RuntimeState::emplace_local_state(
int id, std::unique_ptr<doris::pipeline::PipelineXLocalStateBase>
state) {
+ DCHECK(id < _op_id_to_local_state.size());
+ DCHECK(!_op_id_to_local_state[id]);
_op_id_to_local_state[id] = std::move(state);
}
@@ -451,6 +454,8 @@ Result<RuntimeState::LocalState*>
RuntimeState::get_local_state_result(int id) {
void RuntimeState::emplace_sink_local_state(
int id, std::unique_ptr<doris::pipeline::PipelineXSinkLocalStateBase>
state) {
+ DCHECK(id < _op_id_to_sink_local_state.size());
+ DCHECK(!_op_id_to_sink_local_state[id]);
_op_id_to_sink_local_state[id] = std::move(state);
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 835bd582894..8c7b3bed9fc 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -500,7 +500,7 @@ public:
Result<SinkLocalState*> get_sink_local_state_result(int id);
- void resize_op_id_to_local_state(int size);
+ void resize_op_id_to_local_state(int operator_size, int sink_size);
private:
Status create_error_log_file();
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 8edde60adb4..422dc2efef4 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -172,11 +172,5 @@ std::unique_ptr<Block>
AsyncResultWriter::_get_free_block(doris::vectorized::Blo
return b;
}
-pipeline::Dependency*
AsyncResultWriter::write_blocked_by(pipeline::PipelineXTask* task) {
- std::lock_guard l(_m);
- DCHECK(_dependency != nullptr);
- return _dependency->is_blocked_by(task);
-}
-
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/sink/writer/async_result_writer.h
b/be/src/vec/sink/writer/async_result_writer.h
index 0a217b34e6b..e91ff1a0701 100644
--- a/be/src/vec/sink/writer/async_result_writer.h
+++ b/be/src/vec/sink/writer/async_result_writer.h
@@ -78,8 +78,6 @@ public:
return _data_queue_is_available() || _is_finished();
}
- pipeline::Dependency* write_blocked_by(pipeline::PipelineXTask* task);
-
[[nodiscard]] bool is_pending_finish() const { return
!_writer_thread_closed; }
void process_block(RuntimeState* state, RuntimeProfile* profile);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]