Repository: hadoop
Updated Branches:
  refs/heads/branch-2 9d40eead8 -> c31e3ba92


YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when Node 
is connected/disconnected. Contributed by Bibin A Chundatt
(cherry picked from commit 32e490b6c035487e99df30ce80366446fe09bd6c)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c31e3ba9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c31e3ba9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c31e3ba9

Branch: refs/heads/branch-2
Commit: c31e3ba92132f232bd56b257f3854ffe430fbab9
Parents: 9d40eea
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Jul 31 17:37:24 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Jul 31 17:38:49 2015 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../resourcemanager/NodesListManager.java       |  28 ++--
 .../rmapp/TestNodesListManager.java             | 162 +++++++++++++++++++
 3 files changed, 181 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c31e3ba9/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 0bc4332..79a3f1b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -708,6 +708,9 @@ Release 2.7.2 - UNRELEASED
     YARN-3925. ContainerLogsUtils#getContainerLogFile fails to read container
     log files from full disks. (zhihai xu via jlowe)
 
+    YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when
+    Node is connected/disconnected (Bibin A Chundatt via jlowe)
+
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c31e3ba9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
index 1ad74bf..b9c76fb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java
@@ -178,12 +178,14 @@ public class NodesListManager extends AbstractService 
implements
       LOG.debug(eventNode + " reported unusable");
       unusableRMNodesConcurrentSet.add(eventNode);
       for(RMApp app: rmContext.getRMApps().values()) {
-        this.rmContext
-            .getDispatcher()
-            .getEventHandler()
-            .handle(
-                new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
-                    RMAppNodeUpdateType.NODE_UNUSABLE));
+        if (!app.isAppFinalStateStored()) {
+          this.rmContext
+              .getDispatcher()
+              .getEventHandler()
+              .handle(
+                  new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
+                      RMAppNodeUpdateType.NODE_UNUSABLE));
+        }
       }
       break;
     case NODE_USABLE:
@@ -192,12 +194,14 @@ public class NodesListManager extends AbstractService 
implements
         unusableRMNodesConcurrentSet.remove(eventNode);
       }
       for (RMApp app : rmContext.getRMApps().values()) {
-        this.rmContext
-            .getDispatcher()
-            .getEventHandler()
-            .handle(
-                new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
-                    RMAppNodeUpdateType.NODE_USABLE));
+        if (!app.isAppFinalStateStored()) {
+          this.rmContext
+              .getDispatcher()
+              .getEventHandler()
+              .handle(
+                  new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode,
+                      RMAppNodeUpdateType.NODE_USABLE));
+        }
       }
       break;
     default:

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c31e3ba9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
new file mode 100644
index 0000000..5330976
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+
+public class TestNodesListManager {
+  // To hold list of application for which event was received
+  ArrayList<ApplicationId> applist = new ArrayList<ApplicationId>();
+
+  @Test(timeout = 300000)
+  public void testNodeUsableEvent() throws Exception {
+    Logger rootLogger = LogManager.getRootLogger();
+    rootLogger.setLevel(Level.DEBUG);
+    final Dispatcher dispatcher = getDispatcher();
+    YarnConfiguration conf = new YarnConfiguration();
+    MockRM rm = new MockRM(conf) {
+      @Override
+      protected Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    rm.start();
+    MockNM nm1 = rm.registerNode("h1:1234", 28000);
+    NodesListManager nodesListManager = rm.getNodesListManager();
+    Resource clusterResource = Resource.newInstance(28000, 8);
+    RMNode rmnode = MockNodes.newNodeInfo(1, clusterResource);
+
+    // Create killing APP
+    RMApp killrmApp = rm.submitApp(200);
+    rm.killApp(killrmApp.getApplicationId());
+    rm.waitForState(killrmApp.getApplicationId(), RMAppState.KILLED);
+
+    // Create finish APP
+    RMApp finshrmApp = rm.submitApp(2000);
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt = finshrmApp.getCurrentAppAttempt();
+    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+    am.unregisterAppAttempt();
+    nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE);
+    am.waitForState(RMAppAttemptState.FINISHED);
+
+    // Create submitted App
+    RMApp subrmApp = rm.submitApp(200);
+
+    // Fire Event for NODE_USABLE
+    nodesListManager.handle(new NodesListManagerEvent(
+        NodesListManagerEventType.NODE_USABLE, rmnode));
+    if (applist.size() > 0) {
+      Assert.assertTrue(
+          "Event based on running app expected " + subrmApp.getApplicationId(),
+          applist.contains(subrmApp.getApplicationId()));
+      Assert.assertFalse(
+          "Event based on finish app not expected "
+              + finshrmApp.getApplicationId(),
+          applist.contains(finshrmApp.getApplicationId()));
+      Assert.assertFalse(
+          "Event based on killed app not expected "
+              + killrmApp.getApplicationId(),
+          applist.contains(killrmApp.getApplicationId()));
+    } else {
+      Assert.fail("Events received should have beeen more than 1");
+    }
+    applist.clear();
+
+    // Fire Event for NODE_UNUSABLE
+    nodesListManager.handle(new NodesListManagerEvent(
+        NodesListManagerEventType.NODE_UNUSABLE, rmnode));
+    if (applist.size() > 0) {
+      Assert.assertTrue(
+          "Event based on running app expected " + subrmApp.getApplicationId(),
+          applist.contains(subrmApp.getApplicationId()));
+      Assert.assertFalse(
+          "Event based on finish app not expected "
+              + finshrmApp.getApplicationId(),
+          applist.contains(finshrmApp.getApplicationId()));
+      Assert.assertFalse(
+          "Event based on killed app not expected "
+              + killrmApp.getApplicationId(),
+          applist.contains(killrmApp.getApplicationId()));
+    } else {
+      Assert.fail("Events received should have beeen more than 1");
+    }
+
+  }
+
+  /*
+   * Create dispatcher object
+   */
+  private Dispatcher getDispatcher() {
+    Dispatcher dispatcher = new AsyncDispatcher() {
+      @SuppressWarnings({ "rawtypes", "unchecked" })
+      @Override
+      public EventHandler getEventHandler() {
+
+        class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
+          @Override
+          public boolean matches(Object argument) {
+            if (argument instanceof RMAppNodeUpdateEvent) {
+              ApplicationId appid =
+                  ((RMAppNodeUpdateEvent) argument).getApplicationId();
+              applist.add(appid);
+            }
+            return false;
+          }
+        }
+
+        EventHandler handler = spy(super.getEventHandler());
+        doNothing().when(handler).handle(argThat(new EventArgMatcher()));
+        return handler;
+      }
+    };
+    return dispatcher;
+  }
+
+}

Reply via email to