Author: jlowe Date: Thu Nov 1 23:07:55 2012 New Revision: 1404825 URL: http://svn.apache.org/viewvc?rev=1404825&view=rev Log: svn merge -c 1404817 FIXES: MAPREDUCE-4729. job history UI not showing all job attempts. Contributed by Vinod Kumar Vavilapalli
Added: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java - copied unchanged from r1404817, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1404825&r1=1404824&r2=1404825&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Nov 1 23:07:55 2012 @@ -65,6 +65,9 @@ Release 0.23.5 - UNRELEASED MAPREDUCE-4746. The MR Application Master does not have a config to set environment variables (Rob Parker via bobby) + MAPREDUCE-4729. job history UI not showing all job attempts. (Vinod + Kumar Vavilapalli via jlowe) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1404825&r1=1404824&r2=1404825&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Nov 1 23:07:55 2012 @@ -23,14 +23,17 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -45,6 +48,9 @@ import org.apache.hadoop.mapreduce.Outpu import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; +import org.apache.hadoop.mapreduce.jobhistory.EventReader; +import org.apache.hadoop.mapreduce.jobhistory.EventType; +import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; @@ -89,6 +95,7 @@ import org.apache.hadoop.security.Creden import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.SystemClock; @@ -826,16 +833,21 @@ public class MRAppMaster extends Composi @Override public void start() { + amInfos = new LinkedList<AMInfo>(); + // Pull completedTasks etc from recovery if (inRecovery) { completedTasksFromPreviousRun = recoveryServ.getCompletedTasks(); amInfos = recoveryServ.getAMInfos(); + } else { + // Get the amInfos anyways irrespective of whether recovery is enabled or + // not IF this is not the first AM generation + if (appAttemptID.getAttemptId() != 1) { + amInfos.addAll(readJustAMInfos()); + } } - // / Create the AMInfo for the current AppMaster - if (amInfos == null) { - amInfos = new LinkedList<AMInfo>(); - } + // Current an AMInfo for the current AM generation. AMInfo amInfo = MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost, nmPort, nmHttpPort); @@ -893,6 +905,51 @@ public class MRAppMaster extends Composi startJobs(); } + private List<AMInfo> readJustAMInfos() { + List<AMInfo> amInfos = new ArrayList<AMInfo>(); + FSDataInputStream inputStream = null; + try { + inputStream = + RecoveryService.getPreviousJobHistoryFileStream(getConfig(), + appAttemptID); + EventReader jobHistoryEventReader = new EventReader(inputStream); + + // All AMInfos are contiguous. Track when the first AMStartedEvent + // appears. + boolean amStartedEventsBegan = false; + + HistoryEvent event; + while ((event = jobHistoryEventReader.getNextEvent()) != null) { + if (event.getEventType() == EventType.AM_STARTED) { + if (!amStartedEventsBegan) { + // First AMStartedEvent. + amStartedEventsBegan = true; + } + AMStartedEvent amStartedEvent = (AMStartedEvent) event; + amInfos.add(MRBuilderUtils.newAMInfo( + amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(), + amStartedEvent.getContainerId(), + StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()), + amStartedEvent.getNodeManagerPort(), + amStartedEvent.getNodeManagerHttpPort())); + } else if (amStartedEventsBegan) { + // This means AMStartedEvents began and this event is a + // non-AMStarted event. + // No need to continue reading all the other events. + break; + } + } + } catch (IOException e) { + LOG.warn("Could not parse the old history file. " + + "Will not have old AMinfos ", e); + } finally { + if (inputStream != null) { + IOUtils.closeQuietly(inputStream); + } + } + return amInfos; + } + /** * This can be overridden to instantiate multiple jobs and create a * workflow. Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1404825&r1=1404824&r2=1404825&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Thu Nov 1 23:07:55 2012 @@ -34,7 +34,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; @@ -178,26 +177,13 @@ public class RecoveryService extends Com } private void parse() throws IOException { - // TODO: parse history file based on startCount - String jobName = - TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString(); - String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig()); - FSDataInputStream in = null; - Path historyFile = null; - Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified( - new Path(jobhistoryDir)); - FileContext fc = FileContext.getFileContext(histDirPath.toUri(), - getConfig()); - //read the previous history file - historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile( - histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1))); - LOG.info("History file is at " + historyFile); - in = fc.open(historyFile); + FSDataInputStream in = + getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId); JobHistoryParser parser = new JobHistoryParser(in); jobInfo = parser.parse(); Exception parseException = parser.getParseException(); if (parseException != null) { - LOG.info("Got an error parsing job-history file " + historyFile + + LOG.info("Got an error parsing job-history file" + ", ignoring incomplete events.", parseException); } Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo @@ -213,6 +199,28 @@ public class RecoveryService extends Com LOG.info("Read completed tasks from history " + completedTasks.size()); } + + public static FSDataInputStream getPreviousJobHistoryFileStream( + Configuration conf, ApplicationAttemptId applicationAttemptId) + throws IOException { + FSDataInputStream in = null; + Path historyFile = null; + String jobName = + TypeConverter.fromYarn(applicationAttemptId.getApplicationId()) + .toString(); + String jobhistoryDir = + JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf); + Path histDirPath = + FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir)); + FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf); + // read the previous history file + historyFile = + fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath, + jobName, (applicationAttemptId.getAttemptId() - 1))); + LOG.info("History file is at " + historyFile); + in = fc.open(historyFile); + return in; + } protected Dispatcher createRecoveryDispatcher() { return new RecoveryDispatcher();