Repository: hadoop
Updated Branches:
  refs/heads/trunk 61a4c7fc9 -> 91baca145


MAPREDUCE-6275. Race condition in FileOutputCommitter v2 for user-specified 
task output subdirs. Contributed by Gera Shegalov and Siqi Li


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/91baca14
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/91baca14
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/91baca14

Branch: refs/heads/trunk
Commit: 91baca145a6c16fe13f455d150c05bd73179531b
Parents: 61a4c7f
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Mar 19 21:39:21 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Mar 19 21:39:21 2015 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../lib/output/FileOutputCommitter.java         |  96 ++++++++------
 .../lib/output/TestFileOutputCommitter.java     | 132 ++++++++++++++++++-
 3 files changed, 186 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/91baca14/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt 
b/hadoop-mapreduce-project/CHANGES.txt
index 7972c08..10c9389 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -463,6 +463,9 @@ Release 2.7.0 - UNRELEASED
     MAPREDUCE-6277. Job can post multiple history files if attempt loses
     connection to the RM (Chang Li via jlowe)
 
+    MAPREDUCE-6275. Race condition in FileOutputCommitter v2 for
+    user-specified task output subdirs (Gera Shegalov and Siqi Li via jlowe)
+
 Release 2.6.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/91baca14/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
index 28a8548..6e5d0a1 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
@@ -348,44 +348,61 @@ public class FileOutputCommitter extends OutputCommitter {
    * @param to the path data is going to.
    * @throws IOException on any error
    */
-  private static void mergePaths(FileSystem fs, final FileStatus from,
-      final Path to)
-    throws IOException {
-     LOG.debug("Merging data from "+from+" to "+to);
-     if(from.isFile()) {
-       if(fs.exists(to)) {
-         if(!fs.delete(to, true)) {
-           throw new IOException("Failed to delete "+to);
-         }
-       }
+  private void mergePaths(FileSystem fs, final FileStatus from,
+      final Path to) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Merging data from " + from + " to " + to);
+    }
+    FileStatus toStat;
+    try {
+      toStat = fs.getFileStatus(to);
+    } catch (FileNotFoundException fnfe) {
+      toStat = null;
+    }
+
+    if (from.isFile()) {
+      if (toStat != null) {
+        if (!fs.delete(to, true)) {
+          throw new IOException("Failed to delete " + to);
+        }
+      }
 
-       if(!fs.rename(from.getPath(), to)) {
-         throw new IOException("Failed to rename "+from+" to "+to);
-       }
-     } else if(from.isDirectory()) {
-       if(fs.exists(to)) {
-         FileStatus toStat = fs.getFileStatus(to);
-         if(!toStat.isDirectory()) {
-           if(!fs.delete(to, true)) {
-             throw new IOException("Failed to delete "+to);
-           }
-           if(!fs.rename(from.getPath(), to)) {
-             throw new IOException("Failed to rename "+from+" to "+to);
-           }
-         } else {
-           //It is a directory so merge everything in the directories
-           for(FileStatus subFrom: fs.listStatus(from.getPath())) {
-             Path subTo = new Path(to, subFrom.getPath().getName());
-             mergePaths(fs, subFrom, subTo);
-           }
-         }
-       } else {
-         //it does not exist just rename
-         if(!fs.rename(from.getPath(), to)) {
-           throw new IOException("Failed to rename "+from+" to "+to);
-         }
-       }
-     }
+      if (!fs.rename(from.getPath(), to)) {
+        throw new IOException("Failed to rename " + from + " to " + to);
+      }
+    } else if (from.isDirectory()) {
+      if (toStat != null) {
+        if (!toStat.isDirectory()) {
+          if (!fs.delete(to, true)) {
+            throw new IOException("Failed to delete " + to);
+          }
+          renameOrMerge(fs, from, to);
+        } else {
+          //It is a directory so merge everything in the directories
+          for (FileStatus subFrom : fs.listStatus(from.getPath())) {
+            Path subTo = new Path(to, subFrom.getPath().getName());
+            mergePaths(fs, subFrom, subTo);
+          }
+        }
+      } else {
+        renameOrMerge(fs, from, to);
+      }
+    }
+  }
+
+  private void renameOrMerge(FileSystem fs, FileStatus from, Path to)
+      throws IOException {
+    if (algorithmVersion == 1) {
+      if (!fs.rename(from.getPath(), to)) {
+        throw new IOException("Failed to rename " + from + " to " + to);
+      }
+    } else {
+      fs.mkdirs(to);
+      for (FileStatus subFrom : fs.listStatus(from.getPath())) {
+        Path subTo = new Path(to, subFrom.getPath().getName());
+        mergePaths(fs, subFrom, subTo);
+      }
+    }
   }
 
   @Override
@@ -546,8 +563,9 @@ public class FileOutputCommitter extends OutputCommitter {
       Path previousCommittedTaskPath = getCommittedTaskPath(
           previousAttempt, context);
       FileSystem fs = 
previousCommittedTaskPath.getFileSystem(context.getConfiguration());
-
-      LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
+      }
       if (algorithmVersion == 1) {
         if (fs.exists(previousCommittedTaskPath)) {
           Path committedTaskPath = getCommittedTaskPath(context);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/91baca14/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
index 8f60300..0d4ab98 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
@@ -22,9 +22,15 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -39,6 +45,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
@@ -47,13 +54,25 @@ import 
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 
 @SuppressWarnings("unchecked")
 public class TestFileOutputCommitter extends TestCase {
-  private static Path outDir = new Path(System.getProperty("test.build.data",
-      "/tmp"), "output");
+  private static final Path outDir = new Path(
+      System.getProperty("test.build.data",
+          System.getProperty("java.io.tmpdir")),
+      TestFileOutputCommitter.class.getName());
+
+  private final static String SUB_DIR = "SUB_DIR";
+  private final static Path OUT_SUB_DIR = new Path(outDir, SUB_DIR);
+
+  private static final Log LOG =
+      LogFactory.getLog(TestFileOutputCommitter.class);
 
   // A random task attempt id for testing.
-  private static String attempt = "attempt_200707121733_0001_m_000000_0";
-  private static String partFile = "part-m-00000";
-  private static TaskAttemptID taskID = TaskAttemptID.forName(attempt);
+  private static final String attempt = "attempt_200707121733_0001_m_000000_0";
+  private static final String partFile = "part-m-00000";
+  private static final TaskAttemptID taskID = TaskAttemptID.forName(attempt);
+
+  private static final String attempt1 = 
"attempt_200707121733_0001_m_000001_0";
+  private static final TaskAttemptID taskID1 = TaskAttemptID.forName(attempt1);
+
   private Text key1 = new Text("key1");
   private Text key2 = new Text("key2");
   private Text val1 = new Text("val1");
@@ -229,7 +248,7 @@ public class TestFileOutputCommitter extends TestCase {
   }
 
   private void testCommitterInternal(int version) throws Exception {
-  Job job = Job.getInstance();
+    Job job = Job.getInstance();
     FileOutputFormat.setOutputPath(job, outDir);
     Configuration conf = job.getConfiguration();
     conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
@@ -441,6 +460,107 @@ public class TestFileOutputCommitter extends TestCase {
     testFailAbortInternal(2);
   }
 
+  static class RLFS extends RawLocalFileSystem {
+    private final ThreadLocal<Boolean> needNull = new ThreadLocal<Boolean>() {
+      @Override
+      protected Boolean initialValue() {
+        return true;
+      }
+    };
+
+    public RLFS() {
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      if (needNull.get() &&
+          OUT_SUB_DIR.toUri().getPath().equals(f.toUri().getPath())) {
+        needNull.set(false); // lie once per thread
+        return null;
+      }
+      return super.getFileStatus(f);
+    }
+  }
+
+  private void testConcurrentCommitTaskWithSubDir(int version)
+      throws Exception {
+    final Job job = Job.getInstance();
+    FileOutputFormat.setOutputPath(job, outDir);
+    final Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
+    conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
+        version);
+
+    conf.setClass("fs.file.impl", RLFS.class, FileSystem.class);
+    FileSystem.closeAll();
+
+    final JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
+    final FileOutputCommitter amCommitter =
+        new FileOutputCommitter(outDir, jContext);
+    amCommitter.setupJob(jContext);
+
+    final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
+    taCtx[0] = new TaskAttemptContextImpl(conf, taskID);
+    taCtx[1] = new TaskAttemptContextImpl(conf, taskID1);
+
+    final TextOutputFormat[] tof = new TextOutputFormat[2];
+    for (int i = 0; i < tof.length; i++) {
+      tof[i] = new TextOutputFormat() {
+        @Override
+        public Path getDefaultWorkFile(TaskAttemptContext context,
+            String extension) throws IOException {
+          final FileOutputCommitter foc = (FileOutputCommitter)
+              getOutputCommitter(context);
+          return new Path(new Path(foc.getWorkPath(), SUB_DIR),
+              getUniqueFile(context, getOutputName(context), extension));
+        }
+      };
+    }
+
+    final ExecutorService executor = Executors.newFixedThreadPool(2);
+    try {
+      for (int i = 0; i < taCtx.length; i++) {
+        final int taskIdx = i;
+        executor.submit(new Callable<Void>() {
+          @Override
+          public Void call() throws IOException, InterruptedException {
+            final OutputCommitter outputCommitter =
+                tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
+            outputCommitter.setupTask(taCtx[taskIdx]);
+            final RecordWriter rw =
+                tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
+            writeOutput(rw, taCtx[taskIdx]);
+            outputCommitter.commitTask(taCtx[taskIdx]);
+            return null;
+          }
+        });
+      }
+    } finally {
+      executor.shutdown();
+      while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+        LOG.info("Awaiting thread termination!");
+      }
+    }
+
+    amCommitter.commitJob(jContext);
+    final RawLocalFileSystem lfs = new RawLocalFileSystem();
+    lfs.setConf(conf);
+    assertFalse("Must not end up with sub_dir/sub_dir",
+        lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR)));
+
+    // validate output
+    validateContent(OUT_SUB_DIR);
+    FileUtil.fullyDelete(new File(outDir.toString()));
+  }
+
+  public void testConcurrentCommitTaskWithSubDirV1() throws Exception {
+    testConcurrentCommitTaskWithSubDir(1);
+  }
+
+  public void testConcurrentCommitTaskWithSubDirV2() throws Exception {
+    testConcurrentCommitTaskWithSubDir(2);
+  }
+
   public static String slurp(File f) throws IOException {
     int len = (int) f.length();
     byte[] buf = new byte[len];

Reply via email to