This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 7a52cbb796a [bug](udaf) fix memory leak in the java udaf (#32630) 
(#32702)
7a52cbb796a is described below

commit 7a52cbb796aa904eff4536dbdaaac1cd5cecb776
Author: zhangstar333 <[email protected]>
AuthorDate: Fri Mar 22 21:18:39 2024 +0800

    [bug](udaf) fix memory leak in the java udaf (#32630) (#32702)
---
 .../aggregate_function_java_udaf.h                 | 40 +++++++++++++++-------
 1 file changed, 27 insertions(+), 13 deletions(-)

diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h 
b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
index 05bda448077..720e143d4a5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
@@ -58,21 +58,28 @@ public:
     AggregateJavaUdafData() = default;
     AggregateJavaUdafData(int64_t num_args) { argument_size = num_args; }
 
-    ~AggregateJavaUdafData() {
+    ~AggregateJavaUdafData() = default;
+
+    Status close_and_delete_object() {
         JNIEnv* env = nullptr;
-        //use defer as the jni call maybe have error, then directly return 
without delete
-        Defer defer([&, this] {
-            std::string temp;
-            temp.swap(serialize_data);
+        Defer defer {[&]() {
             if (env != nullptr) {
                 env->DeleteGlobalRef(executor_cl);
                 env->DeleteGlobalRef(executor_obj);
             }
-        });
-        Status status;
-        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env));
+        }};
+        Status st = JniUtil::GetJNIEnv(&env);
+        if (!st.ok()) {
+            LOG(WARNING) << "Failed to get JNIEnv";
+            return st;
+        }
         env->CallNonvirtualVoidMethod(executor_obj, executor_cl, 
executor_close_id);
-        RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env));
+        st = JniUtil::GetJniExceptionMsg(env);
+        if (!st.ok()) {
+            LOG(WARNING) << "Failed to close JAVA UDAF: " << st.to_string();
+            return st;
+        }
+        return Status::OK();
     }
 
     Status init_udaf(const TFunction& fn, const std::string& local_location) {
@@ -521,8 +528,8 @@ public:
     }
 
     void create(AggregateDataPtr __restrict place) const override {
+        new (place) Data(argument_types.size());
         if (_first_created) {
-            new (place) Data(argument_types.size());
             Status status = Status::OK();
             SAFE_CREATE(RETURN_IF_STATUS_ERROR(status,
                                                
this->data(place).init_udaf(_fn, _local_location)),
@@ -532,16 +539,24 @@ public:
                         });
             _first_created = false;
             _exec_place = place;
+            if (UNLIKELY(!status.ok())) {
+                throw doris::Exception(ErrorCode::INTERNAL_ERROR, 
status.to_string());
+            }
         }
     }
 
     // To avoid multiple times JNI call, Here will destroy all data at once
     void destroy(AggregateDataPtr __restrict place) const noexcept override {
         if (place == _exec_place) {
-            this->data(_exec_place).destroy();
-            this->data(_exec_place).~Data();
+            Status status = Status::OK();
+            status = this->data(_exec_place).destroy();
+            status = this->data(_exec_place).close_and_delete_object();
             _first_created = true;
+            if (UNLIKELY(!status.ok())) {
+                LOG(WARNING) << "Failed to destroy function: " << 
status.to_string();
+            }
         }
+        this->data(place).~Data();
     }
 
     String get_name() const override { return _fn.name.function_name; }
@@ -625,7 +640,6 @@ public:
     // so it's can't call ~Data, only to change _destory_deserialize flag.
     void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf,
                      Arena*) const override {
-        new (place) Data(argument_types.size());
         this->data(place).read(buf);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to