YARN-8418. App local logs could leaked if log aggregation fails to initialize 
for the app. (Bibin A Chundatt via wangda)

Change-Id: I29a23ca4b219b48c92e7975cd44cddb8b0e04104


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4b540bbf
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4b540bbf
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4b540bbf

Branch: refs/heads/YARN-7402
Commit: 4b540bbfcf02d828052999215c6135603d98f5db
Parents: 8aa93a5
Author: Wangda Tan <wan...@apache.org>
Authored: Tue Jul 31 12:07:51 2018 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Tue Jul 31 12:08:00 2018 -0700

----------------------------------------------------------------------
 .../LogAggregationFileController.java           |  7 ++
 .../nodemanager/NodeStatusUpdaterImpl.java      |  1 +
 .../containermanager/ContainerManager.java      |  1 +
 .../containermanager/ContainerManagerImpl.java  | 13 ++-
 .../logaggregation/AppLogAggregator.java        |  8 ++
 .../logaggregation/AppLogAggregatorImpl.java    | 15 ++++
 .../logaggregation/LogAggregationService.java   | 83 ++++++++++++++++----
 .../containermanager/loghandler/LogHandler.java |  7 ++
 .../loghandler/NonAggregatingLogHandler.java    |  9 +++
 .../loghandler/event/LogHandlerEventType.java   |  4 +-
 .../event/LogHandlerTokenUpdatedEvent.java      | 26 ++++++
 .../nodemanager/DummyContainerManager.java      |  7 ++
 .../TestLogAggregationService.java              | 34 +++++---
 13 files changed, 187 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
index b047b1c..6b3c9a4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java
@@ -43,11 +43,14 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.webapp.View.ViewContext;
@@ -365,6 +368,10 @@ public abstract class LogAggregationFileController {
         }
       });
     } catch (Exception e) {
+      if (e instanceof RemoteException) {
+        throw new YarnRuntimeException(((RemoteException) e)
+            .unwrapRemoteException(SecretManager.InvalidToken.class));
+      }
       throw new YarnRuntimeException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 8154723..faf7adb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -1135,6 +1135,7 @@ public class NodeStatusUpdaterImpl extends 
AbstractService implements
             if (systemCredentials != null && !systemCredentials.isEmpty()) {
               ((NMContext) context).setSystemCrendentialsForApps(
                   parseCredentials(systemCredentials));
+              context.getContainerManager().handleCredentialUpdate();
             }
             List<org.apache.hadoop.yarn.api.records.Container>
                 containersToUpdate = response.getContainersToUpdate();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
index 2aeb245..356c2e0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
@@ -44,4 +44,5 @@ public interface ContainerManager extends 
ServiceStateChangeListener,
 
   ContainerScheduler getContainerScheduler();
 
+  void handleCredentialUpdate();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index ce240bc..8b35258 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -21,6 +21,7 @@ package 
org.apache.hadoop.yarn.server.nodemanager.containermanager;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.UpdateContainerTokenEvent;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -170,7 +171,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -214,6 +214,7 @@ public class ContainerManagerImpl extends CompositeService 
implements
   protected final AsyncDispatcher dispatcher;
 
   private final DeletionService deletionService;
+  private LogHandler logHandler;
   private boolean serviceStopped = false;
   private final ReadLock readLock;
   private final WriteLock writeLock;
@@ -292,7 +293,7 @@ public class ContainerManagerImpl extends CompositeService 
implements
   @Override
   public void serviceInit(Configuration conf) throws Exception {
 
-    LogHandler logHandler =
+    logHandler =
       createLogHandler(conf, this.context, this.deletionService);
     addIfService(logHandler);
     dispatcher.register(LogHandlerEventType.class, logHandler);
@@ -1904,4 +1905,12 @@ public class ContainerManagerImpl extends 
CompositeService implements
   public ContainerScheduler getContainerScheduler() {
     return this.containerScheduler;
   }
+
+  @Override
+  public void handleCredentialUpdate() {
+    Set<ApplicationId> invalidApps = logHandler.getInvalidTokenApps();
+    if (!invalidApps.isEmpty()) {
+      dispatcher.getEventHandler().handle(new LogHandlerTokenUpdatedEvent());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
index 0178699..93436fa 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregator.java
@@ -18,6 +18,8 @@
 
 package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.server.api.ContainerLogContext;
 
 public interface AppLogAggregator extends Runnable {
@@ -29,4 +31,10 @@ public interface AppLogAggregator extends Runnable {
   void finishLogAggregation();
 
   void disableLogAggregation();
+
+  void enableLogAggregation();
+
+  boolean isAggregationEnabled();
+
+  UserGroupInformation updateCredentials(Credentials cred);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 6630ba6..04503ef 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -561,6 +561,16 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
     this.logAggregationDisabled = true;
   }
 
+  @Override
+  public void enableLogAggregation() {
+    this.logAggregationDisabled = false;
+  }
+
+  @Override
+  public boolean isAggregationEnabled() {
+    return !logAggregationDisabled;
+  }
+
   @Private
   @VisibleForTesting
   // This is only used for testing.
@@ -643,6 +653,11 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
     return this.userUgi;
   }
 
+  public UserGroupInformation updateCredentials(Credentials cred) {
+    this.userUgi.addCredentials(cred);
+    return userUgi;
+  }
+
   @Private
   @VisibleForTesting
   public int getLogAggregationTimes() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index dcc165f..d8db967 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -20,10 +20,14 @@ package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.security.token.SecretManager;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +62,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
 
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -83,6 +88,9 @@ public class LogAggregationService extends AbstractService 
implements
 
   private final ConcurrentMap<ApplicationId, AppLogAggregator> 
appLogAggregators;
 
+  // Holds applications whose aggregation is disable due to invalid Token
+  private final Set<ApplicationId> invalidTokenApps;
+
   @VisibleForTesting
   ExecutorService threadPool;
   
@@ -95,6 +103,7 @@ public class LogAggregationService extends AbstractService 
implements
     this.dirsHandler = dirsHandler;
     this.appLogAggregators =
         new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
+    this.invalidTokenApps = ConcurrentHashMap.newKeySet();
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
@@ -224,8 +233,8 @@ public class LogAggregationService extends AbstractService 
implements
       userUgi.addCredentials(credentials);
     }
 
-    LogAggregationFileController logAggregationFileController
-        = getLogAggregationFileController(getConfig());
+    LogAggregationFileController logAggregationFileController =
+        getLogAggregationFileController(getConfig());
     logAggregationFileController.verifyAndCreateRemoteLogDir();
     // New application
     final AppLogAggregator appLogAggregator =
@@ -245,14 +254,16 @@ public class LogAggregationService extends 
AbstractService implements
       logAggregationFileController.createAppDir(user, appId, userUgi);
     } catch (Exception e) {
       appLogAggregator.disableLogAggregation();
+
+      // add to disabled aggregators if due to InvalidToken
+      if (e.getCause() instanceof SecretManager.InvalidToken) {
+        invalidTokenApps.add(appId);
+      }
       if (!(e instanceof YarnRuntimeException)) {
         appDirException = new YarnRuntimeException(e);
       } else {
         appDirException = (YarnRuntimeException)e;
       }
-      appLogAggregators.remove(appId);
-      closeFileSystems(userUgi);
-      throw appDirException;
     }
 
     // TODO Get the user configuration for the list of containers that need log
@@ -270,6 +281,10 @@ public class LogAggregationService extends AbstractService 
implements
       }
     };
     this.threadPool.execute(aggregatorWrapper);
+
+    if (appDirException != null) {
+      throw appDirException;
+    }
   }
 
   protected void closeFileSystems(final UserGroupInformation userUgi) {
@@ -307,17 +322,20 @@ public class LogAggregationService extends 
AbstractService implements
 
     // App is complete. Finish up any containers' pending log aggregation and
     // close the application specific logFile.
-
-    AppLogAggregator aggregator = this.appLogAggregators.get(appId);
-    if (aggregator == null) {
-      LOG.warn("Log aggregation is not initialized for " + appId
-          + ", did it fail to start?");
-      this.dispatcher.getEventHandler().handle(
-          new ApplicationEvent(appId,
-              ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
-      return;
+    try {
+      AppLogAggregator aggregator = this.appLogAggregators.get(appId);
+      if (aggregator == null) {
+        LOG.warn("Log aggregation is not initialized for " + appId
+            + ", did it fail to start?");
+        this.dispatcher.getEventHandler().handle(new ApplicationEvent(appId,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
+        return;
+      }
+      aggregator.finishLogAggregation();
+    } finally {
+      // Remove invalid Token Apps
+      invalidTokenApps.remove(appId);
     }
-    aggregator.finishLogAggregation();
   }
 
   @Override
@@ -344,12 +362,47 @@ public class LogAggregationService extends 
AbstractService implements
             (LogHandlerAppFinishedEvent) event;
         stopApp(appFinishedEvent.getApplicationId());
         break;
+      case LOG_AGG_TOKEN_UPDATE:
+        checkAndEnableAppAggregators();
+        break;
       default:
         ; // Ignore
     }
 
   }
 
+  private void checkAndEnableAppAggregators() {
+    for (ApplicationId appId : invalidTokenApps) {
+      try {
+        AppLogAggregator aggregator = appLogAggregators.get(appId);
+        if (aggregator != null) {
+          Credentials credentials =
+              context.getSystemCredentialsForApps().get(appId);
+          if (credentials != null) {
+            // Create the app dir again with
+            LogAggregationFileController logAggregationFileController =
+                getLogAggregationFileController(getConfig());
+            UserGroupInformation userUgi =
+                aggregator.updateCredentials(credentials);
+            logAggregationFileController
+                .createAppDir(userUgi.getShortUserName(), appId, userUgi);
+            aggregator.enableLogAggregation();
+          }
+          invalidTokenApps.remove(appId);
+          LOG.info("LogAggregation enabled for application {}", appId);
+        }
+      } catch (Exception e) {
+        //Ignore exception
+        LOG.warn("Enable aggregators failed {}", appId);
+      }
+    }
+  }
+
+  @Override
+  public Set<ApplicationId> getInvalidTokenApps() {
+    return invalidTokenApps;
+  }
+
   @VisibleForTesting
   public ConcurrentMap<ApplicationId, AppLogAggregator> getAppLogAggregators() 
{
     return this.appLogAggregators;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java
index 6eb3fb4..459fdf4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/LogHandler.java
@@ -18,9 +18,16 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
 
+
+
+import java.util.Set;
+
 public interface LogHandler extends EventHandler<LogHandlerEvent> {
   public void handle(LogHandlerEvent event);
+
+  public Set<ApplicationId> getInvalidTokenApps();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
index 9c43dde..d66aa12 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -19,8 +19,12 @@ package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
@@ -204,6 +208,11 @@ public class NonAggregatingLogHandler extends 
AbstractService implements
     }
   }
 
+  @Override
+  public Set<ApplicationId> getInvalidTokenApps() {
+    return Collections.emptySet();
+  }
+
   ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
       Configuration conf) {
     ThreadFactory tf =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java
index 684d6b2..ec477c2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerEventType.java
@@ -19,5 +19,7 @@
 package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
 
 public enum LogHandlerEventType {
-  APPLICATION_STARTED, CONTAINER_FINISHED, APPLICATION_FINISHED
+  APPLICATION_STARTED,
+  CONTAINER_FINISHED,
+  APPLICATION_FINISHED, LOG_AGG_TOKEN_UPDATE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java
new file mode 100644
index 0000000..772a463
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerTokenUpdatedEvent.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
+
+public class LogHandlerTokenUpdatedEvent extends LogHandlerEvent {
+
+  public LogHandlerTokenUpdatedEvent() {
+    super(LogHandlerEventType.LOG_AGG_TOKEN_UPDATE);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
index b5cb43b..feabeb1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
@@ -24,6 +24,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -187,6 +189,11 @@ public class DummyContainerManager extends 
ContainerManagerImpl {
             // Ignore
           }
       }
+
+      @Override
+      public Set<ApplicationId> getInvalidTokenApps() {
+        return Collections.emptySet();
+      }
     };
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4b540bbf/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 6268ad9..8b2e3cc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -73,6 +73,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -128,6 +129,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.Tes
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerTokenUpdatedEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -823,7 +825,8 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
         .getFileControllerForWrite();
     LogAggregationFileController spyLogAggregationFileFormat =
         spy(logAggregationFileFormat);
-    Exception e = new RuntimeException("KABOOM!");
+    Exception e =
+        new YarnRuntimeException(new SecretManager.InvalidToken("KABOOM!"));
     doThrow(e).when(spyLogAggregationFileFormat)
         .createAppDir(any(String.class), any(ApplicationId.class),
             any(UserGroupInformation.class));
@@ -862,29 +865,40 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     };
     checkEvents(appEventHandler, expectedEvents, false,
         "getType", "getApplicationID", "getDiagnostic");
-
+    Assert.assertEquals(logAggregationService.getInvalidTokenApps().size(), 1);
     // verify trying to collect logs for containers/apps we don't know about
     // doesn't blow up and tear down the NM
     logAggregationService.handle(new LogHandlerContainerFinishedEvent(
         BuilderUtils.newContainerId(4, 1, 1, 1),
         ContainerType.APPLICATION_MASTER, 0));
     dispatcher.await();
+
+    AppLogAggregator appAgg =
+        logAggregationService.getAppLogAggregators().get(appId);
+    Assert.assertFalse("Aggregation should be disabled",
+        appAgg.isAggregationEnabled());
+
+    // Enabled aggregation
+    logAggregationService.handle(new LogHandlerTokenUpdatedEvent());
+    dispatcher.await();
+
+    appAgg =
+        logAggregationService.getAppLogAggregators().get(appId);
+    Assert.assertFalse("Aggregation should be enabled",
+        appAgg.isAggregationEnabled());
+
+    // Check disabled apps are cleared
+    Assert.assertEquals(0, logAggregationService.getInvalidTokenApps().size());
+
     logAggregationService.handle(new LogHandlerAppFinishedEvent(
         BuilderUtils.newApplicationId(1, 5)));
     dispatcher.await();
 
     logAggregationService.stop();
     assertEquals(0, logAggregationService.getNumAggregators());
-    // local log dir shouldn't be deleted given log aggregation cannot
-    // continue due to aggregated log dir creation failure on remoteFS.
-    FileDeletionTask deletionTask = new FileDeletionTask(spyDelSrvc, user,
-        null, null);
-    verify(spyDelSrvc, never()).delete(deletionTask);
+    verify(spyDelSrvc).delete(any(FileDeletionTask.class));
     verify(logAggregationService).closeFileSystems(
         any(UserGroupInformation.class));
-    // make sure local log dir is not deleted in case log aggregation
-    // service cannot be initiated.
-    assertTrue(appLogDir.exists());
   }
 
   private void writeContainerLogs(File appLogDir, ContainerId containerId,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to