Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1429114&r1=1429113&r2=1429114&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Fri Jan 4 20:35:56 2013 @@ -253,7 +253,26 @@ public class MRApps extends Apps { return jobFile.toString(); } - + public static Path getEndJobCommitSuccessFile(Configuration conf, String user, + JobId jobId) { + Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user), + jobId.toString() + Path.SEPARATOR + "COMMIT_SUCCESS"); + return endCommitFile; + } + + public static Path getEndJobCommitFailureFile(Configuration conf, String user, + JobId jobId) { + Path endCommitFile = new Path(MRApps.getStagingAreaDir(conf, user), + jobId.toString() + Path.SEPARATOR + "COMMIT_FAIL"); + return endCommitFile; + } + + public static Path getStartJobCommitFile(Configuration conf, String user, + JobId jobId) { + Path startCommitFile = new Path(MRApps.getStagingAreaDir(conf, user), + jobId.toString() + Path.SEPARATOR + "COMMIT_STARTED"); + return startCommitFile; + } private static long[] parseTimeStamps(String[] strs) { if (null == strs) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java?rev=1429114&r1=1429113&r2=1429114&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java Fri Jan 4 20:35:56 2013 @@ -52,6 +52,14 @@ import org.apache.hadoop.classification. * Discard the task commit. * </li> * </ol> + * The methods in this class can be called from several different processes and + * from several different contexts. It is important to know which process and + * which context each is called from. Each method should be marked accordingly + * in its documentation. It is also important to note that not all methods are + * guaranteed to be called once and only once. If a method is not guaranteed to + * have this property the output committer needs to handle this appropriately. + * Also note it will only be in rare situations where they may be called + * multiple times for the same task. * * @see FileOutputCommitter * @see JobContext @@ -62,7 +70,9 @@ import org.apache.hadoop.classification. public abstract class OutputCommitter extends org.apache.hadoop.mapreduce.OutputCommitter { /** - * For the framework to setup the job output during initialization + * For the framework to setup the job output during initialization. This is + * called from the application master process for the entire job. This will be + * called multiple times, once per job attempt. * * @param jobContext Context of the job whose output is being written. * @throws IOException if temporary output could not be created @@ -70,7 +80,9 @@ public abstract class OutputCommitter public abstract void setupJob(JobContext jobContext) throws IOException; /** - * For cleaning up the job's output after job completion + * For cleaning up the job's output after job completion. This is called + * from the application master process for the entire job. This may be called + * multiple times. * * @param jobContext Context of the job whose output is being written. * @throws IOException @@ -82,7 +94,10 @@ public abstract class OutputCommitter /** * For committing job's output after successful job completion. Note that this - * is invoked for jobs with final runstate as SUCCESSFUL. + * is invoked for jobs with final runstate as SUCCESSFUL. This is called + * from the application master process for the entire job. This is guaranteed + * to only be called once. If it throws an exception the entire job will + * fail. * * @param jobContext Context of the job whose output is being written. * @throws IOException @@ -94,7 +109,8 @@ public abstract class OutputCommitter /** * For aborting an unsuccessful job's output. Note that this is invoked for * jobs with final runstate as {@link JobStatus#FAILED} or - * {@link JobStatus#KILLED} + * {@link JobStatus#KILLED}. This is called from the application + * master process for the entire job. This may be called multiple times. * * @param jobContext Context of the job whose output is being written. * @param status final runstate of the job @@ -106,7 +122,10 @@ public abstract class OutputCommitter } /** - * Sets up output for the task. + * Sets up output for the task. This is called from each individual task's + * process that will output to HDFS, and it is called just for that task. This + * may be called multiple times for the same task, but for different task + * attempts. * * @param taskContext Context of the task whose output is being written. * @throws IOException @@ -115,7 +134,9 @@ public abstract class OutputCommitter throws IOException; /** - * Check whether task needs a commit + * Check whether task needs a commit. This is called from each individual + * task's process that will output to HDFS, and it is called just for that + * task. * * @param taskContext * @return true/false @@ -125,9 +146,16 @@ public abstract class OutputCommitter throws IOException; /** - * To promote the task's temporary output to final output location - * - * The task's output is moved to the job's output directory. + * To promote the task's temporary output to final output location. + * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this + * task is the task that the AM determines finished first, this method + * is called to commit an individual task's output. This is to mark + * that tasks output as complete, as {@link #commitJob(JobContext)} will + * also be called later on if the entire job finished successfully. This + * is called from a task's process. This may be called multiple times for the + * same task, but different task attempts. It should be very rare for this to + * be called multiple times and requires odd networking failures to make this + * happen. In the future the Hadoop framework may eliminate this race. * * @param taskContext Context of the task whose output is being written. * @throws IOException if commit is not @@ -136,7 +164,9 @@ public abstract class OutputCommitter throws IOException; /** - * Discard the task output + * Discard the task output. This is called from a task's process to clean + * up a single task's output that can not yet been committed. This may be + * called multiple times for the same task, but for different task attempts. * * @param taskContext * @throws IOException @@ -160,7 +190,8 @@ public abstract class OutputCommitter * The retry-count for the job will be passed via the * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in * {@link TaskAttemptContext#getConfiguration()} for the - * <code>OutputCommitter</code>. + * <code>OutputCommitter</code>. This is called from the application master + * process, but it is called individually for each task. * * If an exception is thrown the task will be attempted again. * Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=1429114&r1=1429113&r2=1429114&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java Fri Jan 4 20:35:56 2013 @@ -54,7 +54,11 @@ import org.apache.hadoop.classification. * The methods in this class can be called from several different processes and * from several different contexts. It is important to know which process and * which context each is called from. Each method should be marked accordingly - * in its documentation. + * in its documentation. It is also important to note that not all methods are + * guaranteed to be called once and only once. If a method is not guaranteed to + * have this property the output committer needs to handle this appropriately. + * Also note it will only be in rare situations where they may be called + * multiple times for the same task. * * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter * @see JobContext @@ -65,7 +69,8 @@ import org.apache.hadoop.classification. public abstract class OutputCommitter { /** * For the framework to setup the job output during initialization. This is - * called from the application master process for the entire job. + * called from the application master process for the entire job. This will be + * called multiple times, once per job attempt. * * @param jobContext Context of the job whose output is being written. * @throws IOException if temporary output could not be created @@ -74,7 +79,8 @@ public abstract class OutputCommitter { /** * For cleaning up the job's output after job completion. This is called - * from the application master process for the entire job. + * from the application master process for the entire job. This may be called + * multiple times. * * @param jobContext Context of the job whose output is being written. * @throws IOException @@ -87,7 +93,9 @@ public abstract class OutputCommitter { /** * For committing job's output after successful job completion. Note that this * is invoked for jobs with final runstate as SUCCESSFUL. This is called - * from the application master process for the entire job. + * from the application master process for the entire job. This is guaranteed + * to only be called once. If it throws an exception the entire job will + * fail. * * @param jobContext Context of the job whose output is being written. * @throws IOException @@ -101,7 +109,7 @@ public abstract class OutputCommitter { * For aborting an unsuccessful job's output. Note that this is invoked for * jobs with final runstate as {@link JobStatus.State#FAILED} or * {@link JobStatus.State#KILLED}. This is called from the application - * master process for the entire job. + * master process for the entire job. This may be called multiple times. * * @param jobContext Context of the job whose output is being written. * @param state final runstate of the job @@ -114,7 +122,9 @@ public abstract class OutputCommitter { /** * Sets up output for the task. This is called from each individual task's - * process that will output to HDFS, and it is called just for that task. + * process that will output to HDFS, and it is called just for that task. This + * may be called multiple times for the same task, but for different task + * attempts. * * @param taskContext Context of the task whose output is being written. * @throws IOException @@ -141,7 +151,10 @@ public abstract class OutputCommitter { * is called to commit an individual task's output. This is to mark * that tasks output as complete, as {@link #commitJob(JobContext)} will * also be called later on if the entire job finished successfully. This - * is called from a task's process. + * is called from a task's process. This may be called multiple times for the + * same task, but different task attempts. It should be very rare for this to + * be called multiple times and requires odd networking failures to make this + * happen. In the future the Hadoop framework may eliminate this race. * * @param taskContext Context of the task whose output is being written. * @throws IOException if commit is not successful. @@ -151,7 +164,8 @@ public abstract class OutputCommitter { /** * Discard the task output. This is called from a task's process to clean - * up a single task's output that can not yet been committed. + * up a single task's output that can not yet been committed. This may be + * called multiple times for the same task, but for different task attempts. * * @param taskContext * @throws IOException @@ -184,6 +198,9 @@ public abstract class OutputCommitter { * * If an exception is thrown the task will be attempted again. * + * This may be called multiple times for the same task. But from different + * application attempts. + * * @param taskContext Context of the task whose output is being recovered * @throws IOException */ Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEventHandler.java?rev=1429114&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEventHandler.java (added) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEventHandler.java Fri Jan 4 20:35:56 2013 @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.jobhistory; + +import java.io.IOException; + +public interface HistoryEventHandler { + + void handleEvent(HistoryEvent event) throws IOException; + +} Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1429114&r1=1429113&r2=1429114&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Fri Jan 4 20:35:56 2013 @@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.api.record */ @InterfaceAudience.Private @InterfaceStability.Unstable -public class JobHistoryParser { +public class JobHistoryParser implements HistoryEventHandler { private static final Log LOG = LogFactory.getLog(JobHistoryParser.class); @@ -94,6 +94,34 @@ public class JobHistoryParser { this.in = in; } + public synchronized void parse(HistoryEventHandler handler) + throws IOException { + parse(new EventReader(in), handler); + } + + /** + * Only used for unit tests. + */ + @Private + public synchronized void parse(EventReader reader, HistoryEventHandler handler) + throws IOException { + int eventCtr = 0; + HistoryEvent event; + try { + while ((event = reader.getNextEvent()) != null) { + handler.handleEvent(event); + ++eventCtr; + } + } catch (IOException ioe) { + LOG.info("Caught exception parsing history file after " + eventCtr + + " events", ioe); + parseException = ioe; + } finally { + in.close(); + } + } + + /** * Parse the entire history file and populate the JobInfo object * The first invocation will populate the object, subsequent calls @@ -122,21 +150,7 @@ public class JobHistoryParser { } info = new JobInfo(); - - int eventCtr = 0; - HistoryEvent event; - try { - while ((event = reader.getNextEvent()) != null) { - handleEvent(event); - ++eventCtr; - } - } catch (IOException ioe) { - LOG.info("Caught exception parsing history file after " + eventCtr + - " events", ioe); - parseException = ioe; - } finally { - in.close(); - } + parse(reader, this); return info; } @@ -150,7 +164,8 @@ public class JobHistoryParser { return parseException; } - private void handleEvent(HistoryEvent event) { + @Override + public void handleEvent(HistoryEvent event) { EventType type = event.getEventType(); switch (type) { Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java?rev=1429114&r1=1429113&r2=1429114&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java Fri Jan 4 20:35:56 2013 @@ -667,6 +667,9 @@ public class HistoryFileManager extends } }); } + } else if (old != null && !old.isMovePending()) { + //This is a duplicate so just delete it + fileInfo.delete(); } } }