Repository: hadoop
Updated Branches:
  refs/heads/trunk cdee0a4f8 -> 47f711eeb


YARN-6629. NPE occurred when container allocation proposal is applied but its 
resource requests are removed before. (Tao Yang via wangda)

Change-Id: I805880f90b3f6798ec96ed8e8e75755f390a9ad5


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/47f711ee
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/47f711ee
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/47f711ee

Branch: refs/heads/trunk
Commit: 47f711eebca315804c80012eea5f31275ac25518
Parents: cdee0a4
Author: Wangda Tan <wan...@apache.org>
Authored: Wed Mar 28 08:47:31 2018 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Wed Mar 28 11:07:45 2018 -0700

----------------------------------------------------------------------
 .../scheduler/capacity/CapacityScheduler.java   |  4 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java | 16 ++++--
 .../capacity/TestCapacityScheduler.java         | 53 ++++++++++++++++++++
 3 files changed, 68 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/47f711ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index daf0354..bf674a8 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -2825,8 +2825,8 @@ public class CapacityScheduler extends
       // proposal might be outdated if AM failover just finished
       // and proposal queue was not be consumed in time
       if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
-        if (app.accept(cluster, request, updatePending)) {
-          app.apply(cluster, request, updatePending);
+        if (app.accept(cluster, request, updatePending)
+            && app.apply(cluster, request, updatePending)) {
           LOG.info("Allocation proposal accepted");
           isSuccess = true;
         } else{

http://git-wip-us.apache.org/repos/asf/hadoop/blob/47f711ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index f3da0a3..32b2cad 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -489,7 +489,7 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
     return accepted;
   }
 
-  public void apply(Resource cluster, ResourceCommitRequest<FiCaSchedulerApp,
+  public boolean apply(Resource cluster, 
ResourceCommitRequest<FiCaSchedulerApp,
       FiCaSchedulerNode> request, boolean updatePending) {
     boolean reReservation = false;
 
@@ -502,8 +502,16 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
             allocation = request.getFirstAllocatedOrReservedContainer();
         SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
             schedulerContainer = allocation.getAllocatedOrReservedContainer();
-        RMContainer rmContainer = schedulerContainer.getRmContainer();
 
+        // Required sanity check - AM can call 'allocate' to update resource
+        // request without locking the scheduler, hence we need to check
+        if (updatePending &&
+            
getOutstandingAsksCount(schedulerContainer.getSchedulerRequestKey())
+                <= 0) {
+          return false;
+        }
+
+        RMContainer rmContainer = schedulerContainer.getRmContainer();
         reReservation =
             (!schedulerContainer.isAllocated()) && (rmContainer.getState()
                 == RMContainerState.RESERVED);
@@ -545,7 +553,8 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
                 containerRequest);
 
             // If this is from a SchedulingRequest, set allocation tags.
-            if (containerRequest.getSchedulingRequest() != null) {
+            if (containerRequest != null
+                && containerRequest.getSchedulingRequest() != null) {
               ((RMContainerImpl) rmContainer).setAllocationTags(
                   containerRequest.getSchedulingRequest().getAllocationTags());
             }
@@ -598,6 +607,7 @@ public class FiCaSchedulerApp extends 
SchedulerApplicationAttempt {
     if (!reReservation) {
       getCSLeafQueue().apply(cluster, request);
     }
+    return true;
   }
 
   public boolean unreserve(SchedulerRequestKey schedulerKey,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/47f711ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 0fa5c15..1d2aadc 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -134,6 +134,7 @@ import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import 
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@@ -170,6 +171,8 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 public class TestCapacityScheduler extends CapacitySchedulerTestBase {
   private static final Log LOG = 
LogFactory.getLog(TestCapacityScheduler.class);
@@ -4857,4 +4860,54 @@ public class TestCapacityScheduler extends 
CapacitySchedulerTestBase {
       }
     }
   }
+
+  @Test (timeout = 60000)
+  public void testClearRequestsBeforeApplyTheProposal()
+      throws Exception {
+    // init RM & NMs & Nodes
+    final MockRM rm = new MockRM(new CapacitySchedulerConfiguration());
+    rm.start();
+    final MockNM nm = rm.registerNode("h1:1234", 200 * GB);
+
+    // submit app
+    final RMApp app = rm.submitApp(200, "app", "user");
+    MockRM.launchAndRegisterAM(app, rm, nm);
+
+    // spy capacity scheduler to handle CapacityScheduler#apply
+    final Priority priority = Priority.newInstance(1);
+    final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    final CapacityScheduler spyCs = Mockito.spy(cs);
+    Mockito.doAnswer(new Answer<Object>() {
+      public Object answer(InvocationOnMock invocation) throws Exception {
+        // clear resource request before applying the proposal for container_2
+        spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(),
+            Arrays.asList(ResourceRequest.newInstance(priority, "*",
+                Resources.createResource(1 * GB), 0)), null,
+            Collections.<ContainerId>emptyList(), null, null,
+            NULL_UPDATE_REQUESTS);
+        // trigger real apply which can raise NPE before YARN-6629
+        try {
+          FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt(
+              app.getCurrentAppAttempt().getAppAttemptId());
+          schedulerApp.apply((Resource) invocation.getArguments()[0],
+              (ResourceCommitRequest) invocation.getArguments()[1],
+              (Boolean) invocation.getArguments()[2]);
+          // the proposal of removed request should be rejected
+          Assert.assertEquals(1, schedulerApp.getLiveContainers().size());
+        } catch (Throwable e) {
+          Assert.fail();
+        }
+        return null;
+      }
+    }).when(spyCs).tryCommit(Mockito.any(Resource.class),
+        Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());
+
+    // rm allocates container_2 to reproduce the process that can raise NPE
+    spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(),
+        Arrays.asList(ResourceRequest.newInstance(priority, "*",
+            Resources.createResource(1 * GB), 1)), null,
+        Collections.<ContainerId>emptyList(), null, null, 
NULL_UPDATE_REQUESTS);
+    spyCs.handle(new NodeUpdateSchedulerEvent(
+        spyCs.getNode(nm.getNodeId()).getRMNode()));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to