[ 
https://issues.apache.org/jira/browse/FLINK-15355?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006836#comment-17006836
 ] 

Piotr Nowojski commented on FLINK-15355:
----------------------------------------

Adapting [my comment from the pull 
request|https://github.com/apache/flink/pull/10678#pullrequestreview-336577210]

I would be against 4th option proposed by [~arvid heise]. I think it’s really 
important to all SPI classes be parent loaded first, otherwise very bad things 
can happen. For example if user accidentally embeds a different version (1.9.2 
vs 1.9.1) of org.apache.flink classes. How I understand the issue:

If you have a SPI class (like Foo), if you implement it's method (like Bar 
Foo#bar() \{ return new Bar(); }), it's important that every object/class that 
crosses the boundary between plugin's implementation and the core system (like 
Bar) must originate from the core's system class loader. If not, if they are 
incompatible, you risk all kind of the dependency convergence errors 
(deadlocks, livelocks, method not found) if the versions of Bar are not binary 
compatible. Additionally if this Bar is passed around, it can crash at any 
place/point of time, very far away from the place it was created. Also using 
older plugin versions can just result in having an unfixed bugs, for example if 
Bar in 1.9.2 has some bug fixed.

Disclaimer: I don't have that much of an experience with class loading in Java 
and especially loading user classes, however all of the systems that I have 
seen, that were doing similar things (loading user classes, especially in a 
separate class loader), strongly emphasise "parent first" pattern for all of 
the dependencies that are on the boundary of the SPI.

https://github.com/prestodb/presto/blob/master/presto-main/src/main/java/com/facebook/presto/server/PluginClassLoader.java#L41
https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-nar-utils/src/main/java/org/apache/nifi/nar/NarClassLoader.java#L42

I would be more inclined towards something like [~banmoy]'s option 1. or 2.

However probably we can choose 1. for the sake of backward compatibility with 
non plugin based class loading ({{classloader.parent-first-patterns.default}} 
is used in more places, not only in plugins). 

*What is the reason, why {{org.apache.hadoop}} is added to this list in the 
first place?* For plugins I think we should be able to drop it. Unless I'm 
missing something, I do not see a reason why {{org.apache.hadoop}} should be 
loaded parent first for any plugin - that would kind of defeat the purpose of 
any plugin using hadoop. As I wrote above, for Plugins, only SPI classes should 
be parent first loaded and {{org.apache.hadoop}} should never be a part of any 
SPI that we expose (unless I'm missing something...). 

If I'm not missing anything, I would vote for duplicating 
{{classloader.parent-first-patterns.additional}} and 
{{classloader.parent-first-patterns.default}} to something like 
{{classloader.plugin-parent-first-patterns.*}} and dropping maybe not only 
{{org.apache.hadoop}}, but maybe almost everything from there.

> Nightly streaming file sink fails with unshaded hadoop
> ------------------------------------------------------
>
>                 Key: FLINK-15355
>                 URL: https://issues.apache.org/jira/browse/FLINK-15355
>             Project: Flink
>          Issue Type: Bug
>          Components: FileSystems
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: Arvid Heise
>            Assignee: PengFei Li
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.10.0
>
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> {code:java}
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>  at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>  at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>  at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>  at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>  at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
>  at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
> Caused by: java.lang.RuntimeException: 
> java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
>  at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:199)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1751)
>  at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:94)
>  at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:63)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1628)
>  at StreamingFileSinkProgram.main(StreamingFileSinkProgram.java:77)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>  ... 11 more
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit 
> JobGraph.
>  at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>  at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>  at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1746)
>  ... 20 more
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
> submit JobGraph.
>  at 
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$7(RestClusterClient.java:326)
>  at 
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>  at 
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>  at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:274)
>  at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>  at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>  at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>  at 
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>  at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>  at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal 
> server error., <Exception on server side:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
>  at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:336)
>  at 
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
>  at 
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
>  at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>  at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>  at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobExecutionException: Could not set up 
> JobManager
>  at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>  at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>  ... 6 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
> set up JobManager
>  at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
>  at 
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
>  at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:379)
>  at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>  ... 7 more
> Caused by: java.lang.NoSuchMethodError: 
> org.apache.hadoop.conf.Configuration.getTimeDuration(Ljava/lang/String;Ljava/lang/String;Ljava/util/concurrent/TimeUnit;)J
>  at org.apache.hadoop.fs.s3a.S3ARetryPolicy.<init>(S3ARetryPolicy.java:113)
>  at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:257)
>  at 
> org.apache.flink.fs.s3.common.AbstractS3FileSystemFactory.create(AbstractS3FileSystemFactory.java:126)
>  at 
> org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:61)
>  at 
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:441)
>  at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
>  at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
>  at 
> org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage.<init>(MemoryBackendCheckpointStorage.java:85)
>  at 
> org.apache.flink.runtime.state.memory.MemoryStateBackend.createCheckpointStorage(MemoryStateBackend.java:295)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:279)
>  at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.<init>(CheckpointCoordinator.java:205)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:486)
>  at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
>  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:245)
>  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:217)
>  at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:205)
>  at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:119)
>  at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
>  at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
>  at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
>  at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
>  at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
>  at 
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
>  ... 10 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to