[ 
https://issues.apache.org/jira/browse/FLINK-3633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15201670#comment-15201670
 ] 

ASF GitHub Bot commented on FLINK-3633:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1818#discussion_r56676850
  
    --- Diff: docs/setup/config.md ---
    @@ -183,7 +184,7 @@ The following parameters configure Flink's JobManager 
and TaskManagers.
     - `akka.transport.threshold`: Threshold for the transport failure 
detector. Since Flink uses TCP, the detector is not necessary and, thus, the 
threshold is set to a high value (DEFAULT: **300**).
     - `akka.tcp.timeout`: Timeout for all outbound connections. If you should 
experience problems with connecting to a TaskManager due to a slow network, you 
should increase this value (DEFAULT: **akka.ask.timeout**).
     - `akka.throughput`: Number of messages that are processed in a batch 
before returning the thread to the pool. Low values denote a fair scheduling 
whereas high values can increase the performance at the cost of unfairness 
(DEFAULT: **15**).
    -- `akka.log.lifecycle.events`: Turns on the Akka's remote logging of 
events. Set this value to 'on' in case of debugging (DEFAULT: **off**).
    +- `akka.log.lifecycle.events`: Turns on the Akka's remote logging of 
events. Set this value to 'on' in case of debugging (DEFAULT: **false**).
    --- End diff --
    
    Yes, definitely. Thanks for spotting it :-)


> Job submission silently fails when using user code types
> --------------------------------------------------------
>
>                 Key: FLINK-3633
>                 URL: https://issues.apache.org/jira/browse/FLINK-3633
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.1.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Blocker
>             Fix For: 1.1.0
>
>
> With the changes introduced by FLINK-3327, it is no longer possible to run 
> remote Flink jobs which work on user code types. The reason is that now the 
> {{ExecutionConfig}} is directly stored in the {{JobGraph}} which is sent as 
> an Akka message to the {{JobManager}}. Per default, user code types are 
> automatically detected and registered in the {{ExecutionConfig}}. When 
> deserializing a {{JobGraph}} whose {{ExecutionConfig}} contains user code 
> classes the user code class loader is consequently required. However, Akka 
> does not have access to it and uses the system class loader. This causes that 
> Akka silently discards the {{SubmitJob}} message which cannot be deserialized 
> because of a {{ClassNotFoundException}}.
> I propose to not sent the {{ExecutionConfig}} explicitly with the 
> {{JobGraph}} and, thus, to partially revert the changes to before FLINK-3327. 
> Before, the {{ExectuionConfig}} was serialized into the job configuration and 
> deserialized on the {{TaskManager}} using the proper user code class loader.
> In order to reproduce the problem you can submit the following job to a 
> remote cluster.
> {code}
> public class Job {
>       public static class CustomType {
>               private final int value;
>               public CustomType(int value) {
>                       this.value = value;
>               }
>               @Override
>               public String toString() {
>                       return "CustomType(" + value + ")";
>               }
>       }
>       public static void main(String[] args) throws Exception {
>               ExecutionEnvironment env = 
> ExecutionEnvironment.createRemoteEnvironment(Address, Port, PathToJar);
>               env.getConfig().disableAutoTypeRegistration();
>               DataSet<Integer> input = env.fromElements(1,2,3,4,5);
>               DataSet<CustomType> customTypes = input.map(new 
> MapFunction<Integer, CustomType>() {
>                       @Override
>                       public CustomType map(Integer integer) throws Exception 
> {
>                               return new CustomType(integer);
>                       }
>               });
>               customTypes.print();
>       }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to