Thanks for driving this ;)
--
Best!
Xuyang
在 2024-04-16 10:47:56,"Xiaolong Wang" 写道:
Reported. JIRA link: https://issues.apache.org/jira/browse/FLINK-35117?filter=-2
On Tue, Apr 16, 2024 at 10:05 AM Xiaolong Wang
wrote:
By adding `'org.apache.commons.text` to the OWNER_CLASSPATH list, the issue can
be resolved.
I'll create a JIRA about it.
On Mon, Apr 15, 2024 at 12:14 PM Xiaolong Wang
wrote:
Sure, the AsyncScalarFunction's code looks like :
public class AsyncHashCodeFunction extends AsyncScalarFunction {
private int factor = 0;
private ExecutorService executor;
@Override
public void open(FunctionContext context) throws Exception {
// access the global "hashcode_factor" parameter
// "12" would be the default value if the parameter does not exist
factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "12"));
executor = Executors.newFixedThreadPool(
Integer.parseInt(context.getJobParameter("in-flight-requests", "10"))
);
}
public final void eval(
CompletableFuture future,
String s) {
executor.submit(() -> {
future.complete(s.hashCode() * factor);
});
}
}
I tried to package `common-text-1.10` in different ways:
1. In the user jar.
2. Into the classpath of the Flink image.
They both fail.
I checked the source code and it seems that the flink-table-planner-loader uses
a URLClassloader to load dependencies, thus it'll first check the
`flink-table-planner-loader.jar` and skip the user's dependencies.
There is an exceptional list in the class of `PlannerModule` which looks like
this:
class PlannerModule {
/**
* The name of the table planner dependency jar, bundled with
flink-table-planner-loader module
* artifact.
*/
static final String FLINK_TABLE_PLANNER_FAT_JAR = "flink-table-planner.jar";
private static final String HINT_USAGE =
"mvn clean package -pl
flink-table/flink-table-planner,flink-table/flink-table-planner-loader
-DskipTests";
private static final String[] OWNER_CLASSPATH =
Stream.concat(
Arrays.stream(CoreOptions.PARENT_FIRST_LOGGING_PATTERNS),
Stream.of(
// These packages are shipped either by
// flink-table-runtime or flink-dist itself
"org.codehaus.janino",
"org.codehaus.commons",
"org.apache.commons.lang3",
"org.apache.commons.math3",
// with hive dialect, hadoop jar should be in classpath,
// also, we should make it loaded by owner
classloader,
// otherwise, it'll throw class not found
exception
// when initialize HiveParser which
requires hadoop
"org.apache.hadoop"))
.toArray(String[]::new);
I think adding the `org.apache.commons.text` would do .
On Wed, Apr 10, 2024 at 2:06 PM Xuyang wrote:
Hi, Wang.
Could you provide more details for this bug, such as minimum reproducible test
code, pom dependencies, etc?
Further more, can you try again to package the dependency "commons-text" with
version "1.10.0" manually to check
if it works? If you can work around this bug by this way, I think we should
open an bug issue for it.
--
Best!
Xuyang
At 2024-04-09 18:11:27, "Xiaolong Wang" wrote:
Hi,
I found a ClassNotFound exception when using Flink 1.19's AsyncScalarFunction.
Stack trace:
Caused by: java.lang.ClassNotFoundException:
org.apache.commons.text.StringSubstitutor
at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at
org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:150)
~[flink-dist-1.19.0.jar:1.19.0]
at
org.apache.flink.core.classloading.ComponentClassLoader.loadClass(ComponentClassLoader.java:113)
~[flink-dist-1.19.0.jar:1.19.0]
at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
at
org.apache.flink.table.planner.codegen.AsyncCodeGenerator.generateProcessCode(AsyncCodeGenerator.java:173)
~[?:?]
at
org.apache.flink.table.planner.codegen.AsyncCodeGenerator.generateFunction(AsyncCodeGenerator.java:77)
~[?:?]
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.getAsyncFunctionOperator(CommonExecAsyncCalc.java:146)
~[?:?]
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.createAsyncOneInputTransformation(CommonExecAsyncCalc.java:126)
~[?:?]
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.translateToPlanInternal(CommonExecAsyncCalc.java:89)
~[?:?]
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
~[?:?]
at
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
~[?:?]
at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(Commo