[10/50] [abbrv] hadoop git commit: MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)

2017-08-16 Thread haibochen
MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are 
complete. (Peter Bacsko via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138

Branch: refs/heads/YARN-1011
Commit: a32e0138fb63c92902e6613001f38a87c8a41321
Parents: 312e57b
Author: Haibo Chen 
Authored: Thu Aug 10 15:17:36 2017 -0700
Committer: Haibo Chen 
Committed: Thu Aug 10 15:17:36 2017 -0700

--
 .../mapreduce/v2/app/job/impl/JobImpl.java  |  35 -
 .../mapreduce/v2/app/job/impl/TestJobImpl.java  | 139 +++
 .../apache/hadoop/mapreduce/MRJobConfig.java|   6 +-
 .../src/main/resources/mapred-default.xml   |   8 ++
 4 files changed, 160 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 4d155d0..6880b6c 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -644,6 +644,8 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   private float reduceProgress;
   private float cleanupProgress;
   private boolean isUber = false;
+  private boolean finishJobWhenReducersDone;
+  private boolean completingJob = false;
 
   private Credentials jobCredentials;
   private Token jobToken;
@@ -717,6 +719,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
 this.maxFetchFailuresNotifications = conf.getInt(
 MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
 MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
+this.finishJobWhenReducersDone = conf.getBoolean(
+MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE,
+MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE);
   }
 
   protected StateMachine 
getStateMachine() {
@@ -2021,7 +2026,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
 TimeUnit.MILLISECONDS);
 return JobStateInternal.FAIL_WAIT;
   }
-  
+
+  checkReadyForCompletionWhenAllReducersDone(job);
+
   return job.checkReadyForCommit();
 }
 
@@ -2052,6 +2059,32 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   }
   job.metrics.killedTask(task);
 }
+
+   /** Improvement: if all reducers have finished, we check if we have
+   restarted mappers that are still running. This can happen in a
+   situation when a node becomes UNHEALTHY and mappers are rescheduled.
+   See MAPREDUCE-6870 for details */
+private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) {
+  if (job.finishJobWhenReducersDone) {
+int totalReduces = job.getTotalReduces();
+int completedReduces = job.getCompletedReduces();
+
+if (totalReduces > 0 && totalReduces == completedReduces
+&& !job.completingJob) {
+
+  for (TaskId mapTaskId : job.mapTasks) {
+MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId);
+if (!task.isFinished()) {
+  LOG.info("Killing map task " + task.getID());
+  job.eventHandler.handle(
+  new TaskEvent(task.getID(), TaskEventType.T_KILL));
+}
+  }
+
+  job.completingJob = true;
+}
+  }
+}
   }
 
   // Transition class for handling jobs with no tasks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
 

[13/50] [abbrv] hadoop git commit: MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)

2017-08-15 Thread asuresh
MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are 
complete. (Peter Bacsko via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138

Branch: refs/heads/YARN-6592
Commit: a32e0138fb63c92902e6613001f38a87c8a41321
Parents: 312e57b
Author: Haibo Chen 
Authored: Thu Aug 10 15:17:36 2017 -0700
Committer: Haibo Chen 
Committed: Thu Aug 10 15:17:36 2017 -0700

--
 .../mapreduce/v2/app/job/impl/JobImpl.java  |  35 -
 .../mapreduce/v2/app/job/impl/TestJobImpl.java  | 139 +++
 .../apache/hadoop/mapreduce/MRJobConfig.java|   6 +-
 .../src/main/resources/mapred-default.xml   |   8 ++
 4 files changed, 160 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 4d155d0..6880b6c 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -644,6 +644,8 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   private float reduceProgress;
   private float cleanupProgress;
   private boolean isUber = false;
+  private boolean finishJobWhenReducersDone;
+  private boolean completingJob = false;
 
   private Credentials jobCredentials;
   private Token jobToken;
@@ -717,6 +719,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
 this.maxFetchFailuresNotifications = conf.getInt(
 MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
 MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
+this.finishJobWhenReducersDone = conf.getBoolean(
+MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE,
+MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE);
   }
 
   protected StateMachine 
getStateMachine() {
@@ -2021,7 +2026,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
 TimeUnit.MILLISECONDS);
 return JobStateInternal.FAIL_WAIT;
   }
-  
+
+  checkReadyForCompletionWhenAllReducersDone(job);
+
   return job.checkReadyForCommit();
 }
 
@@ -2052,6 +2059,32 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   }
   job.metrics.killedTask(task);
 }
+
+   /** Improvement: if all reducers have finished, we check if we have
+   restarted mappers that are still running. This can happen in a
+   situation when a node becomes UNHEALTHY and mappers are rescheduled.
+   See MAPREDUCE-6870 for details */
+private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) {
+  if (job.finishJobWhenReducersDone) {
+int totalReduces = job.getTotalReduces();
+int completedReduces = job.getCompletedReduces();
+
+if (totalReduces > 0 && totalReduces == completedReduces
+&& !job.completingJob) {
+
+  for (TaskId mapTaskId : job.mapTasks) {
+MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId);
+if (!task.isFinished()) {
+  LOG.info("Killing map task " + task.getID());
+  job.eventHandler.handle(
+  new TaskEvent(task.getID(), TaskEventType.T_KILL));
+}
+  }
+
+  job.completingJob = true;
+}
+  }
+}
   }
 
   // Transition class for handling jobs with no tasks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
 

[09/26] hadoop git commit: MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)

2017-08-14 Thread stevel
MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are 
complete. (Peter Bacsko via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138

Branch: refs/heads/HADOOP-13345
Commit: a32e0138fb63c92902e6613001f38a87c8a41321
Parents: 312e57b
Author: Haibo Chen 
Authored: Thu Aug 10 15:17:36 2017 -0700
Committer: Haibo Chen 
Committed: Thu Aug 10 15:17:36 2017 -0700

--
 .../mapreduce/v2/app/job/impl/JobImpl.java  |  35 -
 .../mapreduce/v2/app/job/impl/TestJobImpl.java  | 139 +++
 .../apache/hadoop/mapreduce/MRJobConfig.java|   6 +-
 .../src/main/resources/mapred-default.xml   |   8 ++
 4 files changed, 160 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 4d155d0..6880b6c 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -644,6 +644,8 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   private float reduceProgress;
   private float cleanupProgress;
   private boolean isUber = false;
+  private boolean finishJobWhenReducersDone;
+  private boolean completingJob = false;
 
   private Credentials jobCredentials;
   private Token jobToken;
@@ -717,6 +719,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
 this.maxFetchFailuresNotifications = conf.getInt(
 MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
 MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
+this.finishJobWhenReducersDone = conf.getBoolean(
+MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE,
+MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE);
   }
 
   protected StateMachine 
getStateMachine() {
@@ -2021,7 +2026,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
 TimeUnit.MILLISECONDS);
 return JobStateInternal.FAIL_WAIT;
   }
-  
+
+  checkReadyForCompletionWhenAllReducersDone(job);
+
   return job.checkReadyForCommit();
 }
 
@@ -2052,6 +2059,32 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   }
   job.metrics.killedTask(task);
 }
+
+   /** Improvement: if all reducers have finished, we check if we have
+   restarted mappers that are still running. This can happen in a
+   situation when a node becomes UNHEALTHY and mappers are rescheduled.
+   See MAPREDUCE-6870 for details */
+private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) {
+  if (job.finishJobWhenReducersDone) {
+int totalReduces = job.getTotalReduces();
+int completedReduces = job.getCompletedReduces();
+
+if (totalReduces > 0 && totalReduces == completedReduces
+&& !job.completingJob) {
+
+  for (TaskId mapTaskId : job.mapTasks) {
+MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId);
+if (!task.isFinished()) {
+  LOG.info("Killing map task " + task.getID());
+  job.eventHandler.handle(
+  new TaskEvent(task.getID(), TaskEventType.T_KILL));
+}
+  }
+
+  job.completingJob = true;
+}
+  }
+}
   }
 
   // Transition class for handling jobs with no tasks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
 

[17/50] [abbrv] hadoop git commit: MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)

2017-08-12 Thread inigoiri
MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are 
complete. (Peter Bacsko via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138

Branch: refs/heads/HDFS-10467
Commit: a32e0138fb63c92902e6613001f38a87c8a41321
Parents: 312e57b
Author: Haibo Chen 
Authored: Thu Aug 10 15:17:36 2017 -0700
Committer: Haibo Chen 
Committed: Thu Aug 10 15:17:36 2017 -0700

--
 .../mapreduce/v2/app/job/impl/JobImpl.java  |  35 -
 .../mapreduce/v2/app/job/impl/TestJobImpl.java  | 139 +++
 .../apache/hadoop/mapreduce/MRJobConfig.java|   6 +-
 .../src/main/resources/mapred-default.xml   |   8 ++
 4 files changed, 160 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 4d155d0..6880b6c 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -644,6 +644,8 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   private float reduceProgress;
   private float cleanupProgress;
   private boolean isUber = false;
+  private boolean finishJobWhenReducersDone;
+  private boolean completingJob = false;
 
   private Credentials jobCredentials;
   private Token jobToken;
@@ -717,6 +719,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
 this.maxFetchFailuresNotifications = conf.getInt(
 MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
 MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
+this.finishJobWhenReducersDone = conf.getBoolean(
+MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE,
+MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE);
   }
 
   protected StateMachine 
getStateMachine() {
@@ -2021,7 +2026,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
 TimeUnit.MILLISECONDS);
 return JobStateInternal.FAIL_WAIT;
   }
-  
+
+  checkReadyForCompletionWhenAllReducersDone(job);
+
   return job.checkReadyForCommit();
 }
 
@@ -2052,6 +2059,32 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   }
   job.metrics.killedTask(task);
 }
+
+   /** Improvement: if all reducers have finished, we check if we have
+   restarted mappers that are still running. This can happen in a
+   situation when a node becomes UNHEALTHY and mappers are rescheduled.
+   See MAPREDUCE-6870 for details */
+private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) {
+  if (job.finishJobWhenReducersDone) {
+int totalReduces = job.getTotalReduces();
+int completedReduces = job.getCompletedReduces();
+
+if (totalReduces > 0 && totalReduces == completedReduces
+&& !job.completingJob) {
+
+  for (TaskId mapTaskId : job.mapTasks) {
+MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId);
+if (!task.isFinished()) {
+  LOG.info("Killing map task " + task.getID());
+  job.eventHandler.handle(
+  new TaskEvent(task.getID(), TaskEventType.T_KILL));
+}
+  }
+
+  job.completingJob = true;
+}
+  }
+}
   }
 
   // Transition class for handling jobs with no tasks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
 

[44/50] [abbrv] hadoop git commit: MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)

2017-08-11 Thread wangda
MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are 
complete. (Peter Bacsko via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138

Branch: refs/heads/YARN-5881
Commit: a32e0138fb63c92902e6613001f38a87c8a41321
Parents: 312e57b
Author: Haibo Chen 
Authored: Thu Aug 10 15:17:36 2017 -0700
Committer: Haibo Chen 
Committed: Thu Aug 10 15:17:36 2017 -0700

--
 .../mapreduce/v2/app/job/impl/JobImpl.java  |  35 -
 .../mapreduce/v2/app/job/impl/TestJobImpl.java  | 139 +++
 .../apache/hadoop/mapreduce/MRJobConfig.java|   6 +-
 .../src/main/resources/mapred-default.xml   |   8 ++
 4 files changed, 160 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 4d155d0..6880b6c 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -644,6 +644,8 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   private float reduceProgress;
   private float cleanupProgress;
   private boolean isUber = false;
+  private boolean finishJobWhenReducersDone;
+  private boolean completingJob = false;
 
   private Credentials jobCredentials;
   private Token jobToken;
@@ -717,6 +719,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
 this.maxFetchFailuresNotifications = conf.getInt(
 MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
 MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
+this.finishJobWhenReducersDone = conf.getBoolean(
+MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE,
+MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE);
   }
 
   protected StateMachine 
getStateMachine() {
@@ -2021,7 +2026,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
 TimeUnit.MILLISECONDS);
 return JobStateInternal.FAIL_WAIT;
   }
-  
+
+  checkReadyForCompletionWhenAllReducersDone(job);
+
   return job.checkReadyForCommit();
 }
 
@@ -2052,6 +2059,32 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   }
   job.metrics.killedTask(task);
 }
+
+   /** Improvement: if all reducers have finished, we check if we have
+   restarted mappers that are still running. This can happen in a
+   situation when a node becomes UNHEALTHY and mappers are rescheduled.
+   See MAPREDUCE-6870 for details */
+private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) {
+  if (job.finishJobWhenReducersDone) {
+int totalReduces = job.getTotalReduces();
+int completedReduces = job.getCompletedReduces();
+
+if (totalReduces > 0 && totalReduces == completedReduces
+&& !job.completingJob) {
+
+  for (TaskId mapTaskId : job.mapTasks) {
+MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId);
+if (!task.isFinished()) {
+  LOG.info("Killing map task " + task.getID());
+  job.eventHandler.handle(
+  new TaskEvent(task.getID(), TaskEventType.T_KILL));
+}
+  }
+
+  job.completingJob = true;
+}
+  }
+}
   }
 
   // Transition class for handling jobs with no tasks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
 

hadoop git commit: MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are complete. (Peter Bacsko via Haibo Chen)

2017-08-10 Thread haibochen
Repository: hadoop
Updated Branches:
  refs/heads/trunk 312e57b95 -> a32e0138f


MAPREDUCE-6870. Add configuration for MR job to finish when all reducers are 
complete. (Peter Bacsko via Haibo Chen)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a32e0138
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a32e0138
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a32e0138

Branch: refs/heads/trunk
Commit: a32e0138fb63c92902e6613001f38a87c8a41321
Parents: 312e57b
Author: Haibo Chen 
Authored: Thu Aug 10 15:17:36 2017 -0700
Committer: Haibo Chen 
Committed: Thu Aug 10 15:17:36 2017 -0700

--
 .../mapreduce/v2/app/job/impl/JobImpl.java  |  35 -
 .../mapreduce/v2/app/job/impl/TestJobImpl.java  | 139 +++
 .../apache/hadoop/mapreduce/MRJobConfig.java|   6 +-
 .../src/main/resources/mapred-default.xml   |   8 ++
 4 files changed, 160 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
--
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 4d155d0..6880b6c 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -644,6 +644,8 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   private float reduceProgress;
   private float cleanupProgress;
   private boolean isUber = false;
+  private boolean finishJobWhenReducersDone;
+  private boolean completingJob = false;
 
   private Credentials jobCredentials;
   private Token jobToken;
@@ -717,6 +719,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
 this.maxFetchFailuresNotifications = conf.getInt(
 MRJobConfig.MAX_FETCH_FAILURES_NOTIFICATIONS,
 MRJobConfig.DEFAULT_MAX_FETCH_FAILURES_NOTIFICATIONS);
+this.finishJobWhenReducersDone = conf.getBoolean(
+MRJobConfig.FINISH_JOB_WHEN_REDUCERS_DONE,
+MRJobConfig.DEFAULT_FINISH_JOB_WHEN_REDUCERS_DONE);
   }
 
   protected StateMachine 
getStateMachine() {
@@ -2021,7 +2026,9 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
 TimeUnit.MILLISECONDS);
 return JobStateInternal.FAIL_WAIT;
   }
-  
+
+  checkReadyForCompletionWhenAllReducersDone(job);
+
   return job.checkReadyForCommit();
 }
 
@@ -2052,6 +2059,32 @@ public class JobImpl implements 
org.apache.hadoop.mapreduce.v2.app.job.Job,
   }
   job.metrics.killedTask(task);
 }
+
+   /** Improvement: if all reducers have finished, we check if we have
+   restarted mappers that are still running. This can happen in a
+   situation when a node becomes UNHEALTHY and mappers are rescheduled.
+   See MAPREDUCE-6870 for details */
+private void checkReadyForCompletionWhenAllReducersDone(JobImpl job) {
+  if (job.finishJobWhenReducersDone) {
+int totalReduces = job.getTotalReduces();
+int completedReduces = job.getCompletedReduces();
+
+if (totalReduces > 0 && totalReduces == completedReduces
+&& !job.completingJob) {
+
+  for (TaskId mapTaskId : job.mapTasks) {
+MapTaskImpl task = (MapTaskImpl) job.tasks.get(mapTaskId);
+if (!task.isFinished()) {
+  LOG.info("Killing map task " + task.getID());
+  job.eventHandler.handle(
+  new TaskEvent(task.getID(), TaskEventType.T_KILL));
+}
+  }
+
+  job.completingJob = true;
+}
+  }
+}
   }
 
   // Transition class for handling jobs with no tasks

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a32e0138/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
--
diff --git