Author: jlowe Date: Fri Sep 28 22:03:59 2012 New Revision: 1391677 URL: http://svn.apache.org/viewvc?rev=1391677&view=rev Log: svn merge -c 1391671 FIXES: MAPREDUCE-4691. Historyserver can report "Unknown job" after RM says job has completed. Contributed by Robert Joseph Evans.
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1391677&r1=1391676&r2=1391677&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Sep 28 22:03:59 2012 @@ -35,6 +35,9 @@ Release 0.23.4 - UNRELEASED MAPREDUCE-4646. Fixed MR framework to send diagnostic information correctly to clients in case of failed jobs also. (Jason Lowe via vinodkv) + MAPREDUCE-4691. Historyserver can report "Unknown job" after RM says job + has completed (Robert Joseph Evans via jlowe) + Release 0.23.3 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1391677&r1=1391676&r2=1391677&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Fri Sep 28 22:03:59 2012 @@ -23,14 +23,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; @@ -77,7 +77,7 @@ public class HistoryFileManager extends private static enum HistoryInfoState { IN_INTERMEDIATE, IN_DONE, DELETED, MOVE_FAILED }; - + private static String DONE_BEFORE_SERIAL_TAIL = JobHistoryUtils .doneSubdirsBeforeSerialTail(); @@ -199,6 +199,29 @@ public class HistoryFileManager extends } } + /** + * This class represents a user dir in the intermediate done directory. This + * is mostly for locking purposes. + */ + private class UserLogDir { + long modTime = 0; + + public synchronized void scanIfNeeded(FileStatus fs) { + long newModTime = fs.getModificationTime(); + if (modTime != newModTime) { + Path p = fs.getPath(); + try { + scanIntermediateDirectory(p); + //If scanning fails, we will scan again. We assume the failure is + // temporary. + modTime = newModTime; + } catch (IOException e) { + LOG.error("Error while trying to scan the directory " + p, e); + } + } + } + } + public class HistoryFileInfo { private Path historyFile; private Path confFile; @@ -352,7 +375,8 @@ public class HistoryFileManager extends * Maintains a mapping between intermediate user directories and the last * known modification time. */ - private Map<String, Long> userDirModificationTimeMap = new HashMap<String, Long>(); + private ConcurrentMap<String, UserLogDir> userDirModificationTimeMap = + new ConcurrentHashMap<String, UserLogDir>(); private JobACLsManager aclsMgr; @@ -586,23 +610,15 @@ public class HistoryFileManager extends for (FileStatus userDir : userDirList) { String name = userDir.getPath().getName(); - long newModificationTime = userDir.getModificationTime(); - boolean shouldScan = false; - synchronized (userDirModificationTimeMap) { - if (!userDirModificationTimeMap.containsKey(name) - || newModificationTime > userDirModificationTimeMap.get(name)) { - shouldScan = true; - userDirModificationTimeMap.put(name, newModificationTime); - } - } - if (shouldScan) { - try { - scanIntermediateDirectory(userDir.getPath()); - } catch (IOException e) { - LOG.error("Error while trying to scan the directory " - + userDir.getPath(), e); + UserLogDir dir = userDirModificationTimeMap.get(name); + if(dir == null) { + dir = new UserLogDir(); + UserLogDir old = userDirModificationTimeMap.putIfAbsent(name, dir); + if(old != null) { + dir = old; } } + dir.scanIfNeeded(userDir); } }