Author: cnauroth Date: Tue Apr 22 18:22:26 2014 New Revision: 1589236 URL: http://svn.apache.org/r1589236 Log: MAPREDUCE-5827. Merging change r1589223 from trunk to branch-2.
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1589236&r1=1589235&r2=1589236&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Apr 22 18:22:26 2014 @@ -49,6 +49,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5642. TestMiniMRChildTask fails on Windows. (Chuan Liu via cnauroth) + MAPREDUCE-5827. TestSpeculativeExecutionWithMRApp fails. + (Zhijie Shen via cnauroth) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java?rev=1589236&r1=1589235&r2=1589236&view=diff ============================================================================== --- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java (original) +++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java Tue Apr 22 18:22:26 2014 @@ -40,22 +40,26 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.junit.Test; +import com.google.common.base.Supplier; + @SuppressWarnings({ "unchecked", "rawtypes" }) public class TestSpeculativeExecutionWithMRApp { private static final int NUM_MAPPERS = 5; private static final int NUM_REDUCERS = 0; - @Test(timeout = 60000) + @Test public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception { Clock actualClock = new SystemClock(); - ControlledClock clock = new ControlledClock(actualClock); + final ControlledClock clock = new ControlledClock(actualClock); clock.setTime(System.currentTimeMillis()); MRApp app = @@ -88,7 +92,7 @@ public class TestSpeculativeExecutionWit Random generator = new Random(); Object[] taskValues = tasks.values().toArray(); - Task taskToBeSpeculated = + final Task taskToBeSpeculated = (Task) taskValues[generator.nextInt(taskValues.length)]; // Other than one random task, finish every other task. @@ -105,30 +109,28 @@ public class TestSpeculativeExecutionWit } } - int maxTimeWait = 10; - boolean successfullySpeculated = false; - TaskAttempt[] ta = null; - while (maxTimeWait > 0 && !successfullySpeculated) { - if (taskToBeSpeculated.getAttempts().size() != 2) { - Thread.sleep(1000); - clock.setTime(System.currentTimeMillis() + 20000); - } else { - successfullySpeculated = true; - // finish 1st TA, 2nd will be killed - ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated); + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + if (taskToBeSpeculated.getAttempts().size() != 2) { + clock.setTime(System.currentTimeMillis() + 1000); + return false; + } else { + return true; + } } - maxTimeWait--; - } - Assert - .assertTrue("Couldn't speculate successfully", successfullySpeculated); + }, 1000, 60000); + // finish 1st TA, 2nd will be killed + TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated); verifySpeculationMessage(app, ta); + app.waitForState(Service.STATE.STOPPED); } - @Test(timeout = 60000) + @Test public void testSepculateSuccessfulWithUpdateEvents() throws Exception { Clock actualClock = new SystemClock(); - ControlledClock clock = new ControlledClock(actualClock); + final ControlledClock clock = new ControlledClock(actualClock); clock.setTime(System.currentTimeMillis()); MRApp app = @@ -200,21 +202,21 @@ public class TestSpeculativeExecutionWit } } - int maxTimeWait = 5; - boolean successfullySpeculated = false; - TaskAttempt[] ta = null; - while (maxTimeWait > 0 && !successfullySpeculated) { - if (speculatedTask.getAttempts().size() != 2) { - Thread.sleep(1000); - } else { - successfullySpeculated = true; - ta = makeFirstAttemptWin(appEventHandler, speculatedTask); + final Task speculatedTaskConst = speculatedTask; + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + if (speculatedTaskConst.getAttempts().size() != 2) { + clock.setTime(System.currentTimeMillis() + 1000); + return false; + } else { + return true; + } } - maxTimeWait--; - } - Assert - .assertTrue("Couldn't speculate successfully", successfullySpeculated); + }, 1000, 60000); + TaskAttempt[] ta = makeFirstAttemptWin(appEventHandler, speculatedTask); verifySpeculationMessage(app, ta); + app.waitForState(Service.STATE.STOPPED); } private static TaskAttempt[] makeFirstAttemptWin( @@ -234,15 +236,7 @@ public class TestSpeculativeExecutionWit private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta) throws Exception { app.waitForState(ta[0], TaskAttemptState.SUCCEEDED); - app.waitForState(ta[1], TaskAttemptState.KILLED); - boolean foundSpecMsg = false; - for (String msg : ta[1].getDiagnostics()) { - if (msg.contains("Speculation")) { - foundSpecMsg = true; - break; - } - } - Assert.assertTrue("No speculation diagnostics!", foundSpecMsg); + // The speculative attempt may be not killed before the MR job succeeds. } private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,