This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 7a52cbb796a [bug](udaf) fix memory leak in the java udaf (#32630)
(#32702)
7a52cbb796a is described below
commit 7a52cbb796aa904eff4536dbdaaac1cd5cecb776
Author: zhangstar333 <[email protected]>
AuthorDate: Fri Mar 22 21:18:39 2024 +0800
[bug](udaf) fix memory leak in the java udaf (#32630) (#32702)
---
.../aggregate_function_java_udaf.h | 40 +++++++++++++++-------
1 file changed, 27 insertions(+), 13 deletions(-)
diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
index 05bda448077..720e143d4a5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
@@ -58,21 +58,28 @@ public:
AggregateJavaUdafData() = default;
AggregateJavaUdafData(int64_t num_args) { argument_size = num_args; }
- ~AggregateJavaUdafData() {
+ ~AggregateJavaUdafData() = default;
+
+ Status close_and_delete_object() {
JNIEnv* env = nullptr;
- //use defer as the jni call maybe have error, then directly return
without delete
- Defer defer([&, this] {
- std::string temp;
- temp.swap(serialize_data);
+ Defer defer {[&]() {
if (env != nullptr) {
env->DeleteGlobalRef(executor_cl);
env->DeleteGlobalRef(executor_obj);
}
- });
- Status status;
- RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+ }};
+ Status st = JniUtil::GetJNIEnv(&env);
+ if (!st.ok()) {
+ LOG(WARNING) << "Failed to get JNIEnv";
+ return st;
+ }
env->CallNonvirtualVoidMethod(executor_obj, executor_cl,
executor_close_id);
- RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+ st = JniUtil::GetJniExceptionMsg(env);
+ if (!st.ok()) {
+ LOG(WARNING) << "Failed to close JAVA UDAF: " << st.to_string();
+ return st;
+ }
+ return Status::OK();
}
Status init_udaf(const TFunction& fn, const std::string& local_location) {
@@ -521,8 +528,8 @@ public:
}
void create(AggregateDataPtr __restrict place) const override {
+ new (place) Data(argument_types.size());
if (_first_created) {
- new (place) Data(argument_types.size());
Status status = Status::OK();
SAFE_CREATE(RETURN_IF_STATUS_ERROR(status,
this->data(place).init_udaf(_fn, _local_location)),
@@ -532,16 +539,24 @@ public:
});
_first_created = false;
_exec_place = place;
+ if (UNLIKELY(!status.ok())) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
status.to_string());
+ }
}
}
// To avoid multiple times JNI call, Here will destroy all data at once
void destroy(AggregateDataPtr __restrict place) const noexcept override {
if (place == _exec_place) {
- this->data(_exec_place).destroy();
- this->data(_exec_place).~Data();
+ Status status = Status::OK();
+ status = this->data(_exec_place).destroy();
+ status = this->data(_exec_place).close_and_delete_object();
_first_created = true;
+ if (UNLIKELY(!status.ok())) {
+ LOG(WARNING) << "Failed to destroy function: " <<
status.to_string();
+ }
}
+ this->data(place).~Data();
}
String get_name() const override { return _fn.name.function_name; }
@@ -625,7 +640,6 @@ public:
// so it's can't call ~Data, only to change _destory_deserialize flag.
void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
Arena*) const override {
- new (place) Data(argument_types.size());
this->data(place).read(buf);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]