MAPREDUCE-7015. Possible race condition in JHS if the job is not loaded. Contributed by Peter Bacsko
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cff9edd4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cff9edd4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cff9edd4 Branch: refs/heads/YARN-1011 Commit: cff9edd4b514bdcfe22cd49964e3707fb78ab876 Parents: 55c3277 Author: Jason Lowe <jl...@apache.org> Authored: Wed Jan 24 14:44:07 2018 -0600 Committer: Jason Lowe <jl...@apache.org> Committed: Wed Jan 24 14:44:07 2018 -0600 ---------------------------------------------------------------------- .../mapreduce/v2/hs/CachedHistoryStorage.java | 8 +++++- .../mapreduce/v2/hs/HistoryFileManager.java | 30 ++++++++++++++++---- .../hadoop/mapreduce/v2/hs/TestJobHistory.java | 26 +++++++++++++++++ 3 files changed, 57 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cff9edd4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java index b001ae4..69f4831 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CachedHistoryStorage.java @@ -173,9 +173,14 @@ public class CachedHistoryStorage extends AbstractService implements HistoryFileInfo fileInfo; fileInfo = hsManager.getFileInfo(jobId); + if (fileInfo == null) { throw new HSFileRuntimeException("Unable to find job " + jobId); - } else if (fileInfo.isDeleted()) { + } + + fileInfo.waitUntilMoved(); + + if (fileInfo.isDeleted()) { throw new HSFileRuntimeException("Cannot load deleted job " + jobId); } else { return fileInfo.loadJob(); @@ -211,6 +216,7 @@ public class CachedHistoryStorage extends AbstractService implements for (HistoryFileInfo mi : hsManager.getAllFileInfo()) { if (mi != null) { JobId id = mi.getJobId(); + mi.waitUntilMoved(); result.put(id, new PartialJob(mi.getJobIndexInfo(), id)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cff9edd4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index b418db7..a07ca26 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -452,6 +452,8 @@ public class HistoryFileManager extends AbstractService { } catch (Throwable t) { LOG.error("Error while trying to move a job to done", t); this.state = HistoryInfoState.MOVE_FAILED; + } finally { + notifyAll(); } } @@ -485,12 +487,16 @@ public class HistoryFileManager extends AbstractService { } protected synchronized void delete() throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("deleting " + historyFile + " and " + confFile); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("deleting " + historyFile + " and " + confFile); + } + state = HistoryInfoState.DELETED; + doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); + doneDirFc.delete(doneDirFc.makeQualified(confFile), false); + } finally { + notifyAll(); } - state = HistoryInfoState.DELETED; - doneDirFc.delete(doneDirFc.makeQualified(historyFile), false); - doneDirFc.delete(doneDirFc.makeQualified(confFile), false); } public JobIndexInfo getJobIndexInfo() { @@ -517,6 +523,17 @@ public class HistoryFileManager extends AbstractService { jobIndexInfo.getNumMaps(); return (maxTasksForLoadedJob > 0) && (totalTasks > maxTasksForLoadedJob); } + + public synchronized void waitUntilMoved() { + while (isMovePending() && !didMoveFail()) { + try { + wait(); + } catch (InterruptedException e) { + LOG.warn("Waiting has been interrupted"); + throw new RuntimeException(e); + } + } + } } private SerialNumberIndex serialNumberIndex = null; @@ -956,6 +973,7 @@ public class HistoryFileManager extends AbstractService { if (LOG.isDebugEnabled()) { LOG.debug("Scheduling move to done of " +found); } + moveToDoneExecutor.execute(new Runnable() { @Override public void run() { @@ -1193,5 +1211,5 @@ public class HistoryFileManager extends AbstractService { @VisibleForTesting void setMaxHistoryAge(long newValue){ maxHistoryAge=newValue; - } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cff9edd4/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java index 936c772..9f36477 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistory.java @@ -446,6 +446,32 @@ public class TestJobHistory { } @Test + public void testCachedStorageWaitsForFileMove() throws IOException { + HistoryFileManager historyManager = mock(HistoryFileManager.class); + jobHistory = spy(new JobHistory()); + doReturn(historyManager).when(jobHistory).createHistoryFileManager(); + + Configuration conf = new Configuration(); + jobHistory.init(conf); + jobHistory.start(); + + CachedHistoryStorage storage = spy((CachedHistoryStorage) jobHistory + .getHistoryStorage()); + + Job job = mock(Job.class); + JobId jobId = mock(JobId.class); + when(job.getID()).thenReturn(jobId); + when(job.getTotalMaps()).thenReturn(10); + when(job.getTotalReduces()).thenReturn(2); + HistoryFileInfo fileInfo = mock(HistoryFileInfo.class); + when(historyManager.getFileInfo(eq(jobId))).thenReturn(fileInfo); + when(fileInfo.loadJob()).thenReturn(job); + + storage.getFullJob(jobId); + verify(fileInfo).waitUntilMoved(); + } + + @Test public void testRefreshLoadedJobCacheUnSupportedOperation() { jobHistory = spy(new JobHistory()); HistoryStorage storage = new HistoryStorage() { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org