[
https://issues.apache.org/jira/browse/FLINK-15355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006836#comment-17006836
]
Piotr Nowojski commented on FLINK-15355:
----------------------------------------
Adapting [my comment from the pull
request|https://github.com/apache/flink/pull/10678#pullrequestreview-336577210]
I would be against 4th option proposed by [~arvid heise]. I think it’s really
important to all SPI classes be parent loaded first, otherwise very bad things
can happen. For example if user accidentally embeds a different version (1.9.2
vs 1.9.1) of org.apache.flink classes. How I understand the issue:
If you have a SPI class (like Foo), if you implement it's method (like Bar
Foo#bar() \{ return new Bar(); }), it's important that every object/class that
crosses the boundary between plugin's implementation and the core system (like
Bar) must originate from the core's system class loader. If not, if they are
incompatible, you risk all kind of the dependency convergence errors
(deadlocks, livelocks, method not found) if the versions of Bar are not binary
compatible. Additionally if this Bar is passed around, it can crash at any
place/point of time, very far away from the place it was created. Also using
older plugin versions can just result in having an unfixed bugs, for example if
Bar in 1.9.2 has some bug fixed.
Disclaimer: I don't have that much of an experience with class loading in Java
and especially loading user classes, however all of the systems that I have
seen, that were doing similar things (loading user classes, especially in a
separate class loader), strongly emphasise "parent first" pattern for all of
the dependencies that are on the boundary of the SPI.
https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/server/PluginClassLoader.java#L41
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java#L42
I would be more inclined towards something like [~banmoy]'s option 1. or 2.
However probably we can choose 1. for the sake of backward compatibility with
non plugin based class loading ({{classloader.parent-first-patterns.default}}
is used in more places, not only in plugins).
*What is the reason, why {{org.apache.hadoop}} is added to this list in the
first place?* For plugins I think we should be able to drop it. Unless I'm
missing something, I do not see a reason why {{org.apache.hadoop}} should be
loaded parent first for any plugin - that would kind of defeat the purpose of
any plugin using hadoop. As I wrote above, for Plugins, only SPI classes should
be parent first loaded and {{org.apache.hadoop}} should never be a part of any
SPI that we expose (unless I'm missing something...).
If I'm not missing anything, I would vote for duplicating
{{classloader.parent-first-patterns.additional}} and
{{classloader.parent-first-patterns.default}} to something like
{{classloader.plugin-parent-first-patterns.*}} and dropping maybe not only
{{org.apache.hadoop}}, but maybe almost everything from there.
> Nightly streaming file sink fails with unshaded hadoop
> ------------------------------------------------------
>
> Key: FLINK-15355
> URL: https://issues.apache.org/jira/browse/FLINK-15355
> Project: Flink
> Issue Type: Bug
> Components: FileSystems
> Affects Versions: 1.10.0, 1.11.0
> Reporter: Arvid Heise
> Assignee: PengFei Li
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.10.0
>
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> {code:java}
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.lang.RuntimeException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1751)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1628)
> at StreamingFileSinkProgram.main(StreamingFileSinkProgram.java:77)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
> ... 11 more
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph.
> at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1746)
> ... 20 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to
> submit JobGraph.
> at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:326)
> at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
> at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal
> server error., <Exception on server side:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
> at
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
> at
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could not set up
> JobManager
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ... 6 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not
> set up JobManager
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
> at
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
> at
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
> at
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> ... 7 more
> Caused by: java.lang.NoSuchMethodError:
> org.apache.hadoop.conf.Configuration.getTimeDuration(Ljava/lang/String;Ljava/lang/String;Ljava/util/concurrent/TimeUnit;)J
> at org.apache.hadoop.fs.s3a.S3ARetryPolicy.<init>(S3ARetryPolicy.java:113)
> at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:257)
> at
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:126)
> at
> org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:61)
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:441)
> at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
> at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
> at
> org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage.<init>(MemoryBackendCheckpointStorage.java:85)
> at
> org.apache.flink.runtime.state.memory.MemoryStateBackend.createCheckpointStorage(MemoryStateBackend.java:295)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:279)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:205)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:486)
> at
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:245)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:217)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:205)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
> at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
> at
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
> ... 10 more
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)