Author: bobby
Date: Wed Sep  5 19:44:51 2012
New Revision: 1381322

URL: http://svn.apache.org/viewvc?rev=1381322&view=rev
Log:
svn merge -c 1381317 FIXES: YARN-68. NodeManager will refuse to shutdown 
indefinitely due to container log aggregation (daryn via bobby)

Modified:
    hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/CHANGES.txt
    
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
    
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
    
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
    
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

Modified: hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/CHANGES.txt?rev=1381322&r1=1381321&r2=1381322&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/CHANGES.txt 
(original)
+++ hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/CHANGES.txt Wed 
Sep  5 19:44:51 2012
@@ -68,3 +68,6 @@ Release 0.23.3 - Unreleased
 
     YARN-60. Fixed a bug in ResourceManager which causes all NMs to get NPEs 
and
     thus causes all containers to be rejected. (vinodkv)
+
+    YARN-68. NodeManager will refuse to shutdown indefinitely due to container
+    log aggregation (daryn via bobby)

Modified: 
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java?rev=1381322&r1=1381321&r2=1381322&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
 (original)
+++ 
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
 Wed Sep  5 19:44:51 2012
@@ -26,7 +26,4 @@ public interface AppLogAggregator extend
       boolean wasContainerSuccessful);
 
   void finishLogAggregation();
-
-  void join();
-
 }

Modified: 
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1381322&r1=1381321&r2=1381322&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
 (original)
+++ 
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
 Wed Sep  5 19:44:51 2012
@@ -137,6 +137,9 @@ public class AppLogAggregatorImpl implem
     try {
       doAppLogAggregation();
     } finally {
+      if (!this.appAggregationFinished.get()) {
+        LOG.warn("Aggregation did not complete for application " + appId);
+      }
       this.appAggregationFinished.set(true);
     }
   }
@@ -155,6 +158,7 @@ public class AppLogAggregatorImpl implem
         }
       } catch (InterruptedException e) {
         LOG.warn("PendingContainers queue is interrupted");
+        this.appFinishing.set(true);
       }
     }
 
@@ -197,6 +201,7 @@ public class AppLogAggregatorImpl implem
     this.dispatcher.getEventHandler().handle(
         new ApplicationEvent(this.appId,
             ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
+    this.appAggregationFinished.set(true);    
   }
 
   private Path getRemoteNodeTmpLogFileForApp() {
@@ -250,21 +255,4 @@ public class AppLogAggregatorImpl implem
     LOG.info("Application just finished : " + this.applicationId);
     this.appFinishing.set(true);
   }
-
-  @Override
-  public void join() {
-    // Aggregation service is finishing
-    this.finishLogAggregation();
-
-    while (!this.appAggregationFinished.get()) {
-      LOG.info("Waiting for aggregation to complete for "
-          + this.applicationId);
-      try {
-        Thread.sleep(THREAD_SLEEP_TIME);
-      } catch (InterruptedException e) {
-        LOG.warn("Join interrupted. Some logs may not have been aggregated!!");
-        break;
-      }
-    }
-  }
 }

Modified: 
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1381322&r1=1381321&r2=1381322&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
 (original)
+++ 
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
 Wed Sep  5 19:44:51 2012
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,8 +36,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -137,11 +136,33 @@ public class LogAggregationService exten
   @Override
   public synchronized void stop() {
     LOG.info(this.getName() + " waiting for pending aggregation during exit");
-    for (AppLogAggregator appLogAggregator : this.appLogAggregators.values()) {
-      appLogAggregator.join();
-    }
+    stopAggregators();
     super.stop();
   }
+   
+  private void stopAggregators() {
+    threadPool.shutdown();
+    // politely ask to finish
+    for (AppLogAggregator aggregator : appLogAggregators.values()) {
+      aggregator.finishLogAggregation();
+    }
+    while (!threadPool.isTerminated()) { // wait for all threads to finish
+      for (ApplicationId appId : appLogAggregators.keySet()) {
+        LOG.info("Waiting for aggregation to complete for " + appId);
+      }
+      try {
+        if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
+          threadPool.shutdownNow(); // send interrupt to hurry them along
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Aggregation stop interrupted!");
+        break;
+      }
+    }
+    for (ApplicationId appId : appLogAggregators.keySet()) {
+      LOG.warn("Some logs may not have been aggregated for " + appId);
+    }
+  }
   
   private void verifyAndCreateRemoteLogDir(Configuration conf) {
     // Checking the existance of the TLD
@@ -293,10 +314,7 @@ public class LogAggregationService exten
     final UserGroupInformation userUgi =
         UserGroupInformation.createRemoteUser(user);
     if (credentials != null) {
-      for (Token<? extends TokenIdentifier> token : credentials
-          .getAllTokens()) {
-        userUgi.addToken(token);
-      }
+      userUgi.addCredentials(credentials);
     }
 
     // New application
@@ -312,9 +330,13 @@ public class LogAggregationService exten
     try {
       // Create the app dir
       createAppDir(user, appId, userUgi);
-    } catch (YarnException e) {
+    } catch (Exception e) {
+      appLogAggregators.remove(appId);
       closeFileSystems(userUgi);
-      throw e;
+      if (!(e instanceof YarnException)) {
+        e = new YarnException(e);
+      }
+      throw (YarnException)e;
     }
 
 

Modified: 
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1381322&r1=1381321&r2=1381322&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
 (original)
+++ 
hadoop/common/branches/branch-0.23.3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
 Wed Sep  5 19:44:51 2012
@@ -157,14 +157,18 @@ public class TestLogAggregationService e
         application1));
 
     logAggregationService.stop();
+    assertEquals(0, logAggregationService.getNumAggregators());
     // ensure filesystems were closed
     verify(logAggregationService).closeFileSystems(
         any(UserGroupInformation.class));
     
+    delSrvc.stop();
+    
     String containerIdStr = ConverterUtils.toString(container11);
     File containerLogDir = new File(app1LogDir, containerIdStr);
     for (String fileType : new String[] { "stdout", "stderr", "syslog" }) {
-      Assert.assertFalse(new File(containerLogDir, fileType).exists());
+      File f = new File(containerLogDir, fileType);
+      Assert.assertFalse("check "+f, f.exists());
     }
 
     Assert.assertFalse(app1LogDir.exists());
@@ -222,6 +226,7 @@ public class TestLogAggregationService e
         application1));
 
     logAggregationService.stop();
+    assertEquals(0, logAggregationService.getNumAggregators());
 
     Assert.assertFalse(new File(logAggregationService
         .getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath())
@@ -356,6 +361,7 @@ public class TestLogAggregationService e
         application1));
 
     logAggregationService.stop();
+    assertEquals(0, logAggregationService.getNumAggregators());
 
     verifyContainerLogs(logAggregationService, application1,
         new ContainerId[] { container11, container12 });
@@ -454,7 +460,8 @@ public class TestLogAggregationService e
     
     ApplicationId appId = BuilderUtils.newApplicationId(
         System.currentTimeMillis(), (int)Math.random());
-    doThrow(new YarnException("KABOOM!"))
+    Exception e = new RuntimeException("KABOOM!");
+    doThrow(e)
       .when(logAggregationService).createAppDir(any(String.class),
           any(ApplicationId.class), any(UserGroupInformation.class));
     logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
@@ -463,7 +470,8 @@ public class TestLogAggregationService e
     
     dispatcher.await();
     ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
-        new ApplicationFinishEvent(appId, "Application failed to init 
aggregation: KABOOM!")
+        new ApplicationFinishEvent(appId,
+            "Application failed to init aggregation: "+e)
     };
     checkEvents(appEventHandler, expectedEvents, false,
         "getType", "getApplicationID", "getDiagnostic");
@@ -479,6 +487,9 @@ public class TestLogAggregationService e
     logAggregationService.handle(new LogHandlerAppFinishedEvent(
         BuilderUtils.newApplicationId(1, 5)));
     dispatcher.await();
+
+    logAggregationService.stop();
+    assertEquals(0, logAggregationService.getNumAggregators());
   }
 
   private void writeContainerLogs(File appLogDir, ContainerId containerId)
@@ -690,6 +701,7 @@ public class TestLogAggregationService e
             ContainerLogsRetentionPolicy.ALL_CONTAINERS, this.acls));
 
     logAggregationService.stop();
+    assertEquals(0, logAggregationService.getNumAggregators());
   }
 
   @Test


Reply via email to