Repository: hadoop Updated Branches: refs/heads/trunk cdee0a4f8 -> 47f711eeb
YARN-6629. NPE occurred when container allocation proposal is applied but its resource requests are removed before. (Tao Yang via wangda) Change-Id: I805880f90b3f6798ec96ed8e8e75755f390a9ad5 Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/47f711ee Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/47f711ee Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/47f711ee Branch: refs/heads/trunk Commit: 47f711eebca315804c80012eea5f31275ac25518 Parents: cdee0a4 Author: Wangda Tan <wan...@apache.org> Authored: Wed Mar 28 08:47:31 2018 -0700 Committer: Wangda Tan <wan...@apache.org> Committed: Wed Mar 28 11:07:45 2018 -0700 ---------------------------------------------------------------------- .../scheduler/capacity/CapacityScheduler.java | 4 +- .../scheduler/common/fica/FiCaSchedulerApp.java | 16 ++++-- .../capacity/TestCapacityScheduler.java | 53 ++++++++++++++++++++ 3 files changed, 68 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/47f711ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index daf0354..bf674a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2825,8 +2825,8 @@ public class CapacityScheduler extends // proposal might be outdated if AM failover just finished // and proposal queue was not be consumed in time if (app != null && attemptId.equals(app.getApplicationAttemptId())) { - if (app.accept(cluster, request, updatePending)) { - app.apply(cluster, request, updatePending); + if (app.accept(cluster, request, updatePending) + && app.apply(cluster, request, updatePending)) { LOG.info("Allocation proposal accepted"); isSuccess = true; } else{ http://git-wip-us.apache.org/repos/asf/hadoop/blob/47f711ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index f3da0a3..32b2cad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -489,7 +489,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { return accepted; } - public void apply(Resource cluster, ResourceCommitRequest<FiCaSchedulerApp, + public boolean apply(Resource cluster, ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request, boolean updatePending) { boolean reReservation = false; @@ -502,8 +502,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { allocation = request.getFirstAllocatedOrReservedContainer(); SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer = allocation.getAllocatedOrReservedContainer(); - RMContainer rmContainer = schedulerContainer.getRmContainer(); + // Required sanity check - AM can call 'allocate' to update resource + // request without locking the scheduler, hence we need to check + if (updatePending && + getOutstandingAsksCount(schedulerContainer.getSchedulerRequestKey()) + <= 0) { + return false; + } + + RMContainer rmContainer = schedulerContainer.getRmContainer(); reReservation = (!schedulerContainer.isAllocated()) && (rmContainer.getState() == RMContainerState.RESERVED); @@ -545,7 +553,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { containerRequest); // If this is from a SchedulingRequest, set allocation tags. - if (containerRequest.getSchedulingRequest() != null) { + if (containerRequest != null + && containerRequest.getSchedulingRequest() != null) { ((RMContainerImpl) rmContainer).setAllocationTags( containerRequest.getSchedulingRequest().getAllocationTags()); } @@ -598,6 +607,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { if (!reReservation) { getCSLeafQueue().apply(cluster, request); } + return true; } public boolean unreserve(SchedulerRequestKey schedulerKey, http://git-wip-us.apache.org/repos/asf/hadoop/blob/47f711ee/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 0fa5c15..1d2aadc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -134,6 +134,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; @@ -170,6 +171,8 @@ import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestCapacityScheduler extends CapacitySchedulerTestBase { private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); @@ -4857,4 +4860,54 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { } } } + + @Test (timeout = 60000) + public void testClearRequestsBeforeApplyTheProposal() + throws Exception { + // init RM & NMs & Nodes + final MockRM rm = new MockRM(new CapacitySchedulerConfiguration()); + rm.start(); + final MockNM nm = rm.registerNode("h1:1234", 200 * GB); + + // submit app + final RMApp app = rm.submitApp(200, "app", "user"); + MockRM.launchAndRegisterAM(app, rm, nm); + + // spy capacity scheduler to handle CapacityScheduler#apply + final Priority priority = Priority.newInstance(1); + final CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + final CapacityScheduler spyCs = Mockito.spy(cs); + Mockito.doAnswer(new Answer<Object>() { + public Object answer(InvocationOnMock invocation) throws Exception { + // clear resource request before applying the proposal for container_2 + spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(), + Arrays.asList(ResourceRequest.newInstance(priority, "*", + Resources.createResource(1 * GB), 0)), null, + Collections.<ContainerId>emptyList(), null, null, + NULL_UPDATE_REQUESTS); + // trigger real apply which can raise NPE before YARN-6629 + try { + FiCaSchedulerApp schedulerApp = cs.getApplicationAttempt( + app.getCurrentAppAttempt().getAppAttemptId()); + schedulerApp.apply((Resource) invocation.getArguments()[0], + (ResourceCommitRequest) invocation.getArguments()[1], + (Boolean) invocation.getArguments()[2]); + // the proposal of removed request should be rejected + Assert.assertEquals(1, schedulerApp.getLiveContainers().size()); + } catch (Throwable e) { + Assert.fail(); + } + return null; + } + }).when(spyCs).tryCommit(Mockito.any(Resource.class), + Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean()); + + // rm allocates container_2 to reproduce the process that can raise NPE + spyCs.allocate(app.getCurrentAppAttempt().getAppAttemptId(), + Arrays.asList(ResourceRequest.newInstance(priority, "*", + Resources.createResource(1 * GB), 1)), null, + Collections.<ContainerId>emptyList(), null, null, NULL_UPDATE_REQUESTS); + spyCs.handle(new NodeUpdateSchedulerEvent( + spyCs.getNode(nm.getNodeId()).getRMNode())); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org