[
https://issues.apache.org/jira/browse/MAPREDUCE-5969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14095019#comment-14095019
]
zhihai xu commented on MAPREDUCE-5969:
--------------------------------------
[~kasha] - I checked the MR2(trunk/branch-2) source code, the implementation is
totally different from MR1(branch-1).
The MR2(trunk/branch-2) use LocalizedResource to manage the cache size. The
LocalizedResource is created in LocalResourcesTrackerImpl after receive
ContainerLocalizationRequestEvent(ContainerInitEvent) to request LocalResource
from ContainerLaunchContext (container.launchContext). ContainerLaunchContext
is created in TaskAttemptImpl.java(createContainerLaunchContext) and
YARNRunner.java(createApplicationSubmissionContext).
LocalResource in ContainerLaunchContext is created by {code}
MRApps.setupDistributedCache(conf, localResources) {code}.
So MR2(trunk/branch-2) doesn't have this issue.
The following is the size calculation after received ResourceLocalizedEvent in
LocalizedResource.java:
{code}
private static class FetchSuccessTransition extends ResourceTransition {
@Override
public void transition(LocalizedResource rsrc, ResourceEvent event) {
ResourceLocalizedEvent locEvent = (ResourceLocalizedEvent) event;
rsrc.localPath =
Path.getPathWithoutSchemeAndAuthority(locEvent.getLocation());
rsrc.size = locEvent.getSize();
for (ContainerId container : rsrc.ref) {
rsrc.dispatcher.getEventHandler().handle(
new ContainerResourceLocalizedEvent(
container, rsrc.rsrc, rsrc.localPath));
}
}
}
{code}
the size in ResourceLocalizedEvent is in the following
code(ResourceLocalizationService.java):
For public resource:
{code}
publicRsrc.handle(new ResourceLocalizedEvent(key, local, FileUtil
.getDU(new File(local.toUri()))));
{code}
For private resource:
{code}
getLocalResourcesTracker(req.getVisibility(), user, applicationId)
.handle(
new ResourceLocalizedEvent(req, ConverterUtils
.getPathFromYarnURL(stat.getLocalPath()),
stat.getLocalSize()));
{code}
The cache cleanup is at the following code:
{code}
// from ResourceLocalizationService.java
private void handleCacheCleanup(LocalizationEvent event) {
ResourceRetentionSet retain =
new ResourceRetentionSet(delService, cacheTargetSize);
retain.addResources(publicRsrc);
LOG.debug("Resource cleanup (public) " + retain);
for (LocalResourcesTracker t : privateRsrc.values()) {
retain.addResources(t);
LOG.debug("Resource cleanup " + t.getUser() + ":" + retain);
}
//TODO Check if appRsrcs should also be added to the retention set.
}
// from ResourceRetentionSet.java
public void addResources(LocalResourcesTracker newTracker) {
for (LocalizedResource resource : newTracker) {
currentSize += resource.getSize();
if (resource.getRefCount() > 0) {
// always retain resources in use
continue;
}
retain.put(resource, newTracker);
}
for (Iterator<Map.Entry<LocalizedResource,LocalResourcesTracker>> i =
retain.entrySet().iterator();
currentSize - delSize > targetSize && i.hasNext();) {
Map.Entry<LocalizedResource,LocalResourcesTracker> rsrc = i.next();
LocalizedResource resource = rsrc.getKey();
LocalResourcesTracker tracker = rsrc.getValue();
if (tracker.remove(resource, delService)) {
delSize += resource.getSize();
i.remove();
}
}
}
{code}
And It should be only one copy of LocalizedResource for each
LocalResourceRequest is saved in publicRsrc or privateRsrc.
So this issue should only happen for MR1(branch-1).
> Private non-Archive Files' size add twice in Distributed Cache directory size
> calculation.
> ------------------------------------------------------------------------------------------
>
> Key: MAPREDUCE-5969
> URL: https://issues.apache.org/jira/browse/MAPREDUCE-5969
> Project: Hadoop Map/Reduce
> Issue Type: Bug
> Components: mrv1
> Reporter: zhihai xu
> Assignee: zhihai xu
> Attachments: MAPREDUCE-5969.branch1.patch
>
>
> Private non-Archive Files' size add twice in Distributed Cache directory size
> calculation. Private non-Archive Files list is passed in by "-files" command
> line option. The Distributed Cache directory size is used to check whether
> the total cache files size exceed the cache size limitation, the default
> cache size limitation is 10G.
> I add log in addCacheInfoUpdate and setSize in
> TrackerDistributedCacheManager.java.
> I use the following command to test:
> hadoop jar ./wordcount.jar org.apache.hadoop.examples.WordCount -files
> hdfs://host:8022/tmp/zxu/WordCount.java,hdfs://host:8022/tmp/zxu/wordcount.jar
> /tmp/zxu/test_in/ /tmp/zxu/test_out
> to add two files into distributed cache:WordCount.java and wordcount.jar.
> WordCount.java file size is 2395 byes and wordcount.jar file size is 3865
> bytes. The total should be 6260.
> The log show these files size added twice:
> add one time before download to local node and add second time after download
> to local node, so total file number becomes 4 instead of 2:
> addCacheInfoUpdate size: 6260 num: 2 baseDir: /mapred/local
> addCacheInfoUpdate size: 8683 num: 3 baseDir: /mapred/local
> addCacheInfoUpdate size: 12588 num: 4 baseDir: /mapred/local
> In the code, for Private non-Archive File, the first time we add file size is
> at
> getLocalCache:
> {code}
> if (!isArchive) {
> //for private archives, the lengths come over RPC from the
> //JobLocalizer since the JobLocalizer is the one who expands
> //archives and gets the total length
> lcacheStatus.size = fileStatus.getLen();
> LOG.info("getLocalCache:" + localizedPath + " size = "
> + lcacheStatus.size);
> // Increase the size and sub directory count of the cache
> // from baseDirSize and baseDirNumberSubDir.
> baseDirManager.addCacheInfoUpdate(lcacheStatus);
> }
> {code}
> The second time we add file size is at
> setSize:
> {code}
> synchronized (status) {
> status.size = size;
> baseDirManager.addCacheInfoUpdate(status);
> }
> {code}
> The fix is not to add the file size for for Private non-Archive File after
> download(downloadCacheObject).
--
This message was sent by Atlassian JIRA
(v6.2#6252)