Author: sseth Date: Fri Aug 24 18:56:09 2012 New Revision: 1377047 URL: http://svn.apache.org/viewvc?rev=1377047&view=rev Log: MAPREDUCE-4581. TaskHeartbeatHandler should extend HeartbeatHandlerBase. (Contributed by Tsuyoshi OZAWA)
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902?rev=1377047&view=auto ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 (added) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/CHANGES.txt.MR-3902 Fri Aug 24 18:56:09 2012 @@ -0,0 +1,2 @@ +Branch MR-3902 + MAPREDUCE-4581. TaskHeartbeatHandler should extend HeartbeatHandlerBase (Tsuyoshi OZAWA via sseth) Modified: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java?rev=1377047&r1=1377046&r2=1377047&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java (original) +++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/TaskHeartbeatHandler.java Fri Aug 24 18:56:09 2012 @@ -18,22 +18,14 @@ package org.apache.hadoop.mapreduce.v2.app2; -import java.util.Iterator; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.jobhistory.HeartbeatHandlerBase; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType; -import org.apache.hadoop.yarn.Clock; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.service.AbstractService; /** @@ -42,153 +34,36 @@ import org.apache.hadoop.yarn.service.Ab * not hear from it for a long time. * */ -@SuppressWarnings({"unchecked", "rawtypes"}) -public class TaskHeartbeatHandler extends AbstractService { - - // TODO XXX: Extend HeartbeatHandlerBase - - private static class ReportTime { - private long lastPing; - private long lastProgress; - - public ReportTime(long time) { - setLastProgress(time); - } - - public synchronized void setLastPing(long time) { - lastPing = time; - } - - public synchronized void setLastProgress(long time) { - lastProgress = time; - lastPing = time; - } - - public synchronized long getLastPing() { - return lastPing; - } - - public synchronized long getLastProgress() { - return lastProgress; - } - } - - private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class); - private static final int PING_TIMEOUT = 5 * 60 * 1000; - - //thread which runs periodically to see the last time since a heartbeat is - //received from a task. - private Thread lostTaskCheckerThread; - private volatile boolean stopped; - private int taskTimeOut = 5 * 60 * 1000;// 5 mins - private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds. - - private final EventHandler eventHandler; - private final Clock clock; - private final AppContext context; - - private ConcurrentMap<TaskAttemptId, ReportTime> runningAttempts; +@SuppressWarnings({"unchecked"}) +public class TaskHeartbeatHandler extends HeartbeatHandlerBase<TaskAttemptId> { - public TaskHeartbeatHandler(AppContext context) { - this(context, 16); - } - - public TaskHeartbeatHandler(AppContext context, - int numThreads) { - super("TaskHeartbeatHandler"); - this.eventHandler = context.getEventHandler(); - this.clock = context.getClock(); - this.context = context; - numThreads = numThreads <= 0 ? 1 : numThreads; - runningAttempts = - new ConcurrentHashMap<TaskAttemptId, ReportTime>(16, 0.75f, numThreads); + public TaskHeartbeatHandler(AppContext context, int numThreads) { + super(context, numThreads, "TaskHeartbeatHandler"); } @Override - public void init(Configuration conf) { - super.init(conf); - taskTimeOut = conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000); - taskTimeOutCheckInterval = - conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000); + protected int getConfiguredTimeout(Configuration conf) { + return conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000); } @Override - public void start() { - lostTaskCheckerThread = new Thread(new PingChecker()); - lostTaskCheckerThread.setName("TaskHeartbeatHandler PingChecker"); - lostTaskCheckerThread.start(); - super.start(); + protected int getConfiguredTimeoutCheckInterval(Configuration conf) { + return conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000); } @Override - public void stop() { - stopped = true; - lostTaskCheckerThread.interrupt(); - super.stop(); - } - - public void progressing(TaskAttemptId attemptID) { - //only put for the registered attempts - //TODO throw an exception if the task isn't registered. - ReportTime time = runningAttempts.get(attemptID); - if(time != null) { - time.setLastProgress(clock.getTime()); - } - } - - public void pinged(TaskAttemptId attemptID) { - //only put for the registered attempts - //TODO throw an exception if the task isn't registered. - ReportTime time = runningAttempts.get(attemptID); - if(time != null) { - time.setLastPing(clock.getTime()); - } - } - - public void register(TaskAttemptId attemptID) { - runningAttempts.put(attemptID, new ReportTime(clock.getTime())); + public boolean hasTimedOut( + org.apache.hadoop.mapreduce.jobhistory.HeartbeatHandlerBase.ReportTime report, + long currentTime) { + return (timeOut > 0) && (currentTime > report.getLastPing() + timeOut); } - public void unregister(TaskAttemptId attemptID) { - runningAttempts.remove(attemptID); - } - - private class PingChecker implements Runnable { - - @Override - public void run() { - while (!stopped && !Thread.currentThread().isInterrupted()) { - Iterator<Map.Entry<TaskAttemptId, ReportTime>> iterator = - runningAttempts.entrySet().iterator(); - - // avoid calculating current time everytime in loop - long currentTime = clock.getTime(); - - while (iterator.hasNext()) { - Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next(); - boolean taskTimedOut = (taskTimeOut > 0) && - (currentTime > (entry.getValue().getLastProgress() + taskTimeOut)); - boolean pingTimedOut = - (currentTime > (entry.getValue().getLastPing() + PING_TIMEOUT)); - - if(taskTimedOut || pingTimedOut) { - // task is lost, remove from the list and raise lost event - iterator.remove(); - eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry - .getKey(), "AttemptID:" + entry.getKey().toString() - + " Timed out after " + taskTimeOut / 1000 + " secs")); - eventHandler.handle(new TaskAttemptEvent(entry.getKey(), - TaskAttemptEventType.TA_TIMED_OUT)); - } - } - try { - Thread.sleep(taskTimeOutCheckInterval); - } catch (InterruptedException e) { - LOG.info("TaskHeartbeatHandler thread interrupted"); - break; - } - } - } + @Override + public void handleTimeOut(TaskAttemptId attemptId) { + eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId, + "AttemptID:" + attemptId.toString() + + " Timed out after " + timeOut / 1000 + " secs")); + eventHandler.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_TIMED_OUT)); } - -} +} \ No newline at end of file