[ 
https://issues.apache.org/jira/browse/FLINK-7118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16091629#comment-16091629
 ] 

ASF GitHub Bot commented on FLINK-7118:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4285#discussion_r127992635
  
    --- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/HadoopOutputFormatBase.java
 ---
    @@ -126,18 +128,18 @@ public void open(int taskNumber, int numTasks) throws 
IOException {
                        this.jobConf.setInt("mapreduce.task.partition", 
taskNumber + 1);
     
                        try {
    -                           this.context = 
HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
    +                           this.context = new 
TaskAttemptContextImpl(this.jobConf, taskAttemptID);
                        } catch (Exception e) {
    -                           throw new RuntimeException(e);
    +                           throw new IOException("Could not create 
instance of TaskAttemptContext.", e);
                        }
     
                        this.outputCommitter = 
this.jobConf.getOutputCommitter();
     
                        JobContext jobContext;
                        try {
    -                           jobContext = 
HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
    +                           jobContext = new JobContextImpl(this.jobConf, 
new JobID());
                        } catch (Exception e) {
    -                           throw new RuntimeException(e);
    +                           throw new IOException("Could not create 
instance of JobContext.", e);
    --- End diff --
    
    Same as above...


> Remove hadoop1.x code in HadoopUtils
> ------------------------------------
>
>                 Key: FLINK-7118
>                 URL: https://issues.apache.org/jira/browse/FLINK-7118
>             Project: Flink
>          Issue Type: Improvement
>          Components: Java API
>            Reporter: mingleizhang
>            Assignee: mingleizhang
>
> Since flink no longer support hadoop 1.x version, we should remove it. Below 
> code reside in {{org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils}}
>       
> {code:java}
> public static JobContext instantiateJobContext(Configuration configuration, 
> JobID jobId) throws Exception {
>               try {
>                       Class<?> clazz;
>                       // for Hadoop 1.xx
>                       if(JobContext.class.isInterface()) {
>                               clazz = 
> Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, 
> Thread.currentThread().getContextClassLoader());
>                       }
>                       // for Hadoop 2.xx
>                       else {
>                               clazz = 
> Class.forName("org.apache.hadoop.mapreduce.JobContext", true, 
> Thread.currentThread().getContextClassLoader());
>                       }
>                       Constructor<?> constructor = 
> clazz.getConstructor(Configuration.class, JobID.class);
>                       JobContext context = (JobContext) 
> constructor.newInstance(configuration, jobId);
>                       
>                       return context;
>               } catch(Exception e) {
>                       throw new Exception("Could not create instance of 
> JobContext.");
>               }
>       }
> {code}
> And 
> {code:java}
>       public static TaskAttemptContext 
> instantiateTaskAttemptContext(Configuration configuration,  TaskAttemptID 
> taskAttemptID) throws Exception {
>               try {
>                       Class<?> clazz;
>                       // for Hadoop 1.xx
>                       if(JobContext.class.isInterface()) {
>                               clazz = 
> Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
>                       }
>                       // for Hadoop 2.xx
>                       else {
>                               clazz = 
> Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext");
>                       }
>                       Constructor<?> constructor = 
> clazz.getConstructor(Configuration.class, TaskAttemptID.class);
>                       TaskAttemptContext context = (TaskAttemptContext) 
> constructor.newInstance(configuration, taskAttemptID);
>                       
>                       return context;
>               } catch(Exception e) {
>                       throw new Exception("Could not create instance of 
> TaskAttemptContext.");
>               }
>       }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to