Hi JunTun, 1. Distributed Cache in new API usage:
// Setting up the cache for the application 1. Copy the requisite files to the FileSystem: $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz 2. Setup the application's JobConf: JobConf job = new JobConf(); DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), job); DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job); DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job); DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job); 3. Use the cached files in the Mapper <http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Mapper.html> or Reducer <http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Reducer.html>: public static class MapClass extends MapReduceBase implements Mapper<K, V, K, V> { private Path[] localArchives; private Path[] localFiles; public void configure(JobConf job) { // Get the cached archives/files localArchives = DistributedCache.getLocalCacheArchives(job); localFiles = DistributedCache.getLocalCacheFiles(job); } public void map(K key, V value, OutputCollector<K, V> output, Reporter reporter) throws IOException { // Use data from the cached archives/files here // ... // ... output.collect(k, v); } } 2. without distributed cache in simple terms if you are interested i can help you with the code. 2011/9/23 谭军 <tanjun_2...@163.com> > Hi Swathi.V., > I think my code below would work: > > Configuration conf1 = new Configuration(); > Job job1 = new Job(conf1, "Retrieval1"); > job1.setJarByClass(Retrieval.class); > job1.addCacheFile(new URI(args[0])); // problem here > conf1.set("keyNodeFile", args[0]); //try to set key node > file path and get file path in mapper1 > job1.setOutputKeyClass(Text.class); > job1.setOutputValueClass(Text.class); > job1.setMapperClass(RetrievalMapper.class); > job1.setReducerClass(RetrievalReducer.class); > FileInputFormat.addInputPath(job1, new Path(args[1])); > String out = args[2] + System.nanoTime(); > > FileOutputFormat.setOutputPath(job1, new Path(out)); > job1.waitForCompletion(true); > > Configuration conf2 = new Configuration(); > Job job2 = new Job(conf2, "Retrieval2"); > job2.setJarByClass(Retrieval.class); > conf2.set("newKeyNodeFile", out); // try to set new key node file > path and get it in mapper2 > DistributedCache.addCacheFile(new URI(out)); // problem here > job2.setOutputKeyClass(Text.class); > job2.setOutputValueClass(Text.class); > job2.setMapperClass(RetrievalMapper2.class); > job2.setReducerClass(RetrievalReducer2.class); > FileInputFormat.addInputPath(job2, new Path(args[1])); > FileOutputFormat.setOutputPath(job2, new Path(args[2])); > System.exit(job2.waitForCompletion(true) ? 0 : 1); > > But nullpointer exception was reported when I tried to get file by using > distributed cache file. > How to use distributed cache file in new APIs ? > I also tried to deliver file path by setting global parameters, however, > failed either. > How can I read "args[0]" file in mapper1 and intermediate file in mapper2 > use new APIs? > Thanks! > > > -- > > Regards! > > Jun Tan > > At 2011-09-23 19:06:50,"Swathi V" <swat...@zinniasystems.com> wrote: > > Hi Jun Tan, > > Yes i use 0.21.0 version. So i have used those. Well the Hadoop Definitive > Guide has job dependency examples for 0.20.x. > > Thank You, > > 2011/9/23 谭军 <tanjun_2...@163.com> > >> Swathi.V., >> ControlledJob cannot be resolved in my eclipse. >> My hadoop version is 0.20.2 >> ControlledJob can only be resolved in hadoop 0.21.0 (+)? >> Or I need some certain plugins? >> Thanks >> >> -- >> >> Regards! >> >> Jun Tan >> >> At 2011-09-22 00:56:54,"Swathi V" <swat...@zinniasystems.com> wrote: >> >> >> Hi, >> >> This code might help you >> //JobDependancies.java snippet >> >> Configuration conf = new Configuration(); >> Job job1 = new Job(conf, "job1"); >> job1.setJarByClass(JobDependancies.class); >> job1.setMapperClass(WordMapper.class); >> job1.setReducerClass(WordReducer.class); >> job1.setOutputKeyClass(Text.class); >> job1.setOutputValueClass(IntWritable.class); >> FileInputFormat.addInputPath(job1, new Path(args[0])); >> String out=args[1]+System.nanoTime(); >> FileOutputFormat.setOutputPath(job1, new Path(out)); >> >> >> >> Configuration conf2 = new Configuration(); >> Job job2 = new Job(conf2, "job2"); >> job2.setJarByClass(JobDependancies.class); >> job2.setOutputKeyClass(IntWritable.class); >> job2.setOutputValueClass(Text.class); >> job2.setMapperClass(SortWordMapper.class); >> job2.setReducerClass(Reducer.class); >> FileInputFormat.addInputPath(job2, new Path(out+"/part-r-00000")); >> FileOutputFormat.setOutputPath(job2, new Path(args[1])); >> >> ControlledJob controlledJob1 = new >> ControlledJob(job1.getConfiguration()); >> ControlledJob controlledJob2 = new >> ControlledJob(job2.getConfiguration()); >> controlledJob2.addDependingJob(controlledJob1); >> JobControl jobControl= new JobControl("control"); >> >> jobControl.addJob(controlledJob1); >> jobControl.addJob(controlledJob2); >> >> Thread thread = new Thread(jobControl); >> thread.start(); >> while(!jobControl.allFinished()) >> { >> try { >> Thread.sleep(10000); >> } catch (InterruptedException e) { >> // TODO Auto-generated catch block >> e.printStackTrace(); >> } >> } >> jobControl.stop(); >> } >> } >> >> >> wordcount output => job1 is given to sort=> job2 >> Irrespective of mappers and reducers, above mentioned is the way to handle >> many jobs. >> >> 2011/9/21 谭军 <tanjun_2...@163.com> >> >>> Hi, >>> I want to use 2 MR jobs sequentially. >>> And the first job produces intermediate result to a temp file. >>> The second job reads the result in temp file but not the FileInputPath. >>> I tried, but FileNotFoundException reported. >>> Then I checked the datanodes, temp file was created. >>> The first job was executed correctly. >>> Why the second job cannot find the file? The file was created before the >>> second job was executed. >>> Thanks! >>> >>> -- >>> >>> Regards! >>> >>> Jun Tan >>> >>> >>> >> >> >> -- >> Regards, >> Swathi.V. >> >> >> >> > > > -- > Regards, > Swathi.V. > > > > -- Regards, Swathi.V.