This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6a6e10c1821 [fix](java udf) make executor class thread local (#25758)
6a6e10c1821 is described below
commit 6a6e10c1821321bd5c1a43c2fb12697aec5f7524
Author: Ashin Gau <[email protected]>
AuthorDate: Mon Oct 23 16:55:15 2023 +0800
[fix](java udf) make executor class thread local (#25758)
---
be/src/vec/functions/function_java_udf.cpp | 46 +++++++-----------------------
be/src/vec/functions/function_java_udf.h | 32 ++++++++-------------
2 files changed, 22 insertions(+), 56 deletions(-)
diff --git a/be/src/vec/functions/function_java_udf.cpp
b/be/src/vec/functions/function_java_udf.cpp
index ca69fbb0bf8..751bbdbe55a 100644
--- a/be/src/vec/functions/function_java_udf.cpp
+++ b/be/src/vec/functions/function_java_udf.cpp
@@ -17,31 +17,16 @@
#include "vec/functions/function_java_udf.h"
-#include <glog/logging.h>
-
-#include <cstdint>
#include <memory>
#include <string>
-#include <utility>
#include <vector>
-#include "gutil/strings/substitute.h"
#include "jni.h"
-#include "jni_md.h"
#include "runtime/user_function_cache.h"
#include "util/jni-util.h"
#include "vec/columns/column.h"
-#include "vec/columns/column_array.h"
-#include "vec/columns/column_map.h"
-#include "vec/columns/column_nullable.h"
-#include "vec/columns/column_string.h"
-#include "vec/columns/column_vector.h"
-#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
-#include "vec/common/string_ref.h"
#include "vec/core/block.h"
-#include "vec/data_types/data_type_array.h"
-#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/jni_connector.h"
const char* EXECUTOR_CLASS = "org/apache/doris/udf/UdfExecutor";
@@ -60,25 +45,9 @@ Status JavaFunctionCall::open(FunctionContext* context,
FunctionContext::Functio
if (env == nullptr) {
return Status::InternalError("Failed to get/create JVM");
}
- if (scope == FunctionContext::FunctionStateScope::FRAGMENT_LOCAL) {
- std::shared_ptr<JniEnv> jni_env = std::make_shared<JniEnv>();
- RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS,
&jni_env->executor_cl));
- jni_env->executor_ctor_id =
- env->GetMethodID(jni_env->executor_cl, "<init>",
EXECUTOR_CTOR_SIGNATURE);
- RETURN_ERROR_IF_EXC(env);
- jni_env->executor_evaluate_id =
- env->GetMethodID(jni_env->executor_cl, "evaluate",
EXECUTOR_EVALUATE_SIGNATURE);
- jni_env->executor_close_id =
- env->GetMethodID(jni_env->executor_cl, "close",
EXECUTOR_CLOSE_SIGNATURE);
- RETURN_ERROR_IF_EXC(env);
- context->set_function_state(FunctionContext::FRAGMENT_LOCAL, jni_env);
- }
if (scope == FunctionContext::FunctionStateScope::THREAD_LOCAL) {
- JniEnv* jni_env = reinterpret_cast<JniEnv*>(
- context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
- std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>(
- _argument_types.size(), jni_env->executor_cl,
jni_env->executor_close_id);
+ std::shared_ptr<JniContext> jni_ctx = std::make_shared<JniContext>();
context->set_function_state(FunctionContext::THREAD_LOCAL, jni_ctx);
// Add a scoped cleanup jni reference object. This cleans up local
refs made below.
@@ -98,7 +67,14 @@ Status JavaFunctionCall::open(FunctionContext* context,
FunctionContext::Functio
RETURN_IF_ERROR(SerializeThriftMsg(env, &ctor_params,
&ctor_params_bytes));
- jni_ctx->executor = env->NewObject(jni_env->executor_cl,
jni_env->executor_ctor_id,
+ RETURN_IF_ERROR(JniUtil::GetGlobalClassRef(env, EXECUTOR_CLASS,
&jni_ctx->executor_cl));
+ jni_ctx->executor_ctor_id =
+ env->GetMethodID(jni_ctx->executor_cl, "<init>",
EXECUTOR_CTOR_SIGNATURE);
+ jni_ctx->executor_evaluate_id =
+ env->GetMethodID(jni_ctx->executor_cl, "evaluate",
EXECUTOR_EVALUATE_SIGNATURE);
+ jni_ctx->executor_close_id =
+ env->GetMethodID(jni_ctx->executor_cl, "close",
EXECUTOR_CLOSE_SIGNATURE);
+ jni_ctx->executor = env->NewObject(jni_ctx->executor_cl,
jni_ctx->executor_ctor_id,
ctor_params_bytes);
jbyte* pBytes = env->GetByteArrayElements(ctor_params_bytes,
nullptr);
@@ -118,8 +94,6 @@ Status JavaFunctionCall::execute_impl(FunctionContext*
context, Block& block,
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
JniContext* jni_ctx = reinterpret_cast<JniContext*>(
context->get_function_state(FunctionContext::THREAD_LOCAL));
- JniEnv* jni_env =
-
reinterpret_cast<JniEnv*>(context->get_function_state(FunctionContext::FRAGMENT_LOCAL));
std::unique_ptr<long[]> input_table;
RETURN_IF_ERROR(JniConnector::to_java_table(&block, num_rows, arguments,
input_table));
@@ -136,7 +110,7 @@ Status JavaFunctionCall::execute_impl(FunctionContext*
context, Block& block,
{"required_fields",
output_table_schema.first},
{"columns_types",
output_table_schema.second}};
jobject output_map = JniUtil::convert_to_java_map(env, output_params);
- long output_address = env->CallLongMethod(jni_ctx->executor,
jni_env->executor_evaluate_id,
+ long output_address = env->CallLongMethod(jni_ctx->executor,
jni_ctx->executor_evaluate_id,
input_map, output_map);
RETURN_IF_ERROR(JniUtil::GetJniExceptionMsg(env));
env->DeleteLocalRef(input_map);
diff --git a/be/src/vec/functions/function_java_udf.h
b/be/src/vec/functions/function_java_udf.h
index 425f05ccfd9..a30a93b355e 100644
--- a/be/src/vec/functions/function_java_udf.h
+++ b/be/src/vec/functions/function_java_udf.h
@@ -38,9 +38,8 @@
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/functions/function.h"
-namespace doris {
-namespace vectorized {
+namespace doris::vectorized {
class JavaUdfPreparedFunction : public PreparedFunctionImpl {
public:
@@ -114,26 +113,18 @@ private:
const DataTypes _argument_types;
const DataTypePtr _return_type;
- struct JniEnv {
- /// Global class reference to the UdfExecutor Java class and related
method IDs. Set in
- /// Init(). These have the lifetime of the process (i.e.
'executor_cl_' is never freed).
- jclass executor_cl;
- jmethodID executor_ctor_id;
- jmethodID executor_evaluate_id;
- jmethodID executor_close_id;
- };
-
struct JniContext {
// Do not save parent directly, because parent is in VExpr, but jni
context is in FunctionContext
// The deconstruct sequence is not determined, it will core.
// JniContext's lifecycle should same with function context, not
related with expr
- jclass executor_cl_;
- jmethodID executor_close_id_;
+ jclass executor_cl;
+ jmethodID executor_ctor_id;
+ jmethodID executor_evaluate_id;
+ jmethodID executor_close_id;
jobject executor = nullptr;
bool is_closed = false;
- JniContext(int64_t num_args, jclass executor_cl, jmethodID
executor_close_id)
- : executor_cl_(executor_cl),
executor_close_id_(executor_close_id) {}
+ JniContext() = default;
void close() {
if (is_closed) {
@@ -146,15 +137,16 @@ private:
LOG(WARNING) << "errors while get jni env " << status;
return;
}
- env->CallNonvirtualVoidMethodA(executor, executor_cl_,
executor_close_id_, NULL);
+ env->CallNonvirtualVoidMethodA(executor, executor_cl,
executor_close_id, nullptr);
env->DeleteGlobalRef(executor);
- env->DeleteGlobalRef(executor_cl_);
+ env->DeleteGlobalRef(executor_cl);
Status s = JniUtil::GetJniExceptionMsg(env);
- if (!s.ok()) LOG(WARNING) << s;
+ if (!s.ok()) {
+ LOG(WARNING) << s;
+ }
is_closed = true;
}
};
};
-} // namespace vectorized
-} // namespace doris
+} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]