Repository: hadoop Updated Branches: refs/heads/trunk 0c4af0f99 -> cb26cd4be
MAPREDUCE-6489. Fail fast rogue tasks that write too much to local disk. Contributed by Maysam Yabandeh Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cb26cd4b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cb26cd4b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cb26cd4b Branch: refs/heads/trunk Commit: cb26cd4bee8ab75b304ebad6dc7c77523d0e9ce5 Parents: 0c4af0f Author: Jason Lowe <[email protected]> Authored: Wed Oct 21 14:01:23 2015 +0000 Committer: Jason Lowe <[email protected]> Committed: Wed Oct 21 14:01:23 2015 +0000 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 + .../java/org/apache/hadoop/mapred/Task.java | 70 ++++++++++++++--- .../apache/hadoop/mapreduce/MRJobConfig.java | 5 ++ .../src/main/resources/mapred-default.xml | 12 +++ .../hadoop/mapred/TestTaskProgressReporter.java | 80 ++++++++++++++++++++ 5 files changed, 161 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb26cd4b/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index cb28fce..62807af 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -399,6 +399,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6479. Add missing mapred job command options in mapreduce document. (nijel via aajisaka) + MAPREDUCE-6489. Fail fast rogue tasks that write too much to local disk + (Maysam Yabandeh via jlowe) + OPTIMIZATIONS MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb26cd4b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 673f183..3c58a67 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem.Statistics; import org.apache.hadoop.io.BytesWritable; @@ -64,6 +65,7 @@ import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; import org.apache.hadoop.mapreduce.task.ReduceContextImpl; import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.ReflectionUtils; @@ -730,11 +732,49 @@ abstract public class Task implements Writable, Configurable { } else { return split; } - } - /** - * The communication thread handles communication with the parent (Task Tracker). - * It sends progress updates if progress has been made or if the task needs to - * let the parent know that it's alive. It also pings the parent to see if it's alive. + } + + /** + * exception thrown when the task exceeds some configured limits. + */ + public class TaskLimitException extends IOException { + public TaskLimitException(String str) { + super(str); + } + } + + /** + * check the counters to see whether the task has exceeded any configured + * limits. + * @throws TaskLimitException + */ + protected void checkTaskLimits() throws TaskLimitException { + // check the limit for writing to local file system + long limit = conf.getLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES, + MRJobConfig.DEFAULT_TASK_LOCAL_WRITE_LIMIT_BYTES); + if (limit >= 0) { + Counters.Counter localWritesCounter = null; + try { + LocalFileSystem localFS = FileSystem.getLocal(conf); + localWritesCounter = counters.findCounter(localFS.getScheme(), + FileSystemCounter.BYTES_WRITTEN); + } catch (IOException e) { + LOG.warn("Could not get LocalFileSystem BYTES_WRITTEN counter"); + } + if (localWritesCounter != null + && localWritesCounter.getCounter() > limit) { + throw new TaskLimitException("too much write to local file system." + + " current value is " + localWritesCounter.getCounter() + + " the limit is " + limit); + } + } + } + + /** + * The communication thread handles communication with the parent (Task + * Tracker). It sends progress updates if progress has been made or if + * the task needs to let the parent know that it's alive. It also pings + * the parent to see if it's alive. */ public void run() { final int MAX_RETRIES = 3; @@ -765,8 +805,9 @@ abstract public class Task implements Writable, Configurable { if (sendProgress) { // we need to send progress update updateCounters(); + checkTaskLimits(); taskStatus.statusUpdate(taskProgress.get(), - taskProgress.toString(), + taskProgress.toString(), counters); amFeedback = umbilical.statusUpdate(taskId, taskStatus); taskFound = amFeedback.getTaskFound(); @@ -797,10 +838,21 @@ abstract public class Task implements Writable, Configurable { mustPreempt.get() + " given " + amFeedback.getPreemption() + " for "+ taskId + " task status: " +taskStatus.getPhase()); } - sendProgress = resetProgressFlag(); + sendProgress = resetProgressFlag(); remainingRetries = MAX_RETRIES; - } - catch (Throwable t) { + } catch (TaskLimitException e) { + String errMsg = "Task exceeded the limits: " + + StringUtils.stringifyException(e); + LOG.fatal(errMsg); + try { + umbilical.fatalError(taskId, errMsg); + } catch (IOException ioe) { + LOG.fatal("Failed to update failure diagnosis", ioe); + } + LOG.fatal("Killing " + taskId); + resetDoneFlag(); + ExitUtil.terminate(69); + } catch (Throwable t) { LOG.info("Communication exception: " + StringUtils.stringifyException(t)); remainingRetries -=1; if (remainingRetries == 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb26cd4b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index e321817..3d1e841 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -52,6 +52,11 @@ public interface MRJobConfig { public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed"; + public static final String TASK_LOCAL_WRITE_LIMIT_BYTES = + "mapreduce.task.local-fs.write-limit.bytes"; + // negative values disable the limit + public static final long DEFAULT_TASK_LOCAL_WRITE_LIMIT_BYTES = -1; + public static final String TASK_PROGRESS_REPORT_INTERVAL = "mapreduce.task.progress-report.interval"; /** The number of milliseconds between progress reports. */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb26cd4b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 909f5c5..6ece048 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1791,4 +1791,16 @@ default is 128</description> </property> +<property> + <name>mapreduce.task.local-fs.write-limit.bytes</name> + <value>-1</value> + <description>Limit on the byte written to the local file system by each task. + This limit only applies to writes that go through the Hadoop filesystem APIs + within the task process (i.e.: writes that will update the local filesystem's + BYTES_WRITTEN counter). It does not cover other writes such as logging, + sideband writes from subprocesses (e.g.: streaming jobs), etc. + Negative values disable the limit. + default is -1</description> +</property> + </configuration> http://git-wip-us.apache.org/repos/asf/hadoop/blob/cb26cd4b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java index 0bceb87..18442d6 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java @@ -19,16 +19,28 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.util.Random; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.mapred.SortedRanges.Range; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID; +import org.apache.hadoop.util.ExitUtil; import org.junit.Assert; import org.junit.Test; public class TestTaskProgressReporter { private static int statusUpdateTimes = 0; + + // set to true if the thread is existed with ExitUtil.terminate + volatile boolean threadExited = false; + + final static int LOCAL_BYTES_WRITTEN = 1024; + private FakeUmbilical fakeUmbilical = new FakeUmbilical(); private static class DummyTask extends Task { @@ -133,13 +145,22 @@ public class TestTaskProgressReporter { } private class DummyTaskReporter extends Task.TaskReporter { + volatile boolean taskLimitIsChecked = false; + public DummyTaskReporter(Task task) { task.super(task.getProgress(), fakeUmbilical); } + @Override public void setProgress(float progress) { super.setProgress(progress); } + + @Override + protected void checkTaskLimits() throws TaskLimitException { + taskLimitIsChecked = true; + super.checkTaskLimits(); + } } @Test (timeout=10000) @@ -157,4 +178,63 @@ public class TestTaskProgressReporter { t.join(); Assert.assertEquals(statusUpdateTimes, 2); } + + @Test(timeout=10000) + public void testBytesWrittenRespectingLimit() throws Exception { + // add 1024 to the limit to account for writes not controlled by the test + testBytesWrittenLimit(LOCAL_BYTES_WRITTEN + 1024, false); + } + + @Test(timeout=10000) + public void testBytesWrittenExceedingLimit() throws Exception { + testBytesWrittenLimit(LOCAL_BYTES_WRITTEN - 1, true); + } + + /** + * This is to test the limit on BYTES_WRITTEN. The test is limited in that + * the check is done only once at the first loop of TaskReport#run. + * @param limit the limit on BYTES_WRITTEN in local file system + * @param failFast should the task fail fast with such limit? + * @throws Exception + */ + public void testBytesWrittenLimit(long limit, boolean failFast) + throws Exception { + ExitUtil.disableSystemExit(); + threadExited = false; + Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() { + public void uncaughtException(Thread th, Throwable ex) { + System.out.println("Uncaught exception: " + ex); + if (ex instanceof ExitUtil.ExitException) { + threadExited = true; + } + } + }; + JobConf conf = new JobConf(); + // To disable task reporter sleeping + conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 0); + conf.setLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES, limit); + LocalFileSystem localFS = FileSystem.getLocal(conf); + Path tmpPath = new Path("/tmp/testBytesWrittenLimit-tmpFile-" + + new Random(System.currentTimeMillis()).nextInt()); + FSDataOutputStream out = localFS.create(tmpPath, true); + out.write(new byte[LOCAL_BYTES_WRITTEN]); + out.close(); + + Task task = new DummyTask(); + task.setConf(conf); + DummyTaskReporter reporter = new DummyTaskReporter(task); + Thread t = new Thread(reporter); + t.setUncaughtExceptionHandler(h); + reporter.setProgressFlag(); + + t.start(); + while (!reporter.taskLimitIsChecked) { + Thread.yield(); + } + + task.setTaskDone(); + reporter.resetDoneFlag(); + t.join(); + Assert.assertEquals(failFast, threadExited); + } } \ No newline at end of file
