Piotr Przybylski created FLINK-39547:
----------------------------------------
Summary: User-provided Scala library breaks table-planner
Key: FLINK-39547
URL: https://issues.apache.org/jira/browse/FLINK-39547
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 2.1.1, 2.3.0
Reporter: Piotr Przybylski
Since the fix for FLINK-39150, using Scala 2.13 in a job that uses joins
causes the Flink job to freeze and never transition to any final state:
{code:java}
21:13:29.984 [flink-pekko.actor.default-dispatcher-5] INFO
o.a.f.r.taskexecutor.TaskExecutor - Cannot find task to fail for execution
1e221474986e5a478966544bc847a1de_207cc3c51ef07cf91939b74a6274a728_0_0 with
exception:
java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:97)
at
scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:89)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:294)
at org.apache.pekko.pattern.PromiseActorRef.$bang(AskSupport.scala:624)
at org.apache.pekko.actor.ActorRef.tell(ActorRef.scala:141)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:326)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
at
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233)
at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by: java.lang.NoSuchMethodError: 'scala.collection.GenTraversable
scala.collection.mutable.LinkedHashSet$.apply(scala.collection.Seq)'
at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.<init>(CodeGeneratorContext.scala:68)
at
org.apache.flink.table.planner.codegen.CodeGeneratorContext.<init>(CodeGeneratorContext.scala:61)
at
org.apache.flink.table.planner.plan.utils.HashJoinOperatorUtil.generateOperatorFactory(HashJoinOperatorUtil.java:68)
at
org.apache.flink.table.planner.adaptive.AdaptiveJoinOperatorGenerator.genOperatorFactory(AdaptiveJoinOperatorGenerator.java:94)
at
org.apache.flink.table.runtime.operators.join.adaptive.AdaptiveJoinOperatorFactory.genOperatorFactory(AdaptiveJoinOperatorFactory.java:124)
at
org.apache.flink.table.runtime.strategy.PostProcessAdaptiveJoinStrategy.tryOptimizeAdaptiveJoin(PostProcessAdaptiveJoinStrategy.java:87)
at
org.apache.flink.table.runtime.strategy.BaseAdaptiveJoinOperatorOptimizationStrategy.visitDownstreamAdaptiveJoinNode(BaseAdaptiveJoinOperatorOptimizationStrategy.java:90)
at
org.apache.flink.table.runtime.strategy.PostProcessAdaptiveJoinStrategy.onOperatorsFinished(PostProcessAdaptiveJoinStrategy.java:52)
at
org.apache.flink.runtime.scheduler.adaptivebatch.StreamGraphOptimizer.onOperatorsFinished(StreamGraphOptimizer.java:72)
at
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.tryOptimizeStreamGraph(DefaultAdaptiveExecutionHandler.java:118)
at
org.apache.flink.runtime.scheduler.adaptivebatch.DefaultAdaptiveExecutionHandler.handleJobEvent(DefaultAdaptiveExecutionHandler.java:82)
at
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.lambda$notifyJobVertexFinishedIfPossible$17(AdaptiveBatchScheduler.java:660)
at java.base/java.util.Optional.ifPresent(Optional.java:178)
at
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.notifyJobVertexFinishedIfPossible(AdaptiveBatchScheduler.java:658)
at
org.apache.flink.runtime.scheduler.adaptivebatch.AdaptiveBatchScheduler.onTaskFinished(AdaptiveBatchScheduler.java:406)
at
org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:833)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:813)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:532)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:569)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
... 24 common frames omitted{code}
This is caused by the change in class loading order used when deserializing a
{{{}AdaptiveJoinGenerator{}}}. Previously all classes had to be provided by
{{plannerModule.getSubmoduleClassLoader}} (hence the original bug -
FLINK-39150), but the fix gave priority to {{{}userClassLoader{}}}, with
{{SubmoduleClassLoader}} used as a fallback. I guess that this also allows user
code to effectively override Flink planner dependencies, which may cause other
kinds of errors.
I believe this can be easily resolved by reversing {{ClassLoader}} order: using
{{SubmoduleClassLoader}} first, and falling back to {{userClassLoader}} only
for missing classes. This will preserve the old Flink 2.0 behavior and make
user-provided classes available when {{Serializer}} instances are deserialized.
I linked my PR with a proposed test a fix - if my approach is correct, then
please assign it to me so that it can be quickly reviewed and merged. Or
somebody more knowledgeable about Flink can take over.
If possible, I'd really like this to be fixed for 2.2.1 and 2.3.0, since the
regression was introduced after 2.2.0 was released and neither 2.2.1 nor 2.3.0
has been released yet.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)