Hi Jun Tun, Yeah ! surely... Well the code i gave is the new API.
2011/9/24 谭军 <tanjun_2...@163.com> > Hi Swathi.V., > Thank you very much. > It's very kind of you to do that. > I think the code you gave is implemented in old APIs. > I made it several days ago. What I can't is by new APIs. > I just get started to mapreduce programming and get some problems with my > code. > When you get time we can talk online. > Thanks! > > -- > > Regards! > > Jun Tan > > At 2011-09-24 01:37:54,"Swathi V" <swat...@zinniasystems.com> wrote: > > Hi JunTun, > > 1. Distributed Cache in new API usage: > > // Setting up the cache for the application > > 1. Copy the requisite files to the FileSystem: > > $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat > $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip > $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar > $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar > $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz > $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz > > 2. Setup the application's JobConf: > > JobConf job = new JobConf(); > DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), > job); > DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job); > DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job); > DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job); > DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job); > DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job); > > 3. Use the cached files in the Mapper > <http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Mapper.html> > or Reducer > <http://hadoop.apache.org/common/docs/r0.20.1/api/org/apache/hadoop/mapred/Reducer.html>: > > public static class MapClass extends MapReduceBase > implements Mapper<K, V, K, V> { > > private Path[] localArchives; > private Path[] localFiles; > > public void configure(JobConf job) { > // Get the cached archives/files > localArchives = DistributedCache.getLocalCacheArchives(job); > localFiles = DistributedCache.getLocalCacheFiles(job); > } > > public void map(K key, V value, > OutputCollector<K, V> output, Reporter reporter) > throws IOException { > // Use data from the cached archives/files here > // ... > // ... > output.collect(k, v); > } > } > > > 2. without distributed cache in simple terms if you are interested i can > help you with the code. > > > > 2011/9/23 谭军 <tanjun_2...@163.com> > >> Hi Swathi.V., >> I think my code below would work: >> >> Configuration conf1 = new Configuration(); >> Job job1 = new Job(conf1, "Retrieval1"); >> job1.setJarByClass(Retrieval.class); >> job1.addCacheFile(new URI(args[0])); // problem here >> conf1.set("keyNodeFile", args[0]); //try to set key node >> file path and get file path in mapper1 >> job1.setOutputKeyClass(Text.class); >> job1.setOutputValueClass(Text.class); >> job1.setMapperClass(RetrievalMapper.class); >> job1.setReducerClass(RetrievalReducer.class); >> FileInputFormat.addInputPath(job1, new Path(args[1])); >> String out = args[2] + System.nanoTime(); >> >> FileOutputFormat.setOutputPath(job1, new Path(out)); >> job1.waitForCompletion(true); >> >> Configuration conf2 = new Configuration(); >> Job job2 = new Job(conf2, "Retrieval2"); >> job2.setJarByClass(Retrieval.class); >> conf2.set("newKeyNodeFile", out); // try to set new key node >> file path and get it in mapper2 >> DistributedCache.addCacheFile(new URI(out)); // problem here >> job2.setOutputKeyClass(Text.class); >> job2.setOutputValueClass(Text.class); >> job2.setMapperClass(RetrievalMapper2.class); >> job2.setReducerClass(RetrievalReducer2.class); >> FileInputFormat.addInputPath(job2, new Path(args[1])); >> FileOutputFormat.setOutputPath(job2, new Path(args[2])); >> System.exit(job2.waitForCompletion(true) ? 0 : 1); >> >> But nullpointer exception was reported when I tried to get file by using >> distributed cache file. >> How to use distributed cache file in new APIs ? >> I also tried to deliver file path by setting global parameters, however, >> failed either. >> How can I read "args[0]" file in mapper1 and intermediate file in mapper2 >> use new APIs? >> Thanks! >> >> >> -- >> >> Regards! >> >> Jun Tan >> >> At 2011-09-23 19:06:50,"Swathi V" <swat...@zinniasystems.com> wrote: >> >> Hi Jun Tan, >> >> Yes i use 0.21.0 version. So i have used those. Well the Hadoop Definitive >> Guide has job dependency examples for 0.20.x. >> >> Thank You, >> >> 2011/9/23 谭军 <tanjun_2...@163.com> >> >>> Swathi.V., >>> ControlledJob cannot be resolved in my eclipse. >>> My hadoop version is 0.20.2 >>> ControlledJob can only be resolved in hadoop 0.21.0 (+)? >>> Or I need some certain plugins? >>> Thanks >>> >>> -- >>> >>> Regards! >>> >>> Jun Tan >>> >>> At 2011-09-22 00:56:54,"Swathi V" <swat...@zinniasystems.com> wrote: >>> >>> >>> Hi, >>> >>> This code might help you >>> //JobDependancies.java snippet >>> >>> Configuration conf = new Configuration(); >>> Job job1 = new Job(conf, "job1"); >>> job1.setJarByClass(JobDependancies.class); >>> job1.setMapperClass(WordMapper.class); >>> job1.setReducerClass(WordReducer.class); >>> job1.setOutputKeyClass(Text.class); >>> job1.setOutputValueClass(IntWritable.class); >>> FileInputFormat.addInputPath(job1, new Path(args[0])); >>> String out=args[1]+System.nanoTime(); >>> FileOutputFormat.setOutputPath(job1, new Path(out)); >>> >>> >>> >>> Configuration conf2 = new Configuration(); >>> Job job2 = new Job(conf2, "job2"); >>> job2.setJarByClass(JobDependancies.class); >>> job2.setOutputKeyClass(IntWritable.class); >>> job2.setOutputValueClass(Text.class); >>> job2.setMapperClass(SortWordMapper.class); >>> job2.setReducerClass(Reducer.class); >>> FileInputFormat.addInputPath(job2, new Path(out+"/part-r-00000")); >>> FileOutputFormat.setOutputPath(job2, new Path(args[1])); >>> >>> ControlledJob controlledJob1 = new >>> ControlledJob(job1.getConfiguration()); >>> ControlledJob controlledJob2 = new >>> ControlledJob(job2.getConfiguration()); >>> controlledJob2.addDependingJob(controlledJob1); >>> JobControl jobControl= new JobControl("control"); >>> >>> jobControl.addJob(controlledJob1); >>> jobControl.addJob(controlledJob2); >>> >>> Thread thread = new Thread(jobControl); >>> thread.start(); >>> while(!jobControl.allFinished()) >>> { >>> try { >>> Thread.sleep(10000); >>> } catch (InterruptedException e) { >>> // TODO Auto-generated catch block >>> e.printStackTrace(); >>> } >>> } >>> jobControl.stop(); >>> } >>> } >>> >>> >>> wordcount output => job1 is given to sort=> job2 >>> Irrespective of mappers and reducers, above mentioned is the way to >>> handle many jobs. >>> >>> 2011/9/21 谭军 <tanjun_2...@163.com> >>> >>>> Hi, >>>> I want to use 2 MR jobs sequentially. >>>> And the first job produces intermediate result to a temp file. >>>> The second job reads the result in temp file but not the FileInputPath. >>>> I tried, but FileNotFoundException reported. >>>> Then I checked the datanodes, temp file was created. >>>> The first job was executed correctly. >>>> Why the second job cannot find the file? The file was created before the >>>> second job was executed. >>>> Thanks! >>>> >>>> -- >>>> >>>> Regards! >>>> >>>> Jun Tan >>>> >>>> >>>> >>> >>> >>> -- >>> Regards, >>> Swathi.V. >>> >>> >>> >>> >> >> >> -- >> Regards, >> Swathi.V. >> >> >> >> > > > -- > Regards, > Swathi.V. > > > > -- Regards, Swathi.V.