[
https://issues.apache.org/jira/browse/FLINK-39547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sergey Nuyanzin resolved FLINK-39547.
-------------------------------------
Fix Version/s: 2.3.0
2.2.1
2.4.0
Assignee: Piotr Przybylski
Resolution: Fixed
> User-provided Scala library breaks table-planner
> ------------------------------------------------
>
> Key: FLINK-39547
> URL: https://issues.apache.org/jira/browse/FLINK-39547
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 2.3.0, 2.2.1
> Reporter: Piotr Przybylski
> Assignee: Piotr Przybylski
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.3.0, 2.2.1, 2.4.0
>
>
> Since the fix for FLINK-39150, using Scala 2.13 in a job that uses joins
> causes the Flink job to freeze and never transition to any final state:
> {code:java}
> 21:13:29.984 [flink-pekko.actor.default-dispatcher-5] INFO
> o.a.f.r.taskexecutor.TaskExecutor - Cannot find task to fail for execution
> 1e221474986e5a478966544bc847a1de_207cc3c51ef07cf91939b74a6274a728_0_0 with
> exception:
> java.util.concurrent.ExecutionException: Boxed Error
> at scala.concurrent.impl.Promise$.resolver(Promise.scala:97)
> at
> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:89)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:294)
> at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:624)
> at org.apache.pekko.actor.ActorRef.tell(ActorRef.scala:141)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:326)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
> at
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> at
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
> at
> org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
> at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272)
> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233)
> at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
> at
> java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
> Caused by: java.lang.NoSuchMethodError: 'scala.collection.GenTraversable
> scala.collection.mutable.LinkedHashSet$.apply(scala.collection.Seq)'
> at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.<init>(CodeGeneratorContext.scala:68)
> at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.<init>(CodeGeneratorContext.scala:61)
> at
> org.apache.flink.table.planner.plan.utils.HashJoinOperatorUtil.generateOperatorFactory(HashJoinOperatorUtil.java:68)
> at
> org.apache.flink.table.planner.adaptive.AdaptiveJoinOperatorGenerator.genOperatorFactory(AdaptiveJoinOperatorGenerator.java:94)
> at
> org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.genOperatorFactory(AdaptiveJoinOperatorFactory.java:124)
> at
> org.apache.flink.table.runtime.strategy.PostProcessAdaptiveJoinStrategy.tryOptimizeAdaptiveJoin(PostProcessAdaptiveJoinStrategy.java:87)
> at
> org.apache.flink.table.runtime.strategy.BaseAdaptiveJoinOperatorOptimizationStrategy.visitDownstreamAdaptiveJoinNode(BaseAdaptiveJoinOperatorOptimizationStrategy.java:90)
> at
> org.apache.flink.table.runtime.strategy.PostProcessAdaptiveJoinStrategy.onOperatorsFinished(PostProcessAdaptiveJoinStrategy.java:52)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizer.onOperatorsFinished(StreamGraphOptimizer.java:72)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.tryOptimizeStreamGraph(DefaultAdaptiveExecutionHandler.java:118)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.handleJobEvent(DefaultAdaptiveExecutionHandler.java:82)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$notifyJobVertexFinishedIfPossible$17(AdaptiveBatchScheduler.java:660)
> at java.base/java.util.Optional.ifPresent(Optional.java:178)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.notifyJobVertexFinishedIfPossible(AdaptiveBatchScheduler.java:658)
> at
> org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.onTaskFinished(AdaptiveBatchScheduler.java:406)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:833)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:813)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:532)
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:569)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
> at
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> at
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
> ... 24 common frames omitted{code}
> This is caused by the change in class loading order used when deserializing a
> {{{}AdaptiveJoinGenerator{}}}. Previously all classes had to be provided by
> {{plannerModule.getSubmoduleClassLoader}} (hence the original bug -
> FLINK-39150), but the fix gave priority to {{{}userClassLoader{}}}, with
> {{SubmoduleClassLoader}} used as a fallback. I guess that this also allows
> user code to effectively override Flink planner dependencies, which may cause
> other kinds of errors.
> I believe this can be easily resolved by reversing {{ClassLoader}} order:
> using {{SubmoduleClassLoader}} first, and falling back to {{userClassLoader}}
> only for missing classes. This will preserve the old Flink 2.0 behavior and
> make user-provided classes available when {{Serializer}} instances are
> deserialized.
> I have a PR with a proposed test a fix
> ([https://github.com/apache/flink/pull/28030]) - if my approach is correct,
> then please assign it to me so that it can be quickly reviewed and merged. Or
> somebody more knowledgeable about Flink can take over.
> If possible, I'd really like this to be fixed for 2.2.1 and 2.3.0, since the
> regression was introduced after 2.2.0 was released and neither 2.2.1 nor
> 2.3.0 has been released yet.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)