This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1e74161706a [fix](group concat) Fix be oom caused by group concat
(#42334)
1e74161706a is described below
commit 1e74161706a38650d0e57f8d27fee3b537579d80
Author: TengJianPing <[email protected]>
AuthorDate: Tue Oct 29 10:01:40 2024 +0800
[fix](group concat) Fix be oom caused by group concat (#42334)
## Proposed changes
Issue Number: close #xxx
Memory usage of ```std::string``` is not recorded by BE memtrack, which
may cause BE process OOM.
---
.../aggregate_function_group_concat.h | 39 ++++++++++++++--------
be/src/vec/io/io_helper.h | 11 ++++--
2 files changed, 34 insertions(+), 16 deletions(-)
diff --git a/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
b/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
index a62ffb8da61..a0cac9ab780 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_group_concat.h
@@ -43,20 +43,27 @@ class IColumn;
namespace doris::vectorized {
struct AggregateFunctionGroupConcatData {
- std::string data;
+ ColumnString::Chars data;
std::string separator;
bool inited = false;
void add(StringRef ref, StringRef sep) {
+ auto delta_size = ref.size;
if (!inited) {
- inited = true;
separator.assign(sep.data, sep.data + sep.size);
} else {
- data += separator;
+ delta_size += separator.size();
}
+ auto offset = data.size();
+ data.resize(data.size() + delta_size);
- data.resize(data.length() + ref.size);
- memcpy(data.data() + data.length() - ref.size, ref.data, ref.size);
+ if (!inited) {
+ inited = true;
+ } else {
+ memcpy(data.data() + offset, separator.data(), separator.size());
+ offset += separator.size();
+ }
+ memcpy(data.data() + offset, ref.data, ref.size);
}
void merge(const AggregateFunctionGroupConcatData& rhs) {
@@ -67,17 +74,23 @@ struct AggregateFunctionGroupConcatData {
if (!inited) {
inited = true;
separator = rhs.separator;
- data = rhs.data;
+ data.assign(rhs.data);
} else {
- data += separator;
- data += rhs.data;
+ auto offset = data.size();
+
+ auto delta_size = separator.size() + rhs.data.size();
+ data.resize(data.size() + delta_size);
+
+ memcpy(data.data() + offset, separator.data(), separator.size());
+ offset += separator.size();
+ memcpy(data.data() + offset, rhs.data.data(), rhs.data.size());
}
}
- const std::string& get() const { return data; }
+ StringRef get() const { return StringRef {data.data(), data.size()}; }
void write(BufferWritable& buf) const {
- write_binary(data, buf);
+ write_binary(StringRef {data.data(), data.size()}, buf);
write_binary(separator, buf);
write_binary(inited, buf);
}
@@ -89,7 +102,7 @@ struct AggregateFunctionGroupConcatData {
}
void reset() {
- data = "";
+ data.clear();
separator = "";
inited = false;
}
@@ -150,8 +163,8 @@ public:
}
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
- const std::string& result = this->data(place).get();
- assert_cast<ColumnString&>(to).insert_data(result.c_str(),
result.length());
+ const auto result = this->data(place).get();
+ assert_cast<ColumnString&>(to).insert_data(result.data, result.size);
}
};
diff --git a/be/src/vec/io/io_helper.h b/be/src/vec/io/io_helper.h
index d5ca522146a..221beeccbb3 100644
--- a/be/src/vec/io/io_helper.h
+++ b/be/src/vec/io/io_helper.h
@@ -22,6 +22,7 @@
#include <snappy/snappy.h>
#include <iostream>
+#include <type_traits>
#include "common/exception.h"
#include "util/binary_cast.hpp"
@@ -168,7 +169,9 @@ void read_float_binary(Type& x, BufferReadable& buf) {
read_pod_binary(x, buf);
}
-inline void read_string_binary(std::string& s, BufferReadable& buf,
+template <typename Type>
+ requires(std::is_same_v<Type, String> || std::is_same_v<Type,
PaddedPODArray<UInt8>>)
+inline void read_string_binary(Type& s, BufferReadable& buf,
size_t MAX_STRING_SIZE =
DEFAULT_MAX_STRING_SIZE) {
UInt64 size = 0;
read_var_uint(size, buf);
@@ -178,7 +181,7 @@ inline void read_string_binary(std::string& s,
BufferReadable& buf,
}
s.resize(size);
- buf.read(s.data(), size);
+ buf.read((char*)s.data(), size);
}
inline void read_string_binary(StringRef& s, BufferReadable& buf,
@@ -225,7 +228,9 @@ void read_vector_binary(std::vector<Type>& v,
BufferReadable& buf,
}
}
-inline void read_binary(String& x, BufferReadable& buf) {
+template <typename Type>
+ requires(std::is_same_v<Type, String> || std::is_same_v<Type,
PaddedPODArray<UInt8>>)
+inline void read_binary(Type& x, BufferReadable& buf) {
read_string_binary(x, buf);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]