Hi,

We read something similar but donot use FileSystem api.

Path[] cacheFiles = DistributedCache.getLocalCacheFiles(jobConf);

    if (cacheFiles != null)
    {
      for (Path cacheFile : cacheFiles)
      {
          FileInputStream fis = new FileInputStream(cacheFile.toString());
          //Logic to process on top of the input stream
        }
      }

Thanks
Sudhan S

On Tue, Nov 8, 2011 at 2:30 PM, Uma Maheswara Rao G 72686 <
mahesw...@huawei.com> wrote:

> ----- Original Message -----
> From: Arko Provo Mukherjee <arkoprovomukher...@gmail.com>
> Date: Tuesday, November 8, 2011 1:26 pm
> Subject: Issues with Distributed Caching
> To: mapreduce-user@hadoop.apache.org
>
> > Hello,
> >
> > I am having the following problem with Distributed Caching.
> >
> > *In the driver class, I am doing the following:
> > (/home/arko/MyProgram/datais a directory created as an output of
> > another map-reduce)*
> >
> > *FileSystem fs = FileSystem.get(jobconf_seed);
> >
> > String init_path = "/home/arko/MyProgram/data";
> >
> > System.out.println("Caching files in " + init_path);
> >
> > FileStatus[] init_files = fs.listStatus(new Path(init_path));
> >
> > for ( int i = 0; i < init_files.length; i++ )  {
> >
> >   Path p = init_files[i].getPath();
> >   DistributedCache.addCacheFile ( p.toUri(), jobconf );
> > }*
> >
> I am not clearly sure about this. But looking at this,  if you do
> addCacheFile, it will set the files to mapred.cache.files.
> I think you are getting localCacheFiles ( it will try to get the value
> with ,apred.cache.localFiles) . Looks that value is coming as null. Please
> check whether you are setting that values correctly or not.
> > This is executing fine.
> >
> > *I have the following code in the configure method of the Map class:*
> >
> > *public void configure(JobConf job)  {
> >
> >   try  {
> >
> >       fs = FileSystem.getLocal(new Configuration());
> >       Path [] localFiles = DistributedCache.getLocalCacheFiles(job);
> >
> >       for ( Path p:localFiles )  {
> >
> >           BufferedReader file_reader = new BufferedReader(new
> > InputStreamReader(fs.open(p)));
> >
> >           String line = file_reader.readLine();
> >
> >           while ( line != null )  {
> >
> >               // Do something with the data
> >
> >               line = C0_file.readLine();
> >
> >           }
> >
> >       }
> >
> >   } catch (java.io.IOException e)  {
> >
> > System.err.println("ERROR!! Cannot open filesystem from Map for
> > reading!!");e.printStackTrace();
> >   }
> > }*
> >
> > This is giving me a java.lang.NullPointerException:
> > 11/11/08 01:36:17 INFO mapred.JobClient: Task Id :
> > attempt_201106271322_12775_m_000003_1, Status : FAILED
> > java.lang.NullPointerException
> > at Map.configure(Map.java:57)
> > at
> > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:58)at
> >
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:83)
> > at org.apache.hadoop.mapred.MapRunner.configure(MapRunner.java:34)
> > at
> > org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:58)at
> >
> org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:83)
> > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:328)
> > at org.apache.hadoop.mapred.Child.main(Child.java:155)
> >
> >
> > I am doing it in a wrong way? I followed a lot of links and this
> > seems to
> > be the way to go about it. Please help!
> >
> > Thanks a lot in advance!
> >
> > Warm regards
> > Arko
> >
>

Reply via email to