[10/50] [abbrv] hadoop git commit: MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)
MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138 Branch: refs/heads/YARN-1011 Commit: a32e0138fb63c92902e6613001f38a87c8a41321 Parents: 312e57b Author: Haibo ChenAuthored: Thu Aug 10 15:17:36 2017 -0700 Committer: Haibo Chen Committed: Thu Aug 10 15:17:36 2017 -0700 -- .../mapreduce/v2/app/job/impl/JobImpl.java | 35 - .../mapreduce/v2/app/job/impl/TestJobImpl.java | 139 +++ .../apache/hadoop/mapreduce/MRJobConfig.java| 6 +- .../src/main/resources/mapred-default.xml | 8 ++ 4 files changed, 160 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 4d155d0..6880b6c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -644,6 +644,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private float reduceProgress; private float cleanupProgress; private boolean isUber = false; + private boolean finishJobWhenReducersDone; + private boolean completingJob = false; private Credentials jobCredentials; private Token jobToken; @@ -717,6 +719,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, this.maxFetchFailuresNotifications = conf.getInt( MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS, MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS); +this.finishJobWhenReducersDone = conf.getBoolean( +MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, +MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE); } protected StateMachine getStateMachine() { @@ -2021,7 +2026,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, TimeUnit.MILLISECONDS); return JobStateInternal.FAIL_WAIT; } - + + checkReadyForCompletionWhenAllReducersDone(job); + return job.checkReadyForCommit(); } @@ -2052,6 +2059,32 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } job.metrics.killedTask(task); } + + /** Improvement: if all reducers have finished, we check if we have + restarted mappers that are still running. This can happen in a + situation when a node becomes UNHEALTHY and mappers are rescheduled. + See MAPREDUCE-6870 for details */ +private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) { + if (job.finishJobWhenReducersDone) { +int totalReduces = job.getTotalReduces(); +int completedReduces = job.getCompletedReduces(); + +if (totalReduces > 0 && totalReduces == completedReduces +&& !job.completingJob) { + + for (TaskId mapTaskId : job.mapTasks) { +MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId); +if (!task.isFinished()) { + LOG.info("Killing map task " + task.getID()); + job.eventHandler.handle( + new TaskEvent(task.getID(), TaskEventType.T_KILL)); +} + } + + job.completingJob = true; +} + } +} } // Transition class for handling jobs with no tasks http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
[13/50] [abbrv] hadoop git commit: MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)
MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138 Branch: refs/heads/YARN-6592 Commit: a32e0138fb63c92902e6613001f38a87c8a41321 Parents: 312e57b Author: Haibo ChenAuthored: Thu Aug 10 15:17:36 2017 -0700 Committer: Haibo Chen Committed: Thu Aug 10 15:17:36 2017 -0700 -- .../mapreduce/v2/app/job/impl/JobImpl.java | 35 - .../mapreduce/v2/app/job/impl/TestJobImpl.java | 139 +++ .../apache/hadoop/mapreduce/MRJobConfig.java| 6 +- .../src/main/resources/mapred-default.xml | 8 ++ 4 files changed, 160 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 4d155d0..6880b6c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -644,6 +644,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private float reduceProgress; private float cleanupProgress; private boolean isUber = false; + private boolean finishJobWhenReducersDone; + private boolean completingJob = false; private Credentials jobCredentials; private Token jobToken; @@ -717,6 +719,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, this.maxFetchFailuresNotifications = conf.getInt( MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS, MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS); +this.finishJobWhenReducersDone = conf.getBoolean( +MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, +MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE); } protected StateMachine getStateMachine() { @@ -2021,7 +2026,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, TimeUnit.MILLISECONDS); return JobStateInternal.FAIL_WAIT; } - + + checkReadyForCompletionWhenAllReducersDone(job); + return job.checkReadyForCommit(); } @@ -2052,6 +2059,32 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } job.metrics.killedTask(task); } + + /** Improvement: if all reducers have finished, we check if we have + restarted mappers that are still running. This can happen in a + situation when a node becomes UNHEALTHY and mappers are rescheduled. + See MAPREDUCE-6870 for details */ +private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) { + if (job.finishJobWhenReducersDone) { +int totalReduces = job.getTotalReduces(); +int completedReduces = job.getCompletedReduces(); + +if (totalReduces > 0 && totalReduces == completedReduces +&& !job.completingJob) { + + for (TaskId mapTaskId : job.mapTasks) { +MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId); +if (!task.isFinished()) { + LOG.info("Killing map task " + task.getID()); + job.eventHandler.handle( + new TaskEvent(task.getID(), TaskEventType.T_KILL)); +} + } + + job.completingJob = true; +} + } +} } // Transition class for handling jobs with no tasks http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
[09/26] hadoop git commit: MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)
MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138 Branch: refs/heads/HADOOP-13345 Commit: a32e0138fb63c92902e6613001f38a87c8a41321 Parents: 312e57b Author: Haibo ChenAuthored: Thu Aug 10 15:17:36 2017 -0700 Committer: Haibo Chen Committed: Thu Aug 10 15:17:36 2017 -0700 -- .../mapreduce/v2/app/job/impl/JobImpl.java | 35 - .../mapreduce/v2/app/job/impl/TestJobImpl.java | 139 +++ .../apache/hadoop/mapreduce/MRJobConfig.java| 6 +- .../src/main/resources/mapred-default.xml | 8 ++ 4 files changed, 160 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 4d155d0..6880b6c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -644,6 +644,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private float reduceProgress; private float cleanupProgress; private boolean isUber = false; + private boolean finishJobWhenReducersDone; + private boolean completingJob = false; private Credentials jobCredentials; private Token jobToken; @@ -717,6 +719,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, this.maxFetchFailuresNotifications = conf.getInt( MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS, MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS); +this.finishJobWhenReducersDone = conf.getBoolean( +MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, +MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE); } protected StateMachine getStateMachine() { @@ -2021,7 +2026,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, TimeUnit.MILLISECONDS); return JobStateInternal.FAIL_WAIT; } - + + checkReadyForCompletionWhenAllReducersDone(job); + return job.checkReadyForCommit(); } @@ -2052,6 +2059,32 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } job.metrics.killedTask(task); } + + /** Improvement: if all reducers have finished, we check if we have + restarted mappers that are still running. This can happen in a + situation when a node becomes UNHEALTHY and mappers are rescheduled. + See MAPREDUCE-6870 for details */ +private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) { + if (job.finishJobWhenReducersDone) { +int totalReduces = job.getTotalReduces(); +int completedReduces = job.getCompletedReduces(); + +if (totalReduces > 0 && totalReduces == completedReduces +&& !job.completingJob) { + + for (TaskId mapTaskId : job.mapTasks) { +MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId); +if (!task.isFinished()) { + LOG.info("Killing map task " + task.getID()); + job.eventHandler.handle( + new TaskEvent(task.getID(), TaskEventType.T_KILL)); +} + } + + job.completingJob = true; +} + } +} } // Transition class for handling jobs with no tasks http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
[17/50] [abbrv] hadoop git commit: MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)
MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138 Branch: refs/heads/HDFS-10467 Commit: a32e0138fb63c92902e6613001f38a87c8a41321 Parents: 312e57b Author: Haibo ChenAuthored: Thu Aug 10 15:17:36 2017 -0700 Committer: Haibo Chen Committed: Thu Aug 10 15:17:36 2017 -0700 -- .../mapreduce/v2/app/job/impl/JobImpl.java | 35 - .../mapreduce/v2/app/job/impl/TestJobImpl.java | 139 +++ .../apache/hadoop/mapreduce/MRJobConfig.java| 6 +- .../src/main/resources/mapred-default.xml | 8 ++ 4 files changed, 160 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 4d155d0..6880b6c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -644,6 +644,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private float reduceProgress; private float cleanupProgress; private boolean isUber = false; + private boolean finishJobWhenReducersDone; + private boolean completingJob = false; private Credentials jobCredentials; private Token jobToken; @@ -717,6 +719,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, this.maxFetchFailuresNotifications = conf.getInt( MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS, MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS); +this.finishJobWhenReducersDone = conf.getBoolean( +MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, +MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE); } protected StateMachine getStateMachine() { @@ -2021,7 +2026,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, TimeUnit.MILLISECONDS); return JobStateInternal.FAIL_WAIT; } - + + checkReadyForCompletionWhenAllReducersDone(job); + return job.checkReadyForCommit(); } @@ -2052,6 +2059,32 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } job.metrics.killedTask(task); } + + /** Improvement: if all reducers have finished, we check if we have + restarted mappers that are still running. This can happen in a + situation when a node becomes UNHEALTHY and mappers are rescheduled. + See MAPREDUCE-6870 for details */ +private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) { + if (job.finishJobWhenReducersDone) { +int totalReduces = job.getTotalReduces(); +int completedReduces = job.getCompletedReduces(); + +if (totalReduces > 0 && totalReduces == completedReduces +&& !job.completingJob) { + + for (TaskId mapTaskId : job.mapTasks) { +MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId); +if (!task.isFinished()) { + LOG.info("Killing map task " + task.getID()); + job.eventHandler.handle( + new TaskEvent(task.getID(), TaskEventType.T_KILL)); +} + } + + job.completingJob = true; +} + } +} } // Transition class for handling jobs with no tasks http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
[44/50] [abbrv] hadoop git commit: MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)
MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138 Branch: refs/heads/YARN-5881 Commit: a32e0138fb63c92902e6613001f38a87c8a41321 Parents: 312e57b Author: Haibo ChenAuthored: Thu Aug 10 15:17:36 2017 -0700 Committer: Haibo Chen Committed: Thu Aug 10 15:17:36 2017 -0700 -- .../mapreduce/v2/app/job/impl/JobImpl.java | 35 - .../mapreduce/v2/app/job/impl/TestJobImpl.java | 139 +++ .../apache/hadoop/mapreduce/MRJobConfig.java| 6 +- .../src/main/resources/mapred-default.xml | 8 ++ 4 files changed, 160 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 4d155d0..6880b6c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -644,6 +644,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private float reduceProgress; private float cleanupProgress; private boolean isUber = false; + private boolean finishJobWhenReducersDone; + private boolean completingJob = false; private Credentials jobCredentials; private Token jobToken; @@ -717,6 +719,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, this.maxFetchFailuresNotifications = conf.getInt( MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS, MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS); +this.finishJobWhenReducersDone = conf.getBoolean( +MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, +MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE); } protected StateMachine getStateMachine() { @@ -2021,7 +2026,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, TimeUnit.MILLISECONDS); return JobStateInternal.FAIL_WAIT; } - + + checkReadyForCompletionWhenAllReducersDone(job); + return job.checkReadyForCommit(); } @@ -2052,6 +2059,32 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } job.metrics.killedTask(task); } + + /** Improvement: if all reducers have finished, we check if we have + restarted mappers that are still running. This can happen in a + situation when a node becomes UNHEALTHY and mappers are rescheduled. + See MAPREDUCE-6870 for details */ +private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) { + if (job.finishJobWhenReducersDone) { +int totalReduces = job.getTotalReduces(); +int completedReduces = job.getCompletedReduces(); + +if (totalReduces > 0 && totalReduces == completedReduces +&& !job.completingJob) { + + for (TaskId mapTaskId : job.mapTasks) { +MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId); +if (!task.isFinished()) { + LOG.info("Killing map task " + task.getID()); + job.eventHandler.handle( + new TaskEvent(task.getID(), TaskEventType.T_KILL)); +} + } + + job.completingJob = true; +} + } +} } // Transition class for handling jobs with no tasks http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
hadoop git commit: MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)
Repository: hadoop Updated Branches: refs/heads/trunk 312e57b95 -> a32e0138f MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138 Branch: refs/heads/trunk Commit: a32e0138fb63c92902e6613001f38a87c8a41321 Parents: 312e57b Author: Haibo ChenAuthored: Thu Aug 10 15:17:36 2017 -0700 Committer: Haibo Chen Committed: Thu Aug 10 15:17:36 2017 -0700 -- .../mapreduce/v2/app/job/impl/JobImpl.java | 35 - .../mapreduce/v2/app/job/impl/TestJobImpl.java | 139 +++ .../apache/hadoop/mapreduce/MRJobConfig.java| 6 +- .../src/main/resources/mapred-default.xml | 8 ++ 4 files changed, 160 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java -- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 4d155d0..6880b6c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -644,6 +644,8 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, private float reduceProgress; private float cleanupProgress; private boolean isUber = false; + private boolean finishJobWhenReducersDone; + private boolean completingJob = false; private Credentials jobCredentials; private Token jobToken; @@ -717,6 +719,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, this.maxFetchFailuresNotifications = conf.getInt( MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS, MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS); +this.finishJobWhenReducersDone = conf.getBoolean( +MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE, +MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE); } protected StateMachine getStateMachine() { @@ -2021,7 +2026,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, TimeUnit.MILLISECONDS); return JobStateInternal.FAIL_WAIT; } - + + checkReadyForCompletionWhenAllReducersDone(job); + return job.checkReadyForCommit(); } @@ -2052,6 +2059,32 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, } job.metrics.killedTask(task); } + + /** Improvement: if all reducers have finished, we check if we have + restarted mappers that are still running. This can happen in a + situation when a node becomes UNHEALTHY and mappers are rescheduled. + See MAPREDUCE-6870 for details */ +private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) { + if (job.finishJobWhenReducersDone) { +int totalReduces = job.getTotalReduces(); +int completedReduces = job.getCompletedReduces(); + +if (totalReduces > 0 && totalReduces == completedReduces +&& !job.completingJob) { + + for (TaskId mapTaskId : job.mapTasks) { +MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId); +if (!task.isFinished()) { + LOG.info("Killing map task " + task.getID()); + job.eventHandler.handle( + new TaskEvent(task.getID(), TaskEventType.T_KILL)); +} + } + + job.completingJob = true; +} + } +} } // Transition class for handling jobs with no tasks http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java -- diff --git