[ 
https://issues.apache.org/jira/browse/FLINK-39547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Nuyanzin resolved FLINK-39547.
-------------------------------------
    Fix Version/s: 2.3.0
                   2.2.1
                   2.4.0
         Assignee: Piotr Przybylski
       Resolution: Fixed

> User-provided Scala library breaks table-planner
> ------------------------------------------------
>
>                 Key: FLINK-39547
>                 URL: https://issues.apache.org/jira/browse/FLINK-39547
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 2.3.0, 2.2.1
>            Reporter: Piotr Przybylski
>            Assignee: Piotr Przybylski
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.3.0, 2.2.1, 2.4.0
>
>
> Since the fix for FLINK-39150,  using Scala 2.13 in a job that uses joins 
> causes the Flink job to freeze and never transition to any final state:
> {code:java}
>  21:13:29.984 [flink-pekko.actor.default-dispatcher-5] INFO  
> o.a.f.r.taskexecutor.TaskExecutor - Cannot find task to fail for execution 
> 1e221474986e5a478966544bc847a1de_207cc3c51ef07cf91939b74a6274a728_0_0 with 
> exception:
> java.util.concurrent.ExecutionException: Boxed Error
>     at scala.concurrent.impl.Promise$.resolver(Promise.scala:97)
>     at 
> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:89)
>     at 
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:294)
>     at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:624)
>     at org.apache.pekko.actor.ActorRef.tell(ActorRef.scala:141)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:326)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
>     at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
>     at 
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
>     at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
>     at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
>     at 
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
>     at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
>     at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
>     at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272)
>     at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233)
>     at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
>     at 
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
>     at 
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
>     at 
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
>     at 
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
>     at 
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Caused by: java.lang.NoSuchMethodError: 'scala.collection.GenTraversable 
> scala.collection.mutable.LinkedHashSet$.apply(scala.collection.Seq)'
>     at 
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.<init>(CodeGeneratorContext.scala:68)
>     at 
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.<init>(CodeGeneratorContext.scala:61)
>     at 
> org.apache.flink.table.planner.plan.utils.HashJoinOperatorUtil.generateOperatorFactory(HashJoinOperatorUtil.java:68)
>     at 
> org.apache.flink.table.planner.adaptive.AdaptiveJoinOperatorGenerator.genOperatorFactory(AdaptiveJoinOperatorGenerator.java:94)
>     at 
> org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.genOperatorFactory(AdaptiveJoinOperatorFactory.java:124)
>     at 
> org.apache.flink.table.runtime.strategy.PostProcessAdaptiveJoinStrategy.tryOptimizeAdaptiveJoin(PostProcessAdaptiveJoinStrategy.java:87)
>     at 
> org.apache.flink.table.runtime.strategy.BaseAdaptiveJoinOperatorOptimizationStrategy.visitDownstreamAdaptiveJoinNode(BaseAdaptiveJoinOperatorOptimizationStrategy.java:90)
>     at 
> org.apache.flink.table.runtime.strategy.PostProcessAdaptiveJoinStrategy.onOperatorsFinished(PostProcessAdaptiveJoinStrategy.java:52)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizer.onOperatorsFinished(StreamGraphOptimizer.java:72)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.tryOptimizeStreamGraph(DefaultAdaptiveExecutionHandler.java:118)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.handleJobEvent(DefaultAdaptiveExecutionHandler.java:82)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$notifyJobVertexFinishedIfPossible$17(AdaptiveBatchScheduler.java:660)
>     at java.base/java.util.Optional.ifPresent(Optional.java:178)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.notifyJobVertexFinishedIfPossible(AdaptiveBatchScheduler.java:658)
>     at 
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.onTaskFinished(AdaptiveBatchScheduler.java:406)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:833)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:813)
>     at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:532)
>     at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>     at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>     at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:569)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
>     at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>     at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
>     ... 24 common frames omitted{code}
> This is caused by the change in class loading order used when deserializing a 
> {{{}AdaptiveJoinGenerator{}}}. Previously all classes had to be provided by 
> {{plannerModule.getSubmoduleClassLoader}} (hence the original bug - 
> FLINK-39150), but the fix gave priority to {{{}userClassLoader{}}}, with 
> {{SubmoduleClassLoader}} used as a fallback. I guess that this also allows 
> user code to effectively override Flink planner dependencies, which may cause 
> other kinds of errors.
> I believe this can be easily resolved by reversing {{ClassLoader}} order: 
> using {{SubmoduleClassLoader}} first, and falling back to {{userClassLoader}} 
> only for missing classes. This will preserve the old Flink 2.0 behavior and 
> make user-provided classes available when {{Serializer}} instances are 
> deserialized.
> I have a PR with a proposed test a fix 
> ([https://github.com/apache/flink/pull/28030])  - if my approach is correct, 
> then please assign it to me so that it can be quickly reviewed and merged. Or 
> somebody more knowledgeable about Flink can take over.
> If possible, I'd really like this to be fixed for 2.2.1 and 2.3.0, since the 
> regression was introduced after 2.2.0 was released and neither 2.2.1 nor 
> 2.3.0 has been released yet.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to