Repository: incubator-slider Updated Branches: refs/heads/develop 42b381901 -> 090d8a63b
SLIDER-1224 the outstanding request that has been escalated is failed to cleanup (kyungwan nam via billie) Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/090d8a63 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/090d8a63 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/090d8a63 Branch: refs/heads/develop Commit: 090d8a63b4b8dd0b75882047174c7c884dc0b323 Parents: 42b3819 Author: Billie Rinaldi <bil...@apache.org> Authored: Fri May 12 09:55:24 2017 -0700 Committer: Billie Rinaldi <bil...@apache.org> Committed: Fri May 12 09:55:24 2017 -0700 ---------------------------------------------------------------------- .../state/OutstandingRequestTracker.java | 17 ++++++-- ...tRoleHistoryOutstandingRequestTracker.groovy | 46 ++++++++++++++++++++ 2 files changed, 60 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/090d8a63/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java index c16aa3c..e66976d 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java @@ -176,7 +176,9 @@ public class OutstandingRequestTracker { if (request != null) { log.debug("Found open outstanding request for container: {}", request); request.completed(); - outcome = ContainerAllocationOutcome.Open; + outcome = request.mayEscalate() && request.isEscalated() + ? ContainerAllocationOutcome.Escalated + : ContainerAllocationOutcome.Open; } else { log.warn("No oustanding request found for container {}, outstanding queue has {} entries ", containerDetails, @@ -370,7 +372,9 @@ public class OutstandingRequestTracker { } List<AbstractRMOperation> operations = new ArrayList<>(); - for (OutstandingRequest outstandingRequest : placedRequests.values()) { + List<RoleHostnamePair> requestsToMove = new ArrayList<>(); + for (Map.Entry<RoleHostnamePair, OutstandingRequest> entry : placedRequests.entrySet()) { + OutstandingRequest outstandingRequest = entry.getValue(); synchronized (outstandingRequest) { // sync escalation check with operation so that nothing can happen to state // of the request during the escalation @@ -381,10 +385,17 @@ public class OutstandingRequestTracker { operations.add(cancel); AMRMClient.ContainerRequest escalated = outstandingRequest.escalate(); operations.add(new ContainerRequestOperation(escalated)); + requestsToMove.add(entry.getKey()); } } - } + for (RoleHostnamePair keys : requestsToMove) { + OutstandingRequest escalatedOutstandingRequest = placedRequests.get(keys); + log.info("move placedRequests to openRequests {}", escalatedOutstandingRequest); + openRequests.add(escalatedOutstandingRequest); + placedRequests.remove(keys); + } + return operations; } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/090d8a63/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy index 7be01ad..574ded5 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy @@ -143,6 +143,52 @@ class TestRoleHistoryOutstandingRequestTracker extends BaseMockAppStateTest { } @Test + public void testIssuedEscalatedRequest() throws Throwable { + def req1 = tracker.newRequest(host1, 0) + def resource = factory.newResource() + resource.virtualCores = 1 + resource.memory = 48; + def yarnRequest = req1.buildContainerRequest(resource, role0Status, 0) + assert tracker.listPlacedRequests().size() == 1 + assert tracker.listOpenRequests().size() == 0 + + tracker.escalateOutstandingRequests(role0Status.placementTimeoutSeconds * 1000) + assert !req1.isEscalated() + assert tracker.listPlacedRequests().size() == 1 + assert tracker.listOpenRequests().size() == 0 + + tracker.escalateOutstandingRequests(role0Status.placementTimeoutSeconds * 1000 + 1) + assert req1.isEscalated() + assert tracker.listPlacedRequests().size() == 0 + assert tracker.listOpenRequests().size() == 1 + + def c1 = factory.newContainer() + + def nodeId = factory.newNodeId() + c1.nodeId = nodeId + // if request was escalated, container can be allocated to another host + // by relaxed placement. + nodeId.host = "host9" + + def pri = ContainerPriority.buildPriority(0, false) + assert pri > 0 + c1.setPriority(new MockPriority(pri)) + + c1.setResource(resource) + + def issued = req1.issuedRequest + assert issued.capability == resource + assert issued.priority.priority == c1.getPriority().getPriority() + assert req1.resourceRequirementsMatch(resource) + + def allocation = tracker.onContainerAllocated(0, nodeId.host, c1) + assert tracker.listPlacedRequests().size() == 0 + assert tracker.listOpenRequests().size() == 0 + assert allocation.outcome == ContainerAllocationOutcome.Escalated; + assert allocation.origin.is(req1) + } + + @Test public void testResetEntries() throws Throwable { tracker.newRequest(host1, 0) tracker.newRequest(host2, 0)