Hi
My code creates a new job named "job 1" which writes something to distributed cache (say a text file) and the job gets completed.
Just to manage expectations, you add files to the distributed cache_in the job driver_, and the framework makes them available to maps and reducers.
Adding a file to the distributed cache in a mapper and expecting to have it available in other mappers/reducers or later jobs is not a supported use case. If you want a mapper/reducer to send info back to the job driver, then use counters or write regular files (in the job output dir). If you want to add a file to the distributed cache and have multiple jobs use it, then: Note that DistributedCache.add*(Path, Conf) append the file name to internal conf properties. (then when you submit the job, the framework will read those properties to find out what files need to be distributed, added to the classpath, etc.). So DistributedCache.addCacheFile(*f*, *conf1*) will write something into *conf1*. When you launch *job1* using *conf1*, *f* gets distributed. When you launch *job2* using *conf2*, then *job2* may not distribute file *f*, depending on how you build *conf2* (independent of *conf1*, or a copy/clone of *conf1*). hth Gabriel Balan P.S. Btw, you don't have to copy the local file to hdfs using "IOUtils.copyBytes(in, out, 4096, true);" Try FileSystem.copyFromLocalFile <https://hadoop.apache.org/docs/r2.6.1/api/src-html/org/apache/hadoop/fs/FileSystem.html#line.1885> or "hadoop -jar foo.jar MyClass -file /home/siddharth/Desktop/data/bloom_filter" On 6/13/2016 5:14 AM, Siddharth Dawar wrote:
Hi Jeff, Thanks for your prompt reply. Actually my problem is as follows: My code creates a new job named "job 1" which writes something to distributed cache (say a text file) and the job gets completed. Now, I want to create some n number of jobs in while loop below, which reads the text file written by "job 1" from the distributed cache. So my question is, "*How to share content among multiple jobs using distributed cache*" ? *Another part of the problem *is that I don't know how to get instance of running job from JobClient.runJob(conf2); *so that I can use job.addcachefiles(..) command/* ** while (true) { JobConf conf2 = new JobConf(getConf(),graphMining.class); conf2.setJobName("sid"); conf2.setMapperClass(mapperMiner.class); conf2.setReducerClass(reducerMiner.class); conf2.setInputFormat(SequenceFileInputFormat.class); conf2.setOutputFormat(SequenceFileOutputFormat.class); conf2.setOutputValueClass(BytesWritable.class); conf2.setMapOutputKeyClass(Text.class); conf2.setMapOutputValueClass(MapWritable.class); conf2.setOutputKeyClass(Text.class); conf2.setNumMapTasks(Integer.parseInt(args[3])); conf2.setNumReduceTasks(Integer.parseInt(args[4])); FileInputFormat.addInputPath(conf2, new Path(input)); FileOutputFormat.setOutputPath(conf2, new Path(output)); } RunningJob job = JobClient.runJob(conf2); } On Wed, Jun 8, 2016 at 3:50 AM, Guttadauro, Jeff <[email protected] <mailto:[email protected]>> wrote: Hi, Siddharth. I was also a bit frustrated at what I found to be scant documentation on how to use the distributed cache in Hadoop 2. The DistributedCache class itself was deprecated in Hadoop 2, but there don’t appear to be very clear instructions on the alternative. I think it’s actually much simpler to work with files on the distributed cache in Hadoop 2. The new way is to add files to the cache (or cacheArchive) via the Job object: job.addCacheFile(/uriForYourFile/) job.addCacheArchive(/uriForYourArchive/); The cool part is that, if you set up your URI so that it has a “#/yourFileReference/” at the end, then Hadoop will set up a symbolic link named “/yourFileReference/” in your job’s working directory, which you can use to get at the file or archive. So, it’s as if the file or archive is in the working directory. That obviates the need to even work with the DistributedCache class in your Mapper or Reducer, since you can just work with the file (or path using nio) directly. Hope that helps. -Jeff *From:*Siddharth Dawar [mailto:[email protected] <mailto:[email protected]>] *Sent:* Tuesday, June 07, 2016 4:06 AM *To:* [email protected] <mailto:[email protected]> *Subject:* Accessing files in Hadoop 2.7.2 Distributed Cache Hi, I want to use the distributed cache to allow my mappers to access data in Hadoop 2.7.2. In main, I'm using the command String hdfs_path="hdfs://localhost:9000/bloomfilter"; InputStream in = new BufferedInputStream(new FileInputStream("/home/siddharth/Desktop/data/bloom_filter")); Configuration conf = new Configuration(); fs = FileSystem.get(java.net.URI.create(hdfs_path), conf); OutputStream out = fs.create(new Path(hdfs_path));//Copy file from local to HDFS IOUtils.copyBytes(in, out, 4096, true);System.out.println(hdfs_path + " copied to HDFS");DistributedCache.addCacheFile(new Path(hdfs_path).toUri(), conf2); DistributedCache.addCacheFile(new Path(hdfs_path).toUri(), conf2);The above code adds a file present on my local file system to HDFS and adds it to the distributed cache. However, in my mapper code, when I try to access the file stored in distributed cache, the Path[] P variable gets null value. d public void configure(JobConf conf) { this.conf = conf; try { Path [] p=DistributedCache.getLocalCacheFiles(conf); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); }} Even when I tried to access distributed cache from the following code in my mapper, the code returns the error that bloomfilter file doesn't exist strm = new DataInputStream(new FileInputStream("bloomfilter")); // Read into our Bloom filter. filter.readFields(strm); strm.close(); However, I read somewhere that if we add a file to distributed cache, we can access it directly from its name. Can you please help me out ?
-- The statements and opinions expressed here are my own and do not necessarily represent those of Oracle Corporation.
