[ 
https://issues.apache.org/jira/browse/FLINK-3633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204008#comment-15204008
 ] 

ASF GitHub Bot commented on FLINK-3633:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1818#discussion_r56803957
  
    --- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
    @@ -109,22 +113,22 @@
     
        private RestartStrategies.RestartStrategyConfiguration 
restartStrategyConfiguration;
        
    -   private long taskCancellationIntervalMillis = 
ConfigConstants.DEFAULT_TASK_CANCELLATION_INTERVAL_MILLIS;
    +   private long taskCancellationIntervalMillis = -1;
     
        // Serializers and types registered with Kryo and the PojoSerializer
        // we store them in linked maps/sets to ensure they are registered in 
order in all kryo instances.
     
    -   private final LinkedHashMap<Class<?>, SerializableSerializer<?>> 
registeredTypesWithKryoSerializers = new LinkedHashMap<>();
    +   private final UserCodeValue<LinkedHashMap<Class<?>, 
SerializableSerializer<?>>> registeredTypesWithKryoSerializers = new 
UserCodeValue<>(new LinkedHashMap<Class<?>, SerializableSerializer<?>>());
     
    -   private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
    +   private final UserCodeValue<LinkedHashMap<Class<?>, Class<? extends 
Serializer<?>>>> registeredTypesWithKryoSerializerClasses = new 
UserCodeValue<>(new LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>());
     
    -   private final LinkedHashMap<Class<?>, SerializableSerializer<?>> 
defaultKryoSerializers = new LinkedHashMap<>();
    +   private final UserCodeValue<LinkedHashMap<Class<?>, 
SerializableSerializer<?>>> defaultKryoSerializers = new UserCodeValue<>(new 
LinkedHashMap<Class<?>, SerializableSerializer<?>>());
     
    -   private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> 
defaultKryoSerializerClasses = new LinkedHashMap<>();
    +   private final UserCodeValue<LinkedHashMap<Class<?>, Class<? extends 
Serializer<?>>>> defaultKryoSerializerClasses = new UserCodeValue<>(new 
LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>());
     
    -   private final LinkedHashSet<Class<?>> registeredKryoTypes = new 
LinkedHashSet<>();
    +   private final UserCodeValue<LinkedHashSet<Class<?>>> 
registeredKryoTypes = new UserCodeValue<>(new LinkedHashSet<Class<?>>());
     
    -   private final LinkedHashSet<Class<?>> registeredPojoTypes = new 
LinkedHashSet<>();
    +   private final UserCodeValue<LinkedHashSet<Class<?>>> 
registeredPojoTypes = new UserCodeValue<>(new LinkedHashSet<Class<?>>());
    --- End diff --
    
    That is true, I will fix it. Personally, I think this `GlobalJobParameters` 
is a huge crime. It basically allows you to store anything in the 
`ExecutionConfig`. Sometimes I'm overcome with the feeling that this 
`ExecutionConfig` is simply a dumping ground for all kinds of parameters which 
are somehow needed somewhere.


> Job submission silently fails when using user code types
> --------------------------------------------------------
>
>                 Key: FLINK-3633
>                 URL: https://issues.apache.org/jira/browse/FLINK-3633
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.1.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Blocker
>             Fix For: 1.1.0
>
>
> With the changes introduced by FLINK-3327, it is no longer possible to run 
> remote Flink jobs which work on user code types. The reason is that now the 
> {{ExecutionConfig}} is directly stored in the {{JobGraph}} which is sent as 
> an Akka message to the {{JobManager}}. Per default, user code types are 
> automatically detected and registered in the {{ExecutionConfig}}. When 
> deserializing a {{JobGraph}} whose {{ExecutionConfig}} contains user code 
> classes the user code class loader is consequently required. However, Akka 
> does not have access to it and uses the system class loader. This causes that 
> Akka silently discards the {{SubmitJob}} message which cannot be deserialized 
> because of a {{ClassNotFoundException}}.
> I propose to not sent the {{ExecutionConfig}} explicitly with the 
> {{JobGraph}} and, thus, to partially revert the changes to before FLINK-3327. 
> Before, the {{ExectuionConfig}} was serialized into the job configuration and 
> deserialized on the {{TaskManager}} using the proper user code class loader.
> In order to reproduce the problem you can submit the following job to a 
> remote cluster.
> {code}
> public class Job {
>       public static class CustomType {
>               private final int value;
>               public CustomType(int value) {
>                       this.value = value;
>               }
>               @Override
>               public String toString() {
>                       return "CustomType(" + value + ")";
>               }
>       }
>       public static void main(String[] args) throws Exception {
>               ExecutionEnvironment env = 
> ExecutionEnvironment.createRemoteEnvironment(Address, Port, PathToJar);
>               env.getConfig().disableAutoTypeRegistration();
>               DataSet<Integer> input = env.fromElements(1,2,3,4,5);
>               DataSet<CustomType> customTypes = input.map(new 
> MapFunction<Integer, CustomType>() {
>                       @Override
>                       public CustomType map(Integer integer) throws Exception 
> {
>                               return new CustomType(integer);
>                       }
>               });
>               customTypes.print();
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to