This is an automated email from the ASF dual-hosted git repository.
jacktengg pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 93eca0c2ff0 improve spill stats
93eca0c2ff0 is described below
commit 93eca0c2ff0389d55a4da7da465d6d70ea5161fc
Author: jacktengg <[email protected]>
AuthorDate: Wed Nov 13 09:47:24 2024 +0800
improve spill stats
---
be/src/pipeline/exec/operator.h | 22 ----------------------
be/src/runtime/query_statistics.h | 10 ++++++----
be/src/vec/spill/spill_reader.cpp | 12 ++++++++++++
be/src/vec/spill/spill_reader.h | 10 ++++++++--
be/src/vec/spill/spill_stream.cpp | 10 +++++++---
be/src/vec/spill/spill_writer.cpp | 6 ++++++
be/src/vec/spill/spill_writer.h | 12 +++++++++---
7 files changed, 48 insertions(+), 34 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index c0ef6d27af0..e7c668d2cdb 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -301,17 +301,6 @@ public:
return Status::OK();
}
- Status close(RuntimeState* state) override {
- if (Base::_query_statistics) {
- auto* write_file_bytes =
Base::profile()->get_counter("SpillWriteFileBytes");
- auto* read_file_bytes =
Base::profile()->get_counter("SpillReadFileBytes");
- Base::_query_statistics->add_spill_bytes(
- write_file_bytes ? write_file_bytes->value() : 0,
- read_file_bytes ? read_file_bytes->value() : 0);
- }
- return Base::close(state);
- }
-
void init_spill_write_counters() {
_spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(),
"SpillWriteTime", 1);
@@ -741,17 +730,6 @@ public:
return Status::OK();
}
- Status close(RuntimeState* state, Status exec_status) override {
- if (Base::_query_statistics) {
- auto* write_file_bytes =
Base::profile()->get_counter("SpillWriteFileBytes");
- auto* read_file_bytes =
Base::profile()->get_counter("SpillReadFileBytes");
- Base::_query_statistics->add_spill_bytes(
- write_file_bytes ? write_file_bytes->value() : 0,
- read_file_bytes ? read_file_bytes->value() : 0);
- }
- return Base::close(state, exec_status);
- }
-
std::vector<Dependency*> dependencies() const override {
auto dependencies = Base::dependencies();
return dependencies;
diff --git a/be/src/runtime/query_statistics.h
b/be/src/runtime/query_statistics.h
index bffea2b1d2a..0b0174172ff 100644
--- a/be/src/runtime/query_statistics.h
+++ b/be/src/runtime/query_statistics.h
@@ -82,10 +82,12 @@ public:
current_used_memory_bytes = current_used_memory;
}
- void add_spill_bytes(int64_t spill_write_bytes_to_local_storage,
- int64_t spill_read_bytes_from_local_storage) {
- _spill_write_bytes_to_local_storage +=
spill_write_bytes_to_local_storage;
- _spill_read_bytes_from_local_storage +=
spill_read_bytes_from_local_storage;
+ void add_spill_write_bytes_to_local_storage(int64_t bytes) {
+ _spill_write_bytes_to_local_storage += bytes;
+ }
+
+ void add_spill_read_bytes_from_local_storage(int64_t bytes) {
+ _spill_read_bytes_from_local_storage += bytes;
}
void to_pb(PQueryStatistics* statistics);
diff --git a/be/src/vec/spill/spill_reader.cpp
b/be/src/vec/spill/spill_reader.cpp
index 004de38f354..c947081fcaf 100644
--- a/be/src/vec/spill/spill_reader.cpp
+++ b/be/src/vec/spill/spill_reader.cpp
@@ -55,6 +55,9 @@ Status SpillReader::open() {
RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result,
&bytes_read));
DCHECK(bytes_read == 8); // max_sub_block_size, block count
COUNTER_UPDATE(_read_file_size, bytes_read);
+ if (_query_statistics) {
+ _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
+ }
// read max sub block size
bytes_read = 0;
@@ -62,6 +65,9 @@ Status SpillReader::open() {
RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2,
result, &bytes_read));
DCHECK(bytes_read == 8); // max_sub_block_size, block count
COUNTER_UPDATE(_read_file_size, bytes_read);
+ if (_query_statistics) {
+ _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
+ }
size_t buff_size = std::max(block_count_ * sizeof(size_t),
max_sub_block_size_);
try {
@@ -80,6 +86,9 @@ Status SpillReader::open() {
RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read));
DCHECK(bytes_read == block_count_ * sizeof(size_t));
COUNTER_UPDATE(_read_file_size, bytes_read);
+ if (_query_statistics) {
+ _query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
+ }
block_start_offsets_.resize(block_count_ + 1);
for (size_t i = 0; i < block_count_; ++i) {
@@ -118,6 +127,9 @@ Status SpillReader::read(Block* block, bool* eos) {
if (bytes_read > 0) {
COUNTER_UPDATE(_read_file_size, bytes_read);
+ if (_query_statistics) {
+
_query_statistics->add_spill_read_bytes_from_local_storage(bytes_read);
+ }
COUNTER_UPDATE(_read_block_count, 1);
{
SCOPED_TIMER(_deserialize_timer);
diff --git a/be/src/vec/spill/spill_reader.h b/be/src/vec/spill/spill_reader.h
index 1096e529bf3..da975bdb605 100644
--- a/be/src/vec/spill/spill_reader.h
+++ b/be/src/vec/spill/spill_reader.h
@@ -25,6 +25,7 @@
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
+#include "runtime/query_statistics.h"
#include "util/runtime_profile.h"
namespace doris::vectorized {
@@ -32,8 +33,11 @@ namespace doris::vectorized {
class Block;
class SpillReader {
public:
- SpillReader(int64_t stream_id, std::string file_path)
- : stream_id_(stream_id), file_path_(std::move(file_path)) {}
+ SpillReader(std::shared_ptr<doris::QueryStatistics> query_statistics,
int64_t stream_id,
+ std::string file_path)
+ : stream_id_(stream_id),
+ file_path_(std::move(file_path)),
+ _query_statistics(std::move(query_statistics)) {}
~SpillReader() { (void)close(); }
@@ -81,6 +85,8 @@ private:
RuntimeProfile::Counter* _read_file_size = nullptr;
RuntimeProfile::Counter* _read_rows_count = nullptr;
RuntimeProfile::Counter* _read_file_count = nullptr;
+
+ std::shared_ptr<doris::QueryStatistics> _query_statistics = nullptr;
};
using SpillReaderUPtr = std::unique_ptr<SpillReader>;
diff --git a/be/src/vec/spill/spill_stream.cpp
b/be/src/vec/spill/spill_stream.cpp
index f14e5f60974..3e5b93a21d7 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -25,6 +25,7 @@
#include "io/fs/local_file_system.h"
#include "runtime/exec_env.h"
+#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "util/debug_points.h"
@@ -95,11 +96,14 @@ void SpillStream::gc() {
}
Status SpillStream::prepare() {
- writer_ =
- std::make_unique<SpillWriter>(profile_, stream_id_, batch_rows_,
data_dir_, spill_dir_);
+ writer_ = std::make_unique<SpillWriter>(
+
state_->get_query_ctx()->get_mem_tracker()->get_query_statistics(), profile_,
+ stream_id_, batch_rows_, data_dir_, spill_dir_);
_set_write_counters(profile_);
- reader_ = std::make_unique<SpillReader>(stream_id_,
writer_->get_file_path());
+ reader_ = std::make_unique<SpillReader>(
+
state_->get_query_ctx()->get_mem_tracker()->get_query_statistics(), stream_id_,
+ writer_->get_file_path());
DBUG_EXECUTE_IF("fault_inject::spill_stream::prepare_spill", {
return Status::Error<INTERNAL_ERROR>("fault_inject spill_stream
prepare_spill failed");
diff --git a/be/src/vec/spill/spill_writer.cpp
b/be/src/vec/spill/spill_writer.cpp
index f6e16518043..0af0cb59a7d 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -52,6 +52,9 @@ Status SpillWriter::close() {
total_written_bytes_ += meta_.size();
COUNTER_UPDATE(_write_file_total_size, meta_.size());
+ if (_query_statistics) {
+
_query_statistics->add_spill_write_bytes_to_local_storage(meta_.size());
+ }
if (_write_file_current_size) {
COUNTER_UPDATE(_write_file_current_size, meta_.size());
}
@@ -149,6 +152,9 @@ Status SpillWriter::_write_internal(const Block& block,
size_t& written_bytes) {
meta_.append((const char*)&total_written_bytes_,
sizeof(size_t));
COUNTER_UPDATE(_write_file_total_size, buff_size);
+ if (_query_statistics) {
+
_query_statistics->add_spill_write_bytes_to_local_storage(buff_size);
+ }
if (_write_file_current_size) {
COUNTER_UPDATE(_write_file_current_size, buff_size);
}
diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h
index 05afebf5dcb..36a40e1e055 100644
--- a/be/src/vec/spill/spill_writer.h
+++ b/be/src/vec/spill/spill_writer.h
@@ -22,6 +22,7 @@
#include <string>
#include "io/fs/file_writer.h"
+#include "runtime/query_statistics.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
namespace doris {
@@ -32,9 +33,12 @@ namespace vectorized {
class SpillDataDir;
class SpillWriter {
public:
- SpillWriter(RuntimeProfile* profile, int64_t id, size_t batch_size,
SpillDataDir* data_dir,
- const std::string& dir)
- : data_dir_(data_dir), stream_id_(id), batch_size_(batch_size) {
+ SpillWriter(std::shared_ptr<doris::QueryStatistics> query_statistics,
RuntimeProfile* profile,
+ int64_t id, size_t batch_size, SpillDataDir* data_dir, const
std::string& dir)
+ : data_dir_(data_dir),
+ stream_id_(id),
+ batch_size_(batch_size),
+ _query_statistics(std::move(query_statistics)) {
// Directory path format specified in
SpillStreamManager::register_spill_stream:
//
storage_root/spill/query_id/partitioned_hash_join-node_id-task_id-stream_id/0
file_path_ = dir + "/0";
@@ -92,6 +96,8 @@ private:
RuntimeProfile::Counter* _write_rows_counter = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
+
+ std::shared_ptr<doris::QueryStatistics> _query_statistics = nullptr;
};
using SpillWriterUPtr = std::unique_ptr<SpillWriter>;
} // namespace vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]