[ http://issues.apache.org/jira/browse/HADOOP-288?page=all ]

Mahadev konar updated HADOOP-288:
---------------------------------

    Attachment: caching.patch
                test.zip
                test.jar

I have attached two other files to the patch which are small .jar and .zip 
files needed for the junit tests.

Caching and job.jars:

Two parts to the patch:
1) Unjarring job.jar once for a job
2) Archiving archives/files locally 

1) Unjarring of job.jar
Currently the job.jar is unjarred for each task. This patch makes the framework 
do the unjarring only once for the job. The current working directory for each 
task if the same directory where the job is unjarred once. 
So the directory structure now looks like:

tasktracker/jobcache/jod_id/workdir -- the dir where the job is unjarred once
----------------------------/job_id/task_id/task_specific_job.xml

The current working dir for each task is the workdir.

2) Archiving of files- 

i) Each job can ask for a set of archives/files to be localized. The api for 
that is 
 jobconf.setCacheArchives(comma seperated list of archives)
 or 
 jobconf.setCacheFiles(comma seperated list of files).
 The comma seperated list can be specified as absolute path to files/caches 
(eg. /user/mahadev/test.jar) if they are in the same dfs as the mapred is 
running on or else they can be specified using urls as in copyfiles ( 
dfs://hostname:port/path_to_cache )
 There are two apis provided so that users who do not want their archives to be 
unarchived by the framework or just want to localize a file should use the 
second api.

ii) These archives/files should be present in the specified DFS for localizing.
    The user makes sure that these archives are present in the DFS before he 
submits the job else an error will be thrown that these archives are not 
present in DFS.

iii) Localization happens across jobs. So each cache archive/file has a key and 
the key is the url of the cache (in case of absolute path its the absolute 
path) 

iv) Whenever a job is started, the first tasks for these jobs will localize the 
archives. 
    The archives are stored in 
mapred.local/tasktracker/archives/hostname_of_dfs/dfs path of the archive.
    So an archive called /user/mahadev/k.zip  on a dfs running on machine1 
would be unarchived in  
    dir =  mapred.local/tasktracker/archives/machine1/user/mahadev/k.zip/
    This dir contains the unarchived contents of k.zip.
    If it is just a file (/user/mahadev/test.txt and not an archive, then it is 
stored in a directory called 
    mapred.local/tasktracker/archives/machine1/user/mahadev/test.txt/test.txt
    the local directory name contains test.txt directory just to make it 
similar to the archive structure.

   if no dfs://hostname:port is specified (eg : 
setcachefiles(/user/mahadev/test.txt)), in that case it is stored in 
  
mapred.local/tasktracker/archives/hostname_of_dfs_usedby_mapred/user/mahadev/test.txt

v) The archives are localized only once and checked for each task if they are 
fresh and need to be refresed or not.
   This is done using md5 checksum of the .crc files for the archives.
   
   Steps:
    a) When a job is submitted, the md5 checksums of the required 
archives/files in dfs are calculated and are written into the 
       jobconf.
    b) when a task is executing, it matches this md5 to the md5 of the 
localized cache (stored in memory after it has been localized). If they match 
its fine to go ahead with this archive.
       If it does not match then the md5 of the .crc of the file in dfs is 
calculated. If this does not match then the archives have been changed since 
the job has been submitted, so the tasks fail with this error. If they do match 
then the cache is refreshed again. 

    c) Two jobs can use the same archives in parallel, but if the second job 
updates the same archive and tries using the updated archive, then it will fail.
    
vi) How to get the localized cache paths
   An api in the jobconf called jobconf.getLocallizedCacheArchives gives a 
comma seperated list of localized path of the archives in the same order they 
had been asked to be localized.
  Also, you can use names for archives. So you could do something like:
 setcachearchives(x=somearchive)
and in the maps/reduces do conf.getNamedCache(x) and it will return the 
localized path of the cache named x.
vii) Restrictions: 
  Currently only *.zip and *.jar are only supported for archives. 

viii) Also, caching across tasktracker going up and down is not supported. So a 
tasktracker would lose all caching information once it goes down. The caching 
information can be reconstructed when the task tracker comes up but the support 
is not available in this patch.

ix) When are the caches deleted?
    A soft limit on the cache directory is a configuration parameter in the 
hadoop-default set to 10GB. So whenever the cache directory size goes beyond 
this size the framework will try deleting local caches that are not being used.





> RFC: Efficient file caching
> ---------------------------
>
>                 Key: HADOOP-288
>                 URL: http://issues.apache.org/jira/browse/HADOOP-288
>             Project: Hadoop
>          Issue Type: Bug
>            Reporter: Michel Tourn
>         Assigned To: Mahadev konar
>         Attachments: caching.patch, test.jar, test.zip
>
>
> RFC: Efficient file caching 
> (on Hadoop Task nodes, for benefit of MapReduce Tasks)
> ------------------------------------------------------
> We will start implementing this soon. Please provide feedback and 
> improvements to this plan.
> The header "Options:" indicates places where simple choices must be made.
> Problem:
> -------
> o MapReduce tasks require access to additional out-of-band data 
> ("dictionaries")
> This out-of-band data is:
> o in addition to the map/reduce inputs.
> o large (1GB+)
> o broadcast (same data is required on all the Task nodes)
> o changes "infrequently", in particular:
> oo it is always constant for all the Tasks in a Job. 
> oo it is often constant for a month at a time 
> oo it may be shared across team members
> o sometimes used by pure-Java MapReduce programs
> o sometimes used by non-Java MapReduce programs (using Hadoop-Streaming)
> o (future) used by programs that use HDFS and Task-trackers but not MapReduce.
> Existing Solutions to the problem:
> ---------------------------------
> These solutions are not good enough. The present proposal is to do Sol 1 with 
> caching.
> Sol 1: Pure Hadoop: package the out-of-band data in the MapReduce Job jar 
> file.
> Sol 2: Non  Hadoop: for each task node run rsync from single source for data.
> Sol 3: Non  Hadoop: use BitTorrent, etc.
> Sol.1 is correct but slow for many reasons:
>  The Job submitter must recreate a large jar(tar) file for every Job.
>   (The jar contains both changing programs and stable dictionaries)
>  The large Jar file must be propagated from the client to HDFS with 
>  a large replication factor. 
>  At the beginning of every Task, the Task tracker gets the job jar from HDFS 
>  and unjars it in the working directory. This can dominate task execution 
> time.
>  
> Sol.2 has nice properties but also some problems.
>  It does not scale well with large clusters (many concurrent rsync read 
> requests i.e. single-source broadcast)
>  It assumes that Hadoop users can upload data using rsync to the cluster 
> nodes. As a policy, this is not allowed.
>  It requires rsync.
>  
> Sol.3 alleviates the rsync scalability problems but 
>       It is a dependency on an external system. 
>       We want something simpler and more tightly integrated with Hadoop.
>       
> Staging (uploading) out-of-band data:
> ------------------------------------
> The out-of-band data will often originate on the local filesystem of a user 
> machine 
>  (i.e. a MapReduce job submitter)
> Nevertheless it makes sense to use HDFS to store the original out-of-band 
> data because:
> o HDFS has (wide) replication. This enables scalable broadcast later.
> o HDFS is an available channel to move data from clients to all task machines.
> o HDFS is convenient as a shared location among Hadoop team members.
> Accessing (downloading) out-of-band data:
> ----------------------------------------
> The non-Java MapReduce programs do not have or want[1] APIs for HDFS.
> Instead these programs just want to access out-of-band data as 
>  local files at predefined paths.
> ([1] Existing programs should be reusable with no changes. 
>  This is often possible bec. communication is over stdin/stdout.)
> Job's jar file as a special case:
> --------------------------------
> One use case is to allow users to make the job jar itself cachable.
> This is only useful in cases where NOTHING changes when a job is resubmitted
>  (no MapRed code changes and no changes in shipped data)
> This situation might occur with an 'extractor' job (gets data from an 
> external source: like Nutch crawler)
> Currently the Hadoop mapred-jar mechanism works in this way:
>  the job jar data is unjarred in the "working directory" of the Task 
>  the jar contains both MapRed java code (added to classpath)
> Cache synchronization:
> ---------------------
> The efficient implementation of the out-of-band data distribution
> is mostly a cache synchronization problem.
> A list of the various aspects where choices must be made follows.
> Cache key:
> ---------
> How do you test that the cached copy is out-of-date?
> Options: 
> 1. the archive/file timestamp 
> 2. the MD5 of the archive/file content
> Comparing source and destination Timestamps is problematic bec. it assumes 
> synchronized clocks.
> Also there is no last-modif metadata in HDFS (for good reasons, like 
> scalability of metadata ops)
> Timestamps stored with the source ('last-propagate-time') do 
>  not require synchronized clocks, only locally monotonic time. 
> (and the worse which can happen at daylight-savings switch is a missed update 
> or an extra-update)
> The cache code could store a copy of the local timestamp 
> in the same way that it caches the value of the content hash along with the 
> source data.
>  
> Cachable unit:
> -------------
> Options: individual files or archives or both.
> Note:
> At the API level, directories will be processed recursively 
> (and the local FS directories will parallel HDFS directories)
> So bulk operations are always possible using directories.
> The question here is whether to handle archives as an additional bulk 
> mechanism.
> Archives are special because:
> o unarchiving occurs transparently as part of the cache sync
> o The cache key is computed on the archive and preserved although 
>   the archive itself is not preserved.
> Supported archive format will be: tar (maybe tgz or compressed jar)
> Archive detection test: by filename extension ".tar" or ".jar"
> Suppose we don't handle archives as special files:
> Pros:
>  o less code, no discussion about which archive formats are supported
>  o fine for large dictionary files. And when files are not large, user may as 
> well
>    put them in the Job jar as usual.
>  o user code could always check and unarchive specific cached files
>    (as a side-effect of MapRed task initialization)
> Cons:
>  o handling small files may be inefficient 
>   (multiple HDFS operations, multiple hash computation, 
>    one 'metadata' hash file along with each small file)
>  o It will not be possible to handle the Job's jar file as a special case of 
> caching 
> Cache isolation: 
> ---------------
> In some cases it may be a problem if the cached HDFS files are updated while 
> a Job is in progress:
> The file may become unavailable for a short period of time and some tasks 
> fail.
> The file may change (atomically) and different tasks use a different version.
> This isolation problem is not addressed in this proposal.
> Standard solutions to the isolation problem are:
> o Assume that Jobs and interfering cache updates won't occur concurrently.
> o Put a version number in the HDFS file paths and refer to a hard-coded 
> version in the Job code.
> o Before running the MapRed job, run a non-distributed application that tests
>   what is the latest available version of the out-of-band data. 
>   Then make this version available to the MapRed job.
>   Two ways to do this. 
>   o either set a job property just-in-time:
>     addCachePathPair("/mydata/v1234/", "localcache/mydata_latest"); 
>     (see Job Configuration for meaning of this)
>   o or publish the decision as an HDFS file containing the version.
>     then rely on user code to read the version, and manually populate the 
> cache:
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     (see MapReduce API for meaning of this)
> Cache synchronization stages:
> ----------------------------
> There are two stages: Client-to-HDFS and HDFS-to-TaskTracker
> o Client-to-HDFS stage.
> Options: A simple option is to not do anything here, i.e. rely on the user.
> This is a reasonable option given previous remarks on the role of HDFS:
>  HDFS is a staging/publishing area and a natural shared location.
> In particular this means that the system need not track 
> where the client files come from.
> o HDFS-to-TaskTracker:
> Client-to-HDFS synchronization (if done at all) should happen before this.
> Then HDFS-to-TaskTracker synchronization must happen right before 
> the data is needed on a node.
> MapReduce cache API:
> -------------------
> Options:
> 1. No change in MapReduce framework code:
> require the user to put this logic in map() (or reduce) function:
>  in MyMapper constructor (or in map() on first record) user is asked to add:
>  
>     Cache.syncCache("/hdfs/path/fileordir", "relative/local/fileordir");
>     Cache.syncCache("..."); //etc.
>   
> -----
> 2. Put this logic in MapReduce framework and use Job properties to
>    communicate the list of pairs (hdfs path; local path)
>  
> Directories are processed recursively.
> If archives are treated specially then they are unarchived on destination.
>  
> MapReduce Job Configuration:
> ---------------------------
> Options:
> with No change in MapReduce framework code (see above)
>  no special Job configuration: 
>    it is up to the MapRed writer to configure and run the cache operations.
> ---
> with Logic in MapReduce framework (see above)
>  some simple Job configuration
> JobConf.addCachePathPair(String, String)
> JobConf.addCachePathPair("/hdfs/path/fileordir", "relative/local/fileordir");

-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: 
http://issues.apache.org/jira/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to