This is an automated email from the ASF dual-hosted git repository.
zhangstar333 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 16af223d0a0 [improve](udaf)support class cache for java-udaf (#47619)
16af223d0a0 is described below
commit 16af223d0a077fd2f052fa96815876fb03142b62
Author: zhangstar333 <[email protected]>
AuthorDate: Thu Feb 13 19:40:22 2025 +0800
[improve](udaf)support class cache for java-udaf (#47619)
### What problem does this PR solve?
Problem Summary:
in some user case,
- the udf-jars package have some resource file, it's maybe about
hundreds MBs,
so if every instances load the jar, it's easy to cause the BE JVM OOM.
- or in some udf-jars have some init operator cause many times, so wants
all instance could only init it's once.
follow up https://github.com/apache/doris/pull/40404
support class cache for java-udaf
---
.../apache/doris/common/jni/utils/ExpiringMap.java | 4 +
.../doris/common/jni/utils/UdfClassCache.java | 14 +-
.../java/org/apache/doris/udf/BaseExecutor.java | 105 +++++++++---
.../java/org/apache/doris/udf/UdafExecutor.java | 190 +++++++++------------
.../java/org/apache/doris/udf/UdfExecutor.java | 136 +++------------
.../apache/doris/analysis/CreateFunctionStmt.java | 4 +
.../trees/expressions/functions/udf/JavaUdaf.java | 12 +-
.../trees/expressions/functions/udf/JavaUdtf.java | 13 +-
.../javaudf_p0/test_javaudf_static_load_test.out | Bin 478 -> 714 bytes
.../org/apache/doris/udf/StaticIntTestUDAF.java | 65 +++++++
.../org/apache/doris/udf/StaticIntTestUDTF.java | 35 ++++
.../test_javaudf_static_load_test.groovy | 40 +++++
.../suites/javaudf_p0/test_javaudtf_decimal.groovy | 2 +-
13 files changed, 371 insertions(+), 249 deletions(-)
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
index f08b50a0c42..7bb4b61344a 100644
---
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
+++
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
@@ -78,6 +78,10 @@ public class ExpiringMap<K, V> {
ttlMap.remove(key);
}
+ public int size() {
+ return map.size();
+ }
+
public void shutdown() {
scheduler.shutdown();
try {
diff --git
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java
index 696ef4ed0bb..1062aa05582 100644
---
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java
+++
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java
@@ -20,6 +20,7 @@ package org.apache.doris.common.jni.utils;
import com.esotericsoftware.reflectasm.MethodAccess;
import java.lang.reflect.Method;
+import java.util.HashMap;
/**
* This class is used for caching the class of UDF.
@@ -28,16 +29,17 @@ public class UdfClassCache {
public Class<?> udfClass;
// the index of evaluate() method in the class
public MethodAccess methodAccess;
- public int evaluateIndex;
- // the method of evaluate() in udf
- public Method method;
- // the method of prepare() in udf
- public Method prepareMethod;
// the argument and return's JavaUdfDataType of evaluate() method.
public JavaUdfDataType[] argTypes;
- public JavaUdfDataType retType;
// the class type of the arguments in evaluate() method
public Class[] argClass;
// The return type class of evaluate() method
+ public JavaUdfDataType retType;
public Class retClass;
+
+ // all methods in the class for java-udf/ java-udaf
+ public HashMap<String, Method> allMethods;
+ // for java-udf index is evaluate method index
+ // for java-udaf index is add method index
+ public int methodIndex;
}
diff --git
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
index f97cbb602f1..3bc138b523c 100644
---
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
+++
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
@@ -19,9 +19,12 @@ package org.apache.doris.udf;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Type;
+import org.apache.doris.common.classloader.ScannerLoader;
import org.apache.doris.common.exception.InternalException;
import org.apache.doris.common.exception.UdfRuntimeException;
import org.apache.doris.common.jni.utils.JavaUdfDataType;
+import org.apache.doris.common.jni.utils.UdfClassCache;
+import org.apache.doris.common.jni.utils.UdfUtils;
import org.apache.doris.common.jni.vec.ColumnValueConverter;
import org.apache.doris.common.jni.vec.VectorTable;
import org.apache.doris.thrift.TFunction;
@@ -29,12 +32,16 @@ import org.apache.doris.thrift.TJavaUdfExecutorCtorParams;
import org.apache.doris.thrift.TPrimitiveType;
import com.esotericsoftware.reflectasm.MethodAccess;
+import com.google.common.base.Strings;
import org.apache.log4j.Logger;
import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
import java.net.URLClassLoader;
import java.time.LocalDate;
import java.time.LocalDateTime;
@@ -44,26 +51,17 @@ import java.util.Map;
import java.util.Map.Entry;
public abstract class BaseExecutor {
- private static final Logger LOG = Logger.getLogger(BaseExecutor.class);
-
-
// Object to deserialize ctor params from BE.
protected static final TBinaryProtocol.Factory PROTOCOL_FACTORY = new
TBinaryProtocol.Factory();
-
+ private static final Logger LOG = Logger.getLogger(BaseExecutor.class);
protected Object udf;
// setup by init() and cleared by close()
protected URLClassLoader classLoader;
-
- // Return and argument types of the function inferred from the udf method
- // signature.
- // The JavaUdfDataType enum maps it to corresponding primitive type.
- protected JavaUdfDataType[] argTypes;
- protected JavaUdfDataType retType;
- protected Class[] argClass;
- protected MethodAccess methodAccess;
- protected VectorTable outputTable = null;
+ protected UdfClassCache objCache;
protected TFunction fn;
- protected Class retClass;
+ protected boolean isStaticLoad = false;
+ protected VectorTable outputTable = null;
+ String className;
/**
* Create a UdfExecutor, using parameters from a serialized thrift object.
Used
@@ -94,17 +92,79 @@ public abstract class BaseExecutor {
public String debugString() {
StringBuilder res = new StringBuilder();
- for (JavaUdfDataType type : argTypes) {
+ for (JavaUdfDataType type : objCache.argTypes) {
res.append(type.toString());
}
- res.append(" return type: ").append(retType.toString());
- res.append(" methodAccess: ").append(methodAccess.toString());
+ res.append(" return type: ").append(objCache.retType.toString());
+ res.append(" methodAccess: ").append(objCache.methodAccess.toString());
res.append(" fn.toString(): ").append(fn.toString());
return res.toString();
}
- protected abstract void init(TJavaUdfExecutorCtorParams request, String
jarPath,
- Type funcRetType, Type... parameterTypes) throws
UdfRuntimeException;
+ protected void init(TJavaUdfExecutorCtorParams request, String jarPath,
+ Type funcRetType, Type... parameterTypes) throws
UdfRuntimeException {
+ try {
+ isStaticLoad = request.getFn().isSetIsStaticLoad() &&
request.getFn().is_static_load;
+ long expirationTime = 360L; // default is 6 hours
+ if (request.getFn().isSetExpirationTime()) {
+ expirationTime = request.getFn().getExpirationTime();
+ }
+ objCache = getClassCache(jarPath, request.getFn().getSignature(),
expirationTime,
+ funcRetType, parameterTypes);
+ Constructor<?> ctor = objCache.udfClass.getConstructor();
+ udf = ctor.newInstance();
+ } catch (MalformedURLException e) {
+ throw new UdfRuntimeException("Unable to load jar.", e);
+ } catch (SecurityException e) {
+ throw new UdfRuntimeException("Unable to load function.", e);
+ } catch (ClassNotFoundException e) {
+ throw new UdfRuntimeException("Unable to find class.", e);
+ } catch (NoSuchMethodException e) {
+ throw new UdfRuntimeException(
+ "Unable to find constructor with no arguments.", e);
+ } catch (IllegalArgumentException e) {
+ throw new UdfRuntimeException(
+ "Unable to call UDF constructor with no arguments.", e);
+ } catch (Exception e) {
+ throw new UdfRuntimeException("Unable to call create UDF
instance.", e);
+ }
+ }
+
+
+ public UdfClassCache getClassCache(String jarPath, String signature, long
expirationTime,
+ Type funcRetType, Type... parameterTypes)
+ throws MalformedURLException, FileNotFoundException,
ClassNotFoundException, InternalException,
+ UdfRuntimeException {
+ UdfClassCache cache = null;
+ if (isStaticLoad) {
+ cache = ScannerLoader.getUdfClassLoader(signature);
+ }
+ if (cache == null) {
+ ClassLoader loader;
+ if (Strings.isNullOrEmpty(jarPath)) {
+ // if jarPath is empty, which means the UDF jar is located in
custom_lib
+ // and already be loaded when BE start.
+ // so here we use system class loader to load UDF class.
+ loader = ClassLoader.getSystemClassLoader();
+ } else {
+ ClassLoader parent = getClass().getClassLoader();
+ classLoader = UdfUtils.getClassLoader(jarPath, parent);
+ loader = classLoader;
+ }
+ cache = new UdfClassCache();
+ cache.allMethods = new HashMap<>();
+ cache.udfClass = Class.forName(className, true, loader);
+ cache.methodAccess = MethodAccess.get(cache.udfClass);
+ checkAndCacheUdfClass(cache, funcRetType, parameterTypes);
+ if (isStaticLoad) {
+ ScannerLoader.cacheClassLoader(signature, cache,
expirationTime);
+ }
+ }
+ return cache;
+ }
+
+ protected abstract void checkAndCacheUdfClass(UdfClassCache cache, Type
funcRetType, Type... parameterTypes)
+ throws InternalException, UdfRuntimeException;
/**
* Close the class loader we may have created.
@@ -127,7 +187,7 @@ public abstract class BaseExecutor {
// We are now un-usable (because the class loader has been
// closed), so null out method_ and classLoader_.
classLoader = null;
- methodAccess = null;
+ objCache.methodAccess = null;
}
protected ColumnValueConverter getInputConverter(TPrimitiveType
primitiveType, Class clz) {
@@ -311,7 +371,8 @@ public abstract class BaseExecutor {
for (int j = 0; j < numColumns; ++j) {
// For UDAF, we need to offset by 1 since first arg is state
int argIndex = isUdaf ? j + 1 : j;
- ColumnValueConverter converter =
getInputConverter(argTypes[j].getPrimitiveType(), argClass[argIndex]);
+ ColumnValueConverter converter =
getInputConverter(objCache.argTypes[j].getPrimitiveType(),
+ objCache.argClass[argIndex]);
if (converter != null) {
converters.put(j, converter);
}
@@ -320,6 +381,6 @@ public abstract class BaseExecutor {
}
protected ColumnValueConverter getOutputConverter() {
- return getOutputConverter(retType, retClass);
+ return getOutputConverter(objCache.retType, objCache.retClass);
}
}
diff --git
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
index 53c214303ce..3f3860ad53d 100644
---
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
+++
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
@@ -19,14 +19,15 @@ package org.apache.doris.udf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.exception.InternalException;
import org.apache.doris.common.exception.UdfRuntimeException;
import org.apache.doris.common.jni.utils.JavaUdfDataType;
import org.apache.doris.common.jni.utils.OffHeap;
+import org.apache.doris.common.jni.utils.UdfClassCache;
import org.apache.doris.common.jni.utils.UdfUtils;
import org.apache.doris.common.jni.vec.VectorTable;
import org.apache.doris.thrift.TJavaUdfExecutorCtorParams;
-import com.esotericsoftware.reflectasm.MethodAccess;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.apache.log4j.Logger;
@@ -36,9 +37,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.lang.reflect.Array;
-import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
-import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
@@ -59,9 +58,7 @@ public class UdafExecutor extends BaseExecutor {
private static final String UDAF_MERGE_FUNCTION = "merge";
private static final String UDAF_RESULT_FUNCTION = "getValue";
- private HashMap<String, Method> allMethods;
private HashMap<Long, Object> stateObjMap;
- private int addIndex;
/**
* Constructor to create an object.
@@ -75,8 +72,9 @@ public class UdafExecutor extends BaseExecutor {
*/
@Override
public void close() {
- super.close();
- allMethods = null;
+ if (!isStaticLoad) {
+ super.close();
+ }
stateObjMap = null;
}
@@ -99,7 +97,7 @@ public class UdafExecutor extends BaseExecutor {
public void addBatchSingle(int rowStart, int rowEnd, long placeAddr,
Object[][] inputs) throws UdfRuntimeException {
Long curPlace = placeAddr;
- Object[] inputArgs = new Object[argTypes.length + 1];
+ Object[] inputArgs = new Object[objCache.argTypes.length + 1];
Object state = stateObjMap.get(curPlace);
if (state != null) {
inputArgs[0] = state;
@@ -114,7 +112,7 @@ public class UdafExecutor extends BaseExecutor {
for (int j = 0; j < numColumns; ++j) {
inputArgs[j + 1] = inputs[j][i];
}
- methodAccess.invoke(udf, addIndex, inputArgs);
+ objCache.methodAccess.invoke(udf, objCache.methodIndex, inputArgs);
}
}
@@ -134,15 +132,15 @@ public class UdafExecutor extends BaseExecutor {
placeState[row - rowStart] = newState;
}
}
- //spilt into two for loop
+ // spilt into two for loop
- Object[] inputArgs = new Object[argTypes.length + 1];
+ Object[] inputArgs = new Object[objCache.argTypes.length + 1];
for (int row = 0; row < numRows; ++row) {
inputArgs[0] = placeState[row];
for (int j = 0; j < numColumns; ++j) {
inputArgs[j + 1] = inputs[j][row];
}
- methodAccess.invoke(udf, addIndex, inputArgs);
+ objCache.methodAccess.invoke(udf, objCache.methodIndex, inputArgs);
}
}
@@ -151,7 +149,7 @@ public class UdafExecutor extends BaseExecutor {
*/
public Object createAggState() throws UdfRuntimeException {
try {
- return allMethods.get(UDAF_CREATE_FUNCTION).invoke(udf, null);
+ return objCache.allMethods.get(UDAF_CREATE_FUNCTION).invoke(udf,
null);
} catch (Exception e) {
LOG.warn("invoke createAggState function meet some error: ", e);
throw new UdfRuntimeException("UDAF failed to create: ", e);
@@ -164,7 +162,7 @@ public class UdafExecutor extends BaseExecutor {
public void destroy() throws UdfRuntimeException {
try {
for (Object obj : stateObjMap.values()) {
- allMethods.get(UDAF_DESTROY_FUNCTION).invoke(udf, obj);
+ objCache.allMethods.get(UDAF_DESTROY_FUNCTION).invoke(udf,
obj);
}
stateObjMap.clear();
} catch (Exception e) {
@@ -182,7 +180,7 @@ public class UdafExecutor extends BaseExecutor {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
args[0] = stateObjMap.get(place);
args[1] = new DataOutputStream(baos);
- allMethods.get(UDAF_SERIALIZE_FUNCTION).invoke(udf, args);
+ objCache.allMethods.get(UDAF_SERIALIZE_FUNCTION).invoke(udf, args);
return baos.toByteArray();
} catch (Exception e) {
LOG.info("evaluate exception debug: " + debugString());
@@ -201,7 +199,7 @@ public class UdafExecutor extends BaseExecutor {
if (args[0] == null) {
return;
}
- allMethods.get(UDAF_RESET_FUNCTION).invoke(udf, args);
+ objCache.allMethods.get(UDAF_RESET_FUNCTION).invoke(udf, args);
} catch (Exception e) {
LOG.info("evaluate exception debug: " + debugString());
LOG.warn("invoke reset function meet some error: ", e);
@@ -219,7 +217,7 @@ public class UdafExecutor extends BaseExecutor {
ByteArrayInputStream bins = new ByteArrayInputStream(data);
args[0] = createAggState();
args[1] = new DataInputStream(bins);
- allMethods.get(UDAF_DESERIALIZE_FUNCTION).invoke(udf, args);
+ objCache.allMethods.get(UDAF_DESERIALIZE_FUNCTION).invoke(udf,
args);
args[1] = args[0];
Long curPlace = place;
Object state = stateObjMap.get(curPlace);
@@ -230,7 +228,7 @@ public class UdafExecutor extends BaseExecutor {
stateObjMap.put(curPlace, newState);
args[0] = newState;
}
- allMethods.get(UDAF_MERGE_FUNCTION).invoke(udf, args);
+ objCache.allMethods.get(UDAF_MERGE_FUNCTION).invoke(udf, args);
} catch (Exception e) {
LOG.info("evaluate exception debug: " + debugString());
LOG.warn("invoke merge function meet some error: ", e);
@@ -250,12 +248,12 @@ public class UdafExecutor extends BaseExecutor {
if (stateObjMap.get(place) == null) {
stateObjMap.put(place, createAggState());
}
- Object value = allMethods.get(UDAF_RESULT_FUNCTION).invoke(udf,
stateObjMap.get(place));
+ Object value =
objCache.allMethods.get(UDAF_RESULT_FUNCTION).invoke(udf,
stateObjMap.get(place));
// If the return type is primitive, we can't cast the array of
primitive type as array of Object,
// so we have to new its wrapped Object.
Object[] result = outputTable.getColumnType(0).isPrimitive()
? outputTable.getColumn(0).newObjectContainerArray(1)
- : (Object[]) Array.newInstance(retClass, 1);
+ : (Object[]) Array.newInstance(objCache.retClass, 1);
result[0] = value;
boolean isNullable =
Boolean.parseBoolean(outputParams.getOrDefault("is_nullable", "true"));
outputTable.appendData(0, result, getOutputConverter(),
isNullable);
@@ -267,109 +265,85 @@ public class UdafExecutor extends BaseExecutor {
}
}
- @Override
protected void init(TJavaUdfExecutorCtorParams request, String jarPath,
Type funcRetType,
Type... parameterTypes) throws UdfRuntimeException {
- String className = request.fn.aggregate_fn.symbol;
- allMethods = new HashMap<>();
+ className = fn.aggregate_fn.symbol;
+ super.init(request, jarPath, funcRetType, parameterTypes);
stateObjMap = new HashMap<>();
+ }
+ @Override
+ protected void checkAndCacheUdfClass(UdfClassCache cache, Type funcRetType,
+ Type... parameterTypes) throws InternalException,
UdfRuntimeException {
ArrayList<String> signatures = Lists.newArrayList();
- try {
- ClassLoader loader;
- if (jarPath != null) {
- ClassLoader parent = getClass().getClassLoader();
- classLoader = UdfUtils.getClassLoader(jarPath, parent);
- loader = classLoader;
- } else {
- // for test
- loader = ClassLoader.getSystemClassLoader();
- }
- Class<?> c = Class.forName(className, true, loader);
- methodAccess = MethodAccess.get(c);
- Constructor<?> ctor = c.getConstructor();
- udf = ctor.newInstance();
- Method[] methods = c.getDeclaredMethods();
- int idx = 0;
- for (idx = 0; idx < methods.length; ++idx) {
- signatures.add(methods[idx].toGenericString());
- switch (methods[idx].getName()) {
- case UDAF_DESTROY_FUNCTION:
- case UDAF_CREATE_FUNCTION:
- case UDAF_MERGE_FUNCTION:
- case UDAF_SERIALIZE_FUNCTION:
- case UDAF_RESET_FUNCTION:
- case UDAF_DESERIALIZE_FUNCTION: {
- allMethods.put(methods[idx].getName(), methods[idx]);
- break;
+ Class<?> c = cache.udfClass;
+ Method[] methods = c.getMethods();
+ int idx = 0;
+ for (idx = 0; idx < methods.length; ++idx) {
+ signatures.add(methods[idx].toGenericString());
+ switch (methods[idx].getName()) {
+ case UDAF_DESTROY_FUNCTION:
+ case UDAF_CREATE_FUNCTION:
+ case UDAF_MERGE_FUNCTION:
+ case UDAF_SERIALIZE_FUNCTION:
+ case UDAF_RESET_FUNCTION:
+ case UDAF_DESERIALIZE_FUNCTION: {
+ cache.allMethods.put(methods[idx].getName(), methods[idx]);
+ break;
+ }
+ case UDAF_RESULT_FUNCTION: {
+ cache.allMethods.put(methods[idx].getName(), methods[idx]);
+ Pair<Boolean, JavaUdfDataType> returnType =
UdfUtils.setReturnType(funcRetType,
+ methods[idx].getReturnType());
+ if (!returnType.first) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("result function set return
parameterTypes has error");
+ }
+ } else {
+ cache.retType = returnType.second;
+ cache.retClass = methods[idx].getReturnType();
}
- case UDAF_RESULT_FUNCTION: {
- allMethods.put(methods[idx].getName(), methods[idx]);
- Pair<Boolean, JavaUdfDataType> returnType =
UdfUtils.setReturnType(funcRetType,
- methods[idx].getReturnType());
- if (!returnType.first) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("result function set return
parameterTypes has error");
- }
- } else {
- retType = returnType.second;
- retClass = methods[idx].getReturnType();
+ break;
+ }
+ case UDAF_ADD_FUNCTION: {
+ cache.allMethods.put(methods[idx].getName(), methods[idx]);
+ cache.methodIndex =
cache.methodAccess.getIndex(UDAF_ADD_FUNCTION);
+ cache.argClass = methods[idx].getParameterTypes();
+ if (cache.argClass.length != parameterTypes.length + 1) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("add function parameterTypes length not
equal " + cache.argClass.length + " "
+ + parameterTypes.length + " " +
methods[idx].getName());
}
- break;
}
- case UDAF_ADD_FUNCTION: {
- allMethods.put(methods[idx].getName(), methods[idx]);
- addIndex = methodAccess.getIndex(UDAF_ADD_FUNCTION);
- argClass = methods[idx].getParameterTypes();
- if (argClass.length != parameterTypes.length + 1) {
+ if (!(parameterTypes.length == 0)) {
+ Pair<Boolean, JavaUdfDataType[]> inputType =
UdfUtils.setArgTypes(parameterTypes,
+ cache.argClass, true);
+ if (!inputType.first) {
if (LOG.isDebugEnabled()) {
- LOG.debug("add function parameterTypes length
not equal " + argClass.length + " "
- + parameterTypes.length + " " +
methods[idx].getName());
- }
- }
- if (!(parameterTypes.length == 0)) {
- Pair<Boolean, JavaUdfDataType[]> inputType =
UdfUtils.setArgTypes(parameterTypes,
- argClass, true);
- if (!inputType.first) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("add function set arg
parameterTypes has error");
- }
- } else {
- argTypes = inputType.second;
+ LOG.debug("add function set arg parameterTypes
has error");
}
} else {
- // Special case where the UDF doesn't take any
input args
- argTypes = new JavaUdfDataType[0];
+ cache.argTypes = inputType.second;
}
- break;
+ } else {
+ // Special case where the UDF doesn't take any input
args
+ cache.argTypes = new JavaUdfDataType[0];
}
- default:
- break;
+ break;
}
+ default:
+ break;
}
- if (idx == methods.length) {
- return;
- }
- StringBuilder sb = new StringBuilder();
- sb.append("Unable to find evaluate function with the correct
signature: ")
- .append(className)
- .append(".evaluate(")
- .append(Joiner.on(",
").join(parameterTypes)).append(")\n").append("UDF contains: \n ")
- .append(Joiner.on("\n ").join(signatures));
- throw new UdfRuntimeException(sb.toString());
-
- } catch (MalformedURLException e) {
- throw new UdfRuntimeException("Unable to load jar.", e);
- } catch (SecurityException e) {
- throw new UdfRuntimeException("Unable to load function.", e);
- } catch (ClassNotFoundException e) {
- throw new UdfRuntimeException("Unable to find class.", e);
- } catch (NoSuchMethodException e) {
- throw new UdfRuntimeException("Unable to find constructor with no
arguments.", e);
- } catch (IllegalArgumentException e) {
- throw new UdfRuntimeException("Unable to call UDAF constructor
with no arguments.", e);
- } catch (Exception e) {
- throw new UdfRuntimeException("Unable to call create UDAF
instance.", e);
}
+ if (idx == methods.length) {
+ return;
+ }
+ StringBuilder sb = new StringBuilder();
+ sb.append("Unable to find evaluate function with the correct
signature: ")
+ .append(className)
+ .append(".evaluate(")
+ .append(Joiner.on(",
").join(parameterTypes)).append(")\n").append("UDF contains: \n ")
+ .append(Joiner.on("\n ").join(signatures));
+ throw new UdfRuntimeException(sb.toString());
}
}
diff --git
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
index e24fc719ff5..a7f8f505967 100644
---
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
+++
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
@@ -19,7 +19,6 @@ package org.apache.doris.udf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.Pair;
-import org.apache.doris.common.classloader.ScannerLoader;
import org.apache.doris.common.exception.InternalException;
import org.apache.doris.common.exception.UdfRuntimeException;
import org.apache.doris.common.jni.utils.JavaUdfDataType;
@@ -28,17 +27,13 @@ import org.apache.doris.common.jni.utils.UdfUtils;
import org.apache.doris.common.jni.vec.VectorTable;
import org.apache.doris.thrift.TJavaUdfExecutorCtorParams;
-import com.esotericsoftware.reflectasm.MethodAccess;
import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.log4j.Logger;
-import java.io.FileNotFoundException;
import java.lang.reflect.Array;
-import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
-import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Map;
@@ -47,13 +42,6 @@ public class UdfExecutor extends BaseExecutor {
private static final String UDF_PREPARE_FUNCTION_NAME = "prepare";
private static final String UDF_FUNCTION_NAME = "evaluate";
- // setup by init() and cleared by close()
- private Method method;
-
- private int evaluateIndex;
-
- private boolean isStaticLoad = false;
-
/**
* Create a UdfExecutor, using parameters from a serialized thrift object.
Used by
* the backend.
@@ -69,7 +57,6 @@ public class UdfExecutor extends BaseExecutor {
public void close() {
// We are now un-usable (because the class loader has been
// closed), so null out method_ and classLoader_.
- method = null;
if (!isStaticLoad) {
super.close();
} else if (outputTable != null) {
@@ -91,7 +78,7 @@ public class UdfExecutor extends BaseExecutor {
// so we have to new its wrapped Object.
Object[] result = outputTable.getColumnType(0).isPrimitive()
? outputTable.getColumn(0).newObjectContainerArray(numRows)
- : (Object[]) Array.newInstance(method.getReturnType(),
numRows);
+ : (Object[]) Array.newInstance(objCache.retClass, numRows);
Object[][] inputs =
inputTable.getMaterializedData(getInputConverters(numColumns, false));
Object[] parameters = new Object[numColumns];
for (int i = 0; i < numRows; ++i) {
@@ -99,7 +86,7 @@ public class UdfExecutor extends BaseExecutor {
int row = inputTable.isConstColumn(j) ? 0 : i;
parameters[j] = inputs[j][row];
}
- result[i] = methodAccess.invoke(udf, evaluateIndex,
parameters);
+ result[i] = objCache.methodAccess.invoke(udf,
objCache.methodIndex, parameters);
}
boolean isNullable =
Boolean.parseBoolean(outputParams.getOrDefault("is_nullable", "true"));
outputTable.appendData(0, result, getOutputConverter(),
isNullable);
@@ -110,10 +97,6 @@ public class UdfExecutor extends BaseExecutor {
}
}
- public Method getMethod() {
- return method;
- }
-
private Method findPrepareMethod(Method[] methods) {
for (Method method : methods) {
if (method.getName().equals(UDF_PREPARE_FUNCTION_NAME) &&
method.getReturnType().equals(void.class)
@@ -124,45 +107,32 @@ public class UdfExecutor extends BaseExecutor {
return null; // Method not found
}
- public UdfClassCache getClassCache(String className, String jarPath,
String signature, long expirationTime,
- Type funcRetType, Type... parameterTypes)
- throws MalformedURLException, FileNotFoundException,
ClassNotFoundException, InternalException,
- UdfRuntimeException {
- UdfClassCache cache = null;
- if (isStaticLoad) {
- cache = ScannerLoader.getUdfClassLoader(signature);
- }
- if (cache == null) {
- ClassLoader loader;
- if (Strings.isNullOrEmpty(jarPath)) {
- // if jarPath is empty, which means the UDF jar is located in
custom_lib
- // and already be loaded when BE start.
- // so here we use system class loader to load UDF class.
- loader = ClassLoader.getSystemClassLoader();
- } else {
- ClassLoader parent = getClass().getClassLoader();
- classLoader = UdfUtils.getClassLoader(jarPath, parent);
- loader = classLoader;
- }
- cache = new UdfClassCache();
- cache.udfClass = Class.forName(className, true, loader);
- cache.methodAccess = MethodAccess.get(cache.udfClass);
- checkAndCacheUdfClass(className, cache, funcRetType,
parameterTypes);
- if (isStaticLoad) {
- ScannerLoader.cacheClassLoader(signature, cache,
expirationTime);
+ // Preallocate the input objects that will be passed to the underlying UDF.
+ // These objects are allocated once and reused across calls to evaluate()
+ @Override
+ protected void init(TJavaUdfExecutorCtorParams request, String jarPath,
Type funcRetType,
+ Type... parameterTypes) throws UdfRuntimeException {
+ className = fn.scalar_fn.symbol;
+ super.init(request, jarPath, funcRetType, parameterTypes);
+ Method prepareMethod =
objCache.allMethods.get(UDF_PREPARE_FUNCTION_NAME);
+ if (prepareMethod != null) {
+ try {
+ prepareMethod.invoke(udf);
+ } catch (IllegalAccessException | IllegalArgumentException |
InvocationTargetException e) {
+ throw new UdfRuntimeException("Unable to call UDF prepare
function.", e);
}
}
- return cache;
}
- private void checkAndCacheUdfClass(String className, UdfClassCache cache,
Type funcRetType, Type... parameterTypes)
+ @Override
+ protected void checkAndCacheUdfClass(UdfClassCache cache, Type
funcRetType, Type... parameterTypes)
throws InternalException, UdfRuntimeException {
ArrayList<String> signatures = Lists.newArrayList();
Class<?> c = cache.udfClass;
Method[] methods = c.getMethods();
Method prepareMethod = findPrepareMethod(methods);
if (prepareMethod != null) {
- cache.prepareMethod = prepareMethod;
+ cache.allMethods.put(UDF_PREPARE_FUNCTION_NAME, prepareMethod);
}
for (Method m : methods) {
// By convention, the udf must contain the function "evaluate"
@@ -176,9 +146,10 @@ public class UdfExecutor extends BaseExecutor {
if (cache.argClass.length != parameterTypes.length) {
continue;
}
- cache.method = m;
- cache.evaluateIndex =
cache.methodAccess.getIndex(UDF_FUNCTION_NAME, cache.argClass);
+ cache.allMethods.put(UDF_FUNCTION_NAME, m);
+ cache.methodIndex = cache.methodAccess.getIndex(UDF_FUNCTION_NAME,
cache.argClass);
Pair<Boolean, JavaUdfDataType> returnType;
+ cache.retClass = m.getReturnType();
if (cache.argClass.length == 0 && parameterTypes.length == 0) {
// Special case where the UDF doesn't take any input args
returnType = UdfUtils.setReturnType(funcRetType,
m.getReturnType());
@@ -202,69 +173,18 @@ public class UdfExecutor extends BaseExecutor {
} else {
cache.argTypes = inputType.second;
}
- if (cache.method != null) {
- cache.retClass = cache.method.getReturnType();
- }
return;
}
StringBuilder sb = new StringBuilder();
sb.append("Unable to find evaluate function with the correct
signature: ")
- .append(className)
- .append(".evaluate(")
- .append(Joiner.on(", ").join(parameterTypes))
- .append(")\n")
- .append("UDF contains: \n ")
- .append(Joiner.on("\n ").join(signatures));
+ .append(className)
+ .append(".evaluate(")
+ .append(Joiner.on(", ").join(parameterTypes))
+ .append(")\n")
+ .append("UDF contains: \n ")
+ .append(Joiner.on("\n ").join(signatures));
throw new UdfRuntimeException(sb.toString());
}
-
- // Preallocate the input objects that will be passed to the underlying UDF.
- // These objects are allocated once and reused across calls to evaluate()
- @Override
- protected void init(TJavaUdfExecutorCtorParams request, String jarPath,
Type funcRetType,
- Type... parameterTypes) throws UdfRuntimeException {
- String className = request.fn.scalar_fn.symbol;
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Loading UDF '" + className + "' from " + jarPath);
- }
- isStaticLoad = request.getFn().isSetIsStaticLoad() &&
request.getFn().is_static_load;
- long expirationTime = 360L; // default is 6 hours
- if (request.getFn().isSetExpirationTime()) {
- expirationTime = request.getFn().getExpirationTime();
- }
- UdfClassCache cache = getClassCache(className, jarPath,
request.getFn().getSignature(), expirationTime,
- funcRetType, parameterTypes);
- methodAccess = cache.methodAccess;
- Constructor<?> ctor = cache.udfClass.getConstructor();
- udf = ctor.newInstance();
- Method prepareMethod = cache.prepareMethod;
- if (prepareMethod != null) {
- prepareMethod.invoke(udf);
- }
-
- argClass = cache.argClass;
- method = cache.method;
- evaluateIndex = cache.evaluateIndex;
- retType = cache.retType;
- argTypes = cache.argTypes;
- retClass = cache.retClass;
- } catch (MalformedURLException e) {
- throw new UdfRuntimeException("Unable to load jar.", e);
- } catch (SecurityException e) {
- throw new UdfRuntimeException("Unable to load function.", e);
- } catch (ClassNotFoundException e) {
- throw new UdfRuntimeException("Unable to find class.", e);
- } catch (NoSuchMethodException e) {
- throw new UdfRuntimeException(
- "Unable to find constructor with no arguments.", e);
- } catch (IllegalArgumentException e) {
- throw new UdfRuntimeException(
- "Unable to call UDF constructor with no arguments.", e);
- } catch (Exception e) {
- throw new UdfRuntimeException("Unable to call create UDF
instance.", e);
- }
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
index 06cab70d7d9..7320255782d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
@@ -357,6 +357,8 @@ public class CreateFunctionStmt extends DdlStmt implements
NotFallbackInParser {
location, symbol, null, null);
function.setChecksum(checksum);
function.setNullableMode(returnNullMode);
+ function.setStaticLoad(isStaticLoad);
+ function.setExpirationTime(expirationTime);
function.setUDTFunction(true);
// Todo: maybe in create tables function, need register two function,
one is
// normal and one is outer as those have different result when result
is NULL.
@@ -423,6 +425,8 @@ public class CreateFunctionStmt extends DdlStmt implements
NotFallbackInParser {
function.setBinaryType(binaryType);
function.setChecksum(checksum);
function.setNullableMode(returnNullMode);
+ function.setStaticLoad(isStaticLoad);
+ function.setExpirationTime(expirationTime);
}
private void analyzeUdf() throws AnalysisException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdaf.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdaf.java
index 27cf22b73b2..85143d2aca6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdaf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdaf.java
@@ -62,6 +62,8 @@ public class JavaUdaf extends AggregateFunction implements
ExplicitlyCastableSig
private final String getValueFn;
private final String removeFn;
private final String checkSum;
+ private final boolean isStaticLoad;
+ private final long expirationTime;
/**
* Constructor of UDAF
@@ -72,7 +74,7 @@ public class JavaUdaf extends AggregateFunction implements
ExplicitlyCastableSig
String objectFile, String symbol,
String initFn, String updateFn, String mergeFn,
String serializeFn, String finalizeFn, String getValueFn, String
removeFn,
- boolean isDistinct, String checkSum, Expression... args) {
+ boolean isDistinct, String checkSum, boolean isStaticLoad, long
expirationTime, Expression... args) {
super(name, isDistinct, args);
this.dbName = dbName;
this.functionId = functionId;
@@ -90,6 +92,8 @@ public class JavaUdaf extends AggregateFunction implements
ExplicitlyCastableSig
this.getValueFn = getValueFn;
this.removeFn = removeFn;
this.checkSum = checkSum;
+ this.isStaticLoad = isStaticLoad;
+ this.expirationTime = expirationTime;
}
@Override
@@ -120,7 +124,7 @@ public class JavaUdaf extends AggregateFunction implements
ExplicitlyCastableSig
Preconditions.checkArgument(children.size() == this.children.size());
return new JavaUdaf(getName(), functionId, dbName, binaryType,
signature, intermediateType, nullableMode,
objectFile, symbol, initFn, updateFn, mergeFn, serializeFn,
finalizeFn, getValueFn, removeFn,
- isDistinct, checkSum, children.toArray(new Expression[0]));
+ isDistinct, checkSum, isStaticLoad, expirationTime,
children.toArray(new Expression[0]));
}
/**
@@ -162,6 +166,8 @@ public class JavaUdaf extends AggregateFunction implements
ExplicitlyCastableSig
aggregate.getRemoveFnSymbol(),
false,
aggregate.getChecksum(),
+ aggregate.isStaticLoad(),
+ aggregate.getExpirationTime(),
virtualSlots);
JavaUdafBuilder builder = new JavaUdafBuilder(udaf);
@@ -196,6 +202,8 @@ public class JavaUdaf extends AggregateFunction implements
ExplicitlyCastableSig
expr.setNullableMode(nullableMode);
expr.setChecksum(checkSum);
expr.setId(functionId);
+ expr.setStaticLoad(isStaticLoad);
+ expr.setExpirationTime(expirationTime);
return expr;
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdtf.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdtf.java
index 96ea02bf2b7..c90a8c343a3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdtf.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdtf.java
@@ -56,6 +56,8 @@ public class JavaUdtf extends TableGeneratingFunction
implements ExplicitlyCasta
private final String prepareFn;
private final String closeFn;
private final String checkSum;
+ private final boolean isStaticLoad;
+ private final long expirationTime;
/**
* Constructor of UDTF
@@ -63,7 +65,7 @@ public class JavaUdtf extends TableGeneratingFunction
implements ExplicitlyCasta
public JavaUdtf(String name, long functionId, String dbName,
TFunctionBinaryType binaryType,
FunctionSignature signature,
NullableMode nullableMode, String objectFile, String symbol,
String prepareFn, String closeFn,
- String checkSum, Expression... args) {
+ String checkSum, boolean isStaticLoad, long expirationTime,
Expression... args) {
super(name, args);
this.dbName = dbName;
this.functionId = functionId;
@@ -75,6 +77,8 @@ public class JavaUdtf extends TableGeneratingFunction
implements ExplicitlyCasta
this.prepareFn = prepareFn;
this.closeFn = closeFn;
this.checkSum = checkSum;
+ this.isStaticLoad = isStaticLoad;
+ this.expirationTime = expirationTime;
}
/**
@@ -84,7 +88,8 @@ public class JavaUdtf extends TableGeneratingFunction
implements ExplicitlyCasta
public JavaUdtf withChildren(List<Expression> children) {
Preconditions.checkArgument(children.size() == this.children.size());
return new JavaUdtf(getName(), functionId, dbName, binaryType,
signature, nullableMode,
- objectFile, symbol, prepareFn, closeFn, checkSum,
children.toArray(new Expression[0]));
+ objectFile, symbol, prepareFn, closeFn, checkSum,
isStaticLoad, expirationTime,
+ children.toArray(new Expression[0]));
}
@Override
@@ -119,6 +124,8 @@ public class JavaUdtf extends TableGeneratingFunction
implements ExplicitlyCasta
expr.setNullableMode(nullableMode);
expr.setChecksum(checkSum);
expr.setId(functionId);
+ expr.setStaticLoad(isStaticLoad);
+ expr.setExpirationTime(expirationTime);
expr.setUDTFunction(true);
return expr;
} catch (Exception e) {
@@ -153,6 +160,8 @@ public class JavaUdtf extends TableGeneratingFunction
implements ExplicitlyCasta
scalar.getPrepareFnSymbol(),
scalar.getCloseFnSymbol(),
scalar.getChecksum(),
+ scalar.isStaticLoad(),
+ scalar.getExpirationTime(),
virtualSlots);
JavaUdtfBuilder builder = new JavaUdtfBuilder(udf);
diff --git a/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out
b/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out
index 9f57f52d091..097f84fd7a0 100644
Binary files
a/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out and
b/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out differ
diff --git
a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDAF.java
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDAF.java
new file mode 100644
index 00000000000..dde6c1336d1
--- /dev/null
+++
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDAF.java
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class StaticIntTestUDAF {
+ static {
+ System.out.println("static load should only print once
StaticIntTestUDAF");
+ }
+ private static int value = 0;
+ public static class State {
+ public long counter = 0;
+ }
+
+ public State create() {
+ return new State();
+ }
+
+ public void destroy(State state) {
+ }
+
+ public void reset(State state) {
+ state.counter = 0;
+ }
+
+ public void add(State state, Integer val) {
+ if (val == null) return;
+ state.counter += val;
+ }
+
+ public void serialize(State state, DataOutputStream out) throws
IOException {
+ out.writeLong(state.counter);
+ }
+
+ public void deserialize(State state, DataInputStream in) throws
IOException {
+ state.counter = in.readLong();
+ }
+
+ public void merge(State state, State rhs) {
+ state.counter += rhs.counter;
+ }
+
+ public long getValue(State state) {
+ value = value + 1;
+ System.out.println("getValue StaticIntTestUDAF " + value + " " +
state.counter);
+ return state.counter + value;
+ }
+}
diff --git
a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDTF.java
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDTF.java
new file mode 100644
index 00000000000..460a4ac809d
--- /dev/null
+++
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTestUDTF.java
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+import java.util.ArrayList;
+
+public class StaticIntTestUDTF {
+ static {
+ System.out.println("static load should only print once
StaticIntTestUDTF");
+ }
+ private static int value = 0;
+ public ArrayList<Integer> evaluate() {
+ ArrayList<Integer> result = new ArrayList<>();
+ value = value + 1;
+ for (int i = 0; i < value; i++) {
+ result.add(value);
+ }
+ return result;
+ }
+}
diff --git
a/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy
b/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy
index c816ec90292..1a98767fb51 100644
--- a/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy
+++ b/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy
@@ -66,6 +66,7 @@ suite("test_javaudf_static_load_test") {
"type"="JAVA_UDF"
); """
+ // the result of the following queries should be the accumulation
sql """set parallel_pipeline_task_num = 1; """
qt_select1 """ SELECT static_load_test(); """
qt_select2 """ SELECT static_load_test(); """
@@ -73,8 +74,47 @@ suite("test_javaudf_static_load_test") {
qt_select4 """ SELECT static_load_test(); """
qt_select5 """ SELECT static_load_test(); """
+
+ sql """ CREATE AGGREGATE FUNCTION static_load_test_udaf(int) RETURNS
BigInt PROPERTIES (
+ "file"="file://${jarPath}",
+ "symbol"="org.apache.doris.udf.StaticIntTestUDAF",
+ "always_nullable"="true",
+ "static_load"="true",
+ "expiration_time"="10",
+ "type"="JAVA_UDF"
+ ); """
+
+ // the result of the following queries should be the accumulation
+ // maybe we need drop funtion and test again, the result should be the
same
+ // but the regression test will copy the jar to be custom_lib, and
loaded by BE when it's started
+ // so it's can't be unloaded
+ sql """set parallel_pipeline_task_num = 1; """
+ qt_select6 """ SELECT static_load_test_udaf(0); """
+ qt_select7 """ SELECT static_load_test_udaf(0); """
+ qt_select8 """ SELECT static_load_test_udaf(0); """
+ qt_select9 """ SELECT static_load_test_udaf(0); """
+ qt_select10 """ SELECT static_load_test_udaf(0); """
+
+ sql """ CREATE TABLES FUNCTION static_load_test_udtf() RETURNS
array<int> PROPERTIES (
+ "file"="file://${jarPath}",
+ "symbol"="org.apache.doris.udf.StaticIntTestUDTF",
+ "always_nullable"="true",
+ "static_load"="true",
+ "expiration_time"="10",
+ "type"="JAVA_UDF"
+ ); """
+
+ sql """set parallel_pipeline_task_num = 1; """
+ qt_select11 """ select k1, e1 from (select 1 k1) as t lateral view
static_load_test_udtf() tmp1 as e1; """
+ qt_select12 """ select k1, e1 from (select 1 k1) as t lateral view
static_load_test_udtf() tmp1 as e1; """
+ qt_select13 """ select k1, e1 from (select 1 k1) as t lateral view
static_load_test_udtf() tmp1 as e1; """
+ qt_select14 """ select k1, e1 from (select 1 k1) as t lateral view
static_load_test_udtf() tmp1 as e1; """
+ qt_select15 """ select k1, e1 from (select 1 k1) as t lateral view
static_load_test_udtf() tmp1 as e1; """
+
} finally {
try_sql("DROP FUNCTION IF EXISTS static_load_test();")
+ try_sql("DROP FUNCTION IF EXISTS static_load_test_udaf(int);")
+ try_sql("DROP FUNCTION IF EXISTS static_load_test_udtf();")
try_sql("DROP TABLE IF EXISTS ${tableName}")
}
}
diff --git a/regression-test/suites/javaudf_p0/test_javaudtf_decimal.groovy
b/regression-test/suites/javaudf_p0/test_javaudtf_decimal.groovy
index a8fa7e347af..0f16e10bef1 100644
--- a/regression-test/suites/javaudf_p0/test_javaudtf_decimal.groovy
+++ b/regression-test/suites/javaudf_p0/test_javaudtf_decimal.groovy
@@ -62,7 +62,7 @@ suite("test_javaudtf_decimal") {
qt_select2 """ SELECT user_id, cost_2, e1 FROM ${tableName} lateral
view udtf_decimal(cost_2) temp as e1 order by user_id; """
} finally {
- try_sql("DROP FUNCTION IF EXISTS udtf_decimal(decimal);")
+ try_sql("DROP FUNCTION IF EXISTS udtf_decimal(decimal(27,9));")
try_sql("DROP TABLE IF EXISTS ${tableName}")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]