This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new d9f502693e7 [feature](AuditLog) add scanRows scanBytes in auditlog #
25435 (#26268)
d9f502693e7 is described below
commit d9f502693e7c0caa4c888088cfa14927893ec119
Author: Mryange <[email protected]>
AuthorDate: Thu Nov 2 14:19:10 2023 +0800
[feature](AuditLog) add scanRows scanBytes in auditlog # 25435 (#26268)
---
be/src/exec/exec_node.cpp | 8 +++++++
be/src/exec/exec_node.h | 2 ++
be/src/pipeline/exec/exchange_sink_buffer.cpp | 8 +++++++
be/src/pipeline/exec/exchange_sink_buffer.h | 5 ++++
be/src/pipeline/exec/exchange_sink_operator.cpp | 1 +
be/src/pipeline/exec/operator.h | 21 ++++++++++++++++
be/src/pipeline/pipeline.h | 11 +++++++++
be/src/pipeline/pipeline_fragment_context.cpp | 2 ++
be/src/pipeline/pipeline_task.cpp | 27 +++++++++++++++++++++
be/src/pipeline/pipeline_task.h | 4 ++++
be/src/runtime/query_statistics.cpp | 32 +++++++++++++------------
be/src/runtime/query_statistics.h | 20 ++++++++++++----
be/src/vec/exec/scan/new_olap_scan_node.cpp | 9 +++++--
be/src/vec/exec/scan/new_olap_scan_node.h | 1 +
be/src/vec/exec/scan/new_olap_scanner.cpp | 3 ++-
be/src/vec/exec/scan/vscan_node.cpp | 3 ++-
be/src/vec/exec/scan/vscan_node.h | 1 +
be/src/vec/exec/scan/vscanner.cpp | 9 +++++--
be/src/vec/exec/scan/vscanner.h | 2 ++
be/src/vec/exec/vexchange_node.cpp | 6 ++++-
be/src/vec/exec/vexchange_node.h | 1 +
be/src/vec/runtime/vdata_stream_recvr.cpp | 6 +++++
be/src/vec/runtime/vdata_stream_recvr.h | 3 +++
be/src/vec/sink/vdata_stream_sender.cpp | 6 +++--
be/src/vec/sink/vdata_stream_sender.h | 3 +++
25 files changed, 166 insertions(+), 28 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 56902e38239..0dc8df911b2 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -180,6 +180,14 @@ Status ExecNode::collect_query_statistics(QueryStatistics*
statistics) {
return Status::OK();
}
+Status ExecNode::collect_query_statistics(QueryStatistics* statistics, int
sender_id) {
+ DCHECK(statistics != nullptr);
+ for (auto child_node : _children) {
+ RETURN_IF_ERROR(child_node->collect_query_statistics(statistics,
sender_id));
+ }
+ return Status::OK();
+}
+
void ExecNode::release_resource(doris::RuntimeState* state) {
if (!_is_resource_released) {
if (_rows_returned_counter != nullptr) {
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index ae7b40dc20e..d92f884204b 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -162,6 +162,8 @@ public:
// error.
[[nodiscard]] virtual Status collect_query_statistics(QueryStatistics*
statistics);
+ [[nodiscard]] virtual Status collect_query_statistics(QueryStatistics*
statistics,
+ int sender_id);
// close() will get called for every exec node, regardless of what else is
called and
// the status of these calls (i.e. prepare() may never have been called, or
// prepare()/open()/get_next() returned with an error).
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 696e0643880..68d899e0c48 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -190,6 +190,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
auto& brpc_request = _instance_to_request[id];
brpc_request->set_eos(request.eos);
brpc_request->set_packet_seq(_instance_to_seq[id]++);
+ if (_statistics && _statistics->collected()) {
+ auto statistic = brpc_request->mutable_query_statistics();
+ _statistics->to_pb(statistic);
+ }
if (request.block) {
brpc_request->set_allocated_block(request.block.get());
}
@@ -244,6 +248,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
if (request.block_holder->get_block()) {
brpc_request->set_allocated_block(request.block_holder->get_block());
}
+ if (_statistics && _statistics->collected()) {
+ auto statistic = brpc_request->mutable_query_statistics();
+ _statistics->to_pb(statistic);
+ }
auto* closure = request.channel->get_closure(id, request.eos,
request.block_holder);
ExchangeRpcContext rpc_ctx;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index c4636563108..7e30620cae8 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -33,6 +33,7 @@
#include "common/global_types.h"
#include "common/status.h"
+#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
@@ -173,6 +174,8 @@ public:
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t
receive_rpc_time);
void update_profile(RuntimeProfile* profile);
+ void set_query_statistics(QueryStatistics* statistics) { _statistics =
statistics; }
+
private:
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
_instance_to_package_queue_mutex;
@@ -211,6 +214,8 @@ private:
inline bool _is_receiver_eof(InstanceLoId id);
void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
int64_t get_sum_rpc_time();
+
+ QueryStatistics* _statistics = nullptr;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index dd0a35ce959..79517e8b041 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -67,6 +67,7 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, _dest_node_id,
_sink->_sender_id,
_state->be_number(),
_context);
+ _sink_buffer->set_query_statistics(_sink->query_statistics());
RETURN_IF_ERROR(DataSinkOperator::prepare(state));
_sink->registe_channels(_sink_buffer.get());
return Status::OK();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index d98b4f16bd4..4a68a95e366 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -163,6 +163,14 @@ public:
bool is_source() const;
+ virtual Status collect_query_statistics(QueryStatistics* statistics) {
return Status::OK(); };
+
+ virtual Status collect_query_statistics(QueryStatistics* statistics, int
sender_id) {
+ return Status::OK();
+ };
+
+ virtual void set_query_statistics(std::shared_ptr<QueryStatistics>) {};
+
virtual Status init(const TDataSink& tsink) { return Status::OK(); }
// Prepare for running. (e.g. resource allocation, etc.)
@@ -299,6 +307,9 @@ public:
Status finalize(RuntimeState* state) override { return Status::OK(); }
[[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
return _sink->profile(); }
+ void set_query_statistics(std::shared_ptr<QueryStatistics> statistics)
override {
+ _sink->set_query_statistics(statistics);
+ }
protected:
NodeType* _sink;
@@ -369,6 +380,16 @@ public:
return _node->runtime_profile();
}
+ Status collect_query_statistics(QueryStatistics* statistics) override {
+ RETURN_IF_ERROR(_node->collect_query_statistics(statistics));
+ return Status::OK();
+ }
+
+ Status collect_query_statistics(QueryStatistics* statistics, int
sender_id) override {
+ RETURN_IF_ERROR(_node->collect_query_statistics(statistics,
sender_id));
+ return Status::OK();
+ }
+
protected:
NodeType* _node;
bool _use_projection;
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 759e4fce2c9..056c331dd0c 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -102,6 +102,15 @@ public:
RuntimeProfile* pipeline_profile() { return _pipeline_profile.get(); }
+ void set_is_root_pipeline() { _is_root_pipeline = true; }
+ bool is_root_pipeline() const { return _is_root_pipeline; }
+ void set_collect_query_statistics_with_every_batch() {
+ _collect_query_statistics_with_every_batch = true;
+ }
+ [[nodiscard]] bool collect_query_statistics_with_every_batch() const {
+ return _collect_query_statistics_with_every_batch;
+ }
+
private:
void _init_profile();
@@ -145,6 +154,8 @@ private:
*/
bool _always_can_read = false;
bool _always_can_write = false;
+ bool _is_root_pipeline = false;
+ bool _collect_query_statistics_with_every_batch = false;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 87578fbc665..0ce125f1891 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -318,6 +318,8 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
}
_root_pipeline = fragment_context->add_pipeline();
+ _root_pipeline->set_is_root_pipeline();
+ _root_pipeline->set_collect_query_statistics_with_every_batch();
RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline));
if (_sink) {
RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id,
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 2b428ac5f14..8adc2fd0783 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -29,6 +29,7 @@
#include "pipeline_fragment_context.h"
#include "runtime/descriptors.h"
#include "runtime/query_context.h"
+#include "runtime/query_statistics.h"
#include "runtime/thread_context.h"
#include "task_queue.h"
#include "util/defer_op.h"
@@ -60,6 +61,10 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t
index, RuntimeState*
_fragment_context(fragment_context),
_parent_profile(parent_profile) {
_pipeline_task_watcher.start();
+ _query_statistics.reset(new QueryStatistics());
+ _sink->set_query_statistics(_query_statistics);
+ _collect_query_statistics_with_every_batch =
+ _pipeline->collect_query_statistics_with_every_batch();
}
void PipelineTask::_fresh_profile_counter() {
@@ -256,6 +261,10 @@ Status PipelineTask::execute(bool* eos) {
*eos = _data_state == SourceState::FINISHED;
if (_block->rows() != 0 || *eos) {
SCOPED_TIMER(_sink_timer);
+ if (_data_state == SourceState::FINISHED ||
+ _collect_query_statistics_with_every_batch) {
+ RETURN_IF_ERROR(_collect_query_statistics());
+ }
auto status = _sink->sink(_state, block, _data_state);
if (!status.is<ErrorCode::END_OF_FILE>()) {
RETURN_IF_ERROR(status);
@@ -282,6 +291,24 @@ Status PipelineTask::finalize() {
return _sink->finalize(_state);
}
+Status PipelineTask::_collect_query_statistics() {
+ // The execnode tree of a fragment will be split into multiple pipelines,
we only need to collect the root pipeline.
+ if (_pipeline->is_root_pipeline()) {
+ // If the current fragment has only one instance, we can collect all
of them;
+ // otherwise, we need to collect them based on the sender_id.
+ if (_state->num_per_fragment_instances() == 1) {
+ _query_statistics->clear();
+
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get()));
+ } else {
+ _query_statistics->clear();
+
RETURN_IF_ERROR(_root->collect_query_statistics(_query_statistics.get(),
+
_state->per_fragment_instance_idx()));
+ }
+ }
+ return Status::OK();
+}
+
+
Status PipelineTask::try_close() {
if (_try_close_flag) {
return Status::OK();
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 696b335f0e1..9fdf3c82e05 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -341,5 +341,9 @@ private:
int64_t _close_pipeline_time = 0;
RuntimeProfile::Counter* _pip_task_total_timer;
+
+ std::shared_ptr<QueryStatistics> _query_statistics;
+ Status _collect_query_statistics();
+ bool _collect_query_statistics_with_every_batch = false;
};
} // namespace doris::pipeline
diff --git a/be/src/runtime/query_statistics.cpp
b/be/src/runtime/query_statistics.cpp
index a6754215ace..22c18faa1e6 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -20,6 +20,8 @@
#include <gen_cpp/data.pb.h>
#include <glog/logging.h>
+#include <memory>
+
namespace doris {
void NodeStatistics::merge(const NodeStatistics& other) {
@@ -85,6 +87,13 @@ void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
recvr->merge(this);
}
+void QueryStatistics::merge(QueryStatisticsRecvr* recvr, int sender_id) {
+ auto it = recvr->_query_statistics.find(sender_id);
+ if (it != recvr->_query_statistics.end()) {
+ merge(*it->second);
+ }
+}
+
void QueryStatistics::clearNodeStatistics() {
for (auto& pair : _nodes_statistics_map) {
delete pair.second;
@@ -98,24 +107,17 @@ QueryStatistics::~QueryStatistics() {
void QueryStatisticsRecvr::insert(const PQueryStatistics& statistics, int
sender_id) {
std::lock_guard<SpinLock> l(_lock);
- QueryStatistics* query_statistics = nullptr;
- auto iter = _query_statistics.find(sender_id);
- if (iter == _query_statistics.end()) {
- query_statistics = new QueryStatistics;
- _query_statistics[sender_id] = query_statistics;
- } else {
- query_statistics = iter->second;
+ if (!_query_statistics.contains(sender_id)) {
+ _query_statistics[sender_id] = std::make_shared<QueryStatistics>();
}
- query_statistics->from_pb(statistics);
+ _query_statistics[sender_id]->from_pb(statistics);
}
-QueryStatisticsRecvr::~QueryStatisticsRecvr() {
- // It is unnecessary to lock here, because the destructor will be
- // called alter DataStreamRecvr's close in ExchangeNode.
- for (auto& pair : _query_statistics) {
- delete pair.second;
- }
- _query_statistics.clear();
+void QueryStatisticsRecvr::insert(QueryStatisticsPtr statistics, int
sender_id) {
+ if (!statistics->collected()) return;
+ if (_query_statistics.contains(sender_id)) return;
+ std::lock_guard<SpinLock> l(_lock);
+ _query_statistics[sender_id] = statistics;
}
} // namespace doris
diff --git a/be/src/runtime/query_statistics.h
b/be/src/runtime/query_statistics.h
index c4ace8f23e2..42c1457472f 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -20,6 +20,7 @@
#include <stdint.h>
#include <map>
+#include <memory>
#include <mutex>
#include <unordered_map>
#include <utility>
@@ -88,6 +89,7 @@ public:
void merge(QueryStatisticsRecvr* recvr);
+ void merge(QueryStatisticsRecvr* recvr, int sender_id);
// Get the maximum value from the peak memory collected by all node
statistics
int64_t calculate_max_peak_memory_bytes();
@@ -100,13 +102,18 @@ public:
returned_rows = 0;
max_peak_memory_bytes = 0;
clearNodeStatistics();
+ //clear() is used before collection, so calling "clear" is equivalent
to being collected.
+ set_collected();
}
void to_pb(PQueryStatistics* statistics);
void from_pb(const PQueryStatistics& statistics);
+ bool collected() const { return _collected; }
+ void set_collected() { _collected = true; }
private:
+ friend class QueryStatisticsRecvr;
int64_t scan_rows;
int64_t scan_bytes;
int64_t cpu_ms;
@@ -117,17 +124,22 @@ private:
// only set once by result sink when closing.
int64_t max_peak_memory_bytes;
// The statistics of the query on each backend.
- typedef std::unordered_map<int64_t, NodeStatistics*> NodeStatisticsMap;
+ using NodeStatisticsMap = std::unordered_map<int64_t, NodeStatistics*>;
NodeStatisticsMap _nodes_statistics_map;
+ bool _collected = false;
};
-
+using QueryStatisticsPtr = std::shared_ptr<QueryStatistics>;
// It is used for collecting sub plan query statistics in DataStreamRecvr.
class QueryStatisticsRecvr {
public:
- ~QueryStatisticsRecvr();
+ ~QueryStatisticsRecvr() = default;
+ // Transmitted via RPC, incurring serialization overhead.
void insert(const PQueryStatistics& statistics, int sender_id);
+ // using local_exchange for transmission, only need to hold a shared
pointer.
+ void insert(QueryStatisticsPtr statistics, int sender_id);
+
private:
friend class QueryStatistics;
@@ -138,7 +150,7 @@ private:
}
}
- std::map<int, QueryStatistics*> _query_statistics;
+ std::map<int, QueryStatisticsPtr> _query_statistics;
SpinLock _lock;
};
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 5aa179d3006..28035ce291d 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -82,13 +82,18 @@ NewOlapScanNode::NewOlapScanNode(ObjectPool* pool, const
TPlanNode& tnode,
Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics) {
RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
if (!_is_pipeline_scan || _should_create_scanner) {
- statistics->add_scan_bytes(_read_compressed_counter->value());
- statistics->add_scan_rows(_raw_rows_counter->value());
+ statistics->add_scan_bytes(_byte_read_counter->value());
+ statistics->add_scan_rows(_rows_read_counter->value());
statistics->add_cpu_ms(_scan_cpu_timer->value() / NANOS_PER_MILLIS);
}
return Status::OK();
}
+Status NewOlapScanNode::collect_query_statistics(QueryStatistics* statistics,
int) {
+ RETURN_IF_ERROR(collect_query_statistics(statistics));
+ return Status::OK();
+}
+
Status NewOlapScanNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VScanNode::prepare(state));
// if you want to add some profile in scan node, even it have not new
VScanner object
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h
b/be/src/vec/exec/scan/new_olap_scan_node.h
index 0725c37cf5e..cbbf3072872 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -64,6 +64,7 @@ public:
Status prepare(RuntimeState* state) override;
Status collect_query_statistics(QueryStatistics* statistics) override;
+ Status collect_query_statistics(QueryStatistics* statistics, int
sender_id) override;
void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index ea4187e556a..03c14ddafe0 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -501,7 +501,8 @@ void NewOlapScanner::_update_realtime_counters() {
}
void NewOlapScanner::_update_counters_before_close() {
- if (!_state->enable_profile() || _has_updated_counter) {
+ // Please don't directly enable the profile here, we need to set
QueryStatistics using the counter inside.
+ if (_has_updated_counter) {
return;
}
_has_updated_counter = true;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index e0c954042d1..c79faf6d6fa 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -274,7 +274,8 @@ Status VScanNode::get_next(RuntimeState* state,
vectorized::Block* block, bool*
Status VScanNode::_init_profile() {
// 1. counters for scan node
- _rows_read_counter = ADD_COUNTER(_runtime_profile, "RowsRead",
TUnit::UNIT);
+ _rows_read_counter = ADD_COUNTER(_runtime_profile, "ScanRowsRead",
TUnit::UNIT);
+ _byte_read_counter = ADD_COUNTER(_runtime_profile, "ScanByteRead",
TUnit::BYTES);
_total_throughput_counter =
runtime_profile()->add_rate_counter("TotalReadThroughput",
_rows_read_counter);
_num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT);
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index 7600189a251..27c0667e8c6 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -317,6 +317,7 @@ protected:
// rows read from the scanner (including those discarded by (pre)filters)
RuntimeProfile::Counter* _rows_read_counter;
+ RuntimeProfile::Counter* _byte_read_counter;
// Wall based aggregate read throughput [rows/sec]
RuntimeProfile::Counter* _total_throughput_counter;
RuntimeProfile::Counter* _num_scanners;
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 19d5302286a..52cf88f260a 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -20,6 +20,7 @@
#include <glog/logging.h>
#include "common/config.h"
+#include "common/logging.h"
#include "runtime/descriptors.h"
#include "util/runtime_profile.h"
#include "vec/core/column_with_type_and_name.h"
@@ -79,6 +80,7 @@ Status VScanner::get_block(RuntimeState* state, Block* block,
bool* eof) {
break;
}
_num_rows_read += block->rows();
+ _num_byte_read += block->allocated_bytes();
}
// 2. Filter the output block finally.
@@ -159,9 +161,12 @@ Status VScanner::close(RuntimeState* state) {
}
void VScanner::_update_counters_before_close() {
- COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer);
+ if (_parent) {
+ COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer);
+ COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);
+ COUNTER_UPDATE(_parent->_byte_read_counter, _num_byte_read);
+ }
if (!_state->enable_profile() && !_is_load) return;
- COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);
// Update stats for load
_state->update_num_rows_load_filtered(_counter.num_rows_filtered);
_state->update_num_rows_load_unselected(_counter.num_rows_unselected);
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 06b35465491..2ee39979563 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -187,6 +187,8 @@ protected:
// num of rows read from scanner
int64_t _num_rows_read = 0;
+ int64_t _num_byte_read = 0;
+
// num of rows return from scanner, after filter block
int64_t _num_rows_return = 0;
diff --git a/be/src/vec/exec/vexchange_node.cpp
b/be/src/vec/exec/vexchange_node.cpp
index 3d9a50ded23..797ff590ca9 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -147,7 +147,11 @@ Status
VExchangeNode::collect_query_statistics(QueryStatistics* statistics) {
statistics->merge(_sub_plan_query_statistics_recvr.get());
return Status::OK();
}
-
+Status VExchangeNode::collect_query_statistics(QueryStatistics* statistics,
int sender_id) {
+ RETURN_IF_ERROR(ExecNode::collect_query_statistics(statistics));
+ statistics->merge(_sub_plan_query_statistics_recvr.get(), sender_id);
+ return Status::OK();
+}
Status VExchangeNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
diff --git a/be/src/vec/exec/vexchange_node.h b/be/src/vec/exec/vexchange_node.h
index c4f083dda48..94302e84d9b 100644
--- a/be/src/vec/exec/vexchange_node.h
+++ b/be/src/vec/exec/vexchange_node.h
@@ -56,6 +56,7 @@ public:
Status get_next(RuntimeState* state, Block* row_batch, bool* eos) override;
void release_resource(RuntimeState* state) override;
Status collect_query_statistics(QueryStatistics* statistics) override;
+ Status collect_query_statistics(QueryStatistics* statistics, int
sender_id) override;
Status close(RuntimeState* state) override;
void set_num_senders(int num_senders) { _num_senders = num_senders; }
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 9cbc76693b9..cb2b9184430 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -412,6 +412,12 @@ void VDataStreamRecvr::remove_sender(int sender_id, int
be_number) {
_sender_queues[use_sender_id]->decrement_senders(be_number);
}
+void VDataStreamRecvr::remove_sender(int sender_id, int be_number,
QueryStatisticsPtr statistics) {
+ int use_sender_id = _is_merging ? sender_id : 0;
+ _sender_queues[use_sender_id]->decrement_senders(be_number);
+ _sub_plan_query_statistics_recvr->insert(statistics, sender_id);
+}
+
void VDataStreamRecvr::cancel_stream() {
for (int i = 0; i < _sender_queues.size(); ++i) {
_sender_queues[i]->cancel();
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 5e88aa8eb43..37fd282e117 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -96,8 +96,11 @@ public:
// Indicate that a particular sender is done. Delegated to the appropriate
// sender queue. Called from DataStreamMgr.
+
void remove_sender(int sender_id, int be_number);
+ void remove_sender(int sender_id, int be_number, QueryStatisticsPtr
statistics);
+
void cancel_stream();
void close();
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index ad473fcd775..42ceb04aea7 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -129,7 +129,8 @@ Status Channel::send_local_block(bool eos) {
COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
_local_recvr->add_block(&block, _parent->_sender_id, true);
if (eos) {
- _local_recvr->remove_sender(_parent->_sender_id, _be_number);
+ _local_recvr->remove_sender(_parent->_sender_id, _be_number,
+ _parent->query_statisticsPtr());
}
return Status::OK();
} else {
@@ -271,7 +272,8 @@ Status Channel::close_internal() {
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
if (is_local()) {
if (_recvr_is_valid()) {
- _local_recvr->remove_sender(_parent->_sender_id, _be_number);
+ _local_recvr->remove_sender(_parent->_sender_id, _be_number,
+ _parent->query_statisticsPtr());
}
} else {
status = send_block((PBlock*)nullptr, true);
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 07f95c48e38..1b207d824c3 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -106,6 +106,9 @@ public:
const RowDescriptor& row_desc() { return _row_desc; }
+ QueryStatistics* query_statistics() { return _query_statistics.get(); }
+ QueryStatisticsPtr query_statisticsPtr() { return _query_statistics; }
+
protected:
friend class Channel;
friend class PipChannel;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]