[ 
https://issues.apache.org/jira/browse/MAPREDUCE-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14095019#comment-14095019
 ] 

zhihai xu commented on MAPREDUCE-5969:
--------------------------------------

[~kasha] - I checked the MR2(trunk/branch-2) source code, the implementation is 
 totally different from MR1(branch-1).
The MR2(trunk/branch-2) use LocalizedResource to manage the cache size. The 
LocalizedResource is created in LocalResourcesTrackerImpl after receive 
ContainerLocalizationRequestEvent(ContainerInitEvent) to request LocalResource 
from ContainerLaunchContext (container.launchContext). ContainerLaunchContext 
is created in TaskAttemptImpl.java(createContainerLaunchContext) and 
YARNRunner.java(createApplicationSubmissionContext).
LocalResource in  ContainerLaunchContext is created by {code} 
MRApps.setupDistributedCache(conf, localResources) {code}.
So MR2(trunk/branch-2) doesn't have this issue.

The following is the size calculation after received ResourceLocalizedEvent in 
LocalizedResource.java:
{code}
  private static class FetchSuccessTransition extends ResourceTransition {
    @Override
    public void transition(LocalizedResource rsrc, ResourceEvent event) {
      ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event;
      rsrc.localPath =
          Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation());
      rsrc.size = locEvent.getSize();
      for (ContainerId container : rsrc.ref) {
        rsrc.dispatcher.getEventHandler().handle(
            new ContainerResourceLocalizedEvent(
              container, rsrc.rsrc, rsrc.localPath));
      }
    }
  }
{code}

the size in ResourceLocalizedEvent is in the following 
code(ResourceLocalizationService.java):
For public resource:
{code}
              publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil
                .getDU(new File(local.toUri()))));
{code}
For private resource:
{code}
            getLocalResourcesTracker(req.getVisibility(), user, applicationId)
              .handle(
                new ResourceLocalizedEvent(req, ConverterUtils
                  .getPathFromYarnURL(stat.getLocalPath()), 
stat.getLocalSize()));
{code}

The cache cleanup is at the following code:  
{code}
// from ResourceLocalizationService.java
private void handleCacheCleanup(LocalizationEvent event) {
    ResourceRetentionSet retain =
      new ResourceRetentionSet(delService, cacheTargetSize);
    retain.addResources(publicRsrc);
    LOG.debug("Resource cleanup (public) " + retain);
    for (LocalResourcesTracker t : privateRsrc.values()) {
      retain.addResources(t);
      LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
    }
    //TODO Check if appRsrcs should also be added to the retention set.
  }

  // from ResourceRetentionSet.java
  public void addResources(LocalResourcesTracker newTracker) {
    for (LocalizedResource resource : newTracker) {
      currentSize += resource.getSize();
      if (resource.getRefCount() > 0) {
        // always retain resources in use
        continue;
      }
      retain.put(resource, newTracker);
    }
    for (Iterator<Map.Entry<LocalizedResource,LocalResourcesTracker>> i =
           retain.entrySet().iterator();
         currentSize - delSize > targetSize && i.hasNext();) {
      Map.Entry<LocalizedResource,LocalResourcesTracker> rsrc = i.next();
      LocalizedResource resource = rsrc.getKey();
      LocalResourcesTracker tracker = rsrc.getValue();
      if (tracker.remove(resource, delService)) {
        delSize += resource.getSize();
        i.remove();
      }
    }
  }
{code}

And It should be only one copy of LocalizedResource for each 
LocalResourceRequest is saved in publicRsrc or privateRsrc.
So this issue should only happen for MR1(branch-1).

> Private non-Archive Files' size add twice in Distributed Cache directory size 
> calculation.
> ------------------------------------------------------------------------------------------
>
>                 Key: MAPREDUCE-5969
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5969
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: mrv1
>            Reporter: zhihai xu
>            Assignee: zhihai xu
>         Attachments: MAPREDUCE-5969.branch1.patch
>
>
> Private non-Archive Files' size add twice in Distributed Cache directory size 
> calculation. Private non-Archive Files list is passed in by "-files" command 
> line option. The Distributed Cache directory size is used to check whether 
> the total cache files size exceed the cache size limitation,  the default 
> cache size limitation is 10G.
> I add log in addCacheInfoUpdate and setSize in 
> TrackerDistributedCacheManager.java.
> I use the following command to test:
> hadoop jar ./wordcount.jar org.apache.hadoop.examples.WordCount -files 
> hdfs://host:8022/tmp/zxu/WordCount.java,hdfs://host:8022/tmp/zxu/wordcount.jar
>  /tmp/zxu/test_in/ /tmp/zxu/test_out
> to add two files into distributed cache:WordCount.java and wordcount.jar.
> WordCount.java file size is 2395 byes and wordcount.jar file size is 3865 
> bytes. The total should be 6260.
> The log show these files size added twice:
> add one time before download to local node and add second time after download 
> to local node, so total file number becomes 4 instead of 2:
> addCacheInfoUpdate size: 6260 num: 2 baseDir: /mapred/local
> addCacheInfoUpdate size: 8683 num: 3 baseDir: /mapred/local
> addCacheInfoUpdate size: 12588 num: 4 baseDir: /mapred/local
> In the code, for Private non-Archive File, the first time we add file size is 
> at 
> getLocalCache:
> {code}
>             if (!isArchive) {
>               //for private archives, the lengths come over RPC from the 
>               //JobLocalizer since the JobLocalizer is the one who expands
>               //archives and gets the total length
>               lcacheStatus.size = fileStatus.getLen();
>               LOG.info("getLocalCache:" + localizedPath + " size = "
>                   + lcacheStatus.size);
>               // Increase the size and sub directory count of the cache
>               // from baseDirSize and baseDirNumberSubDir.
>               baseDirManager.addCacheInfoUpdate(lcacheStatus);
>             }
> {code}
> The second time we add file size is at 
> setSize:
> {code}
>       synchronized (status) {
>         status.size = size;
>         baseDirManager.addCacheInfoUpdate(status);
>       }
> {code}
> The fix is not to add the file size for for Private non-Archive File after 
> download(downloadCacheObject).



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to