Author: kasha
Date: Tue Aug  5 06:31:39 2014
New Revision: 1615868

URL: http://svn.apache.org/r1615868
Log:
MAPREDUCE-5968. Work directory is not deleted when downloadCacheObject throws 
IOException. (Zhihai Xu va kasha)

Modified:
    hadoop/common/branches/branch-1/CHANGES.txt
    
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
    
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java

Modified: hadoop/common/branches/branch-1/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1615868&r1=1615867&r2=1615868&view=diff
==============================================================================
--- hadoop/common/branches/branch-1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1/CHANGES.txt Tue Aug  5 06:31:39 2014
@@ -233,6 +233,9 @@ Release 1.3.0 - unreleased
     MAPREDUCE-5966. MR1 FairScheduler use of custom weight adjuster is not 
     thread safe for comparisons. (Anubhav Dhoot via kasha)
 
+    MAPREDUCE-5968. Work directory is not deleted when downloadCacheObject 
+    throws IOException. (Zhihai Xu va kasha)
+
 Release 1.2.2 - unreleased
 
   INCOMPATIBLE CHANGES

Modified: 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java?rev=1615868&r1=1615867&r2=1615868&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/filecache/TrackerDistributedCacheManager.java
 Tue Aug  5 06:31:39 2014
@@ -69,6 +69,7 @@ import org.apache.hadoop.mapreduce.secur
  * interface.</b>
  */
 public class TrackerDistributedCacheManager {
+  public static final String WORK_DIR_FIX = "-work-";
   // cacheID to cacheStatus mapping
   private LinkedHashMap<String, CacheStatus> cachedArchives = 
     new LinkedHashMap<String, CacheStatus>();
@@ -372,7 +373,7 @@ public class TrackerDistributedCacheMana
   }
 
   private static Path createRandomPath(Path base) throws IOException {
-    return new Path(base.toString() + "-work-" + random.nextLong());
+    return new Path(base.toString() + WORK_DIR_FIX + random.nextLong());
   }
 
   /**
@@ -427,39 +428,44 @@ public class TrackerDistributedCacheMana
     if (!localFs.mkdirs(workDir, permission)) {
       throw new IOException("Mkdirs failed to create directory " + workDir);
     }
-    Path workFile = new Path(workDir, parchive.getName());
-    sourceFs.copyToLocalFile(sourcePath, workFile);
-    localFs.setPermission(workFile, permission);
-    if (isArchive) {
-      String tmpArchive = workFile.getName().toLowerCase();
-      File srcFile = new File(workFile.toString());
-      File destDir = new File(workDir.toString());
-      LOG.info(String.format("Extracting %s to %s",
-               srcFile.toString(), destDir.toString()));
-      if (tmpArchive.endsWith(".jar")) {
-        RunJar.unJar(srcFile, destDir);
-      } else if (tmpArchive.endsWith(".zip")) {
-        FileUtil.unZip(srcFile, destDir);
-      } else if (isTarFile(tmpArchive)) {
-        FileUtil.unTar(srcFile, destDir);
-      } else {
-        LOG.warn(String.format(
-            "Cache file %s specified as archive, but not valid extension.",
-            srcFile.toString()));
-        // else will not do anyhting
-        // and copy the file into the dir as it is
-      }
-      FileUtil.chmod(destDir.toString(), "ugo+rx", true);
-    }
-    // promote the output to the final location
-    if (!localFs.rename(workDir, finalDir)) {
-      localFs.delete(workDir, true);
-      if (!localFs.exists(finalDir)) {
-        throw new IOException("Failed to promote distributed cache object " +
-                              workDir + " to " + finalDir);
+    try {
+      Path workFile = new Path(workDir, parchive.getName());
+      sourceFs.copyToLocalFile(sourcePath, workFile);
+      localFs.setPermission(workFile, permission);
+      if (isArchive) {
+        String tmpArchive = workFile.getName().toLowerCase();
+        File srcFile = new File(workFile.toString());
+        File destDir = new File(workDir.toString());
+        LOG.info(String.format("Extracting %s to %s",
+                 srcFile.toString(), destDir.toString()));
+        if (tmpArchive.endsWith(".jar")) {
+          RunJar.unJar(srcFile, destDir);
+        } else if (tmpArchive.endsWith(".zip")) {
+          FileUtil.unZip(srcFile, destDir);
+        } else if (isTarFile(tmpArchive)) {
+          FileUtil.unTar(srcFile, destDir);
+        } else {
+          LOG.warn(String.format(
+              "Cache file %s specified as archive, but not valid extension.",
+              srcFile.toString()));
+          // else will not do anyhting
+          // and copy the file into the dir as it is
+        }
+        FileUtil.chmod(destDir.toString(), "ugo+rx", true);
+      }
+      // promote the output to the final location
+      if (!localFs.rename(workDir, finalDir)) {
+        if (!localFs.exists(finalDir)) {
+          throw new IOException("Failed to promote distributed cache object " +
+                                workDir + " to " + finalDir);
+        }
+        // someone else promoted first
+        return 0;
+      }
+    } finally {
+      if (localFs.exists(workDir)) {
+        localFs.delete(workDir, true);
       }
-      // someone else promoted first
-      return 0;
     }
 
     LOG.info(String.format("Cached %s as %s",

Modified: 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1615868&r1=1615867&r2=1615868&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
 Tue Aug  5 06:31:39 2014
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -57,6 +58,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.filecache.TaskDistributedCacheManager;
 import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 
 public class TestTrackerDistributedCacheManager extends TestCase {
@@ -1239,4 +1241,48 @@ public class TestTrackerDistributedCache
     assertNull(taskDistributedCacheManager);
   }
 
+  public void testRemoveWorkDirInDownloadCacheObject() throws Exception {
+    if (!canRun()) {
+      return;
+    }
+    // This is to test the workDir is removed, when IOException happened
+    // use TestFileSystem to generate an IOException,
+    // then verify whether the workDir is deleted.
+    FsPermission filePerm = FsPermission.createImmutable((short)0755);
+    Configuration myConf = new Configuration(conf);
+    myConf.setClass("fs.test.impl", TestFileSystem.class, FileSystem.class);
+    Path testDir = new Path(TEST_ROOT_DIR, "testDir");
+    Path destination = new Path(testDir.toString(),
+        "downloadCacheObjectTestDir");
+    Path fileToCache = new Path("test:///" + destination.toUri().getPath());
+    try {
+      TrackerDistributedCacheManager.downloadCacheObject(myConf,
+          fileToCache.toUri(), destination, 0L, false, filePerm);
+      fail("did not throw an exception");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(
+          "Force an IOException for test", e);
+    }
+    File TEST_ROOT = new File(TEST_ROOT_DIR);
+    String workDir = destination.getParent().toString() +
+        TrackerDistributedCacheManager.WORK_DIR_FIX;
+    for (File f : TEST_ROOT.listFiles()) {
+      assertFalse(f.toString().contains(workDir));
+    }
+  }
+
+  static class TestFileSystem extends LocalFileSystem {
+    public TestFileSystem() {
+      super();
+    }
+    @Override
+    public void copyToLocalFile(boolean delSrc, Path src, Path dst)
+        throws IOException {
+      throw new IOException("Force an IOException for test");
+    }
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+        return new FileStatus();
+    }
+  }
 }


Reply via email to