Author: vinodkv Date: Thu Aug 22 23:18:20 2013 New Revision: 1516663 URL: http://svn.apache.org/r1516663 Log: MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory only after unregistering from the RM. Contributed by Jian He. svn merge --ignore-ancestry -c 1516660 ../../trunk/
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt?rev=1516663&r1=1516662&r2=1516663&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt Thu Aug 22 23:18:20 2013 @@ -61,6 +61,9 @@ Release 2.1.1-beta - UNRELEASED MAPREDUCE-5468. Fix MR AM recovery for map-only jobs. (vinodkv via acmurthy) + MAPREDUCE-5476. Changed MR AM recovery code to cleanup staging-directory + only after unregistering from the RM. (Jian He via vinodkv) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1516663&r1=1516662&r2=1516663&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Aug 22 23:18:20 2013 @@ -325,18 +325,23 @@ public class MRAppMaster extends Composi dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class, eater); } - + + if (copyHistory) { + // Now that there's a FINISHING state for application on RM to give AMs + // plenty of time to clean up after unregister it's safe to clean staging + // directory after unregistering with RM. So, we start the staging-dir + // cleaner BEFORE the ContainerAllocator so that on shut-down, + // ContainerAllocator unregisters first and then the staging-dir cleaner + // deletes staging directory. + addService(createStagingDirCleaningService()); + } + // service to allocate containers from RM (if non-uber) or to fake it (uber) containerAllocator = createContainerAllocator(null, context); addIfService(containerAllocator); dispatcher.register(ContainerAllocator.EventType.class, containerAllocator); if (copyHistory) { - // Add the staging directory cleaner before the history server but after - // the container allocator so the staging directory is cleaned after - // the history has been flushed but before unregistering with the RM. - addService(createStagingDirCleaningService()); - // Add the JobHistoryEventHandler last so that it is properly stopped first. // This will guarantee that all history-events are flushed before AM goes // ahead with shutdown. @@ -344,7 +349,6 @@ public class MRAppMaster extends Composi // component creates a JobHistoryEvent in the meanwhile, it will be just be // queued inside the JobHistoryEventHandler addIfService(historyService); - JobHistoryCopyService cpHist = new JobHistoryCopyService(appAttemptID, dispatcher.getEventHandler()); @@ -396,6 +400,14 @@ public class MRAppMaster extends Composi dispatcher.register(Speculator.EventType.class, speculatorEventDispatcher); + // Now that there's a FINISHING state for application on RM to give AMs + // plenty of time to clean up after unregister it's safe to clean staging + // directory after unregistering with RM. So, we start the staging-dir + // cleaner BEFORE the ContainerAllocator so that on shut-down, + // ContainerAllocator unregisters first and then the staging-dir cleaner + // deletes staging directory. + addService(createStagingDirCleaningService()); + // service to allocate containers from RM (if non-uber) or to fake it (uber) addIfService(containerAllocator); dispatcher.register(ContainerAllocator.EventType.class, containerAllocator); @@ -405,11 +417,6 @@ public class MRAppMaster extends Composi addIfService(containerLauncher); dispatcher.register(ContainerLauncher.EventType.class, containerLauncher); - // Add the staging directory cleaner before the history server but after - // the container allocator so the staging directory is cleaned after - // the history has been flushed but before unregistering with the RM. - addService(createStagingDirCleaningService()); - // Add the JobHistoryEventHandler last so that it is properly stopped first. // This will guarantee that all history-events are flushed before AM goes // ahead with shutdown. Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1516663&r1=1516662&r2=1516663&view=diff ============================================================================== --- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original) +++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Thu Aug 22 23:18:20 2013 @@ -36,6 +36,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; @@ -54,6 +56,7 @@ import org.apache.hadoop.service.Service import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; @@ -279,14 +282,17 @@ import org.junit.Test; } private final class MRAppTestCleanup extends MRApp { - boolean stoppedContainerAllocator; - boolean cleanedBeforeContainerAllocatorStopped; - + int stagingDirCleanedup; + int ContainerAllocatorStopped; + int JobHistoryEventHandlerStopped; + int numStops; public MRAppTestCleanup(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart) { super(maps, reduces, autoComplete, testName, cleanOnStart); - stoppedContainerAllocator = false; - cleanedBeforeContainerAllocatorStopped = false; + stagingDirCleanedup = 0; + ContainerAllocatorStopped = 0; + JobHistoryEventHandlerStopped = 0; + numStops = 0; } @Override @@ -313,6 +319,26 @@ import org.junit.Test; } @Override + protected EventHandler<JobHistoryEvent> createJobHistoryHandler( + AppContext context) { + return new TestJobHistoryEventHandler(context, getStartCount()); + } + + private class TestJobHistoryEventHandler extends JobHistoryEventHandler { + + public TestJobHistoryEventHandler(AppContext context, int startCount) { + super(context, startCount); + } + + @Override + public void serviceStop() throws Exception { + numStops++; + JobHistoryEventHandlerStopped = numStops; + super.serviceStop(); + } + } + + @Override protected ContainerAllocator createContainerAllocator( ClientService clientService, AppContext context) { return new TestCleanupContainerAllocator(); @@ -334,7 +360,8 @@ import org.junit.Test; @Override protected void serviceStop() throws Exception { - stoppedContainerAllocator = true; + numStops++; + ContainerAllocatorStopped = numStops; super.serviceStop(); } } @@ -346,7 +373,8 @@ import org.junit.Test; @Override public void cleanupStagingDir() throws IOException { - cleanedBeforeContainerAllocatorStopped = !stoppedContainerAllocator; + numStops++; + stagingDirCleanedup = numStops; } @Override @@ -377,11 +405,15 @@ import org.junit.Test; app.verifyCompleted(); int waitTime = 20 * 1000; - while (waitTime > 0 && !app.cleanedBeforeContainerAllocatorStopped) { + while (waitTime > 0 && app.numStops < 3 ) { Thread.sleep(100); waitTime -= 100; } - Assert.assertTrue("Staging directory not cleaned before notifying RM", - app.cleanedBeforeContainerAllocatorStopped); + + // assert JobHistoryEventHandlerStopped first, then + // ContainerAllocatorStopped, and then stagingDirCleanedup + Assert.assertEquals(1, app.JobHistoryEventHandlerStopped); + Assert.assertEquals(2, app.ContainerAllocatorStopped); + Assert.assertEquals(3, app.stagingDirCleanedup); } }