This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d809bee46e3 [refactor](profilev2) add BlocksProduced RowsProduced
counter #27291
d809bee46e3 is described below
commit d809bee46e318c9cab7becdc0426e0cdce3e511d
Author: Mryange <[email protected]>
AuthorDate: Tue Nov 21 12:01:11 2023 +0800
[refactor](profilev2) add BlocksProduced RowsProduced counter #27291
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 2 ++
be/src/pipeline/exec/exchange_sink_operator.h | 2 ++
be/src/pipeline/exec/result_sink_operator.cpp | 5 ++++-
be/src/pipeline/exec/result_sink_operator.h | 4 ++++
4 files changed, 12 insertions(+), 1 deletion(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 10cc44b80ba..42ab71a4547 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -114,6 +114,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_split_block_distribute_by_channel_timer =
ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime");
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced",
TUnit::UNIT, 1);
+ _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced",
TUnit::UNIT, 1);
_overall_throughput = _profile->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second,
_bytes_sent_counter,
@@ -309,6 +310,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
SourceState source_state) {
auto& local_state = get_local_state(state);
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)block->rows());
+ COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows());
SCOPED_TIMER(local_state.exec_time_counter());
local_state._peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
bool all_receiver_eof = true;
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 1c7b6cbdaad..bc9c26e36ea 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -146,6 +146,7 @@ public:
RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
RuntimeProfile::Counter* blocks_sent_counter() { return
_blocks_sent_counter; }
+ RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; }
RuntimeProfile::Counter* local_send_timer() { return _local_send_timer; }
RuntimeProfile::Counter* local_bytes_send_counter() { return
_local_bytes_send_counter; }
RuntimeProfile::Counter* local_sent_rows() { return _local_sent_rows; }
@@ -192,6 +193,7 @@ private:
RuntimeProfile::Counter* _split_block_hash_compute_timer = nullptr;
RuntimeProfile::Counter* _split_block_distribute_by_channel_timer =
nullptr;
RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
+ RuntimeProfile::Counter* _rows_sent_counter = nullptr;
// Throughput per total time spent in sender
RuntimeProfile::Counter* _overall_throughput = nullptr;
// Used to counter send bytes under local data exchange
diff --git a/be/src/pipeline/exec/result_sink_operator.cpp
b/be/src/pipeline/exec/result_sink_operator.cpp
index 321642bdba0..e9a51d56ad1 100644
--- a/be/src/pipeline/exec/result_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_sink_operator.cpp
@@ -64,7 +64,8 @@ Status ResultSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info)
state->execution_timeout()));
_result_sink_dependency =
ResultSinkDependency::create_shared(_parent->operator_id(),
_parent->node_id());
-
+ _blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced",
TUnit::UNIT, 1);
+ _rows_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "RowsProduced",
TUnit::UNIT, 1);
((PipBufferControlBlock*)_sender.get())->set_dependency(_result_sink_dependency);
return Status::OK();
}
@@ -131,6 +132,8 @@ Status ResultSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block,
SourceState source_state) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
+ COUNTER_UPDATE(local_state.rows_sent_counter(), (int64_t)block->rows());
+ COUNTER_UPDATE(local_state.blocks_sent_counter(), 1);
if (_fetch_option.use_two_phase_fetch && block->rows() > 0) {
RETURN_IF_ERROR(_second_phase_fetch_data(state, block));
}
diff --git a/be/src/pipeline/exec/result_sink_operator.h
b/be/src/pipeline/exec/result_sink_operator.h
index 9bda54e79b6..93ce397b840 100644
--- a/be/src/pipeline/exec/result_sink_operator.h
+++ b/be/src/pipeline/exec/result_sink_operator.h
@@ -62,6 +62,8 @@ public:
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
WriteDependency* dependency() override { return
_result_sink_dependency.get(); }
+ RuntimeProfile::Counter* blocks_sent_counter() { return
_blocks_sent_counter; }
+ RuntimeProfile::Counter* rows_sent_counter() { return _rows_sent_counter; }
private:
friend class ResultSinkOperatorX;
@@ -71,6 +73,8 @@ private:
std::shared_ptr<BufferControlBlock> _sender;
std::shared_ptr<ResultWriter> _writer;
std::shared_ptr<ResultSinkDependency> _result_sink_dependency;
+ RuntimeProfile::Counter* _blocks_sent_counter = nullptr;
+ RuntimeProfile::Counter* _rows_sent_counter = nullptr;
};
class ResultSinkOperatorX final : public
DataSinkOperatorX<ResultSinkLocalState> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]