[ 
https://issues.apache.org/jira/browse/FLINK-3633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15201634#comment-15201634
 ] 

ASF GitHub Bot commented on FLINK-3633:
---------------------------------------

GitHub user tillrohrmann opened a pull request:

    https://github.com/apache/flink/pull/1818

    [FLINK-3633] Fix user code de/serialization in ExecutionConfig

    FLINK-3327 moved the ExecutionConfig directly to the JobGraph so that it 
was serialized
    and deserialized using the system class loader when sending a SubmitJob 
message to the
    JobManager. This is problematic since the ExecutionConfig can contain user 
code class
    which require a user code class loader for deserialization. In order to 
circumvent the
    problem, a UserCodeValue class was introduced which automatically sends the 
wrapped value
    as a byte array. On the receiving side, the wrapped value has to be 
explicitly deserialized
    providing a class loader.
    
    To test the feature the ScalaShellITCase.testSubmissionOfExternalLibrary 
was adapted
    to register org.apache.flink.ml.math.Vector at the ExecutionConfig.
    
    This commit also re-introduces the removed ExecutionConfig.CONFIG_KEY key, 
so that
    version 1.1 does not break the API.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tillrohrmann/flink fixJobSubmission

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/1818.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1818
    
----
commit 7abef1b685d76a5249ddeb0fcc4f1190b541ba57
Author: Till Rohrmann <trohrm...@apache.org>
Date:   2016-03-18T15:22:04Z

    [FLINK-3633] Fix user code de/serialization in ExecutionConfig
    
    FLINK-3327 moved the ExecutionConfig directly to the JobGraph so that it 
was serialized
    and deserialized using the system class loader when sending a SubmitJob 
message to the
    JobManager. This is problematic since the ExecutionConfig can contain user 
code class
    which require a user code class loader for deserialization. In order to 
circumvent the
    problem, a UserCodeValue class was introduced which automatically sends the 
wrapped value
    as a byte array. On the receiving side, the wrapped value has to be 
explicitly deserialized
    providing a class loader.
    
    To test the feature the ScalaShellITCase.testSubmissionOfExternalLibrary 
was adapted
    to register org.apache.flink.ml.math.Vector at the ExecutionConfig.
    
    This commit also re-introduces the removed ExecutionConfig.CONFIG_KEY key, 
so that
    version 1.1 does not break the API.

----


> Job submission silently fails when using user code types
> --------------------------------------------------------
>
>                 Key: FLINK-3633
>                 URL: https://issues.apache.org/jira/browse/FLINK-3633
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.1.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Blocker
>             Fix For: 1.1.0
>
>
> With the changes introduced by FLINK-3327, it is no longer possible to run 
> remote Flink jobs which work on user code types. The reason is that now the 
> {{ExecutionConfig}} is directly stored in the {{JobGraph}} which is sent as 
> an Akka message to the {{JobManager}}. Per default, user code types are 
> automatically detected and registered in the {{ExecutionConfig}}. When 
> deserializing a {{JobGraph}} whose {{ExecutionConfig}} contains user code 
> classes the user code class loader is consequently required. However, Akka 
> does not have access to it and uses the system class loader. This causes that 
> Akka silently discards the {{SubmitJob}} message which cannot be deserialized 
> because of a {{ClassNotFoundException}}.
> I propose to not sent the {{ExecutionConfig}} explicitly with the 
> {{JobGraph}} and, thus, to partially revert the changes to before FLINK-3327. 
> Before, the {{ExectuionConfig}} was serialized into the job configuration and 
> deserialized on the {{TaskManager}} using the proper user code class loader.
> In order to reproduce the problem you can submit the following job to a 
> remote cluster.
> {code}
> public class Job {
>       public static class CustomType {
>               private final int value;
>               public CustomType(int value) {
>                       this.value = value;
>               }
>               @Override
>               public String toString() {
>                       return "CustomType(" + value + ")";
>               }
>       }
>       public static void main(String[] args) throws Exception {
>               ExecutionEnvironment env = 
> ExecutionEnvironment.createRemoteEnvironment(Address, Port, PathToJar);
>               env.getConfig().disableAutoTypeRegistration();
>               DataSet<Integer> input = env.fromElements(1,2,3,4,5);
>               DataSet<CustomType> customTypes = input.map(new 
> MapFunction<Integer, CustomType>() {
>                       @Override
>                       public CustomType map(Integer integer) throws Exception 
> {
>                               return new CustomType(integer);
>                       }
>               });
>               customTypes.print();
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to