This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new af589c0b13b [memtracker](accuracy) should not account resuable buffer
to query memtracker (#33933)
af589c0b13b is described below
commit af589c0b13b0af8c3ea8d17753b820f9ee20aba2
Author: yiguolei <[email protected]>
AuthorDate: Mon Apr 22 00:10:03 2024 +0800
[memtracker](accuracy) should not account resuable buffer to query
memtracker (#33933)
Co-authored-by: yiguolei <[email protected]>
---
be/src/util/block_compression.cpp | 118 +++++++++++++++++++++-----------------
1 file changed, 65 insertions(+), 53 deletions(-)
diff --git a/be/src/util/block_compression.cpp
b/be/src/util/block_compression.cpp
index f3b1e781e7e..21f7e72f5d9 100644
--- a/be/src/util/block_compression.cpp
+++ b/be/src/util/block_compression.cpp
@@ -96,13 +96,20 @@ private:
ENABLE_FACTORY_CREATOR(Context);
public:
- Context() : ctx(nullptr) {}
+ Context() : ctx(nullptr) {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+ ExecEnv::GetInstance()->block_compression_mem_tracker());
+ buffer = std::make_unique<faststring>();
+ }
LZ4_stream_t* ctx;
- faststring buffer;
+ std::unique_ptr<faststring> buffer;
~Context() {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+ ExecEnv::GetInstance()->block_compression_mem_tracker());
if (ctx) {
LZ4_freeStream(ctx);
}
+ buffer.reset();
}
};
@@ -118,8 +125,6 @@ public:
}
Status compress(const Slice& input, faststring* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
if (input.size > INT_MAX) {
return Status::InvalidArgument(
"LZ4 not support those case(input.size>INT_MAX), maybe you
should change "
@@ -144,8 +149,14 @@ public:
compressed_buf.size = max_len;
} else {
// reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
- context->buffer.resize(max_len);
- compressed_buf.data =
reinterpret_cast<char*>(context->buffer.data());
+ {
+ // context->buffer is resuable between queries, should
accouting to
+ // global tracker.
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+
ExecEnv::GetInstance()->block_compression_mem_tracker());
+ context->buffer->resize(max_len);
+ }
+ compressed_buf.data =
reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}
@@ -165,8 +176,6 @@ public:
}
Status decompress(const Slice& input, Slice* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size,
output->size);
if (decompressed_len < 0) {
@@ -218,8 +227,6 @@ public:
return &s_instance;
}
Status decompress(const Slice& input, Slice* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
RETURN_IF_ERROR(Decompressor::create_decompressor(CompressType::LZ4BLOCK,
&_decompressor));
size_t input_bytes_read = 0;
size_t decompressed_len = 0;
@@ -245,13 +252,20 @@ private:
ENABLE_FACTORY_CREATOR(CContext);
public:
- CContext() : ctx(nullptr) {}
+ CContext() : ctx(nullptr) {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+ ExecEnv::GetInstance()->block_compression_mem_tracker());
+ buffer = std::make_unique<faststring>();
+ }
LZ4F_compressionContext_t ctx;
- faststring buffer;
+ std::unique_ptr<faststring> buffer;
~CContext() {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+ ExecEnv::GetInstance()->block_compression_mem_tracker());
if (ctx) {
LZ4F_freeCompressionContext(ctx);
}
+ buffer.reset();
}
};
class DContext {
@@ -301,8 +315,6 @@ public:
private:
Status _compress(const std::vector<Slice>& inputs, size_t
uncompressed_size,
faststring* output) {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
std::unique_ptr<CContext> context;
RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
@@ -319,9 +331,13 @@ private:
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
- // reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
- context->buffer.resize(max_len);
- compressed_buf.data =
reinterpret_cast<char*>(context->buffer.data());
+ {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+
ExecEnv::GetInstance()->block_compression_mem_tracker());
+ // reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
+ context->buffer->resize(max_len);
+ }
+ compressed_buf.data =
reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}
@@ -361,8 +377,6 @@ private:
}
Status _decompress(const Slice& input, Slice* output) {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
bool decompress_failed = false;
std::unique_ptr<DContext> context;
RETURN_IF_ERROR(_acquire_decompression_ctx(context));
@@ -472,13 +486,20 @@ private:
ENABLE_FACTORY_CREATOR(Context);
public:
- Context() : ctx(nullptr) {}
+ Context() : ctx(nullptr) {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+ ExecEnv::GetInstance()->block_compression_mem_tracker());
+ buffer = std::make_unique<faststring>();
+ }
LZ4_streamHC_t* ctx;
- faststring buffer;
+ std::unique_ptr<faststring> buffer;
~Context() {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+ ExecEnv::GetInstance()->block_compression_mem_tracker());
if (ctx) {
LZ4_freeStreamHC(ctx);
}
+ buffer.reset();
}
};
@@ -494,8 +515,6 @@ public:
}
Status compress(const Slice& input, faststring* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
std::unique_ptr<Context> context;
RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
@@ -512,9 +531,13 @@ public:
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
- // reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
- context->buffer.resize(max_len);
- compressed_buf.data =
reinterpret_cast<char*>(context->buffer.data());
+ {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+
ExecEnv::GetInstance()->block_compression_mem_tracker());
+ // reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
+ context->buffer->resize(max_len);
+ }
+ compressed_buf.data =
reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}
@@ -533,8 +556,6 @@ public:
}
Status decompress(const Slice& input, Slice* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
auto decompressed_len =
LZ4_decompress_safe(input.data, output->data, input.size,
output->size);
if (decompressed_len < 0) {
@@ -654,8 +675,6 @@ public:
~SnappyBlockCompression() override {}
Status compress(const Slice& input, faststring* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
size_t max_len = max_compressed_len(input.size);
output->resize(max_len);
Slice s(*output);
@@ -666,8 +685,6 @@ public:
}
Status decompress(const Slice& input, Slice* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
if (!snappy::RawUncompress(input.data, input.size, output->data)) {
return Status::InvalidArgument("Fail to do Snappy decompress");
}
@@ -699,8 +716,6 @@ public:
~ZlibBlockCompression() {}
Status compress(const Slice& input, faststring* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
size_t max_len = max_compressed_len(input.size);
output->resize(max_len);
Slice s(*output);
@@ -715,8 +730,6 @@ public:
Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
faststring* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
size_t max_len = max_compressed_len(uncompressed_size);
output->resize(max_len);
@@ -757,8 +770,6 @@ public:
}
Status decompress(const Slice& input, Slice* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
size_t input_size = input.size;
auto zres =
::uncompress2((Bytef*)output->data, &output->size,
(Bytef*)input.data, &input_size);
@@ -781,13 +792,20 @@ private:
ENABLE_FACTORY_CREATOR(CContext);
public:
- CContext() : ctx(nullptr) {}
+ CContext() : ctx(nullptr) {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+ ExecEnv::GetInstance()->block_compression_mem_tracker());
+ buffer = std::make_unique<faststring>();
+ }
ZSTD_CCtx* ctx;
- faststring buffer;
+ std::unique_ptr<faststring> buffer;
~CContext() {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+ ExecEnv::GetInstance()->block_compression_mem_tracker());
if (ctx) {
ZSTD_freeCCtx(ctx);
}
+ buffer.reset();
}
};
class DContext {
@@ -826,8 +844,6 @@ public:
//
https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c
Status compress(const std::vector<Slice>& inputs, size_t uncompressed_size,
faststring* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
std::unique_ptr<CContext> context;
RETURN_IF_ERROR(_acquire_compression_ctx(context));
bool compress_failed = false;
@@ -845,9 +861,13 @@ public:
compressed_buf.data = reinterpret_cast<char*>(output->data());
compressed_buf.size = max_len;
} else {
- // reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
- context->buffer.resize(max_len);
- compressed_buf.data =
reinterpret_cast<char*>(context->buffer.data());
+ {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+
ExecEnv::GetInstance()->block_compression_mem_tracker());
+ // reuse context buffer if max_len <=
MAX_COMPRESSION_BUFFER_FOR_REUSE
+ context->buffer->resize(max_len);
+ }
+ compressed_buf.data =
reinterpret_cast<char*>(context->buffer->data());
compressed_buf.size = max_len;
}
@@ -904,8 +924,6 @@ public:
}
Status decompress(const Slice& input, Slice* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
std::unique_ptr<DContext> context;
bool decompress_failed = false;
RETURN_IF_ERROR(_acquire_decompression_ctx(context));
@@ -1001,8 +1019,6 @@ public:
~GzipBlockCompression() override = default;
Status decompress(const Slice& input, Slice* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
z_stream z_strm = {};
z_strm.zalloc = Z_NULL;
z_strm.zfree = Z_NULL;
@@ -1084,8 +1100,6 @@ public:
~GzipBlockCompressionByLibdeflate() override = default;
Status decompress(const Slice& input, Slice* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
if (input.empty()) {
output->size = 0;
return Status::OK();
@@ -1118,8 +1132,6 @@ public:
}
size_t max_compressed_len(size_t len) override { return 0; };
Status decompress(const Slice& input, Slice* output) override {
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
- ExecEnv::GetInstance()->block_compression_mem_tracker());
auto* input_ptr = input.data;
auto remain_input_size = input.size;
auto* output_ptr = output->data;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]