Piotr Przybylski created FLINK-39547:
----------------------------------------

             Summary: User-provided Scala library breaks table-planner
                 Key: FLINK-39547
                 URL: https://issues.apache.org/jira/browse/FLINK-39547
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 2.1.1, 2.3.0
            Reporter: Piotr Przybylski


Since the fix for FLINK-39150,  using Scala 2.13 in a job that uses joins 
causes the Flink job to freeze and never transition to any final state:
{code:java}
 21:13:29.984 [flink-pekko.actor.default-dispatcher-5] INFO  
o.a.f.r.taskexecutor.TaskExecutor - Cannot find task to fail for execution 
1e221474986e5a478966544bc847a1de_207cc3c51ef07cf91939b74a6274a728_0_0 with 
exception:
java.util.concurrent.ExecutionException: Boxed Error
    at scala.concurrent.impl.Promise$.resolver(Promise.scala:97)
    at 
scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:89)
    at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:294)
    at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:624)
    at org.apache.pekko.actor.ActorRef.tell(ActorRef.scala:141)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:326)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
    at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
    at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
    at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
    at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
    at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
    at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
    at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
    at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272)
    at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233)
    at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
    at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
    at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
    at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: java.lang.NoSuchMethodError: 'scala.collection.GenTraversable 
scala.collection.mutable.LinkedHashSet$.apply(scala.collection.Seq)'
    at 
org.apache.flink.table.planner.codegen.CodeGeneratorContext.<init>(CodeGeneratorContext.scala:68)
    at 
org.apache.flink.table.planner.codegen.CodeGeneratorContext.<init>(CodeGeneratorContext.scala:61)
    at 
org.apache.flink.table.planner.plan.utils.HashJoinOperatorUtil.generateOperatorFactory(HashJoinOperatorUtil.java:68)
    at 
org.apache.flink.table.planner.adaptive.AdaptiveJoinOperatorGenerator.genOperatorFactory(AdaptiveJoinOperatorGenerator.java:94)
    at 
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.genOperatorFactory(AdaptiveJoinOperatorFactory.java:124)
    at 
org.apache.flink.table.runtime.strategy.PostProcessAdaptiveJoinStrategy.tryOptimizeAdaptiveJoin(PostProcessAdaptiveJoinStrategy.java:87)
    at 
org.apache.flink.table.runtime.strategy.BaseAdaptiveJoinOperatorOptimizationStrategy.visitDownstreamAdaptiveJoinNode(BaseAdaptiveJoinOperatorOptimizationStrategy.java:90)
    at 
org.apache.flink.table.runtime.strategy.PostProcessAdaptiveJoinStrategy.onOperatorsFinished(PostProcessAdaptiveJoinStrategy.java:52)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizer.onOperatorsFinished(StreamGraphOptimizer.java:72)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.tryOptimizeStreamGraph(DefaultAdaptiveExecutionHandler.java:118)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.handleJobEvent(DefaultAdaptiveExecutionHandler.java:82)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$notifyJobVertexFinishedIfPossible$17(AdaptiveBatchScheduler.java:660)
    at java.base/java.util.Optional.ifPresent(Optional.java:178)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.notifyJobVertexFinishedIfPossible(AdaptiveBatchScheduler.java:658)
    at 
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.onTaskFinished(AdaptiveBatchScheduler.java:406)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:833)
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:813)
    at 
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
    at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:532)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
    at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:569)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
    at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
    ... 24 common frames omitted{code}
This is caused by the change in class loading order used when deserializing a 
{{{}AdaptiveJoinGenerator{}}}. Previously all classes had to be provided by 
{{plannerModule.getSubmoduleClassLoader}} (hence the original bug - 
FLINK-39150), but the fix gave priority to {{{}userClassLoader{}}}, with 
{{SubmoduleClassLoader}} used as a fallback. I guess that this also allows user 
code to effectively override Flink planner dependencies, which may cause other 
kinds of errors.

I believe this can be easily resolved by reversing {{ClassLoader}} order: using 
{{SubmoduleClassLoader}} first, and falling back to {{userClassLoader}} only 
for missing classes. This will preserve the old Flink 2.0 behavior and make 
user-provided classes available when {{Serializer}} instances are deserialized.

I linked my PR with a proposed test a fix - if my approach is correct, then 
please assign it to me so that it can be quickly reviewed and merged. Or 
somebody more knowledgeable about Flink can take over.

If possible, I'd really like this to be fixed for 2.2.1 and 2.3.0, since the 
regression was introduced after 2.2.0 was released and neither 2.2.1 nor 2.3.0 
has been released yet.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to