Repository: hadoop
Updated Branches:
  refs/heads/trunk ad9441122 -> 81effb7dc


YARN-4325. Nodemanager log handlers fail to send finished/failed events in some 
cases. Contributed by Junping Du


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81effb7d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81effb7d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81effb7d

Branch: refs/heads/trunk
Commit: 81effb7dcde2b31423438d6f1b8b8204d4ca05b3
Parents: ad94411
Author: Jason Lowe <jl...@apache.org>
Authored: Mon May 16 15:40:23 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Mon May 16 15:40:23 2016 +0000

----------------------------------------------------------------------
 .../application/ApplicationImpl.java            |  8 +-
 .../logaggregation/AppLogAggregatorImpl.java    |  4 +
 .../logaggregation/LogAggregationService.java   |  5 +-
 .../loghandler/NonAggregatingLogHandler.java    |  4 +
 .../TestContainerManagerRecovery.java           | 84 ++++++++++++++++++++
 .../TestNonAggregatingLogHandler.java           | 58 +++++++++++++-
 6 files changed, 155 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/81effb7d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index fbc8453..efa258a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -207,18 +207,18 @@ public class ApplicationImpl implements Application {
                   ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
                   ApplicationEventType.APPLICATION_INITED,
                   ApplicationEventType.FINISH_APPLICATION))
-           
+
            // Transitions from FINISHED state
            .addTransition(ApplicationState.FINISHED,
                ApplicationState.FINISHED,
-               ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
+               EnumSet.of(
+                   ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
+                   ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED),
                new AppLogsAggregatedTransition())
            .addTransition(ApplicationState.FINISHED, ApplicationState.FINISHED,
                EnumSet.of(
                   ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
-                  ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED,
                   ApplicationEventType.FINISH_APPLICATION))
-               
            // create the topology tables
            .installTopology();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81effb7d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index fed4a3b..32b0934 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -501,6 +501,7 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
     }
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void run() {
     try {
@@ -513,6 +514,9 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
     } finally {
       if (!this.appAggregationFinished.get()) {
         LOG.warn("Aggregation did not complete for application " + appId);
+        this.dispatcher.getEventHandler().handle(
+            new ApplicationEvent(this.appId,
+                ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
       }
       this.appAggregationFinished.set(true);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81effb7d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index d46f7a3..189e7ff 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -416,7 +416,6 @@ public class LogAggregationService extends AbstractService 
implements
 
     // A container is complete. Put this containers' logs up for aggregation if
     // this containers' logs are needed.
-
     AppLogAggregator aggregator = this.appLogAggregators.get(
         containerId.getApplicationAttemptId().getApplicationId());
     if (aggregator == null) {
@@ -436,6 +435,7 @@ public class LogAggregationService extends AbstractService 
implements
         new ContainerLogContext(containerId, containerType, exitCode));
   }
 
+  @SuppressWarnings("unchecked")
   private void stopApp(ApplicationId appId) {
 
     // App is complete. Finish up any containers' pending log aggregation and
@@ -445,6 +445,9 @@ public class LogAggregationService extends AbstractService 
implements
     if (aggregator == null) {
       LOG.warn("Log aggregation is not initialized for " + appId
           + ", did it fail to start?");
+      this.dispatcher.getEventHandler().handle(
+          new ApplicationEvent(appId,
+              ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
       return;
     }
     aggregator.finishLogAggregation();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81effb7d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
index d42a4e7..2901743 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -171,6 +171,10 @@ public class NonAggregatingLogHandler extends 
AbstractService implements
         String user = appOwners.remove(appId);
         if (user == null) {
           LOG.error("Unable to locate user for " + appId);
+          // send LOG_HANDLING_FAILED out
+          NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
+              new ApplicationEvent(appId,
+                  ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
           break;
         }
         LogDeleterRunnable logDeleter = new LogDeleterRunnable(user, appId);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81effb7d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index ccdfd64..61477a7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -295,6 +295,90 @@ public class TestContainerManagerRecovery extends 
BaseContainerManagerTest {
   }
 
   @Test
+  public void testNMRecoveryForAppFinishedWithLogAggregationFailure()
+      throws Exception {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+
+    NMStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    Context context = createContext(conf, stateStore);
+    ContainerManagerImpl cm = createContainerManager(context);
+    cm.init(conf);
+    cm.start();
+
+    // add an application by starting a container
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId attemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+    Map<String, LocalResource> localResources = Collections.emptyMap();
+    Map<String, String> containerEnv = Collections.emptyMap();
+    List<String> containerCmds = Collections.emptyList();
+    Map<String, ByteBuffer> serviceData = Collections.emptyMap();
+
+    ContainerLaunchContext clc = ContainerLaunchContext.newInstance(
+        localResources, containerEnv, containerCmds, serviceData,
+        null, null);
+
+    StartContainersResponse startResponse = startContainer(context, cm, cid,
+        clc, null);
+    assertTrue(startResponse.getFailedRequests().isEmpty());
+    assertEquals(1, context.getApplications().size());
+    Application app = context.getApplications().get(appId);
+    assertNotNull(app);
+    waitForAppState(app, ApplicationState.INITING);
+
+    // simulate application completion
+    List<ApplicationId> finishedApps = new ArrayList<ApplicationId>();
+    finishedApps.add(appId);
+    cm.handle(new CMgrCompletedAppsEvent(finishedApps,
+        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+    waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
+
+    app.handle(new ApplicationEvent(app.getAppId(),
+        ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
+    assertEquals(app.getApplicationState(), ApplicationState.FINISHED);
+    // application is still in NM context.
+    assertEquals(1, context.getApplications().size());
+
+    // restart and verify app is still there and marked as finished.
+    cm.stop();
+    context = createContext(conf, stateStore);
+    cm = createContainerManager(context);
+    cm.init(conf);
+    cm.start();
+    assertEquals(1, context.getApplications().size());
+    app = context.getApplications().get(appId);
+    assertNotNull(app);
+
+    // no longer saving FINISH_APP event in NM stateStore,
+    // simulate by resending FINISH_APP event
+    cm.handle(new CMgrCompletedAppsEvent(finishedApps,
+        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+    waitForAppState(app, ApplicationState.APPLICATION_RESOURCES_CLEANINGUP);
+    // TODO need to figure out why additional APPLICATION_RESOURCES_CLEANEDUP
+    // is needed.
+    app.handle(new ApplicationEvent(app.getAppId(),
+        ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP));
+    assertEquals(app.getApplicationState(), ApplicationState.FINISHED);
+
+    // simulate log aggregation failed.
+    app.handle(new ApplicationEvent(app.getAppId(),
+        ApplicationEventType.APPLICATION_LOG_HANDLING_FAILED));
+
+    // restart and verify app is no longer present after recovery
+    cm.stop();
+    context = createContext(conf, stateStore);
+    cm = createContainerManager(context);
+    cm.init(conf);
+    cm.start();
+    assertTrue(context.getApplications().isEmpty());
+    cm.stop();
+  }
+
+  @Test
   public void testContainerResizeRecovery() throws Exception {
     conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
     conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81effb7d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
index 46d06da..ec3757e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
@@ -84,7 +87,7 @@ public class TestNonAggregatingLogHandler {
   DeletionService mockDelService;
   Configuration conf;
   DrainDispatcher dispatcher;
-  EventHandler<ApplicationEvent> appEventHandler;
+  private ApplicationEventHandler appEventHandler;
   String user = "testuser";
   ApplicationId appId;
   ApplicationAttemptId appAttemptId;
@@ -97,7 +100,7 @@ public class TestNonAggregatingLogHandler {
     mockDelService = mock(DeletionService.class);
     conf = new YarnConfiguration();
     dispatcher = createDispatcher(conf);
-    appEventHandler = mock(EventHandler.class);
+    appEventHandler = new ApplicationEventHandler();
     dispatcher.register(ApplicationEventType.class, appEventHandler);
     appId = BuilderUtils.newApplicationId(1234, 1);
     appAttemptId = BuilderUtils.newApplicationAttemptId(appId, 1);
@@ -345,6 +348,9 @@ public class TestNonAggregatingLogHandler {
 
     dirsHandler.init(conf);
 
+    appEventHandler.resetLogHandlingEvent();
+    assertFalse(appEventHandler.receiveLogHandlingFinishEvent());
+
     NMStateStoreService stateStore = new NMMemoryStateStoreService();
     stateStore.init(conf);
     stateStore.start();
@@ -377,8 +383,21 @@ public class TestNonAggregatingLogHandler {
     logHandler.start();
     verify(logHandler.mockSched, never()).schedule(any(Runnable.class),
         anyLong(), any(TimeUnit.class));
+
+    // wait events get drained.
+    this.dispatcher.await();
+    assertTrue(appEventHandler.receiveLogHandlingFinishEvent());
+
+    appEventHandler.resetLogHandlingEvent();
+    assertFalse(appEventHandler.receiveLogHandlingFailedEvent());
+    // send an app finish event against a removed app
+    logHandler.handle(new LogHandlerAppFinishedEvent(appId));
+    this.dispatcher.await();
+    // verify to receive a log failed event.
+    assertTrue(appEventHandler.receiveLogHandlingFailedEvent());
+    assertFalse(appEventHandler.receiveLogHandlingFinishEvent());
     logHandler.close();
-   }
+  }
 
   /**
    * Function to run a log handler with directories failing the getFileStatus
@@ -536,4 +555,37 @@ public class TestNonAggregatingLogHandler {
     }
     return dirs;
   }
+
+  class ApplicationEventHandler implements EventHandler<ApplicationEvent> {
+
+    private boolean logHandlingFinished = false;
+    private boolean logHandlingFailed = false;
+
+    @Override
+    public void handle(ApplicationEvent event) {
+      switch (event.getType()) {
+      case APPLICATION_LOG_HANDLING_FINISHED:
+        logHandlingFinished = true;
+        break;
+      case APPLICATION_LOG_HANDLING_FAILED:
+        logHandlingFailed = true;
+      default:
+        // do nothing.
+      }
+    }
+
+    public boolean receiveLogHandlingFinishEvent() {
+      return logHandlingFinished;
+    }
+
+    public boolean receiveLogHandlingFailedEvent() {
+      return logHandlingFailed;
+    }
+
+    public void resetLogHandlingEvent() {
+      logHandlingFinished = false;
+      logHandlingFailed = false;
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to