[ 
http://issues.apache.org/jira/browse/HADOOP-372?page=comments#action_12429787 ] 
            
Runping Qi commented on HADOOP-372:
-----------------------------------


I think it may be better to make InputPipeline as an interface:

public interface InputPipeline { 
  public Class createInputFormat(Path inputDir, JobConf conf); 
  public Class getMapper(Path inputDir, JobConf conf); 
  public int getRequestedNumMaps(Path inputDir, JobConf conf); 
} 

In order to customizing input processing pipeline, the user is expected to 
provide a class
that  implements the interface:

public class MyInputPipeline implements InputPipeline { 
  public MyInputPipeline(); 
  public Class getInputFormat(Path inputDir, JobConf conf); 
  public Class getMapper(Path inputDir, JobConf conf); 
  public int getRequestedNumMaps(Path inputDir, JobConf conf); 
} 

The class provides a argumentless constructor, and implements the logic to 
determine an InputFormat class and a mapper class that apply to the data under 
the 
specified input directory, and determine
the expected number of mappers for the data under the directory.

The access to this class is through two new methods of JobConf:

   public void setInputPipelineClass(Class cls); 
   public Class getInputPipelineClass(); 

To specify such a class, the user just simply call:
   myJob.setInputPipelineClass(MyInputPipeline.class)


The initTasks method of JobInProgress class can do something like the following:

       String jobFile = profile.getJobFile();

        JobConf jd = new JobConf(localJobFile);
        FileSystem fs = FileSystem.get(conf);
        String userPipelineClassName = jd.get("mapred.input.pipeline.class");
        InputPipeline  inputPipeline ;
        if (userPipelineClassName != null && localJarFile != null) {
          try {
            ClassLoader loader =
            new URLClassLoader(new URL[]{ 
localFs.pathToFile(localJarFile).toURL() });
            Class inputPipelineClass = Class.forName(userPipelineClassName , 
true, loader);
            inputPipeline  = (InputPipeline )inputPipelineClass.newInstance(); 
          } catch (Exception e) {
            throw new IOException(e.toString());
          }
        } else {
          inputPipeline = jd.getInputPipeline();
        }
        ArrayList allSplits;
        InputFormat inputFormat;
        InputFormat genericInputFormat;
        Class mapperClass;
        Class genericMapperClass;
        int numMapTasks;
        // code to get the generic input format, mapper class for the job
        
        Path[] dirs = job.getInputPaths();
        for (int i = 0; i < dirs.length; i++) {
                inputFormat = genericInputFormat;
                if (inputPipeline != null) {
                     Class inputFormatClass = 
inputPipeline.getInputFormat(dirs[i], jd);
                
                     if (inputFormatClass != null) {
                            inputFormat  = 
(InputFormat)inputFormatClass.newInstance();
                    } 
                    mapperClass = inputPipeline.getMapper(dirs[i], jd);
                    if (mapperClass == null) {
                          mapperClass = genericMapperClass;
                    }
                    int numMapTasks = 
inputPipeline.getRequestedNumMaps(dirs[i], jd);
                     
                }
                FileSplit[] splits = inputFormat.getSplits(fs, jd, numMapTasks);
                // add the new splits to allSplits
                ....
            }

        FileSplit[] splits = inputFormat.getSplits(fs, jd, numMapTasks);

> should allow to specify different inputformat classes for different input 
> dirs for Map/Reduce jobs
> --------------------------------------------------------------------------------------------------
>
>                 Key: HADOOP-372
>                 URL: http://issues.apache.org/jira/browse/HADOOP-372
>             Project: Hadoop
>          Issue Type: New Feature
>          Components: mapred
>    Affects Versions: 0.4.0
>         Environment: all
>            Reporter: Runping Qi
>         Assigned To: Owen O'Malley
>
> Right now, the user can specify multiple input directories for a map reduce 
> job. 
> However, the files under all the directories are assumed to be in the same 
> format, 
> with the same key/value classes. This proves to be  a serious limit in many 
> situations. 
> Here is an example. Suppose I have three simple tables: 
> one has URLs and their rank values (page ranks), 
> another has URLs and their classification values, 
> and the third one has the URL meta data such as crawl status, last crawl 
> time, etc. 
> Suppose now I need a job to generate a list of URLs to be crawled next. 
> The decision depends on the info in all the three tables.
> Right now, there is no easy way to accomplish this.
> However, this job can be done if the framework allows to specify different 
> inputformats for different input dirs.
> Suppose my three tables are in the following directory respectively: 
> rankTable, classificationTable. and metaDataTable. 
> If we extend JobConf class with the following method (as Owen suggested to 
> me):
>     addInputPath(aPath, anInputFormatClass, anInputKeyClass, 
> anInputValueClass)
> Then I can specify my job as follows:
>     addInputPath(rankTable, SequenceFileInputFormat.class, UTF8.class, 
> DoubleWritable.class)
>     addInputPath(classificationTable, TextInputFormat.class, UTF8,class, 
> UTF8.class)
>     addInputPath(metaDataTable, SequenceFileInputFormat.class, UTF8.class, 
> MyRecord.class)
> If an input directory is added through the current API, it will have the same 
> meaning as it is now. 
> Thus this extension will not affect any applications that do not need this 
> new feature.
> It is relatively easy for the M/R framework to create an appropriate record 
> reader for a map task based on the above information.
> And that is the only change needed for supporting this extension.

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: 
http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to