[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16017393#comment-16017393 ] ASF GitHub Bot commented on FLINK-6606: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3933 > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16017347#comment-16017347 ] ASF GitHub Bot commented on FLINK-6606: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3933 Yes that is true. Thanks for your work and the clarification @EronWright. Merging this PR. > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16017047#comment-16017047 ] ASF GitHub Bot commented on FLINK-6606: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3933#discussion_r117427223 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java --- @@ -267,6 +269,112 @@ else if (!allowUnmatchedState) { } // + // hook management + // + + /** +* Wraps a hook such that the user-code classloader is applied when the hook is invoked. +* @param hook the hook to wrap +* @param userClassLoader the classloader to use +*/ + public static MasterTriggerRestoreHook wrapHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { + return new WrappedMasterHook(hook, userClassLoader); + } + + @VisibleForTesting + static class WrappedMasterHook implements MasterTriggerRestoreHook { + + private final MasterTriggerRestoreHook hook; + private final ClassLoader userClassLoader; + + WrappedMasterHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { + this.hook = hook; + this.userClassLoader = userClassLoader; + } + + @Override + public String getIdentifier() { + Thread thread = Thread.currentThread(); + ClassLoader originalClassLoader = thread.getContextClassLoader(); + thread.setContextClassLoader(userClassLoader); --- End diff -- True. > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16017026#comment-16017026 ] ASF GitHub Bot commented on FLINK-6606: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3933#discussion_r117421730 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java --- @@ -267,6 +269,112 @@ else if (!allowUnmatchedState) { } // + // hook management + // + + /** +* Wraps a hook such that the user-code classloader is applied when the hook is invoked. +* @param hook the hook to wrap +* @param userClassLoader the classloader to use +*/ + public static MasterTriggerRestoreHook wrapHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { + return new WrappedMasterHook(hook, userClassLoader); + } + + @VisibleForTesting + static class WrappedMasterHook implements MasterTriggerRestoreHook { + + private final MasterTriggerRestoreHook hook; + private final ClassLoader userClassLoader; + + WrappedMasterHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { + this.hook = hook; + this.userClassLoader = userClassLoader; + } + + @Override + public String getIdentifier() { + Thread thread = Thread.currentThread(); + ClassLoader originalClassLoader = thread.getContextClassLoader(); + thread.setContextClassLoader(userClassLoader); --- End diff -- True, but I guess if `close` is idempotent, then it doesn't hurt. > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16016578#comment-16016578 ] ASF GitHub Bot commented on FLINK-6606: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3933#discussion_r117370390 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java --- @@ -267,6 +269,112 @@ else if (!allowUnmatchedState) { } // + // hook management + // + + /** +* Wraps a hook such that the user-code classloader is applied when the hook is invoked. +* @param hook the hook to wrap +* @param userClassLoader the classloader to use +*/ + public static MasterTriggerRestoreHook wrapHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { + return new WrappedMasterHook(hook, userClassLoader); + } + + @VisibleForTesting + static class WrappedMasterHook implements MasterTriggerRestoreHook { + + private final MasterTriggerRestoreHook hook; + private final ClassLoader userClassLoader; + + WrappedMasterHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { + this.hook = hook; + this.userClassLoader = userClassLoader; + } + + @Override + public String getIdentifier() { + Thread thread = Thread.currentThread(); + ClassLoader originalClassLoader = thread.getContextClassLoader(); + thread.setContextClassLoader(userClassLoader); --- End diff -- I disagee with that approach due to lack of symmetry.The try..finally idiom is typically written as: > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16016577#comment-16016577 ] ASF GitHub Bot commented on FLINK-6606: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/3933 Originally I pursued the path suggested by @tillrohrmann of setting the TCCL at the sites where the hook is invoked, but found that the classloader wasn't readily available.I found it more expedient to create a wrapper that eagerly captured the classloader. > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16016575#comment-16016575 ] ASF GitHub Bot commented on FLINK-6606: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/3933#discussion_r117370134 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java --- @@ -267,6 +269,112 @@ else if (!allowUnmatchedState) { } // + // hook management + // + + /** +* Wraps a hook such that the user-code classloader is applied when the hook is invoked. +* @param hook the hook to wrap +* @param userClassLoader the classloader to use +*/ + public static MasterTriggerRestoreHook wrapHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { + return new WrappedMasterHook(hook, userClassLoader); + } + + @VisibleForTesting + static class WrappedMasterHook implements MasterTriggerRestoreHook { + + private final MasterTriggerRestoreHook hook; + private final ClassLoader userClassLoader; + + WrappedMasterHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { + this.hook = hook; + this.userClassLoader = userClassLoader; + } + + @Override + public String getIdentifier() { + Thread thread = Thread.currentThread(); + ClassLoader originalClassLoader = thread.getContextClassLoader(); + thread.setContextClassLoader(userClassLoader); --- End diff -- Maybe we can move the `setContextClassLoader` into the `try` body. Just to be on the safe side that whenever something happens we will reset the original class loader. > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16016573#comment-16016573 ] ASF GitHub Bot commented on FLINK-6606: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3933 I agree with @EronWright that it would be nice to offer all user code the a similar environment. This will become especially important if a `MasterTriggerRestoreHook` which is contained in the system class loader tries to load user code classes dynamically. I'm wondering whether to follow the approach of the `InputFormatVertex` and `ExecutionJobVertex` to set the TCCL explicitly when calling the hooks. This could happen in `MasterHooks#triggerMasterHooks` for example. The advantage would be that we don't have to reset the TCCL for each hook. On the other hand, it harbours danger that we will forget to set the TCCL when calling the hooks from somewhere else in the future. > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16016039#comment-16016039 ] ASF GitHub Bot commented on FLINK-6606: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3933 Lets see what Till says. In my opinion, both approaches are fine. What you are proposing makes things on Flink's side a bit more complicated, but the behavior is consistent with other places where user code is executed. Till's proposal (so get the CL via the class) keeps our code straightforward, but users have to know it. > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16016010#comment-16016010 ] ASF GitHub Bot commented on FLINK-6606: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/3933 That's the sort of workaround that Android apps are forced to use. Why not continue the practice of setting the TCCL in transitions from system to user code? > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16016007#comment-16016007 ] ASF GitHub Bot commented on FLINK-6606: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3933 Till and I had an offline discussion about this. What you can do in your user code to set the TCCL properly is the following: ``` Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader()); ``` `this` points in user code to a class loaded through the user code classlaoder. So `this.getClass().getClassLoader()` is the CL you need to set for the thread. > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16015963#comment-16015963 ] ASF GitHub Bot commented on FLINK-6606: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/3933 @tillrohrmann the issue is with code that uses the thread's context classloader (TCCL), that may execute in `Factory.create` or later in the hook methods. For example, the Pravega connector uses grpc, which uses the TCCL during initialization (see [ManagedChannelProvider](https://github.com/grpc/grpc-java/blob/v1.3.0/core/src/main/java/io/grpc/ManagedChannelProvider.java#L132)). Looking at other areas in Flink where usercode is called, I see that the TCCL is consistently set. For example, see [InputFormatVertex](https://github.com/apache/flink/blob/2c68085f658873c2d5836fbad6b82be76a79f0f9/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/InputFormatVertex.java#L81), [ExecutionJobVertex](https://github.com/apache/flink/blob/7ad489d87281b74c53d3b1a0dd97e56b7a8ef303/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java#L229), and [Task](https://github.com/apache/flink/blob/6181302f1ab741b86af357e4513f5952a5fc1531/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java#L698). > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16015711#comment-16015711 ] ASF GitHub Bot commented on FLINK-6606: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/3933 Thanks for your contribution @EronWright. I'm not quite sure whether I understand which problem we are trying to solve here. I think by deserializing the `MasterTriggerRestoreHook.Factories` with the user code class loader in `ExecutionGraphBuilder.java:253`, we support user code hooks. Given that the `Factory` is a user defined class, then it should get the user code class loader set as its `ClassLoader`. Thus, it should also be able to load a user defined `MasterTriggerRestoreHook` class. And the latter can only be a user defined class if the factory is user defined. But I might be overlooking something here. Maybe you can give me some more details about the PR. > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16015695#comment-16015695 ] ASF GitHub Bot commented on FLINK-6606: --- Github user rmetzger commented on the issue: https://github.com/apache/flink/pull/3933 I haven't worked on this part of the code before, but I believe this code is +1 to merge. > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16014378#comment-16014378 ] ASF GitHub Bot commented on FLINK-6606: --- GitHub user EronWright opened a pull request: https://github.com/apache/flink/pull/3933 [FLINK-6606] Create checkpoint hook with user classloader - wrap calls to MasterTriggerRestoreHook (and its factory) such that the user classloader is applied You can merge this pull request into a Git repository by running: $ git pull https://github.com/EronWright/flink FLINK-6606 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3933.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3933 commit fbf904a60a1e252944e1cbc7ad60c5d95ae28ec2 Author: Wright, Eron Date: 2017-05-17T16:46:13Z FLINK-6606 - wrap calls to MasterTriggerRestoreHook (and its factory) such that the user classloader is applied > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6606) Create checkpoint hook with user classloader
[ https://issues.apache.org/jira/browse/FLINK-6606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16013670#comment-16013670 ] Eron Wright commented on FLINK-6606: - I have a fix ready and will open a PR soon. The fix wraps all calls to the hook such that the thread's context classloader is set to the user classloader. > Create checkpoint hook with user classloader > > > Key: FLINK-6606 > URL: https://issues.apache.org/jira/browse/FLINK-6606 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Blocker > Fix For: 1.3.0 > > > Flink should set the thread's classloader when calling the checkpoint hook > factory's `create` method. Without that, the hook is likely to fail during > initialization (e.g. using ServiceLoader). -- This message was sent by Atlassian JIRA (v6.3.15#6346)