This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a6e10c1821 [fix](java udf) make executor class thread local (#25758)
6a6e10c1821 is described below

commit 6a6e10c1821321bd5c1a43c2fb12697aec5f7524
Author: Ashin Gau <[email protected]>
AuthorDate: Mon Oct 23 16:55:15 2023 +0800

    [fix](java udf) make executor class thread local (#25758)
---
 be/src/vec/functions/function_java_udf.cpp | 46 +++++++-----------------------
 be/src/vec/functions/function_java_udf.h   | 32 ++++++++-------------
 2 files changed, 22 insertions(+), 56 deletions(-)

diff --git a/be/src/vec/functions/function_java_udf.cpp 
b/be/src/vec/functions/function_java_udf.cpp
index ca69fbb0bf8..751bbdbe55a 100644
--- a/be/src/vec/functions/function_java_udf.cpp
+++ b/be/src/vec/functions/function_java_udf.cpp
@@ -17,31 +17,16 @@
 
 #include "vec/functions/function_java_udf.h"
 
-#include <glog/logging.h>
-
-#include <cstdint>
 #include <memory>
 #include <string>
-#include <utility>
 #include <vector>
 
-#include "gutil/strings/substitute.h"
 #include "jni.h"
-#include "jni_md.h"
 #include "runtime/user_function_cache.h"
 #include "util/jni-util.h"
 #include "vec/columns/column.h"
-#include "vec/columns/column_array.h"
-#include "vec/columns/column_map.h"
-#include "vec/columns/column_nullable.h"
-#include "vec/columns/column_string.h"
-#include "vec/columns/column_vector.h"
-#include "vec/columns/columns_number.h"
 #include "vec/common/assert_cast.h"
-#include "vec/common/string_ref.h"
 #include "vec/core/block.h"
-#include "vec/data_types/data_type_array.h"
-#include "vec/data_types/data_type_nullable.h"
 #include "vec/exec/jni_connector.h"
 
 const char* EXECUTOR_CLASS = "org/apache/doris/udf/UdfExecutor";
@@ -60,25 +45,9 @@ Status JavaFunctionCall::open(FunctionContext* context, 
FunctionContext::Functio
     if (env == nullptr) {
         return Status::InternalError("Failed to get/create JVM");
     }
-    if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) {
-        std::shared_ptr<JniEnv> jni_env = std::make_shared<JniEnv>();
-        RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, 
&jni_env->executor_cl));
-        jni_env->executor_ctor_id =
-                env->GetMethodID(jni_env->executor_cl, "<init>", 
EXECUTOR_CTOR_SIGNATURE);
-        RETURN_ERROR_IF_EXC(env);
-        jni_env->executor_evaluate_id =
-                env->GetMethodID(jni_env->executor_cl, "evaluate", 
EXECUTOR_EVALUATE_SIGNATURE);
-        jni_env->executor_close_id =
-                env->GetMethodID(jni_env->executor_cl, "close", 
EXECUTOR_CLOSE_SIGNATURE);
-        RETURN_ERROR_IF_EXC(env);
-        context->set_function_state(FunctionContext::FRAGMENT_LOCAL, jni_env);
-    }
 
     if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) {
-        JniEnv* jni_env = reinterpret_cast<JniEnv*>(
-                context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
-        std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>(
-                _argument_types.size(), jni_env->executor_cl, 
jni_env->executor_close_id);
+        std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>();
         context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);
 
         // Add a scoped cleanup jni reference object. This cleans up local 
refs made below.
@@ -98,7 +67,14 @@ Status JavaFunctionCall::open(FunctionContext* context, 
FunctionContext::Functio
 
             RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params, 
&ctor_params_bytes));
 
-            jni_ctx->executor = env->NewObject(jni_env->executor_cl, 
jni_env->executor_ctor_id,
+            RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS, 
&jni_ctx->executor_cl));
+            jni_ctx->executor_ctor_id =
+                    env->GetMethodID(jni_ctx->executor_cl, "<init>", 
EXECUTOR_CTOR_SIGNATURE);
+            jni_ctx->executor_evaluate_id =
+                    env->GetMethodID(jni_ctx->executor_cl, "evaluate", 
EXECUTOR_EVALUATE_SIGNATURE);
+            jni_ctx->executor_close_id =
+                    env->GetMethodID(jni_ctx->executor_cl, "close", 
EXECUTOR_CLOSE_SIGNATURE);
+            jni_ctx->executor = env->NewObject(jni_ctx->executor_cl, 
jni_ctx->executor_ctor_id,
                                                ctor_params_bytes);
 
             jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes, 
nullptr);
@@ -118,8 +94,6 @@ Status JavaFunctionCall::execute_impl(FunctionContext* 
context, Block& block,
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
     JniContext* jni_ctx = reinterpret_cast<JniContext*>(
             context->get_function_state(FunctionContext::THREAD_LOCAL));
-    JniEnv* jni_env =
-            
reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
 
     std::unique_ptr<long[]> input_table;
     RETURN_IF_ERROR(JniConnector::to_java_table(&block, num_rows, arguments, 
input_table));
@@ -136,7 +110,7 @@ Status JavaFunctionCall::execute_impl(FunctionContext* 
context, Block& block,
                                               {"required_fields", 
output_table_schema.first},
                                               {"columns_types", 
output_table_schema.second}};
     jobject output_map = JniUtil::convert_to_java_map(env, output_params);
-    long output_address = env->CallLongMethod(jni_ctx->executor, 
jni_env->executor_evaluate_id,
+    long output_address = env->CallLongMethod(jni_ctx->executor, 
jni_ctx->executor_evaluate_id,
                                               input_map, output_map);
     RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
     env->DeleteLocalRef(input_map);
diff --git a/be/src/vec/functions/function_java_udf.h 
b/be/src/vec/functions/function_java_udf.h
index 425f05ccfd9..a30a93b355e 100644
--- a/be/src/vec/functions/function_java_udf.h
+++ b/be/src/vec/functions/function_java_udf.h
@@ -38,9 +38,8 @@
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
 #include "vec/functions/function.h"
-namespace doris {
 
-namespace vectorized {
+namespace doris::vectorized {
 
 class JavaUdfPreparedFunction : public PreparedFunctionImpl {
 public:
@@ -114,26 +113,18 @@ private:
     const DataTypes _argument_types;
     const DataTypePtr _return_type;
 
-    struct JniEnv {
-        /// Global class reference to the UdfExecutor Java class and related 
method IDs. Set in
-        /// Init(). These have the lifetime of the process (i.e. 
'executor_cl_' is never freed).
-        jclass executor_cl;
-        jmethodID executor_ctor_id;
-        jmethodID executor_evaluate_id;
-        jmethodID executor_close_id;
-    };
-
     struct JniContext {
         // Do not save parent directly, because parent is in VExpr, but jni 
context is in FunctionContext
         // The deconstruct sequence is not determined, it will core.
         // JniContext's lifecycle should same with function context, not 
related with expr
-        jclass executor_cl_;
-        jmethodID executor_close_id_;
+        jclass executor_cl;
+        jmethodID executor_ctor_id;
+        jmethodID executor_evaluate_id;
+        jmethodID executor_close_id;
         jobject executor = nullptr;
         bool is_closed = false;
 
-        JniContext(int64_t num_args, jclass executor_cl, jmethodID 
executor_close_id)
-                : executor_cl_(executor_cl), 
executor_close_id_(executor_close_id) {}
+        JniContext() = default;
 
         void close() {
             if (is_closed) {
@@ -146,15 +137,16 @@ private:
                 LOG(WARNING) << "errors while get jni env " << status;
                 return;
             }
-            env->CallNonvirtualVoidMethodA(executor, executor_cl_, 
executor_close_id_, NULL);
+            env->CallNonvirtualVoidMethodA(executor, executor_cl, 
executor_close_id, nullptr);
             env->DeleteGlobalRef(executor);
-            env->DeleteGlobalRef(executor_cl_);
+            env->DeleteGlobalRef(executor_cl);
             Status s = JniUtil::GetJniExceptionMsg(env);
-            if (!s.ok()) LOG(WARNING) << s;
+            if (!s.ok()) {
+                LOG(WARNING) << s;
+            }
             is_closed = true;
         }
     };
 };
 
-} // namespace vectorized
-} // namespace doris
+} // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to