Re:Re: Found an issue when using Flink 1.19's AsyncScalarFunction

2024-04-15 Thread Xuyang
Thanks for driving this ;)




--

Best!
Xuyang




在 2024-04-16 10:47:56,"Xiaolong Wang"  写道:

Reported. JIRA link: https://issues.apache.org/jira/browse/FLINK-35117?filter=-2


On Tue, Apr 16, 2024 at 10:05 AM Xiaolong Wang  
wrote:

By adding `'org.apache.commons.text` to the OWNER_CLASSPATH list, the issue can 
be resolved.


I'll create a JIRA about it.


On Mon, Apr 15, 2024 at 12:14 PM Xiaolong Wang  
wrote:

Sure, the AsyncScalarFunction's code looks like :



public class AsyncHashCodeFunction extends AsyncScalarFunction {
private int factor = 0;
private ExecutorService executor;


  @Override
public void open(FunctionContext context) throws Exception {
// access the global "hashcode_factor" parameter
// "12" would be the default value if the parameter does not exist
factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "12"));
executor = Executors.newFixedThreadPool(
Integer.parseInt(context.getJobParameter("in-flight-requests", "10"))

);
  }

public final void eval(
  CompletableFuture future,
  String s) {
executor.submit(() -> {
  future.complete(s.hashCode() * factor);
});
  }
}

I tried to package `common-text-1.10` in different ways:
1. In the user jar.
2. Into the classpath of the Flink image.


They both fail.


I checked the source code and it seems that the flink-table-planner-loader uses 
a URLClassloader to load dependencies, thus it'll first check the 
`flink-table-planner-loader.jar` and skip the user's dependencies.


There is an exceptional list in the class of `PlannerModule` which looks like 
this:
class PlannerModule {

/**
 * The name of the table planner dependency jar, bundled with 
flink-table-planner-loader module
 * artifact.
 */
static final String FLINK_TABLE_PLANNER_FAT_JAR = "flink-table-planner.jar";

private static final String HINT_USAGE =
"mvn clean package -pl 
flink-table/flink-table-planner,flink-table/flink-table-planner-loader 
-DskipTests";

private static final String[] OWNER_CLASSPATH =
Stream.concat(

Arrays.stream(CoreOptions.PARENT_FIRST_LOGGING_PATTERNS),
Stream.of(
// These packages are shipped either by
// flink-table-runtime or flink-dist itself
"org.codehaus.janino",
"org.codehaus.commons",
"org.apache.commons.lang3",
"org.apache.commons.math3",
// with hive dialect, hadoop jar should be in classpath,
// also, we should make it loaded by owner 
classloader,
// otherwise, it'll throw class not found 
exception
// when initialize HiveParser which 
requires hadoop
"org.apache.hadoop"))

.toArray(String[]::new); 


I think adding the `org.apache.commons.text` would do .


On Wed, Apr 10, 2024 at 2:06 PM Xuyang  wrote:

Hi, Wang.


Could you provide more details for this bug, such as minimum reproducible test 
code, pom dependencies, etc?


Further more, can you try again to package the dependency "commons-text" with 
version "1.10.0" manually to check 
if it works? If you can work around this bug by this way, I think we should 
open an bug issue for it. 




--

Best!
Xuyang




At 2024-04-09 18:11:27, "Xiaolong Wang"  wrote:

Hi,



I found a ClassNotFound exception when using Flink 1.19's AsyncScalarFunction. 


Stack trace:



Caused by: java.lang.ClassNotFoundException: 
org.apache.commons.text.StringSubstitutor

at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]

at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]

at 
org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:150)
 ~[flink-dist-1.19.0.jar:1.19.0]

at 
org.apache.flink.core.classloading.ComponentClassLoader.loadClass(ComponentClassLoader.java:113)
 ~[flink-dist-1.19.0.jar:1.19.0]

at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]

at 
org.apache.flink.table.planner.codegen.AsyncCodeGenerator.generateProcessCode(AsyncCodeGenerator.java:173)
 ~[?:?]

at 
org.apache.flink.table.planner.codegen.AsyncCodeGenerator.generateFunction(AsyncCodeGenerator.java:77)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.getAsyncFunctionOperator(CommonExecAsyncCalc.java:146)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.createAsyncOneInputTransformation(CommonExecAsyncCalc.java:126)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.translateToPlanInternal(CommonExecAsyncCalc.java:89)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
 ~[?:?]

at 
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(Commo

Found an issue when using Flink 1.19's AsyncScalarFunction

2024-04-09 Thread Xiaolong Wang
Hi,

I found a ClassNotFound exception when using Flink 1.19's
AsyncScalarFunction.

Stack trace:

Caused by: java.lang.ClassNotFoundException:
> org.apache.commons.text.StringSubstitutor
>
> at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?]
>
> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>
> at
> org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:150)
> ~[flink-dist-1.19.0.jar:1.19.0]
>
> at
> org.apache.flink.core.classloading.ComponentClassLoader.loadClass(ComponentClassLoader.java:113)
> ~[flink-dist-1.19.0.jar:1.19.0]
>
> at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
>
> at
> org.apache.flink.table.planner.codegen.AsyncCodeGenerator.generateProcessCode(AsyncCodeGenerator.java:173)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.codegen.AsyncCodeGenerator.generateFunction(AsyncCodeGenerator.java:77)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.getAsyncFunctionOperator(CommonExecAsyncCalc.java:146)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.createAsyncOneInputTransformation(CommonExecAsyncCalc.java:126)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.translateToPlanInternal(CommonExecAsyncCalc.java:89)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168)
> ~[?:?]
>
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85)
> ~[?:?]
>

 Environment:

  flink image: flink:1.19.0-scala_2.12-java11

Tried solutions:

I tried to package the needed dependency `commons-io-1.10.0.jar` into both
user jar and the classpath and the issue remained.

Would someone please help resolve this ?