Repository: hadoop Updated Branches: refs/heads/branch-2 9d40eead8 -> c31e3ba92
YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when Node is connected/disconnected. Contributed by Bibin A Chundatt (cherry picked from commit 32e490b6c035487e99df30ce80366446fe09bd6c) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c31e3ba9 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c31e3ba9 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c31e3ba9 Branch: refs/heads/branch-2 Commit: c31e3ba92132f232bd56b257f3854ffe430fbab9 Parents: 9d40eea Author: Jason Lowe <jl...@apache.org> Authored: Fri Jul 31 17:37:24 2015 +0000 Committer: Jason Lowe <jl...@apache.org> Committed: Fri Jul 31 17:38:49 2015 +0000 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/NodesListManager.java | 28 ++-- .../rmapp/TestNodesListManager.java | 162 +++++++++++++++++++ 3 files changed, 181 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c31e3ba9/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0bc4332..79a3f1b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -708,6 +708,9 @@ Release 2.7.2 - UNRELEASED YARN-3925. ContainerLogsUtils#getContainerLogFile fails to read container log files from full disks. (zhihai xu via jlowe) + YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when + Node is connected/disconnected (Bibin A Chundatt via jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/c31e3ba9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java index 1ad74bf..b9c76fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/NodesListManager.java @@ -178,12 +178,14 @@ public class NodesListManager extends AbstractService implements LOG.debug(eventNode + " reported unusable"); unusableRMNodesConcurrentSet.add(eventNode); for(RMApp app: rmContext.getRMApps().values()) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, - RMAppNodeUpdateType.NODE_UNUSABLE)); + if (!app.isAppFinalStateStored()) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, + RMAppNodeUpdateType.NODE_UNUSABLE)); + } } break; case NODE_USABLE: @@ -192,12 +194,14 @@ public class NodesListManager extends AbstractService implements unusableRMNodesConcurrentSet.remove(eventNode); } for (RMApp app : rmContext.getRMApps().values()) { - this.rmContext - .getDispatcher() - .getEventHandler() - .handle( - new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, - RMAppNodeUpdateType.NODE_USABLE)); + if (!app.isAppFinalStateStored()) { + this.rmContext + .getDispatcher() + .getEventHandler() + .handle( + new RMAppNodeUpdateEvent(app.getApplicationId(), eventNode, + RMAppNodeUpdateType.NODE_USABLE)); + } } break; default: http://git-wip-us.apache.org/repos/asf/hadoop/blob/c31e3ba9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java new file mode 100644 index 0000000..5330976 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestNodesListManager.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.rmapp; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.spy; + +import java.util.ArrayList; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; +import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentMatcher; + +public class TestNodesListManager { + // To hold list of application for which event was received + ArrayList<ApplicationId> applist = new ArrayList<ApplicationId>(); + + @Test(timeout = 300000) + public void testNodeUsableEvent() throws Exception { + Logger rootLogger = LogManager.getRootLogger(); + rootLogger.setLevel(Level.DEBUG); + final Dispatcher dispatcher = getDispatcher(); + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + rm.start(); + MockNM nm1 = rm.registerNode("h1:1234", 28000); + NodesListManager nodesListManager = rm.getNodesListManager(); + Resource clusterResource = Resource.newInstance(28000, 8); + RMNode rmnode = MockNodes.newNodeInfo(1, clusterResource); + + // Create killing APP + RMApp killrmApp = rm.submitApp(200); + rm.killApp(killrmApp.getApplicationId()); + rm.waitForState(killrmApp.getApplicationId(), RMAppState.KILLED); + + // Create finish APP + RMApp finshrmApp = rm.submitApp(2000); + nm1.nodeHeartbeat(true); + RMAppAttempt attempt = finshrmApp.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + am.unregisterAppAttempt(); + nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + + // Create submitted App + RMApp subrmApp = rm.submitApp(200); + + // Fire Event for NODE_USABLE + nodesListManager.handle(new NodesListManagerEvent( + NodesListManagerEventType.NODE_USABLE, rmnode)); + if (applist.size() > 0) { + Assert.assertTrue( + "Event based on running app expected " + subrmApp.getApplicationId(), + applist.contains(subrmApp.getApplicationId())); + Assert.assertFalse( + "Event based on finish app not expected " + + finshrmApp.getApplicationId(), + applist.contains(finshrmApp.getApplicationId())); + Assert.assertFalse( + "Event based on killed app not expected " + + killrmApp.getApplicationId(), + applist.contains(killrmApp.getApplicationId())); + } else { + Assert.fail("Events received should have beeen more than 1"); + } + applist.clear(); + + // Fire Event for NODE_UNUSABLE + nodesListManager.handle(new NodesListManagerEvent( + NodesListManagerEventType.NODE_UNUSABLE, rmnode)); + if (applist.size() > 0) { + Assert.assertTrue( + "Event based on running app expected " + subrmApp.getApplicationId(), + applist.contains(subrmApp.getApplicationId())); + Assert.assertFalse( + "Event based on finish app not expected " + + finshrmApp.getApplicationId(), + applist.contains(finshrmApp.getApplicationId())); + Assert.assertFalse( + "Event based on killed app not expected " + + killrmApp.getApplicationId(), + applist.contains(killrmApp.getApplicationId())); + } else { + Assert.fail("Events received should have beeen more than 1"); + } + + } + + /* + * Create dispatcher object + */ + private Dispatcher getDispatcher() { + Dispatcher dispatcher = new AsyncDispatcher() { + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public EventHandler getEventHandler() { + + class EventArgMatcher extends ArgumentMatcher<AbstractEvent> { + @Override + public boolean matches(Object argument) { + if (argument instanceof RMAppNodeUpdateEvent) { + ApplicationId appid = + ((RMAppNodeUpdateEvent) argument).getApplicationId(); + applist.add(appid); + } + return false; + } + } + + EventHandler handler = spy(super.getEventHandler()); + doNothing().when(handler).handle(argThat(new EventArgMatcher())); + return handler; + } + }; + return dispatcher; + } + +}