Author: bobby Date: Wed Sep 5 19:44:51 2012 New Revision: 1381322 URL: http://svn.apache.org/viewvc?rev=1381322&view=rev Log: svn merge -c 1381317 FIXES: YARN-68. NodeManager will refuse to shutdown indefinitely due to container log aggregation (daryn via bobby)
Modified: hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/CHANGES.txt hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Modified: hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/CHANGES.txt?rev=1381322&r1=1381321&r2=1381322&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/CHANGES.txt Wed Sep 5 19:44:51 2012 @@ -68,3 +68,6 @@ Release 0.23.3 - Unreleased YARN-60. Fixed a bug in ResourceManager which causes all NMs to get NPEs and thus causes all containers to be rejected. (vinodkv) + + YARN-68. NodeManager will refuse to shutdown indefinitely due to container + log aggregation (daryn via bobby) Modified: hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java?rev=1381322&r1=1381321&r2=1381322&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java (original) +++ hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java Wed Sep 5 19:44:51 2012 @@ -26,7 +26,4 @@ public interface AppLogAggregator extend boolean wasContainerSuccessful); void finishLogAggregation(); - - void join(); - } Modified: hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1381322&r1=1381321&r2=1381322&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original) +++ hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Wed Sep 5 19:44:51 2012 @@ -137,6 +137,9 @@ public class AppLogAggregatorImpl implem try { doAppLogAggregation(); } finally { + if (!this.appAggregationFinished.get()) { + LOG.warn("Aggregation did not complete for application " + appId); + } this.appAggregationFinished.set(true); } } @@ -155,6 +158,7 @@ public class AppLogAggregatorImpl implem } } catch (InterruptedException e) { LOG.warn("PendingContainers queue is interrupted"); + this.appFinishing.set(true); } } @@ -197,6 +201,7 @@ public class AppLogAggregatorImpl implem this.dispatcher.getEventHandler().handle( new ApplicationEvent(this.appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); + this.appAggregationFinished.set(true); } private Path getRemoteNodeTmpLogFileForApp() { @@ -250,21 +255,4 @@ public class AppLogAggregatorImpl implem LOG.info("Application just finished : " + this.applicationId); this.appFinishing.set(true); } - - @Override - public void join() { - // Aggregation service is finishing - this.finishLogAggregation(); - - while (!this.appAggregationFinished.get()) { - LOG.info("Waiting for aggregation to complete for " - + this.applicationId); - try { - Thread.sleep(THREAD_SLEEP_TIME); - } catch (InterruptedException e) { - LOG.warn("Join interrupted. Some logs may not have been aggregated!!"); - break; - } - } - } } Modified: hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1381322&r1=1381321&r2=1381322&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java (original) +++ hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java Wed Sep 5 19:44:51 2012 @@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHa import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,8 +36,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationAccessType; @@ -137,11 +136,33 @@ public class LogAggregationService exten @Override public synchronized void stop() { LOG.info(this.getName() + " waiting for pending aggregation during exit"); - for (AppLogAggregator appLogAggregator : this.appLogAggregators.values()) { - appLogAggregator.join(); - } + stopAggregators(); super.stop(); } + + private void stopAggregators() { + threadPool.shutdown(); + // politely ask to finish + for (AppLogAggregator aggregator : appLogAggregators.values()) { + aggregator.finishLogAggregation(); + } + while (!threadPool.isTerminated()) { // wait for all threads to finish + for (ApplicationId appId : appLogAggregators.keySet()) { + LOG.info("Waiting for aggregation to complete for " + appId); + } + try { + if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); // send interrupt to hurry them along + } + } catch (InterruptedException e) { + LOG.warn("Aggregation stop interrupted!"); + break; + } + } + for (ApplicationId appId : appLogAggregators.keySet()) { + LOG.warn("Some logs may not have been aggregated for " + appId); + } + } private void verifyAndCreateRemoteLogDir(Configuration conf) { // Checking the existance of the TLD @@ -293,10 +314,7 @@ public class LogAggregationService exten final UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user); if (credentials != null) { - for (Token<? extends TokenIdentifier> token : credentials - .getAllTokens()) { - userUgi.addToken(token); - } + userUgi.addCredentials(credentials); } // New application @@ -312,9 +330,13 @@ public class LogAggregationService exten try { // Create the app dir createAppDir(user, appId, userUgi); - } catch (YarnException e) { + } catch (Exception e) { + appLogAggregators.remove(appId); closeFileSystems(userUgi); - throw e; + if (!(e instanceof YarnException)) { + e = new YarnException(e); + } + throw (YarnException)e; } Modified: hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1381322&r1=1381321&r2=1381322&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java (original) +++ hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java Wed Sep 5 19:44:51 2012 @@ -157,14 +157,18 @@ public class TestLogAggregationService e application1)); logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); // ensure filesystems were closed verify(logAggregationService).closeFileSystems( any(UserGroupInformation.class)); + delSrvc.stop(); + String containerIdStr = ConverterUtils.toString(container11); File containerLogDir = new File(app1LogDir, containerIdStr); for (String fileType : new String[] { "stdout", "stderr", "syslog" }) { - Assert.assertFalse(new File(containerLogDir, fileType).exists()); + File f = new File(containerLogDir, fileType); + Assert.assertFalse("check "+f, f.exists()); } Assert.assertFalse(app1LogDir.exists()); @@ -222,6 +226,7 @@ public class TestLogAggregationService e application1)); logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); Assert.assertFalse(new File(logAggregationService .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath()) @@ -356,6 +361,7 @@ public class TestLogAggregationService e application1)); logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); verifyContainerLogs(logAggregationService, application1, new ContainerId[] { container11, container12 }); @@ -454,7 +460,8 @@ public class TestLogAggregationService e ApplicationId appId = BuilderUtils.newApplicationId( System.currentTimeMillis(), (int)Math.random()); - doThrow(new YarnException("KABOOM!")) + Exception e = new RuntimeException("KABOOM!"); + doThrow(e) .when(logAggregationService).createAppDir(any(String.class), any(ApplicationId.class), any(UserGroupInformation.class)); logAggregationService.handle(new LogHandlerAppStartedEvent(appId, @@ -463,7 +470,8 @@ public class TestLogAggregationService e dispatcher.await(); ApplicationEvent expectedEvents[] = new ApplicationEvent[]{ - new ApplicationFinishEvent(appId, "Application failed to init aggregation: KABOOM!") + new ApplicationFinishEvent(appId, + "Application failed to init aggregation: "+e) }; checkEvents(appEventHandler, expectedEvents, false, "getType", "getApplicationID", "getDiagnostic"); @@ -479,6 +487,9 @@ public class TestLogAggregationService e logAggregationService.handle(new LogHandlerAppFinishedEvent( BuilderUtils.newApplicationId(1, 5))); dispatcher.await(); + + logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); } private void writeContainerLogs(File appLogDir, ContainerId containerId) @@ -690,6 +701,7 @@ public class TestLogAggregationService e ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls)); logAggregationService.stop(); + assertEquals(0, logAggregationService.getNumAggregators()); } @Test