This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 5d3f0a267a8 [opt](scan) unify the local and remote scan bytes stats
for all scanners for 2.1 (#45167)
5d3f0a267a8 is described below
commit 5d3f0a267a89a5954312e17831832a20e6d0dd4d
Author: Mingyu Chen (Rayner) <[email protected]>
AuthorDate: Mon Dec 9 22:19:19 2024 -0800
[opt](scan) unify the local and remote scan bytes stats for all scanners
for 2.1 (#45167)
pick part of #40493
TODO: not working with s3 reader
---
.../schema_scanner/schema_backend_active_tasks.cpp | 4 ++-
be/src/io/cache/block/block_file_segment.cpp | 4 +--
be/src/io/cache/block/block_file_segment.h | 2 +-
.../io/cache/block/cached_remote_file_reader.cpp | 9 ++---
be/src/io/cache/block/cached_remote_file_reader.h | 5 +--
be/src/io/fs/broker_file_reader.cpp | 5 ++-
be/src/io/fs/broker_file_reader.h | 2 --
be/src/io/fs/file_reader.h | 2 +-
be/src/io/fs/hdfs_file_reader.cpp | 10 ++++--
be/src/io/fs/hdfs_file_reader.h | 1 -
be/src/io/fs/local_file_reader.cpp | 5 ++-
be/src/io/fs/s3_file_reader.cpp | 8 +++--
be/src/io/fs/s3_file_reader.h | 1 -
be/src/io/io_common.h | 42 ++++++++++++++++++++++
be/src/runtime/query_statistics.cpp | 8 +++++
be/src/runtime/query_statistics.h | 13 +++++++
be/src/runtime/runtime_query_statistics_mgr.cpp | 37 ++++++++++++-------
be/src/vec/exec/format/orc/vorc_reader.h | 1 -
be/src/vec/exec/scan/new_olap_scanner.cpp | 16 ++++++++-
be/src/vec/exec/scan/new_olap_scanner.h | 1 +
be/src/vec/exec/scan/vfile_scanner.cpp | 15 ++++++++
be/src/vec/exec/scan/vfile_scanner.h | 2 ++
be/src/vec/exec/scan/vscanner.cpp | 17 ++++-----
be/src/vec/exec/scan/vscanner.h | 11 +++++-
be/test/io/cache/file_block_cache_test.cpp | 14 ++++----
.../org/apache/doris/catalog/InternalSchema.java | 6 ++++
.../java/org/apache/doris/catalog/SchemaTable.java | 2 ++
.../java/org/apache/doris/plugin/AuditEvent.java | 14 ++++++++
.../org/apache/doris/plugin/audit/AuditLoader.java | 4 +++
.../java/org/apache/doris/qe/AuditLogHelper.java | 6 +++-
.../WorkloadRuntimeStatusMgr.java | 5 +++
gensrc/proto/data.proto | 2 ++
gensrc/thrift/FrontendService.thrift | 2 ++
33 files changed, 222 insertions(+), 54 deletions(-)
diff --git a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
index 74e95f42032..a67b2600d23 100644
--- a/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
+++ b/be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
@@ -34,6 +34,8 @@ std::vector<SchemaScanner::ColumnDesc>
SchemaBackendActiveTasksScanner::_s_tbls_
{"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
{"SCAN_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
{"SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
+ {"LOCAL_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
+ {"REMOTE_SCAN_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"BE_PEAK_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"CURRENT_USED_MEMORY_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
@@ -93,4 +95,4 @@ Status
SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Bloc
return Status::OK();
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/io/cache/block/block_file_segment.cpp
b/be/src/io/cache/block/block_file_segment.cpp
index 564ac9d776f..1d6d425593d 100644
--- a/be/src/io/cache/block/block_file_segment.cpp
+++ b/be/src/io/cache/block/block_file_segment.cpp
@@ -179,7 +179,7 @@ std::string FileBlock::get_path_in_local_cache() const {
return _cache->get_path_in_local_cache(key(), offset(), _cache_type);
}
-Status FileBlock::read_at(Slice buffer, size_t read_offset) {
+Status FileBlock::read_at(Slice buffer, size_t read_offset, const IOContext*
io_ctx) {
Status st = Status::OK();
std::shared_ptr<FileReader> reader;
if (!(reader = _cache_reader.lock())) {
@@ -192,7 +192,7 @@ Status FileBlock::read_at(Slice buffer, size_t read_offset)
{
}
}
size_t bytes_reads = buffer.size;
- RETURN_IF_ERROR(reader->read_at(read_offset, buffer, &bytes_reads));
+ RETURN_IF_ERROR(reader->read_at(read_offset, buffer, &bytes_reads,
io_ctx));
DCHECK(bytes_reads == buffer.size);
return st;
}
diff --git a/be/src/io/cache/block/block_file_segment.h
b/be/src/io/cache/block/block_file_segment.h
index 67f4fc17a0d..d1e341d42c6 100644
--- a/be/src/io/cache/block/block_file_segment.h
+++ b/be/src/io/cache/block/block_file_segment.h
@@ -110,7 +110,7 @@ public:
Status append(Slice data);
// read data from cache file
- Status read_at(Slice buffer, size_t read_offset);
+ Status read_at(Slice buffer, size_t read_offset, const IOContext* io_ctx);
// finish write, release the file writer
Status finalize_write();
diff --git a/be/src/io/cache/block/cached_remote_file_reader.cpp
b/be/src/io/cache/block/cached_remote_file_reader.cpp
index bbd7516dfaa..f8fda4b028e 100644
--- a/be/src/io/cache/block/cached_remote_file_reader.cpp
+++ b/be/src/io/cache/block/cached_remote_file_reader.cpp
@@ -112,7 +112,6 @@ Status CachedRemoteFileReader::_read_from_cache(size_t
offset, Slice result, siz
RETURN_IF_ERROR(_remote_file_reader->read_at(offset, result,
bytes_read, io_ctx));
DorisMetrics::instance()->s3_bytes_read_total->increment(*bytes_read);
if (io_ctx->file_cache_stats) {
- stats.bytes_read += bytes_req;
_update_state(stats, io_ctx->file_cache_stats);
}
return Status::OK();
@@ -142,7 +141,6 @@ Status CachedRemoteFileReader::_read_from_cache(size_t
offset, Slice result, siz
break;
}
}
- stats.bytes_read += bytes_req;
size_t empty_start = 0;
size_t empty_end = 0;
if (!empty_segments.empty()) {
@@ -224,8 +222,9 @@ Status CachedRemoteFileReader::_read_from_cache(size_t
offset, Slice result, siz
size_t file_offset = current_offset - left;
{
SCOPED_RAW_TIMER(&stats.local_read_timer);
- RETURN_IF_ERROR(segment->read_at(
- Slice(result.data + (current_offset - offset), read_size),
file_offset));
+ RETURN_IF_ERROR(
+ segment->read_at(Slice(result.data + (current_offset -
offset), read_size),
+ file_offset, io_ctx));
}
*bytes_read += read_size;
current_offset = right + 1;
@@ -280,10 +279,8 @@ void CachedRemoteFileReader::_update_state(const
ReadStatistics& read_stats,
}
if (read_stats.hit_cache) {
statis->num_local_io_total++;
- statis->bytes_read_from_local += read_stats.bytes_read;
} else {
statis->num_remote_io_total++;
- statis->bytes_read_from_remote += read_stats.bytes_read;
}
statis->remote_io_timer += read_stats.remote_read_timer;
statis->local_io_timer += read_stats.local_read_timer;
diff --git a/be/src/io/cache/block/cached_remote_file_reader.h
b/be/src/io/cache/block/cached_remote_file_reader.h
index e4e280d95f6..af33e5d6f15 100644
--- a/be/src/io/cache/block/cached_remote_file_reader.h
+++ b/be/src/io/cache/block/cached_remote_file_reader.h
@@ -66,10 +66,11 @@ private:
IFileCache::Key _cache_key;
CloudFileCachePtr _cache;
+ // Used to record read/write timer and cache related metrics.
+ // These metrics will finally be saved in FileCacheStatistics.
struct ReadStatistics {
bool hit_cache = true;
bool skip_cache = false;
- int64_t bytes_read = 0;
int64_t bytes_write_into_file_cache = 0;
int64_t remote_read_timer = 0;
int64_t local_read_timer = 0;
@@ -82,4 +83,4 @@ private:
};
} // namespace io
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/io/fs/broker_file_reader.cpp
b/be/src/io/fs/broker_file_reader.cpp
index 4d370cfb4d8..bb0eac47e38 100644
--- a/be/src/io/fs/broker_file_reader.cpp
+++ b/be/src/io/fs/broker_file_reader.cpp
@@ -62,7 +62,7 @@ Status BrokerFileReader::close() {
}
Status BrokerFileReader::read_at_impl(size_t offset, Slice result, size_t*
bytes_read,
- const IOContext* /*io_ctx*/) {
+ const IOContext* io_ctx) {
DCHECK(!closed());
size_t bytes_req = result.size;
char* to = result.data;
@@ -76,6 +76,9 @@ Status BrokerFileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes
*bytes_read = data.size();
memcpy(to, data.data(), *bytes_read);
+ if (io_ctx && io_ctx->file_cache_stats) {
+ io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
+ }
return Status::OK();
}
diff --git a/be/src/io/fs/broker_file_reader.h
b/be/src/io/fs/broker_file_reader.h
index 7acdcbcc0d5..43a46c62331 100644
--- a/be/src/io/fs/broker_file_reader.h
+++ b/be/src/io/fs/broker_file_reader.h
@@ -34,8 +34,6 @@
namespace doris::io {
-struct IOContext;
-
class BrokerFileReader : public FileReader {
public:
BrokerFileReader(const TNetworkAddress& broker_addr, Path path, size_t
file_size, TBrokerFD fd,
diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h
index 03828ef28dd..b41df4426ce 100644
--- a/be/src/io/fs/file_reader.h
+++ b/be/src/io/fs/file_reader.h
@@ -24,6 +24,7 @@
#include "common/status.h"
#include "io/fs/path.h"
+#include "io/io_common.h"
#include "util/profile_collector.h"
#include "util/slice.h"
@@ -32,7 +33,6 @@ namespace doris {
namespace io {
class FileSystem;
-struct IOContext;
enum class FileCachePolicy : uint8_t {
NO_CACHE,
diff --git a/be/src/io/fs/hdfs_file_reader.cpp
b/be/src/io/fs/hdfs_file_reader.cpp
index 263276768bc..ac75e2e722b 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -84,7 +84,7 @@ Status HdfsFileReader::close() {
#ifdef USE_HADOOP_HDFS
Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t*
bytes_read,
- const IOContext* /*io_ctx*/) {
+ const IOContext* io_ctx) {
DCHECK(!closed());
if (offset > _handle->file_size()) {
return Status::IOError("offset exceeds file size(offset: {}, file
size: {}, path: {})",
@@ -121,6 +121,9 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_r
has_read += loop_read;
}
*bytes_read = has_read;
+ if (io_ctx && io_ctx->file_cache_stats) {
+ io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
+ }
return Status::OK();
}
@@ -128,7 +131,7 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_r
// The hedged read only support hdfsPread().
// TODO: rethink here to see if there are some difference between hdfsPread()
and hdfsRead()
Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t*
bytes_read,
- const IOContext* /*io_ctx*/) {
+ const IOContext* io_ctx) {
DCHECK(!closed());
if (offset > _handle->file_size()) {
return Status::IOError("offset exceeds file size(offset: {}, file
size: {}, path: {})",
@@ -177,6 +180,9 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_r
has_read += loop_read;
}
*bytes_read = has_read;
+ if (io_ctx && io_ctx->file_cache_stats) {
+ io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
+ }
return Status::OK();
}
#endif
diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h
index 6204859e600..0f4a3f14019 100644
--- a/be/src/io/fs/hdfs_file_reader.h
+++ b/be/src/io/fs/hdfs_file_reader.h
@@ -34,7 +34,6 @@
namespace doris {
namespace io {
-struct IOContext;
class HdfsFileReader : public FileReader {
public:
diff --git a/be/src/io/fs/local_file_reader.cpp
b/be/src/io/fs/local_file_reader.cpp
index 93953eeddd9..c7abf2ad047 100644
--- a/be/src/io/fs/local_file_reader.cpp
+++ b/be/src/io/fs/local_file_reader.cpp
@@ -118,7 +118,7 @@ Status LocalFileReader::close() {
}
Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t*
bytes_read,
- const IOContext* /*io_ctx*/) {
+ const IOContext* io_ctx) {
DCHECK(!closed());
if (offset > _file_size) {
return Status::InternalError(
@@ -148,6 +148,9 @@ Status LocalFileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_
*bytes_read += res;
}
}
+ if (io_ctx && io_ctx->file_cache_stats) {
+ io_ctx->file_cache_stats->bytes_read_from_local += *bytes_read;
+ }
DorisMetrics::instance()->local_bytes_read_total->increment(*bytes_read);
return Status::OK();
}
diff --git a/be/src/io/fs/s3_file_reader.cpp b/be/src/io/fs/s3_file_reader.cpp
index 005257c1312..2d97252319d 100644
--- a/be/src/io/fs/s3_file_reader.cpp
+++ b/be/src/io/fs/s3_file_reader.cpp
@@ -42,7 +42,6 @@
namespace doris {
namespace io {
-struct IOContext;
bvar::Adder<uint64_t> s3_file_reader_read_counter("s3_file_reader", "read_at");
bvar::Adder<uint64_t> s3_file_reader_total("s3_file_reader", "total_num");
bvar::Adder<uint64_t> s3_bytes_read_total("s3_file_reader", "bytes_read");
@@ -86,7 +85,7 @@ Status S3FileReader::close() {
}
Status S3FileReader::read_at_impl(size_t offset, Slice result, size_t*
bytes_read,
- const IOContext* /*io_ctx*/) {
+ const IOContext* io_ctx) {
DCHECK(!closed());
if (offset > _file_size) {
return Status::InternalError(
@@ -154,6 +153,11 @@ Status S3FileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_rea
LOG(INFO) << fmt::format("read s3 file {} succeed after {} times
with {} ms sleeping",
_path.native(), retry_count,
total_sleep_time);
}
+ // ATTN: Do not open it, may casuing stack-use-after-scope.
+ // Will be refactored in future
+ // if (io_ctx && io_ctx->file_cache_stats) {
+ // io_ctx->file_cache_stats->bytes_read_from_remote += bytes_req;
+ // }
return Status::OK();
}
return Status::InternalError("failed to read from s3, exceeded maximum
retries");
diff --git a/be/src/io/fs/s3_file_reader.h b/be/src/io/fs/s3_file_reader.h
index c9e9656fe58..50a19712674 100644
--- a/be/src/io/fs/s3_file_reader.h
+++ b/be/src/io/fs/s3_file_reader.h
@@ -35,7 +35,6 @@ namespace doris {
class RuntimeProfile;
namespace io {
-struct IOContext;
class S3FileReader final : public FileReader {
public:
diff --git a/be/src/io/io_common.h b/be/src/io/io_common.h
index 80a594473dc..3bd92dc7c1a 100644
--- a/be/src/io/io_common.h
+++ b/be/src/io/io_common.h
@@ -19,6 +19,8 @@
#include <gen_cpp/Types_types.h>
+#include <sstream>
+
namespace doris {
enum class ReaderType : uint8_t {
@@ -45,6 +47,38 @@ struct FileCacheStatistics {
int64_t write_cache_io_timer = 0;
int64_t bytes_write_into_cache = 0;
int64_t num_skip_cache_io_total = 0;
+
+ void update(const FileCacheStatistics& other) {
+ num_local_io_total += other.num_local_io_total;
+ num_remote_io_total += other.num_remote_io_total;
+ local_io_timer += other.local_io_timer;
+ bytes_read_from_local += other.bytes_read_from_local;
+ bytes_read_from_remote += other.bytes_read_from_remote;
+ remote_io_timer += other.remote_io_timer;
+ write_cache_io_timer += other.write_cache_io_timer;
+ write_cache_io_timer += other.write_cache_io_timer;
+ bytes_write_into_cache += other.bytes_write_into_cache;
+ num_skip_cache_io_total += other.num_skip_cache_io_total;
+ }
+
+ void reset() {
+ num_local_io_total = 0;
+ num_remote_io_total = 0;
+ local_io_timer = 0;
+ bytes_read_from_local = 0;
+ bytes_read_from_remote = 0;
+ remote_io_timer = 0;
+ write_cache_io_timer = 0;
+ bytes_write_into_cache = 0;
+ num_skip_cache_io_total = 0;
+ }
+
+ std::string debug_string() const {
+ std::stringstream ss;
+ ss << "bytes_read_from_local: " << bytes_read_from_local
+ << ", bytes_read_from_remote: " << bytes_read_from_remote;
+ return ss.str();
+ }
};
struct IOContext {
@@ -60,6 +94,14 @@ struct IOContext {
int64_t expiration_time = 0;
const TUniqueId* query_id = nullptr; // Ref
FileCacheStatistics* file_cache_stats = nullptr; // Ref
+
+ std::string debug_string() const {
+ if (file_cache_stats != nullptr) {
+ return file_cache_stats->debug_string();
+ } else {
+ return "no file cache stats";
+ }
+ }
};
} // namespace io
diff --git a/be/src/runtime/query_statistics.cpp
b/be/src/runtime/query_statistics.cpp
index de950704180..4f87da1196b 100644
--- a/be/src/runtime/query_statistics.cpp
+++ b/be/src/runtime/query_statistics.cpp
@@ -32,6 +32,8 @@ void QueryStatistics::merge(const QueryStatistics& other) {
cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
shuffle_send_bytes +=
other.shuffle_send_bytes.load(std::memory_order_relaxed);
shuffle_send_rows +=
other.shuffle_send_rows.load(std::memory_order_relaxed);
+ _scan_bytes_from_local_storage += other._scan_bytes_from_local_storage;
+ _scan_bytes_from_remote_storage += other._scan_bytes_from_remote_storage;
int64_t other_peak_mem =
other.max_peak_memory_bytes.load(std::memory_order_relaxed);
if (other_peak_mem > this->max_peak_memory_bytes) {
@@ -51,6 +53,8 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
statistics->set_returned_rows(returned_rows);
statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
+
statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
+
statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
}
void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
@@ -64,12 +68,16 @@ void QueryStatistics::to_thrift(TQueryStatistics*
statistics) const {
current_used_memory_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
+
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
+
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
}
void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
scan_rows = statistics.scan_rows();
scan_bytes = statistics.scan_bytes();
cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS;
+ _scan_bytes_from_local_storage =
statistics.scan_bytes_from_local_storage();
+ _scan_bytes_from_remote_storage =
statistics.scan_bytes_from_remote_storage();
}
void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
diff --git a/be/src/runtime/query_statistics.h
b/be/src/runtime/query_statistics.h
index a9f6e192ec0..fcfbf48bb18 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -44,6 +44,8 @@ public:
: scan_rows(0),
scan_bytes(0),
cpu_nanos(0),
+ _scan_bytes_from_local_storage(0),
+ _scan_bytes_from_remote_storage(0),
returned_rows(0),
max_peak_memory_bytes(0),
current_used_memory_bytes(0),
@@ -65,6 +67,13 @@ public:
this->cpu_nanos.fetch_add(delta_cpu_time, std::memory_order_relaxed);
}
+ void add_scan_bytes_from_local_storage(int64_t
scan_bytes_from_local_storage) {
+ _scan_bytes_from_local_storage += scan_bytes_from_local_storage;
+ }
+ void add_scan_bytes_from_remote_storage(int64_t
scan_bytes_from_remote_storage) {
+ _scan_bytes_from_remote_storage += scan_bytes_from_remote_storage;
+ }
+
void add_shuffle_send_bytes(int64_t delta_bytes) {
this->shuffle_send_bytes.fetch_add(delta_bytes,
std::memory_order_relaxed);
}
@@ -95,6 +104,8 @@ public:
cpu_nanos.store(0, std::memory_order_relaxed);
shuffle_send_bytes.store(0, std::memory_order_relaxed);
shuffle_send_rows.store(0, std::memory_order_relaxed);
+ _scan_bytes_from_local_storage.store(0, std::memory_order_relaxed);
+ _scan_bytes_from_remote_storage.store(0, std::memory_order_relaxed);
returned_rows = 0;
max_peak_memory_bytes.store(0, std::memory_order_relaxed);
@@ -120,6 +131,8 @@ private:
std::atomic<int64_t> scan_rows;
std::atomic<int64_t> scan_bytes;
std::atomic<int64_t> cpu_nanos;
+ std::atomic<int64_t> _scan_bytes_from_local_storage;
+ std::atomic<int64_t> _scan_bytes_from_remote_storage;
// number rows returned by query.
// only set once by result sink when closing.
int64_t returned_rows;
diff --git a/be/src/runtime/runtime_query_statistics_mgr.cpp
b/be/src/runtime/runtime_query_statistics_mgr.cpp
index 1d9bb34d09d..104a22fb8b9 100644
--- a/be/src/runtime/runtime_query_statistics_mgr.cpp
+++ b/be/src/runtime/runtime_query_statistics_mgr.cpp
@@ -225,28 +225,41 @@ void
RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo
int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
// block's schema come from
SchemaBackendActiveTasksScanner::_s_tbls_columns
+ // before 2.1.7, there are 12 columns in "backend_active_tasks" table.
+ // after 2.1.8, 2 new columns added.
+ // check this to make it compatible with version before 2.1.7
+ bool need_local_and_remote_bytes = (block->columns() > 12);
for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
+ int col_idx = 0;
TQueryStatistics tqs;
qs_ctx_ptr->collect_query_statistics(&tqs);
- SchemaScannerHelper::insert_int64_value(0, be_id, block);
- SchemaScannerHelper::insert_string_value(1,
qs_ctx_ptr->_fe_addr.hostname, block);
- SchemaScannerHelper::insert_string_value(2, query_id, block);
+ SchemaScannerHelper::insert_int64_value(col_idx++, be_id, block);
+ SchemaScannerHelper::insert_string_value(col_idx++,
qs_ctx_ptr->_fe_addr.hostname, block);
+ SchemaScannerHelper::insert_string_value(col_idx++, query_id, block);
int64_t task_time = qs_ctx_ptr->_is_query_finished
? qs_ctx_ptr->_query_finish_time -
qs_ctx_ptr->_query_start_time
: MonotonicMillis() -
qs_ctx_ptr->_query_start_time;
- SchemaScannerHelper::insert_int64_value(3, task_time, block);
- SchemaScannerHelper::insert_int64_value(4, tqs.cpu_ms, block);
- SchemaScannerHelper::insert_int64_value(5, tqs.scan_rows, block);
- SchemaScannerHelper::insert_int64_value(6, tqs.scan_bytes, block);
- SchemaScannerHelper::insert_int64_value(7, tqs.max_peak_memory_bytes,
block);
- SchemaScannerHelper::insert_int64_value(8,
tqs.current_used_memory_bytes, block);
- SchemaScannerHelper::insert_int64_value(9, tqs.shuffle_send_bytes,
block);
- SchemaScannerHelper::insert_int64_value(10, tqs.shuffle_send_rows,
block);
+ SchemaScannerHelper::insert_int64_value(col_idx++, task_time, block);
+ SchemaScannerHelper::insert_int64_value(col_idx++, tqs.cpu_ms, block);
+ SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_rows,
block);
+ SchemaScannerHelper::insert_int64_value(col_idx++, tqs.scan_bytes,
block);
+
+ if (need_local_and_remote_bytes) {
+ SchemaScannerHelper::insert_int64_value(col_idx++,
tqs.scan_bytes_from_local_storage,
+ block);
+ SchemaScannerHelper::insert_int64_value(col_idx++,
tqs.scan_bytes_from_remote_storage,
+ block);
+ }
+
+ SchemaScannerHelper::insert_int64_value(col_idx++,
tqs.max_peak_memory_bytes, block);
+ SchemaScannerHelper::insert_int64_value(col_idx++,
tqs.current_used_memory_bytes, block);
+ SchemaScannerHelper::insert_int64_value(col_idx++,
tqs.shuffle_send_bytes, block);
+ SchemaScannerHelper::insert_int64_value(col_idx++,
tqs.shuffle_send_rows, block);
std::stringstream ss;
ss << qs_ctx_ptr->_query_type;
- SchemaScannerHelper::insert_string_value(11, ss.str(), block);
+ SchemaScannerHelper::insert_string_value(col_idx++, ss.str(), block);
}
}
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 0807f4949e5..b286b714ad9 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -667,7 +667,6 @@ public:
io::FileReaderSPtr& get_inner_reader() { return _inner_reader; }
protected:
- void _collect_profile_at_runtime() override {};
void _collect_profile_before_close() override;
private:
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index aaf6fbdf3b4..22360200d79 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -656,7 +656,6 @@ void NewOlapScanner::_collect_profile_before_close() {
COUNTER_UPDATE(Parent->_total_segment_counter, stats.total_segment_number);
// Update counters for NewOlapScanner
- // Update counters from tablet reader's stats
auto& stats = _tablet_reader->stats();
if (_parent) {
@@ -678,4 +677,19 @@ void NewOlapScanner::_collect_profile_before_close() {
tablet->query_scan_count->increment(1);
}
+void NewOlapScanner::_update_bytes_and_rows_read() {
+ VScanner::_update_bytes_and_rows_read();
+ if (_query_statistics) {
+ auto& stats = _tablet_reader->stats();
+ int64_t delta_local = stats.file_cache_stats.bytes_read_from_local -
_bytes_read_from_local;
+ int64_t delta_remote =
+ stats.file_cache_stats.bytes_read_from_remote -
_bytes_read_from_remote;
+ _query_statistics->add_scan_bytes_from_local_storage(delta_local);
+ _query_statistics->add_scan_bytes_from_remote_storage(delta_remote);
+ _query_statistics->add_scan_bytes(delta_local + delta_remote);
+ _bytes_read_from_local = stats.file_cache_stats.bytes_read_from_local;
+ _bytes_read_from_remote =
stats.file_cache_stats.bytes_read_from_remote;
+ }
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h
b/be/src/vec/exec/scan/new_olap_scanner.h
index cdadf8f7f49..90d871734c3 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -80,6 +80,7 @@ public:
protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
void _collect_profile_before_close() override;
+ void _update_bytes_and_rows_read() override;
private:
void _update_realtime_counters();
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index e3899d96982..331b49b2082 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -1236,4 +1236,19 @@ void VFileScanner::_collect_profile_before_close() {
}
}
+void VFileScanner::_update_bytes_and_rows_read() {
+ VScanner::_update_bytes_and_rows_read();
+ if (_query_statistics && _io_ctx.get() && _io_ctx->file_cache_stats) {
+ int64_t delta_local =
+ _io_ctx->file_cache_stats->bytes_read_from_local -
_bytes_read_from_local;
+ int64_t delta_remote =
+ _io_ctx->file_cache_stats->bytes_read_from_remote -
_bytes_read_from_remote;
+ _query_statistics->add_scan_bytes_from_local_storage(delta_local);
+ _query_statistics->add_scan_bytes_from_remote_storage(delta_remote);
+ _query_statistics->add_scan_bytes(delta_local + delta_remote);
+ _bytes_read_from_local =
_io_ctx->file_cache_stats->bytes_read_from_local;
+ _bytes_read_from_remote =
_io_ctx->file_cache_stats->bytes_read_from_remote;
+ }
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index cf1ea97f21b..1c6d903a87f 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -97,6 +97,8 @@ protected:
void _collect_profile_before_close() override;
+ void _update_bytes_and_rows_read() override;
+
protected:
const TFileScanRangeParams* _params = nullptr;
std::shared_ptr<vectorized::SplitSourceConnector> _split_source;
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 97a2ba8207a..58511e890d6 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -118,8 +118,7 @@ Status VScanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
}
}
- int64_t old_scan_rows = _num_rows_read;
- int64_t old_scan_bytes = _num_byte_read;
+ _prev_num_rows_read = _num_rows_read;
{
do {
// if step 2 filter all rows of block, and block will be reused to
get next rows,
@@ -138,7 +137,6 @@ Status VScanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
break;
}
_num_rows_read += block->rows();
- _num_byte_read += block->allocated_bytes();
}
// 2. Filter the output block finally.
@@ -153,10 +151,7 @@ Status VScanner::get_block(RuntimeState* state, Block*
block, bool* eof) {
_num_rows_read < rows_read_threshold);
}
- if (_query_statistics) {
- _query_statistics->add_scan_rows(_num_rows_read - old_scan_rows);
- _query_statistics->add_scan_bytes(_num_byte_read - old_scan_bytes);
- }
+ _update_bytes_and_rows_read();
if (state->is_cancelled()) {
// TODO: Should return the specific ErrorStatus instead of just
Cancelled.
@@ -281,7 +276,6 @@ void VScanner::_collect_profile_before_close() {
if (_parent) {
COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer);
COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);
- COUNTER_UPDATE(_parent->_byte_read_counter, _num_byte_read);
} else {
COUNTER_UPDATE(_local_state->_scan_cpu_timer, _scan_cpu_timer);
COUNTER_UPDATE(_local_state->_rows_read_counter, _num_rows_read);
@@ -301,4 +295,11 @@ void VScanner::update_scan_cpu_timer() {
}
}
+void VScanner::_update_bytes_and_rows_read() {
+ if (_query_statistics) {
+ _query_statistics->add_scan_rows(_num_rows_read - _prev_num_rows_read);
+ _prev_num_rows_read = _num_rows_read;
+ }
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index e85c6082ca6..03604621f05 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -135,6 +135,10 @@ public:
void update_scan_cpu_timer();
+ // update the bytes and rows read at each round in query statistics.
+ // so that we can get runtime statistics for each query.
+ virtual void _update_bytes_and_rows_read();
+
RuntimeState* runtime_state() { return _state; }
bool is_open() { return _is_open; }
@@ -214,7 +218,12 @@ protected:
// num of rows read from scanner
int64_t _num_rows_read = 0;
- int64_t _num_byte_read = 0;
+ // save the current _num_rows_read before next round,
+ // so that we can get delta rows between each round.
+ int64_t _prev_num_rows_read = 0;
+ // bytes read from local and remote fs
+ int64_t _bytes_read_from_local = 0;
+ int64_t _bytes_read_from_remote = 0;
// num of rows return from scanner, after filter block
int64_t _num_rows_return = 0;
diff --git a/be/test/io/cache/file_block_cache_test.cpp
b/be/test/io/cache/file_block_cache_test.cpp
index 20f97aae6ad..092a343c1a5 100644
--- a/be/test/io/cache/file_block_cache_test.cpp
+++ b/be/test/io/cache/file_block_cache_test.cpp
@@ -852,7 +852,7 @@ TEST(LRUFileCache, fd_cache_remove) {
assert_range(2, segments[0], io::FileBlock::Range(0, 8),
io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(9);
- static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 9), 0));
+ static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 9), 0,
nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key,
0)));
}
{
@@ -864,7 +864,7 @@ TEST(LRUFileCache, fd_cache_remove) {
assert_range(2, segments[0], io::FileBlock::Range(9, 9),
io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(1);
- static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 1), 0));
+ static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 1), 0,
nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key,
9)));
}
{
@@ -877,7 +877,7 @@ TEST(LRUFileCache, fd_cache_remove) {
io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(5);
- static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 5), 0));
+ static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 5), 0,
nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key,
10)));
}
{
@@ -890,7 +890,7 @@ TEST(LRUFileCache, fd_cache_remove) {
io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(10);
- static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 10), 0));
+ static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 10), 0,
nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key,
15)));
}
EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
@@ -933,7 +933,7 @@ TEST(LRUFileCache, fd_cache_evict) {
assert_range(2, segments[0], io::FileBlock::Range(0, 8),
io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(9);
- static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 9), 0));
+ static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 9), 0,
nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key,
0)));
}
{
@@ -945,7 +945,7 @@ TEST(LRUFileCache, fd_cache_evict) {
assert_range(2, segments[0], io::FileBlock::Range(9, 9),
io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(1);
- static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 1), 0));
+ static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 1), 0,
nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key,
9)));
}
{
@@ -958,7 +958,7 @@ TEST(LRUFileCache, fd_cache_evict) {
io::FileBlock::State::DOWNLOADING);
download(segments[0]);
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(5);
- static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 5), 0));
+ static_cast<void>(segments[0]->read_at(Slice(buffer.get(), 5), 0,
nullptr));
EXPECT_TRUE(io::IFileCache::contains_file_reader(std::make_pair(key,
10)));
}
EXPECT_FALSE(io::IFileCache::contains_file_reader(std::make_pair(key, 0)));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
index afe1f9af2da..cf827efbbdb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/InternalSchema.java
@@ -82,6 +82,12 @@ public class InternalSchema {
AUDIT_SCHEMA.add(new ColumnDef("return_rows",
TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_rows",
TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("shuffle_send_bytes",
TypeDef.create(PrimitiveType.BIGINT), true));
+ AUDIT_SCHEMA
+ .add(new ColumnDef("scan_bytes_from_local_storage",
TypeDef.create(PrimitiveType.BIGINT),
+ true));
+ AUDIT_SCHEMA
+ .add(new ColumnDef("scan_bytes_from_remote_storage",
TypeDef.create(PrimitiveType.BIGINT),
+ true));
AUDIT_SCHEMA.add(new ColumnDef("stmt_id",
TypeDef.create(PrimitiveType.BIGINT), true));
AUDIT_SCHEMA.add(new ColumnDef("is_query",
TypeDef.create(PrimitiveType.TINYINT), true));
AUDIT_SCHEMA.add(new ColumnDef("is_nereids",
TypeDef.create(PrimitiveType.TINYINT), true));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
index 785343d78cb..8f12300faea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
@@ -455,6 +455,8 @@ public class SchemaTable extends Table {
.column("TASK_CPU_TIME_MS",
ScalarType.createType(PrimitiveType.BIGINT))
.column("SCAN_ROWS",
ScalarType.createType(PrimitiveType.BIGINT))
.column("SCAN_BYTES",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("LOCAL_SCAN_BYTES",
ScalarType.createType(PrimitiveType.BIGINT))
+ .column("REMOTE_SCAN_BYTES",
ScalarType.createType(PrimitiveType.BIGINT))
.column("BE_PEAK_MEMORY_BYTES",
ScalarType.createType(PrimitiveType.BIGINT))
.column("CURRENT_USED_MEMORY_BYTES",
ScalarType.createType(PrimitiveType.BIGINT))
.column("SHUFFLE_SEND_BYTES",
ScalarType.createType(PrimitiveType.BIGINT))
diff --git a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
index 7d64b600d8a..5e9dd3f4d4c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/AuditEvent.java
@@ -105,6 +105,10 @@ public class AuditEvent {
// note: newly added fields should be always before fuzzyVariables
@AuditField(value = "FuzzyVariables")
public String fuzzyVariables = "";
+ @AuditField(value = "scanBytesFromLocalStorage")
+ public long scanBytesFromLocalStorage = -1;
+ @AuditField(value = "scanBytesFromRemoteStorage")
+ public long scanBytesFromRemoteStorage = -1;
public long pushToAuditLogQueueTime;
@@ -249,6 +253,16 @@ public class AuditEvent {
return this;
}
+ public AuditEventBuilder setScanBytesFromLocalStorage(long
scanBytesFromLocalStorage) {
+ auditEvent.scanBytesFromLocalStorage = scanBytesFromLocalStorage;
+ return this;
+ }
+
+ public AuditEventBuilder setScanBytesFromRemoteStorage(long
scanBytesFromRemoteStorage) {
+ auditEvent.scanBytesFromRemoteStorage = scanBytesFromRemoteStorage;
+ return this;
+ }
+
public AuditEvent build() {
return this.auditEvent;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
index 55dbba9805e..6aac0364bc0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/plugin/audit/AuditLoader.java
@@ -151,6 +151,10 @@ public class AuditLoader extends Plugin implements
AuditPlugin {
logBuffer.append(event.queryTime).append("\t");
logBuffer.append(event.scanBytes).append("\t");
logBuffer.append(event.scanRows).append("\t");
+ logBuffer.append(event.shuffleSendBytes).append("\t");
+ logBuffer.append(event.shuffleSendRows).append("\t");
+ logBuffer.append(event.scanBytesFromLocalStorage).append("\t");
+ logBuffer.append(event.scanBytesFromRemoteStorage).append("\t");
logBuffer.append(event.returnRows).append("\t");
logBuffer.append(event.shuffleSendRows).append("\t");
logBuffer.append(event.shuffleSendBytes).append("\t");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
index 031a01b32d2..e73ddd7aa86 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/AuditLogHelper.java
@@ -226,7 +226,11 @@ public class AuditLogHelper {
auditEventBuilder.setSqlDigest(sqlDigest);
}
}
- auditEventBuilder.setIsQuery(true);
+ auditEventBuilder.setIsQuery(true)
+ .setScanBytesFromLocalStorage(
+ statistics == null ? 0 :
statistics.getScanBytesFromLocalStorage())
+ .setScanBytesFromRemoteStorage(
+ statistics == null ? 0 :
statistics.getScanBytesFromRemoteStorage());
} else {
auditEventBuilder.setIsQuery(false);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
index e27cb4e0df2..ce0703f23a8 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadschedpolicy/WorkloadRuntimeStatusMgr.java
@@ -84,6 +84,8 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
auditEvent.cpuTimeMs = queryStats.cpu_ms;
auditEvent.shuffleSendBytes =
queryStats.shuffle_send_bytes;
auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
+ auditEvent.scanBytesFromLocalStorage =
queryStats.scan_bytes_from_local_storage;
+ auditEvent.scanBytesFromRemoteStorage =
queryStats.scan_bytes_from_remote_storage;
}
boolean ret =
Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent, true);
if (!ret) {
@@ -222,6 +224,8 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
}
+ dst.scan_bytes_from_local_storage += src.scan_bytes_from_local_storage;
+ dst.scan_bytes_from_remote_storage +=
src.scan_bytes_from_remote_storage;
}
private void queryAuditEventLogWriteLock() {
@@ -232,3 +236,4 @@ public class WorkloadRuntimeStatusMgr extends MasterDaemon {
queryAuditEventLock.writeLock().unlock();
}
}
+
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index e9ced523912..755a3a042db 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -35,6 +35,8 @@ message PQueryStatistics {
optional int64 cpu_ms = 4;
optional int64 max_peak_memory_bytes = 5;
repeated PNodeStatistics nodes_statistics = 6;
+ optional int64 scan_bytes_from_local_storage = 7;
+ optional int64 scan_bytes_from_remote_storage = 8;
}
message PRowBatch {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 4522fd08f68..638edf1a7bb 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -412,6 +412,8 @@ struct TQueryStatistics {
7: optional i64 workload_group_id
8: optional i64 shuffle_send_bytes
9: optional i64 shuffle_send_rows
+ 10: optional i64 scan_bytes_from_local_storage
+ 11: optional i64 scan_bytes_from_remote_storage
}
struct TReportWorkloadRuntimeStatusParams {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]