Author: acmurthy
Date: Sun Sep 30 01:14:41 2012
New Revision: 1391933
URL: http://svn.apache.org/viewvc?rev=1391933&view=rev
Log:
Merge -c 1391671 from trunk to branch-2.0.2-alpha to fix MAPREDUCE-4691.
Historyserver can report 'Unknown job' after RM says job has completed.
Contributed by Robert Evans.
Modified:
hadoop/common/branches/branch-2.0.2-alpha/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2.0.2-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
Modified:
hadoop/common/branches/branch-2.0.2-alpha/hadoop-mapreduce-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.2-alpha/hadoop-mapreduce-project/CHANGES.txt?rev=1391933&r1=1391932&r2=1391933&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.0.2-alpha/hadoop-mapreduce-project/CHANGES.txt
(original)
+++
hadoop/common/branches/branch-2.0.2-alpha/hadoop-mapreduce-project/CHANGES.txt
Sun Sep 30 01:14:41 2012
@@ -396,6 +396,9 @@ Release 0.23.4 - UNRELEASED
MAPREDUCE-4647. We should only unjar jobjar if there is a lib directory
in it. (Robert Evans via tgraves)
+ MAPREDUCE-4691. Historyserver can report "Unknown job" after RM says job
+ has completed (Robert Joseph Evans via jlowe)
+
Release 0.23.3 - UNRELEASED
INCOMPATIBLE CHANGES
Modified:
hadoop/common/branches/branch-2.0.2-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.2-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1391933&r1=1391932&r2=1391933&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.0.2-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
(original)
+++
hadoop/common/branches/branch-2.0.2-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
Sun Sep 30 01:14:41 2012
@@ -23,14 +23,14 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
@@ -77,7 +77,7 @@ public class HistoryFileManager extends
private static enum HistoryInfoState {
IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED
};
-
+
private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils
.doneSubdirsBeforeSerialTail();
@@ -199,6 +199,29 @@ public class HistoryFileManager extends
}
}
+ /**
+ * This class represents a user dir in the intermediate done directory. This
+ * is mostly for locking purposes.
+ */
+ private class UserLogDir {
+ long modTime = 0;
+
+ public synchronized void scanIfNeeded(FileStatus fs) {
+ long newModTime = fs.getModificationTime();
+ if (modTime != newModTime) {
+ Path p = fs.getPath();
+ try {
+ scanIntermediateDirectory(p);
+ //If scanning fails, we will scan again. We assume the failure is
+ // temporary.
+ modTime = newModTime;
+ } catch (IOException e) {
+ LOG.error("Error while trying to scan the directory " + p, e);
+ }
+ }
+ }
+ }
+
public class HistoryFileInfo {
private Path historyFile;
private Path confFile;
@@ -352,7 +375,8 @@ public class HistoryFileManager extends
* Maintains a mapping between intermediate user directories and the last
* known modification time.
*/
- private Map<String, Long> userDirModificationTimeMap = new HashMap<String,
Long>();
+ private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap =
+ new ConcurrentHashMap<String, UserLogDir>();
private JobACLsManager aclsMgr;
@@ -584,23 +608,15 @@ public class HistoryFileManager extends
for (FileStatus userDir : userDirList) {
String name = userDir.getPath().getName();
- long newModificationTime = userDir.getModificationTime();
- boolean shouldScan = false;
- synchronized (userDirModificationTimeMap) {
- if (!userDirModificationTimeMap.containsKey(name)
- || newModificationTime > userDirModificationTimeMap.get(name)) {
- shouldScan = true;
- userDirModificationTimeMap.put(name, newModificationTime);
- }
- }
- if (shouldScan) {
- try {
- scanIntermediateDirectory(userDir.getPath());
- } catch (IOException e) {
- LOG.error("Error while trying to scan the directory "
- + userDir.getPath(), e);
+ UserLogDir dir = userDirModificationTimeMap.get(name);
+ if(dir == null) {
+ dir = new UserLogDir();
+ UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir);
+ if(old != null) {
+ dir = old;
}
}
+ dir.scanIfNeeded(userDir);
}
}