Hi

My code creates a new job named "job 1" which writes something to distributed 
cache (say a text file) and the job gets completed.
Just to manage expectations, you add files to the distributed cache_in the job 
driver_, and the framework makes them available to maps and reducers.

   Adding a file to the distributed cache in a mapper and expecting to have it 
available in other mappers/reducers or later jobs is not a supported use case.

If you want a mapper/reducer to send info back to the job driver, then use 
counters or write regular files (in the job output dir).

If you want to add a file to the distributed cache and have multiple jobs use 
it, then:

   Note that DistributedCache.add*(Path, Conf) append the file name to internal 
conf properties. (then when you submit the job, the framework will read those 
properties to find out what files need to be distributed, added to the 
classpath, etc.).
   So DistributedCache.addCacheFile(*f*, *conf1*) will write something into 
*conf1*. When you launch *job1* using *conf1*, *f* gets distributed. When you 
launch *job2* using *conf2*, then *job2* may not distribute file *f*, depending 
on how you build *conf2* (independent of *conf1*, or a copy/clone of *conf1*).

hth

Gabriel Balan

P.S. Btw, you don't have to copy the local file to hdfs using "IOUtils.copyBytes(in, 
out, 4096, true);"

Try  FileSystem.copyFromLocalFile 
<https://hadoop.apache.org/docs/r2.6.1/api/src-html/org/apache/hadoop/fs/FileSystem.html#line.1885>
 or "hadoop -jar foo.jar MyClass -file /home/siddharth/Desktop/data/bloom_filter"


On 6/13/2016 5:14 AM, Siddharth Dawar wrote:
Hi Jeff,

Thanks for your prompt reply. Actually my problem is as follows:

My code creates a new job named "job 1" which writes something to distributed 
cache (say a text file) and the job gets completed.

Now, I want to create some n number of jobs in while loop below, which reads the text file written 
by "job 1" from the distributed cache. So my question is, "*How to share content 
among multiple jobs using distributed cache*" ?

*Another part of the problem *is that I don't know how to get instance of 
running job from
JobClient.runJob(conf2);
*so that I can use job.addcachefiles(..) command/*
**


while (true)
{
JobConf  conf2   =  new  JobConf(getConf(),graphMining.class);

conf2.setJobName("sid");
conf2.setMapperClass(mapperMiner.class);
conf2.setReducerClass(reducerMiner.class);

conf2.setInputFormat(SequenceFileInputFormat.class);
conf2.setOutputFormat(SequenceFileOutputFormat.class);
conf2.setOutputValueClass(BytesWritable.class);

conf2.setMapOutputKeyClass(Text.class);
conf2.setMapOutputValueClass(MapWritable.class);
conf2.setOutputKeyClass(Text.class);

conf2.setNumMapTasks(Integer.parseInt(args[3]));
conf2.setNumReduceTasks(Integer.parseInt(args[4]));
FileInputFormat.addInputPath(conf2,  new  Path(input));
FileOutputFormat.setOutputPath(conf2,  new  Path(output));  }
RunningJob  job  =  JobClient.runJob(conf2); }



On Wed, Jun 8, 2016 at 3:50 AM, Guttadauro, Jeff <jeff.guttada...@here.com 
<mailto:jeff.guttada...@here.com>> wrote:

    Hi, Siddharth.

    I was also a bit frustrated at what I found to be scant documentation on 
how to use the distributed cache in Hadoop 2.  The DistributedCache class 
itself was deprecated in Hadoop 2, but there don’t appear to be very clear 
instructions on the alternative.  I think it’s actually much simpler to work 
with files on the distributed cache in Hadoop 2.  The new way is to add files 
to the cache (or cacheArchive) via the Job object:

    job.addCacheFile(/uriForYourFile/)

    job.addCacheArchive(/uriForYourArchive/);

    The cool part is that, if you set up your URI so that it has a 
“#/yourFileReference/” at the end, then Hadoop will set up a symbolic link 
named “/yourFileReference/” in your job’s working directory, which you can use 
to get at the file or archive.  So, it’s as if the file or archive is in the 
working directory.  That obviates the need to even work with the 
DistributedCache class in your Mapper or Reducer, since you can just work with 
the file (or path using nio) directly.

    Hope that helps.

    -Jeff

    *From:*Siddharth Dawar [mailto:siddharthdawa...@gmail.com 
<mailto:siddharthdawa...@gmail.com>]
    *Sent:* Tuesday, June 07, 2016 4:06 AM
    *To:* user@hadoop.apache.org <mailto:user@hadoop.apache.org>
    *Subject:* Accessing files in Hadoop 2.7.2 Distributed Cache

    Hi,

    I want to use the distributed cache to allow my mappers to access data in 
Hadoop 2.7.2. In main, I'm using the command

    String hdfs_path="hdfs://localhost:9000/bloomfilter";

    InputStream in = new BufferedInputStream(new 
FileInputStream("/home/siddharth/Desktop/data/bloom_filter"));

    Configuration conf = new Configuration();

    fs = FileSystem.get(java.net.URI.create(hdfs_path), conf);

    OutputStream out = fs.create(new Path(hdfs_path));

    //Copy file from local to HDFS

    IOUtils.copyBytes(in, out, 4096, true);

    System.out.println(hdfs_path + " copied to 
HDFS");DistributedCache.addCacheFile(new Path(hdfs_path).toUri(), conf2);

    DistributedCache.addCacheFile(new Path(hdfs_path).toUri(), conf2);

    The above code adds a file present on my local file system to HDFS and adds 
it to the distributed cache.

    However, in my mapper code, when I try to access the file stored in 
distributed cache, the Path[] P variable gets null value. d

    public void configure(JobConf conf)

                            {

                                    this.conf = conf;

                                    try {

                                           Path [] 
p=DistributedCache.getLocalCacheFiles(conf);

                                    } catch (IOException e) {

                                           // TODO Auto-generated catch block

                                           e.printStackTrace();

                                    }

                            }

    Even when I tried to access distributed cache from the following code

    in my mapper, the code returns the error that bloomfilter file doesn't exist

    strm = new DataInputStream(new FileInputStream("bloomfilter"));

    // Read into our Bloom filter.

    filter.readFields(strm);

    strm.close();

    However, I read somewhere that if we add a file to distributed cache, we 
can access it

    directly from its name.

    Can you please help me out ?



--
The statements and opinions expressed here are my own and do not necessarily 
represent those of Oracle Corporation.

Reply via email to