Author: kasha Date: Tue Aug 5 06:31:39 2014 New Revision: 1615868 URL: http://svn.apache.org/r1615868 Log: MAPREDUCE-5968. Work directory is not deleted when downloadCacheObject throws IOException. (Zhihai Xu va kasha)
Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1615868&r1=1615867&r2=1615868&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Tue Aug 5 06:31:39 2014 @@ -233,6 +233,9 @@ Release 1.3.0 - unreleased MAPREDUCE-5966. MR1 FairScheduler use of custom weight adjuster is not thread safe for comparisons. (Anubhav Dhoot via kasha) + MAPREDUCE-5968. Work directory is not deleted when downloadCacheObject + throws IOException. (Zhihai Xu va kasha) + Release 1.2.2 - unreleased INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1615868&r1=1615867&r2=1615868&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java Tue Aug 5 06:31:39 2014 @@ -69,6 +69,7 @@ import org.apache.hadoop.mapreduce.secur * interface.</b> */ public class TrackerDistributedCacheManager { + public static final String WORK_DIR_FIX = "-work-"; // cacheID to cacheStatus mapping private LinkedHashMap<String, CacheStatus> cachedArchives = new LinkedHashMap<String, CacheStatus>(); @@ -372,7 +373,7 @@ public class TrackerDistributedCacheMana } private static Path createRandomPath(Path base) throws IOException { - return new Path(base.toString() + "-work-" + random.nextLong()); + return new Path(base.toString() + WORK_DIR_FIX + random.nextLong()); } /** @@ -427,39 +428,44 @@ public class TrackerDistributedCacheMana if (!localFs.mkdirs(workDir, permission)) { throw new IOException("Mkdirs failed to create directory " + workDir); } - Path workFile = new Path(workDir, parchive.getName()); - sourceFs.copyToLocalFile(sourcePath, workFile); - localFs.setPermission(workFile, permission); - if (isArchive) { - String tmpArchive = workFile.getName().toLowerCase(); - File srcFile = new File(workFile.toString()); - File destDir = new File(workDir.toString()); - LOG.info(String.format("Extracting %s to %s", - srcFile.toString(), destDir.toString())); - if (tmpArchive.endsWith(".jar")) { - RunJar.unJar(srcFile, destDir); - } else if (tmpArchive.endsWith(".zip")) { - FileUtil.unZip(srcFile, destDir); - } else if (isTarFile(tmpArchive)) { - FileUtil.unTar(srcFile, destDir); - } else { - LOG.warn(String.format( - "Cache file %s specified as archive, but not valid extension.", - srcFile.toString())); - // else will not do anyhting - // and copy the file into the dir as it is - } - FileUtil.chmod(destDir.toString(), "ugo+rx", true); - } - // promote the output to the final location - if (!localFs.rename(workDir, finalDir)) { - localFs.delete(workDir, true); - if (!localFs.exists(finalDir)) { - throw new IOException("Failed to promote distributed cache object " + - workDir + " to " + finalDir); + try { + Path workFile = new Path(workDir, parchive.getName()); + sourceFs.copyToLocalFile(sourcePath, workFile); + localFs.setPermission(workFile, permission); + if (isArchive) { + String tmpArchive = workFile.getName().toLowerCase(); + File srcFile = new File(workFile.toString()); + File destDir = new File(workDir.toString()); + LOG.info(String.format("Extracting %s to %s", + srcFile.toString(), destDir.toString())); + if (tmpArchive.endsWith(".jar")) { + RunJar.unJar(srcFile, destDir); + } else if (tmpArchive.endsWith(".zip")) { + FileUtil.unZip(srcFile, destDir); + } else if (isTarFile(tmpArchive)) { + FileUtil.unTar(srcFile, destDir); + } else { + LOG.warn(String.format( + "Cache file %s specified as archive, but not valid extension.", + srcFile.toString())); + // else will not do anyhting + // and copy the file into the dir as it is + } + FileUtil.chmod(destDir.toString(), "ugo+rx", true); + } + // promote the output to the final location + if (!localFs.rename(workDir, finalDir)) { + if (!localFs.exists(finalDir)) { + throw new IOException("Failed to promote distributed cache object " + + workDir + " to " + finalDir); + } + // someone else promoted first + return 0; + } + } finally { + if (localFs.exists(workDir)) { + localFs.delete(workDir, true); } - // someone else promoted first - return 0; } LOG.info(String.format("Cached %s as %s", Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1615868&r1=1615867&r2=1615868&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Tue Aug 5 06:31:39 2014 @@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.permission.FsAction; @@ -57,6 +58,7 @@ import org.apache.hadoop.fs.permission.F import org.apache.hadoop.filecache.TaskDistributedCacheManager; import org.apache.hadoop.filecache.TrackerDistributedCacheManager; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ReflectionUtils; public class TestTrackerDistributedCacheManager extends TestCase { @@ -1239,4 +1241,48 @@ public class TestTrackerDistributedCache assertNull(taskDistributedCacheManager); } + public void testRemoveWorkDirInDownloadCacheObject() throws Exception { + if (!canRun()) { + return; + } + // This is to test the workDir is removed, when IOException happened + // use TestFileSystem to generate an IOException, + // then verify whether the workDir is deleted. + FsPermission filePerm = FsPermission.createImmutable((short)0755); + Configuration myConf = new Configuration(conf); + myConf.setClass("fs.test.impl", TestFileSystem.class, FileSystem.class); + Path testDir = new Path(TEST_ROOT_DIR, "testDir"); + Path destination = new Path(testDir.toString(), + "downloadCacheObjectTestDir"); + Path fileToCache = new Path("test:///" + destination.toUri().getPath()); + try { + TrackerDistributedCacheManager.downloadCacheObject(myConf, + fileToCache.toUri(), destination, 0L, false, filePerm); + fail("did not throw an exception"); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains( + "Force an IOException for test", e); + } + File TEST_ROOT = new File(TEST_ROOT_DIR); + String workDir = destination.getParent().toString() + + TrackerDistributedCacheManager.WORK_DIR_FIX; + for (File f : TEST_ROOT.listFiles()) { + assertFalse(f.toString().contains(workDir)); + } + } + + static class TestFileSystem extends LocalFileSystem { + public TestFileSystem() { + super(); + } + @Override + public void copyToLocalFile(boolean delSrc, Path src, Path dst) + throws IOException { + throw new IOException("Force an IOException for test"); + } + @Override + public FileStatus getFileStatus(Path f) throws IOException { + return new FileStatus(); + } + } }