[ 
https://issues.apache.org/jira/browse/FLINK-1438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-1438:
---------------------------------
    Description: 
Jobs with custom InputSplits fail with a ClassCastException such as 
{{org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit 
cannot be cast to 
org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit}} if 
executed on a local setup. 

This issue is probably related to different ClassLoaders used by the JobManager 
when InputSplits are generated and when they are handed to the InputFormat by 
the TaskManager. Moving the class of the custom InputSplit into the {{./lib}} 
folder and removing it from the job's makes the job work.

To reproduce the bug, run the following job on a local setup. 

{code}
public class CustomSplitTestJob {

        public static void main(String[] args) throws Exception {

                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

                DataSet<String> x = env.createInput(new TestFileInputFormat());
                x.print();

                env.execute();
        }

        public static class TestFileInputFormat implements 
InputFormat<String,TestFileInputSplit> {

                @Override
                public void configure(Configuration parameters) {

                }

                @Override
                public BaseStatistics getStatistics(BaseStatistics 
cachedStatistics) throws IOException {
                        return null;
                }

                @Override
                public TestFileInputSplit[] createInputSplits(int minNumSplits) 
throws IOException {
                        return new TestFileInputSplit[]{new 
TestFileInputSplit()};
                }

                @Override
                public InputSplitAssigner 
getInputSplitAssigner(TestFileInputSplit[] inputSplits) {
                        return new LocatableInputSplitAssigner(inputSplits);
                }

                @Override
                public void open(TestFileInputSplit split) throws IOException {

                }

                @Override
                public boolean reachedEnd() throws IOException {
                        return false;
                }

                @Override
                public String nextRecord(String reuse) throws IOException {
                        return null;
                }

                @Override
                public void close() throws IOException {

                }
        }

        public static class TestFileInputSplit extends FileInputSplit {

        }

}
{code}

The same happens in distributed mode just that Akka terminates the transmission 
of the input split with a meaningless {{invalid type code: 00}}.

  was:
Jobs with custom InputSplits fail with a ClassCastException such as 
{{org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit 
cannot be cast to 
org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit}} if 
executed on a local setup. 

This issue is probably related to different ClassLoaders used by the JobManager 
when InputSplits are generated and when they are handed to the InputFormat by 
the TaskManager. Moving the class of the custom InputSplit into the {{./lib}} 
folder and removing it from the job's makes the job work.

To reproduce the bug, run the following job on a local setup. 

{code}
public class CustomSplitTestJob {

        public static void main(String[] args) throws Exception {

                ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();

                DataSet<String> x = env.createInput(new TestFileInputFormat());
                x.print();

                env.execute();
        }

        public static class TestFileInputFormat implements 
InputFormat<String,TestFileInputSplit> {

                @Override
                public void configure(Configuration parameters) {

                }

                @Override
                public BaseStatistics getStatistics(BaseStatistics 
cachedStatistics) throws IOException {
                        return null;
                }

                @Override
                public TestFileInputSplit[] createInputSplits(int minNumSplits) 
throws IOException {
                        return new TestFileInputSplit[]{new 
TestFileInputSplit()};
                }

                @Override
                public InputSplitAssigner 
getInputSplitAssigner(TestFileInputSplit[] inputSplits) {
                        return new LocatableInputSplitAssigner(inputSplits);
                }

                @Override
                public void open(TestFileInputSplit split) throws IOException {

                }

                @Override
                public boolean reachedEnd() throws IOException {
                        return false;
                }

                @Override
                public String nextRecord(String reuse) throws IOException {
                        return null;
                }

                @Override
                public void close() throws IOException {

                }
        }

        public static class TestFileInputSplit extends FileInputSplit {

        }

}
{code}


> ClassCastException for Custom InputSplit in local mode and invalid type code 
> in distributed mode
> ------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-1438
>                 URL: https://issues.apache.org/jira/browse/FLINK-1438
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 0.8
>            Reporter: Fabian Hueske
>            Priority: Minor
>
> Jobs with custom InputSplits fail with a ClassCastException such as 
> {{org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit 
> cannot be cast to 
> org.apache.flink.examples.java.misc.CustomSplitTestJob$TestFileInputSplit}} 
> if executed on a local setup. 
> This issue is probably related to different ClassLoaders used by the 
> JobManager when InputSplits are generated and when they are handed to the 
> InputFormat by the TaskManager. Moving the class of the custom InputSplit 
> into the {{./lib}} folder and removing it from the job's makes the job work.
> To reproduce the bug, run the following job on a local setup. 
> {code}
> public class CustomSplitTestJob {
>       public static void main(String[] args) throws Exception {
>               ExecutionEnvironment env = 
> ExecutionEnvironment.getExecutionEnvironment();
>               DataSet<String> x = env.createInput(new TestFileInputFormat());
>               x.print();
>               env.execute();
>       }
>       public static class TestFileInputFormat implements 
> InputFormat<String,TestFileInputSplit> {
>               @Override
>               public void configure(Configuration parameters) {
>               }
>               @Override
>               public BaseStatistics getStatistics(BaseStatistics 
> cachedStatistics) throws IOException {
>                       return null;
>               }
>               @Override
>               public TestFileInputSplit[] createInputSplits(int minNumSplits) 
> throws IOException {
>                       return new TestFileInputSplit[]{new 
> TestFileInputSplit()};
>               }
>               @Override
>               public InputSplitAssigner 
> getInputSplitAssigner(TestFileInputSplit[] inputSplits) {
>                       return new LocatableInputSplitAssigner(inputSplits);
>               }
>               @Override
>               public void open(TestFileInputSplit split) throws IOException {
>               }
>               @Override
>               public boolean reachedEnd() throws IOException {
>                       return false;
>               }
>               @Override
>               public String nextRecord(String reuse) throws IOException {
>                       return null;
>               }
>               @Override
>               public void close() throws IOException {
>               }
>       }
>       public static class TestFileInputSplit extends FileInputSplit {
>       }
> }
> {code}
> The same happens in distributed mode just that Akka terminates the 
> transmission of the input split with a meaningless {{invalid type code: 00}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to