Repository: hadoop
Updated Branches:
  refs/heads/branch-2 77d147e0e -> d3dfed3fd


MAPREDUCE-6684. High contention on scanning of user directory under 
immediate_done in Job History Server. Contributed by Haibo Chen
(cherry picked from commit 5ffb54694b52657f3b7de4560474ab740734e1b2)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d3dfed3f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d3dfed3f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d3dfed3f

Branch: refs/heads/branch-2
Commit: d3dfed3fd04aa0e4c9b281e46582b6024f015747
Parents: 77d147e
Author: Jason Lowe <jl...@apache.org>
Authored: Tue May 10 16:03:49 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Tue May 10 16:06:26 2016 +0000

----------------------------------------------------------------------
 .../mapreduce/v2/hs/HistoryFileManager.java     |  37 ++-
 ...estUnnecessaryBlockingOnHistoryFileInfo.java | 323 +++++++++++++++++++
 2 files changed, 346 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3dfed3f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
index 1467db1..02da8e5 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
@@ -346,7 +346,7 @@ public class HistoryFileManager extends AbstractService {
     private Path confFile;
     private Path summaryFile;
     private JobIndexInfo jobIndexInfo;
-    private HistoryInfoState state;
+    private volatile HistoryInfoState state;
 
     @VisibleForTesting
     protected HistoryFileInfo(Path historyFile, Path confFile,
@@ -360,20 +360,20 @@ public class HistoryFileManager extends AbstractService {
     }
 
     @VisibleForTesting
-    synchronized boolean isMovePending() {
+    boolean isMovePending() {
       return state == HistoryInfoState.IN_INTERMEDIATE
           || state == HistoryInfoState.MOVE_FAILED;
     }
 
     @VisibleForTesting
-    synchronized boolean didMoveFail() {
+    boolean didMoveFail() {
       return state == HistoryInfoState.MOVE_FAILED;
     }
-    
+
     /**
      * @return true if the files backed by this were deleted.
      */
-    public synchronized boolean isDeleted() {
+    public boolean isDeleted() {
       return state == HistoryInfoState.DELETED;
     }
 
@@ -566,13 +566,15 @@ public class HistoryFileManager extends AbstractService {
     int numMoveThreads = conf.getInt(
         JHAdminConfig.MR_HISTORY_MOVE_THREAD_COUNT,
         JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
+    moveToDoneExecutor = createMoveToDoneThreadPool(numMoveThreads);
+    super.serviceInit(conf);
+  }
+
+  protected ThreadPoolExecutor createMoveToDoneThreadPool(int numMoveThreads) {
     ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
         "MoveIntermediateToDone Thread #%d").build();
-    moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads,
-        numMoveThreads, 1, TimeUnit.HOURS,
-        new LinkedBlockingQueue<Runnable>(), tf);
-
-    super.serviceInit(conf);
+    return new HadoopThreadPoolExecutor(numMoveThreads, numMoveThreads,
+        1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
   }
 
   @VisibleForTesting
@@ -708,6 +710,13 @@ public class HistoryFileManager extends AbstractService {
     }
   }
 
+  protected HistoryFileInfo createHistoryFileInfo(Path historyFile,
+      Path confFile, Path summaryFile, JobIndexInfo jobIndexInfo,
+      boolean isInDone) {
+    return new HistoryFileInfo(
+        historyFile, confFile, summaryFile, jobIndexInfo, isInDone);
+  }
+
   /**
    * Populates index data structures. Should only be called at initialization
    * times.
@@ -782,7 +791,7 @@ public class HistoryFileManager extends AbstractService {
           .getIntermediateConfFileName(jobIndexInfo.getJobId());
       String summaryFileName = JobHistoryUtils
           .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
+      HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new 
Path(fs
           .getPath().getParent(), confFileName), new Path(fs.getPath()
           .getParent(), summaryFileName), jobIndexInfo, true);
       jobListCache.addIfAbsent(fileInfo);
@@ -880,7 +889,7 @@ public class HistoryFileManager extends AbstractService {
           .getIntermediateConfFileName(jobIndexInfo.getJobId());
       String summaryFileName = JobHistoryUtils
           .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-      HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(fs
+      HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new 
Path(fs
           .getPath().getParent(), confFileName), new Path(fs.getPath()
           .getParent(), summaryFileName), jobIndexInfo, false);
 
@@ -940,7 +949,7 @@ public class HistoryFileManager extends AbstractService {
             .getIntermediateConfFileName(jobIndexInfo.getJobId());
         String summaryFileName = JobHistoryUtils
             .getIntermediateSummaryFileName(jobIndexInfo.getJobId());
-        HistoryFileInfo fileInfo = new HistoryFileInfo(fs.getPath(), new Path(
+        HistoryFileInfo fileInfo = createHistoryFileInfo(fs.getPath(), new 
Path(
             fs.getPath().getParent(), confFileName), new Path(fs.getPath()
             .getParent(), summaryFileName), jobIndexInfo, true);
         return fileInfo;
@@ -1105,7 +1114,7 @@ public class HistoryFileManager extends AbstractService {
             String confFileName = JobHistoryUtils
                 .getIntermediateConfFileName(jobIndexInfo.getJobId());
 
-            fileInfo = new HistoryFileInfo(historyFile.getPath(), new Path(
+            fileInfo = createHistoryFileInfo(historyFile.getPath(), new Path(
                 historyFile.getPath().getParent(), confFileName), null,
                 jobIndexInfo, true);
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d3dfed3f/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestUnnecessaryBlockingOnHistoryFileInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestUnnecessaryBlockingOnHistoryFileInfo.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestUnnecessaryBlockingOnHistoryFileInfo.java
new file mode 100644
index 0000000..06045b5
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestUnnecessaryBlockingOnHistoryFileInfo.java
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.impl.pb.JobIdPBImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * The test in this class is created specifically to address the issue in
+ * MAPREDUCE-6684. In cases where there are two threads trying to load 
different
+ * jobs through job history file manager, one thread could be blocked by the
+ * other that is loading a huge job file, which is undesirable.
+ *
+ */
+public class TestUnnecessaryBlockingOnHistoryFileInfo {
+  /**
+   * The intermediate done directory that JHS scans for completed jobs.
+   */
+  private final static File INTERMEDIATE_DIR = new File("target",
+      TestUnnecessaryBlockingOnHistoryFileInfo.class.getName() +
+          "/intermediate");
+  /**
+   * A test user directory under intermediate done directory.
+   */
+  private final static File USER_DIR = new File(INTERMEDIATE_DIR, "test");
+
+  @BeforeClass
+  public static void setUp() throws IOException {
+    if(USER_DIR.exists()) {
+      FileUtils.cleanDirectory(USER_DIR);
+    }
+    USER_DIR.mkdirs();
+  }
+
+  @AfterClass
+  public static void cleanUp() throws IOException {
+    FileUtils.deleteDirectory(INTERMEDIATE_DIR);
+  }
+
+  /**
+   * This create a test case in which two threads are trying to load two
+   * different jobs of the same user under the intermediate directory.
+   * One thread should not be blocked by the other thread that is loading
+   * a huge job files (This is simulated by hanging up parsing the job files
+   * forever). The test will fail by triggering the timeout if one thread is
+   * blocked by the other while the other thread is holding the lock on its
+   * associated job files and hanging up parsing the files.
+   */
+  @Test(timeout = 20000)
+  public void testTwoThreadsQueryingDifferentJobOfSameUser()
+      throws InterruptedException, IOException {
+    final Configuration config = new Configuration();
+    config.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR,
+        INTERMEDIATE_DIR.getPath());
+    config.setLong(JHAdminConfig.MR_HISTORY_MAX_AGE_MS, Long.MAX_VALUE);
+
+    final JobId job1 = createJobId(0);
+    final JobId job2 = createJobId(1);
+    final HistoryFileManagerUnderContention historyFileManager =
+        createHistoryFileManager(config, job1, job2);
+
+    Thread webRequest1 = null;
+    Thread webRequest2 = null;
+    try {
+      /**
+       * create a dummy .jhist file for job1, and try to load/parse the job
+       * files in one child thread.
+       */
+      createJhistFile(job1);
+      webRequest1 = new Thread(
+          new Runnable() {
+            @Override
+            public void run() {
+              try {
+                HistoryFileManager.HistoryFileInfo historyFileInfo =
+                    historyFileManager.getFileInfo(job1);
+                historyFileInfo.loadJob();
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
+            }
+          }
+      );
+      webRequest1.start();
+      historyFileManager.waitUntilIntermediateDirIsScanned(job1);
+
+      /**
+       * At this point, thread webRequest1 has finished scanning the
+       * intermediate directory and is hanging up parsing the job files while
+       * it's holding the lock on the associated HistoryFileInfo object.
+       */
+
+      /**
+       * create a dummy .jhist file for job2 and try to load/parse the job 
files
+       * in the other child thread. Because job files are not moved from the
+       * intermediate directory to the done directory, thread webRequest2
+       * will also see the job history files for job1.
+       */
+      createJhistFile(job2);
+      webRequest2 = new Thread(
+          new Runnable() {
+            @Override
+            public void run() {
+              try {
+                HistoryFileManager.HistoryFileInfo historyFileInfo =
+                    historyFileManager.getFileInfo(job2);
+                historyFileInfo.loadJob();
+              } catch (IOException e) {
+                e.printStackTrace();
+              }
+            }
+          }
+      );
+      webRequest2.start();
+      historyFileManager.waitUntilIntermediateDirIsScanned(job2);
+
+      /**
+       * If execution had gotten to this point, then thread webRequest2 would
+       * not have tried to acquire the lock of the HistoryFileInfo object
+       * associated job1, which is permanently held by thread webRequest1 that
+       * is hanging up parsing the job history files, so it was able to proceed
+       * with parsing job history files of job2.
+       */
+      Assert.assertTrue("Thread 2 is blocked while it is trying to " +
+          "load job2 by Thread 1 which is loading job1.",
+          webRequest2.getState() != Thread.State.BLOCKED);
+    } finally {
+      if(webRequest1 != null) {
+        webRequest1.interrupt();
+      }
+      if(webRequest2 != null) {
+        webRequest2.interrupt();
+      }
+    }
+  }
+
+  /**
+   * Create, initialize and start an instance of HistoryFileManager.
+   * @param config the configuration to initialize the HistoryFileManager
+   *               instance.
+   * @param jobIds the set of jobs expected to be loaded by HistoryFileManager.
+   */
+  private HistoryFileManagerUnderContention createHistoryFileManager(
+      Configuration config, JobId... jobIds) {
+    HistoryFileManagerUnderContention historyFileManager =
+        new HistoryFileManagerUnderContention(jobIds);
+    historyFileManager.init(config);
+    historyFileManager.start();
+    return historyFileManager;
+  }
+
+  /**
+   * Create, initialize and start an instance of CacheHistoryStorage.
+   * @param config the config to initialize the storage
+   * @param historyFileManager the HistoryFileManager to initializae the cache
+   */
+  private static CachedHistoryStorage createHistoryStorage(
+      Configuration config, HistoryFileManager historyFileManager) {
+    CachedHistoryStorage historyStorage = new CachedHistoryStorage();
+    historyStorage.setHistoryFileManager(historyFileManager);
+    historyStorage.init(config);
+    historyStorage.start();
+    return historyStorage;
+  }
+
+  private static JobId createJobId(int id) {
+    JobId jobId = new JobIdPBImpl();
+    jobId.setId(id);
+    jobId.setAppId(ApplicationIdPBImpl.newInstance(0, id));
+    return jobId;
+  }
+
+  /**
+   * Create a dummy .jhist file under the intermediate directory for given job.
+   * @param jobId the id of the given job
+   * @return true if file is created successfully, false otherwise
+   */
+  private static boolean createJhistFile(JobId jobId) throws IOException {
+    StringBuilder fileName = new StringBuilder(jobId.toString());
+    long finishTime = System.currentTimeMillis();
+    fileName.append("-").append(finishTime - 1000)
+        .append("-").append("test")
+        .append("-").append(jobId.getId())
+        .append("-").append(finishTime)
+        .append(".jhist");
+    File jhistFile = new File(USER_DIR, fileName.toString());
+    return jhistFile.createNewFile();
+  }
+
+  /**
+   * A test implementation of HistoryFileManager that does not move files
+   * from intermediate directory to done directory and hangs up parsing
+   * job history files.
+   */
+  class HistoryFileManagerUnderContention extends HistoryFileManager {
+    /**
+     * A map of job to a signal that indicates whether the intermediate
+     * directory is done being scanned before the job files are parsed.
+     */
+    private Map<JobId, CountDownLatch> scanningDoneSignals = new HashMap<>();
+
+    /**
+     * A HistoryFileManager that expects to load given jobs and hangs up
+     * parsing the job files. It perform no moving of files from the
+     * intermediate directory to done directory.
+     * @param jobId the set of jobs expected to load and parse
+     */
+    public HistoryFileManagerUnderContention(JobId... jobId) {
+      for(JobId job: jobId) {
+        scanningDoneSignals.put(job, new CountDownLatch(1));
+      }
+    }
+
+    /**
+     * Wait until scanning of the intermediate directory finishes and load
+     * of the given job is started.
+     */
+    public void waitUntilIntermediateDirIsScanned(JobId jobId)
+        throws InterruptedException {
+      if(scanningDoneSignals.containsKey(jobId)) {
+        scanningDoneSignals.get(jobId).await();
+      }
+    }
+
+    /**
+     * Create a HistoryFileInfo instance that hangs on parsing job files.
+     */
+    @Override
+    protected HistoryFileManager.HistoryFileInfo createHistoryFileInfo(
+        Path historyFile, Path confFile, Path summaryFile,
+        JobIndexInfo jobIndexInfo, boolean isInDone) {
+      return new HistoryFileInfo(historyFile, confFile, summaryFile,
+          jobIndexInfo, isInDone,
+          scanningDoneSignals.get(jobIndexInfo .getJobId()));
+    }
+
+    /**
+     * Create a dummy ThreadPoolExecutor that does not execute submitted tasks.
+     */
+    @Override
+    protected ThreadPoolExecutor createMoveToDoneThreadPool(
+        int numMoveThreads) {
+      return mock(ThreadPoolExecutor.class);
+    }
+
+    /**
+     * A HistoryFileInfo implementation that takes forever to parse the
+     * associated job files. This mimics the behavior of parsing huge job 
files.
+     */
+    class HistoryFileInfo extends HistoryFileManager.HistoryFileInfo {
+      /**
+       * A signal that indicates scanning of the intermediate directory is done
+       * as HistoryFileManager is in the process of loading the HistoryFileInfo
+       * instance.
+       */
+      private final CountDownLatch scanningDoneSignal;
+
+      HistoryFileInfo(Path historyFile, Path confFile, Path summaryFile,
+          JobIndexInfo jobIndexInfo, boolean isInDone,
+          CountDownLatch scanningDoneSignal) {
+        super(historyFile, confFile, summaryFile, jobIndexInfo, isInDone);
+        this.scanningDoneSignal = scanningDoneSignal;
+      }
+
+      /**
+       * An test implementation that takes forever to load a job in order to
+       * mimic what happens when job files of large size are parsed in JHS.
+       * Before loading, we signal that scanning of the intermediate directory
+       * is finished.
+       */
+      @Override
+      public synchronized Job loadJob() throws IOException {
+        if(scanningDoneSignal != null) {
+          scanningDoneSignal.countDown();
+        }
+        while(!Thread.currentThread().isInterrupted()) {
+          try {
+            Thread.sleep(5000);
+          } catch (InterruptedException e) {
+          }
+        }
+        return null;
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to