[ 
https://issues.apache.org/jira/browse/FLINK-3633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204529#comment-15204529
 ] 

ASF GitHub Bot commented on FLINK-3633:
---------------------------------------

Github user tillrohrmann commented on the pull request:

    https://github.com/apache/flink/pull/1818#issuecomment-199363606
  
    Due to the tight coupling of the ExecutionConfig and multiple Flink 
components
    (e.g. PojoSerializer) the automatic serialization and manual 
deserialization of
    user code objects via the UserCodeValue class caused problems. In order to 
minimize
    the impact of the changes, I changed the serialization strategy to an 
explicit one.
    One has to call `ExecutionConfig.serializeUserCode` to store the user code 
objects in a SerializedValue
    object and nulling the corresponding member fields. If that is not done, 
then it is
    assumed that the object is deserialized using a user code class loader. On 
the receiving side one has to call `ExecutionConfig.deserializeConfig` 
providing a class loader which knows the user code classes.


> Job submission silently fails when using user code types
> --------------------------------------------------------
>
>                 Key: FLINK-3633
>                 URL: https://issues.apache.org/jira/browse/FLINK-3633
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.1.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Blocker
>             Fix For: 1.1.0
>
>
> With the changes introduced by FLINK-3327, it is no longer possible to run 
> remote Flink jobs which work on user code types. The reason is that now the 
> {{ExecutionConfig}} is directly stored in the {{JobGraph}} which is sent as 
> an Akka message to the {{JobManager}}. Per default, user code types are 
> automatically detected and registered in the {{ExecutionConfig}}. When 
> deserializing a {{JobGraph}} whose {{ExecutionConfig}} contains user code 
> classes the user code class loader is consequently required. However, Akka 
> does not have access to it and uses the system class loader. This causes that 
> Akka silently discards the {{SubmitJob}} message which cannot be deserialized 
> because of a {{ClassNotFoundException}}.
> I propose to not sent the {{ExecutionConfig}} explicitly with the 
> {{JobGraph}} and, thus, to partially revert the changes to before FLINK-3327. 
> Before, the {{ExectuionConfig}} was serialized into the job configuration and 
> deserialized on the {{TaskManager}} using the proper user code class loader.
> In order to reproduce the problem you can submit the following job to a 
> remote cluster.
> {code}
> public class Job {
>       public static class CustomType {
>               private final int value;
>               public CustomType(int value) {
>                       this.value = value;
>               }
>               @Override
>               public String toString() {
>                       return "CustomType(" + value + ")";
>               }
>       }
>       public static void main(String[] args) throws Exception {
>               ExecutionEnvironment env = 
> ExecutionEnvironment.createRemoteEnvironment(Address, Port, PathToJar);
>               env.getConfig().disableAutoTypeRegistration();
>               DataSet<Integer> input = env.fromElements(1,2,3,4,5);
>               DataSet<CustomType> customTypes = input.map(new 
> MapFunction<Integer, CustomType>() {
>                       @Override
>                       public CustomType map(Integer integer) throws Exception 
> {
>                               return new CustomType(integer);
>                       }
>               });
>               customTypes.print();
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to