MAPREDUCE-7022. Fast fail rogue jobs based on task scratch dir size. Contributed by Johan Gustavsson
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a37e7f0a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a37e7f0a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a37e7f0a Branch: refs/heads/YARN-1011 Commit: a37e7f0ad8b68c7ed16c242bedf62f4cde48d6fd Parents: 1b0f265 Author: Jason Lowe <jl...@apache.org> Authored: Fri Jan 26 14:36:45 2018 -0600 Committer: Jason Lowe <jl...@apache.org> Committed: Fri Jan 26 14:36:45 2018 -0600 ---------------------------------------------------------------------- .../hadoop/mapred/LocalContainerLauncher.java | 2 +- .../hadoop/mapred/TaskAttemptListenerImpl.java | 7 +- .../org/apache/hadoop/mapred/YarnChild.java | 4 +- .../v2/app/job/event/TaskAttemptFailEvent.java | 53 ++++++++++++ .../app/job/event/TaskTAttemptFailedEvent.java | 39 +++++++++ .../v2/app/job/impl/TaskAttemptImpl.java | 40 ++++++--- .../mapreduce/v2/app/job/impl/TaskImpl.java | 6 +- .../hadoop/mapreduce/v2/app/TestFail.java | 7 +- .../hadoop/mapreduce/v2/app/TestRecovery.java | 7 +- .../mapreduce/v2/app/job/impl/TestJobImpl.java | 5 +- .../v2/app/job/impl/TestTaskAttempt.java | 9 +- .../mapreduce/v2/app/job/impl/TestTaskImpl.java | 42 ++++----- .../apache/hadoop/mapred/LocalJobRunner.java | 4 +- .../java/org/apache/hadoop/mapred/MapTask.java | 3 +- .../java/org/apache/hadoop/mapred/Task.java | 87 ++++++++++++++++++- .../hadoop/mapred/TaskUmbilicalProtocol.java | 12 ++- .../apache/hadoop/mapreduce/MRJobConfig.java | 14 +++ .../src/main/resources/mapred-default.xml | 22 +++++ .../hadoop/mapred/TestTaskProgressReporter.java | 90 +++++++++++++++++++- .../mapreduce/v2/hs/TestJobHistoryParsing.java | 9 +- .../apache/hadoop/mapred/TestMapProgress.java | 4 +- .../apache/hadoop/mapred/TestTaskCommit.java | 2 +- 22 files changed, 397 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java index 6f9cc34..fed500a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java @@ -510,7 +510,7 @@ public class LocalContainerLauncher extends AbstractService implements String cause = (tCause == null) ? throwable.getMessage() : StringUtils .stringifyException(tCause); - umbilical.fatalError(classicAttemptID, cause); + umbilical.fatalError(classicAttemptID, cause, false); } throw new RuntimeException(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 556c90c..b155af22 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; @@ -281,7 +282,7 @@ public class TaskAttemptListenerImpl extends CompositeService } @Override - public void fatalError(TaskAttemptID taskAttemptID, String msg) + public void fatalError(TaskAttemptID taskAttemptID, String msg, boolean fastFail) throws IOException { // This happens only in Child and in the Task. LOG.error("Task: " + taskAttemptID + " - exited : " + msg); @@ -294,7 +295,7 @@ public class TaskAttemptListenerImpl extends CompositeService preemptionPolicy.handleFailedContainer(attemptID); context.getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID, fastFail)); } @Override @@ -312,7 +313,7 @@ public class TaskAttemptListenerImpl extends CompositeService preemptionPolicy.handleFailedContainer(attemptID); context.getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java index 7ae7a1e..bd40e54 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/YarnChild.java @@ -206,7 +206,7 @@ class YarnChild { if (taskid != null) { if (!ShutdownHookManager.get().isShutdownInProgress()) { umbilical.fatalError(taskid, - StringUtils.stringifyException(exception)); + StringUtils.stringifyException(exception), false); } } } catch (Throwable throwable) { @@ -218,7 +218,7 @@ class YarnChild { String cause = tCause == null ? throwable.getMessage() : StringUtils .stringifyException(tCause); - umbilical.fatalError(taskid, cause); + umbilical.fatalError(taskid, cause, false); } } } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java new file mode 100644 index 0000000..6ea1d15 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptFailEvent.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.job.event; + +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; + +public class TaskAttemptFailEvent extends TaskAttemptEvent { + private boolean fastFail; + + /** + * Create a new TaskAttemptFailEvent, with task fastFail disabled. + * + * @param id the id of the task attempt + */ + public TaskAttemptFailEvent(TaskAttemptId id) { + this(id, false); + } + + /** + * Create a new TaskAttemptFailEvent. + * + * @param id the id of the task attempt + * @param fastFail should the task fastFail or not. + */ + public TaskAttemptFailEvent(TaskAttemptId id, boolean fastFail) { + super(id, TaskAttemptEventType.TA_FAILMSG); + this.fastFail = fastFail; + } + + /** + * Check if task should fast fail or retry + * @return boolean value where true indicates the task should not retry + */ + public boolean isFastFail() { + return fastFail; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java new file mode 100644 index 0000000..30392ac --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptFailedEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.v2.app.job.event; + +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; + +public class TaskTAttemptFailedEvent extends TaskTAttemptEvent { + + private boolean fastFail; + + public TaskTAttemptFailedEvent(TaskAttemptId id) { + this(id, false); + } + + public TaskTAttemptFailedEvent(TaskAttemptId id, boolean fastFail) { + super(id, TaskEventType.T_ATTEMPT_FAILED); + this.fastFail = fastFail; + } + + public boolean isFastFail() { + return fastFail; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 431128b..6632f27 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -94,6 +94,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunched import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; @@ -101,6 +102,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; @@ -194,6 +196,7 @@ public abstract class TaskAttemptImpl implements private Locality locality; private Avataar avataar; private boolean rescheduleNextAttempt = false; + private boolean failFast = false; private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition(); @@ -1412,6 +1415,14 @@ public abstract class TaskAttemptImpl implements public void setAvataar(Avataar avataar) { this.avataar = avataar; } + + public void setTaskFailFast(boolean failFast) { + this.failFast = failFast; + } + + public boolean isTaskFailFast() { + return failFast; + } @SuppressWarnings("unchecked") public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo, @@ -1921,9 +1932,12 @@ public abstract class TaskAttemptImpl implements switch(finalState) { case FAILED: - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, - TaskEventType.T_ATTEMPT_FAILED)); + boolean fastFail = false; + if (event instanceof TaskAttemptFailEvent) { + fastFail = ((TaskAttemptFailEvent) event).isFastFail(); + } + taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent( + taskAttempt.attemptId, fastFail)); break; case KILLED: taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent( @@ -2041,13 +2055,16 @@ public abstract class TaskAttemptImpl implements private static class FailedTransition implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> { + + @SuppressWarnings("unchecked") @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { // set the finish time taskAttempt.setFinishTime(); - notifyTaskAttemptFailed(taskAttempt); + + notifyTaskAttemptFailed(taskAttempt, taskAttempt.isTaskFailFast()); } } @@ -2154,8 +2171,8 @@ public abstract class TaskAttemptImpl implements LOG.debug("Not generating HistoryFinish event since start event not " + "generated for taskAttempt: " + taskAttempt.getID()); } - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); + taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent( + taskAttempt.attemptId)); } } @@ -2332,6 +2349,8 @@ public abstract class TaskAttemptImpl implements if (event instanceof TaskAttemptKillEvent) { taskAttempt.setRescheduleNextAttempt( ((TaskAttemptKillEvent)event).getRescheduleAttempt()); + } else if (event instanceof TaskAttemptFailEvent) { + taskAttempt.setTaskFailFast(((TaskAttemptFailEvent)event).isFastFail()); } } } @@ -2400,12 +2419,13 @@ public abstract class TaskAttemptImpl implements // register it to finishing state taskAttempt.appContext.getTaskAttemptFinishingMonitor().register( taskAttempt.attemptId); - notifyTaskAttemptFailed(taskAttempt); + notifyTaskAttemptFailed(taskAttempt, false); } } @SuppressWarnings("unchecked") - private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) { + private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt, + boolean fastFail) { if (taskAttempt.getLaunchTime() == 0) { sendJHStartEventForAssignedFailTask(taskAttempt); } @@ -2419,8 +2439,8 @@ public abstract class TaskAttemptImpl implements taskAttempt.eventHandler.handle(new JobHistoryEvent( taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); + taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent( + taskAttempt.attemptId, fastFail)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 086d4d5..ce3b3cc 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -74,6 +74,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; @@ -1054,7 +1055,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> { @Override public TaskStateInternal transition(TaskImpl task, TaskEvent event) { - TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; + TaskTAttemptFailedEvent castEvent = (TaskTAttemptFailedEvent) event; TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID(); task.failedAttempts.add(taskAttemptId); if (taskAttemptId.equals(task.commitAttempt)) { @@ -1068,7 +1069,8 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> { } task.finishedAttempts.add(taskAttemptId); - if (task.failedAttempts.size() < task.maxAttempts) { + if (!castEvent.isFastFail() + && task.failedAttempts.size() < task.maxAttempts) { task.handleTaskAttemptCompletion( taskAttemptId, TaskAttemptCompletionEventStatus.FAILED); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java index 4d3f6f4..a2f0aba 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java @@ -23,6 +23,7 @@ import java.net.InetSocketAddress; import java.util.Iterator; import java.util.Map; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -288,8 +289,7 @@ public class TestFail { if (attemptID.getTaskId().getId() == 0) {//check if it is first task // send the Fail event getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, - TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, @@ -310,8 +310,7 @@ public class TestFail { //check if it is first task's first attempt // send the Fail event getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, - TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 893c4a0..b2807c1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -38,6 +38,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -167,9 +169,8 @@ public class TestRecovery { /////////// Play some games with the TaskAttempts of the first task ////// //send the fail signal to the 1st map task attempt app.getContext().getEventHandler().handle( - new TaskAttemptEvent( - task1Attempt1.getID(), - TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent( + task1Attempt1.getID())); app.waitForState(task1Attempt1, TaskAttemptState.FAILED); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java index 1827ce4..8592b20 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java @@ -81,7 +81,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; -import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler; @@ -437,8 +437,7 @@ public class TestJobImpl { TaskImpl task = (TaskImpl) t; task.handle(new TaskEvent(task.getID(), TaskEventType.T_SCHEDULE)); for(TaskAttempt ta: task.getAttempts().values()) { - task.handle(new TaskTAttemptEvent(ta.getID(), - TaskEventType.T_ATTEMPT_FAILED)); + task.handle(new TaskTAttemptFailedEvent(ta.getID())); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index fe5d95d..43571a9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; @@ -499,7 +500,7 @@ public class TestTaskAttempt{ new TaskAttemptDiagnosticsUpdateEvent(attemptID, "Test Diagnostic Event")); getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } protected EventHandler<JobHistoryEvent> createJobHistoryHandler( @@ -1357,8 +1358,7 @@ public class TestTaskAttempt{ MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); - taImpl.handle(new TaskAttemptEvent(taImpl.getID(), - TaskAttemptEventType.TA_FAILMSG)); + taImpl.handle(new TaskAttemptFailEvent(taImpl.getID())); assertEquals("Task attempt is not in FAILED state", taImpl.getState(), TaskAttemptState.FAILED); @@ -1484,8 +1484,7 @@ public class TestTaskAttempt{ MockEventHandler eventHandler = new MockEventHandler(); TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); - taImpl.handle(new TaskAttemptEvent(taImpl.getID(), - TaskAttemptEventType.TA_FAILMSG)); + taImpl.handle(new TaskAttemptFailEvent(taImpl.getID())); assertEquals("Task attempt is not in RUNNING state", taImpl.getState(), TaskAttemptState.FAILED); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index 62d4cc0..1225c43 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -53,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptFailedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.security.Credentials; @@ -345,8 +346,7 @@ public class TestTaskImpl { } private void failRunningTaskAttempt(TaskAttemptId attemptId) { - mockTask.handle(new TaskTAttemptEvent(attemptId, - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent(attemptId)); assertTaskRunningState(); } @@ -612,11 +612,16 @@ public class TestTaskImpl { // The task should now have succeeded assertTaskSucceededState(); - + // Now complete the first task attempt, after the second has succeeded - mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), - firstAttemptFinishEvent)); - + if (firstAttemptFinishEvent.equals(TaskEventType.T_ATTEMPT_FAILED)) { + mockTask.handle(new TaskTAttemptFailedEvent(taskAttempts + .get(0).getAttemptId())); + } else { + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), + firstAttemptFinishEvent)); + } + // The task should still be in the succeeded state assertTaskSucceededState(); @@ -668,8 +673,8 @@ public class TestTaskImpl { assertEquals(2, taskAttempts.size()); // speculative attempt retroactively fails from fetch failures - mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent( + taskAttempts.get(1).getAttemptId())); assertTaskScheduledState(); assertEquals(3, taskAttempts.size()); @@ -683,8 +688,8 @@ public class TestTaskImpl { assertEquals(2, taskAttempts.size()); // speculative attempt retroactively fails from fetch failures - mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent( + taskAttempts.get(1).getAttemptId())); assertTaskScheduledState(); assertEquals(3, taskAttempts.size()); @@ -698,8 +703,8 @@ public class TestTaskImpl { assertEquals(2, taskAttempts.size()); // speculative attempt retroactively fails from fetch failures - mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent( + taskAttempts.get(1).getAttemptId())); assertTaskScheduledState(); assertEquals(3, taskAttempts.size()); @@ -734,8 +739,8 @@ public class TestTaskImpl { // have the first attempt fail, verify task failed due to no retries MockTaskAttemptImpl taskAttempt = taskAttempts.get(0); taskAttempt.setState(TaskAttemptState.FAILED); - mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent( + taskAttempt.getAttemptId())); assertEquals(TaskState.FAILED, mockTask.getState()); // verify task can no longer be killed @@ -757,8 +762,7 @@ public class TestTaskImpl { TaskEventType.T_ATTEMPT_COMMIT_PENDING)); assertEquals(TaskState.FAILED, mockTask.getState()); taskAttempt.setState(TaskAttemptState.FAILED); - mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt.getAttemptId())); assertEquals(TaskState.FAILED, mockTask.getState()); taskAttempt = taskAttempts.get(2); taskAttempt.setState(TaskAttemptState.SUCCEEDED); @@ -808,8 +812,7 @@ public class TestTaskImpl { // max attempts is 4 MockTaskAttemptImpl taskAttempt = taskAttempts.get(0); taskAttempt.setState(TaskAttemptState.FAILED); - mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt.getAttemptId())); assertEquals(TaskState.RUNNING, mockTask.getState()); // verify a new attempt(#3) added because the speculative attempt(#2) @@ -829,8 +832,7 @@ public class TestTaskImpl { // hasn't reach the max attempts which is 4 MockTaskAttemptImpl taskAttempt1 = taskAttempts.get(1); taskAttempt1.setState(TaskAttemptState.FAILED); - mockTask.handle(new TaskTAttemptEvent(taskAttempt1.getAttemptId(), - TaskEventType.T_ATTEMPT_FAILED)); + mockTask.handle(new TaskTAttemptFailedEvent(taskAttempt1.getAttemptId())); assertEquals(TaskState.RUNNING, mockTask.getState()); // verify there's no new attempt added because of the running attempt(#3) http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index c9dff6a..5e7a250 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -729,9 +729,9 @@ public class LocalJobRunner implements ClientProtocol { LOG.error("shuffleError: "+ message + "from task: " + taskId); } - public synchronized void fatalError(TaskAttemptID taskId, String msg) + public synchronized void fatalError(TaskAttemptID taskId, String msg, boolean fastFail) throws IOException { - LOG.error("Fatal: "+ msg + "from task: " + taskId); + LOG.error("Fatal: "+ msg + " from task: " + taskId + " fast fail: " + fastFail); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java index 27c8976..ab7cba5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java @@ -1568,7 +1568,8 @@ public class MapTask extends Task { if (lspillException instanceof Error) { final String logMsg = "Task " + getTaskID() + " failed : " + StringUtils.stringifyException(lspillException); - mapTask.reportFatalError(getTaskID(), lspillException, logMsg); + mapTask.reportFatalError(getTaskID(), lspillException, logMsg, + false); } throw new IOException("Spill failed", lspillException); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 730f4ee..87c9e16 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -40,6 +40,7 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; @@ -354,7 +355,7 @@ abstract public class Task implements Writable, Configurable { * Report a fatal error to the parent (task) tracker. */ protected void reportFatalError(TaskAttemptID id, Throwable throwable, - String logMsg) { + String logMsg, boolean fastFail) { LOG.error(logMsg); if (ShutdownHookManager.get().isShutdownInProgress()) { @@ -366,7 +367,7 @@ abstract public class Task implements Writable, Configurable { ? StringUtils.stringifyException(throwable) : StringUtils.stringifyException(tCause); try { - umbilical.fatalError(id, cause); + umbilical.fatalError(id, cause, fastFail); } catch (IOException ioe) { LOG.error("Failed to contact the tasktracker", ioe); System.exit(-1); @@ -652,6 +653,8 @@ abstract public class Task implements Writable, Configurable { private Thread pingThread = null; private boolean done = true; private Object lock = new Object(); + private volatile String diskLimitCheckStatus = null; + private Thread diskLimitCheckThread = null; /** * flag that indicates whether progress update needs to be sent to parent. @@ -749,6 +752,65 @@ abstract public class Task implements Writable, Configurable { } /** + * disk limit checker, runs in separate thread when activated. + */ + public class DiskLimitCheck implements Runnable { + private LocalFileSystem localFS; + private long fsLimit; + private long checkInterval; + private String[] localDirs; + private boolean killOnLimitExceeded; + + public DiskLimitCheck(JobConf conf) throws IOException { + this.localFS = FileSystem.getLocal(conf); + this.fsLimit = conf.getLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, + MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES); + this.localDirs = conf.getLocalDirs(); + this.checkInterval = conf.getLong( + MRJobConfig.JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS, + MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS); + this.killOnLimitExceeded = conf.getBoolean( + MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED, + MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED); + } + + @Override + public void run() { + while (!taskDone.get()) { + try { + long localWritesSize = 0L; + String largestWorkDir = null; + for (String local : localDirs) { + long size = FileUtil.getDU(localFS.pathToFile(new Path(local))); + if (localWritesSize < size) { + localWritesSize = size; + largestWorkDir = local; + } + } + if (localWritesSize > fsLimit) { + String localStatus = + "too much data in local scratch dir=" + + largestWorkDir + + ". current size is " + + localWritesSize + + " the limit is " + fsLimit; + if (killOnLimitExceeded) { + LOG.error(localStatus); + diskLimitCheckStatus = localStatus; + } else { + LOG.warn(localStatus); + } + break; + } + Thread.sleep(checkInterval); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + } + } + } + + /** * check the counters to see whether the task has exceeded any configured * limits. * @throws TaskLimitException @@ -773,6 +835,9 @@ abstract public class Task implements Writable, Configurable { " the limit is " + limit); } } + if (diskLimitCheckStatus != null) { + throw new TaskLimitException(diskLimitCheckStatus); + } } /** @@ -851,7 +916,7 @@ abstract public class Task implements Writable, Configurable { StringUtils.stringifyException(e); LOG.error(errMsg); try { - umbilical.fatalError(taskId, errMsg); + umbilical.fatalError(taskId, errMsg, true); } catch (IOException ioe) { LOG.error("Failed to update failure diagnosis", ioe); } @@ -884,6 +949,22 @@ abstract public class Task implements Writable, Configurable { pingThread.setDaemon(true); pingThread.start(); } + startDiskLimitCheckerThreadIfNeeded(); + } + public void startDiskLimitCheckerThreadIfNeeded() { + if (diskLimitCheckThread == null && conf.getLong( + MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, + MRJobConfig.DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES) >= 0) { + try { + diskLimitCheckThread = new Thread(new DiskLimitCheck(conf), + "disk limit check thread"); + diskLimitCheckThread.setDaemon(true); + diskLimitCheckThread.start(); + } catch (IOException e) { + LOG.error("Issues starting disk monitor thread: " + + e.getMessage(), e); + } + } } public void stopCommunicationThread() throws InterruptedException { if (pingThread != null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java index c3678d6..041ab39 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java @@ -68,9 +68,10 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol { * Version 18 Added numRequiredSlots to TaskStatus for MAPREDUCE-516 * Version 19 Added fatalError for child to communicate fatal errors to TT * Version 20 Added methods to manage checkpoints + * Version 21 Added fastFail parameter to fatalError * */ - public static final long versionID = 20L; + public static final long versionID = 21L; /** * Called when a child task process starts, to get its task. @@ -140,8 +141,13 @@ public interface TaskUmbilicalProtocol extends VersionedProtocol { /** Report that the task encounted a local filesystem error.*/ void fsError(TaskAttemptID taskId, String message) throws IOException; - /** Report that the task encounted a fatal error.*/ - void fatalError(TaskAttemptID taskId, String message) throws IOException; + /** + * Report that the task encounted a fatal error. + * @param taskId task's id + * @param message fail message + * @param fastFail flag to enable fast fail for task + */ + void fatalError(TaskAttemptID taskId, String message, boolean fastFail) throws IOException; /** Called by a reduce task to get the map output locations for finished maps. * Returns an update centered around the map-task-completion-events. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 6acf1bc..ca18bfe 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -52,6 +52,20 @@ public interface MRJobConfig { public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed"; + public static final String JOB_SINGLE_DISK_LIMIT_BYTES = + "mapreduce.job.local-fs.single-disk-limit.bytes"; + // negative values disable the limit + public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_BYTES = -1; + + public static final String JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED = + "mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed"; + // setting to false only logs the kill + public static final boolean DEFAULT_JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED = true; + + public static final String JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS = + "mapreduce.job.local-fs.single-disk-limit.check.interval-ms"; + public static final long DEFAULT_JOB_SINGLE_DISK_LIMIT_CHECK_INTERVAL_MS = 5000; + public static final String TASK_LOCAL_WRITE_LIMIT_BYTES = "mapreduce.task.local-fs.write-limit.bytes"; // negative values disable the limit http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 62f3dfa..72f509c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -63,6 +63,28 @@ </property> <property> + <name>mapreduce.job.local-fs.single-disk-limit.bytes</name> + <value>-1</value> + <description>Enable an in task monitor thread to watch for single disk + consumption by jobs. By setting this to x nr of bytes, the task will fast + fail in case it is reached. This is a per disk configuration.</description> +</property> + +<property> + <name>mapreduce.job.local-fs.single-disk-limit.check.interval-ms</name> + <value>5000</value> + <description>Interval of disk limit check to run in ms.</description> +</property> + +<property> + <name>mapreduce.job.local-fs.single-disk-limit.check.kill-limit-exceed</name> + <value>true</value> + <description>If mapreduce.job.local-fs.single-disk-limit.bytes is triggered + should the task be killed or logged. If false the intent to kill the task + is only logged in the container logs.</description> +</property> + +<property> <name>mapreduce.job.maps</name> <value>2</value> <description>The default number of map tasks per job. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java index 18442d6..e5ff64e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java @@ -18,15 +18,19 @@ package org.apache.hadoop.mapred; +import java.io.File; import java.io.IOException; import java.util.Random; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.mapred.SortedRanges.Range; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; import org.apache.hadoop.util.ExitUtil; @@ -43,6 +47,11 @@ public class TestTaskProgressReporter { private FakeUmbilical fakeUmbilical = new FakeUmbilical(); + private static final String TEST_DIR = + System.getProperty("test.build.data", + System.getProperty("java.io.tmpdir")) + "/" + + TestTaskProgressReporter.class.getName(); + private static class DummyTask extends Task { @Override public void run(JobConf job, TaskUmbilicalProtocol umbilical) @@ -53,6 +62,11 @@ public class TestTaskProgressReporter { public boolean isMapTask() { return true; } + + @Override + public boolean isCommitRequired() { + return false; + } } private static class FakeUmbilical implements TaskUmbilicalProtocol { @@ -118,7 +132,7 @@ public class TestTaskProgressReporter { } @Override - public void fatalError(TaskAttemptID taskId, String message) + public void fatalError(TaskAttemptID taskId, String message, boolean fastFail) throws IOException { } @@ -163,6 +177,78 @@ public class TestTaskProgressReporter { } } + @Test(timeout=60000) + public void testScratchDirSize() throws Exception { + String tmpPath = TEST_DIR + "/testBytesWrittenLimit-tmpFile-" + + new Random(System.currentTimeMillis()).nextInt(); + File data = new File(tmpPath + "/out"); + File testDir = new File(tmpPath); + testDir.mkdirs(); + testDir.deleteOnExit(); + JobConf conf = new JobConf(); + conf.setStrings(MRConfig.LOCAL_DIR, "file://" + tmpPath); + conf.setLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, 1024L); + conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED, + true); + getBaseConfAndWriteToFile(-1, data); + testScratchDirLimit(false, conf); + data.delete(); + getBaseConfAndWriteToFile(100, data); + testScratchDirLimit(false, conf); + data.delete(); + getBaseConfAndWriteToFile(1536, data); + testScratchDirLimit(true, conf); + conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED, + false); + testScratchDirLimit(false, conf); + conf.setBoolean(MRJobConfig.JOB_SINGLE_DISK_LIMIT_KILL_LIMIT_EXCEED, + true); + conf.setLong(MRJobConfig.JOB_SINGLE_DISK_LIMIT_BYTES, -1L); + testScratchDirLimit(false, conf); + data.delete(); + FileUtil.fullyDelete(testDir); + } + + private void getBaseConfAndWriteToFile(int size, File data) + throws IOException { + if (size > 0) { + byte[] b = new byte[size]; + for (int i = 0; i < size; i++) { + b[i] = 1; + } + FileUtils.writeByteArrayToFile(data, b); + } + } + + public void testScratchDirLimit(boolean fastFail, JobConf conf) + throws Exception { + ExitUtil.disableSystemExit(); + threadExited = false; + Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread th, Throwable ex) { + if (ex instanceof ExitUtil.ExitException) { + threadExited = true; + th.interrupt(); + } + } + }; + Task task = new DummyTask(); + task.setConf(conf); + DummyTaskReporter reporter = new DummyTaskReporter(task); + reporter.startDiskLimitCheckerThreadIfNeeded(); + Thread t = new Thread(reporter); + t.setUncaughtExceptionHandler(h); + reporter.setProgressFlag(); + t.start(); + while (!reporter.taskLimitIsChecked) { + Thread.yield(); + } + task.done(fakeUmbilical, reporter); + reporter.resetDoneFlag(); + t.join(1000L); + Assert.assertEquals(fastFail, threadExited); + } + @Test (timeout=10000) public void testTaskProgress() throws Exception { JobConf job = new JobConf(); @@ -214,7 +300,7 @@ public class TestTaskProgressReporter { conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 0); conf.setLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES, limit); LocalFileSystem localFS = FileSystem.getLocal(conf); - Path tmpPath = new Path("/tmp/testBytesWrittenLimit-tmpFile-" + Path tmpPath = new Path(TEST_DIR + "/testBytesWrittenLimit-tmpFile-" + new Random(System.currentTimeMillis()).nextInt()); FSDataOutputStream out = localFS.create(tmpPath, true); out.write(new byte[LOCAL_BYTES_WRITTEN]); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java index 83e35fe..7b70f98 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.StringTokenizer; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptFailEvent; import org.junit.Assert; import org.apache.hadoop.conf.Configuration; @@ -712,7 +713,7 @@ public class TestJobHistoryParsing { protected void attemptLaunched(TaskAttemptId attemptID) { if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) { getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); @@ -732,7 +733,7 @@ public class TestJobHistoryParsing { protected void attemptLaunched(TaskAttemptId attemptID) { if (attemptID.getTaskId().getId() == 0) { getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } else { getContext().getEventHandler().handle( new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_DONE)); @@ -760,10 +761,10 @@ public class TestJobHistoryParsing { new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL)); } else if (taskType == TaskType.MAP && taskId == 1) { getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } else if (taskType == TaskType.REDUCE && taskId == 0) { getContext().getEventHandler().handle( - new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILMSG)); + new TaskAttemptFailEvent(attemptID)); } else if (taskType == TaskType.REDUCE && taskId == 1) { getContext().getEventHandler().handle( new TaskEvent(attemptID.getTaskId(), TaskEventType.T_KILL)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java index f364c18..9b6ebda 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java @@ -91,8 +91,8 @@ public class TestMapProgress { LOG.info("Task " + taskId + " reporting shuffle error: " + message); } - public void fatalError(TaskAttemptID taskId, String msg) throws IOException { - LOG.info("Task " + taskId + " reporting fatal error: " + msg); + public void fatalError(TaskAttemptID taskId, String msg, boolean fastFail) throws IOException { + LOG.info("Task " + taskId + " reporting fatal error: " + msg + " fast fail: " + fastFail); } public JvmTask getTask(JvmContext context) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a37e7f0a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java index bed545e..a534cfa 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java @@ -124,7 +124,7 @@ public class TestTaskCommit extends HadoopTestCase { } @Override - public void fatalError(TaskAttemptID taskId, String message) + public void fatalError(TaskAttemptID taskId, String message, boolean fastFail) throws IOException { } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org