Repository: incubator-slider
Updated Branches:
  refs/heads/develop 42b381901 -> 090d8a63b


SLIDER-1224 the outstanding request that has been escalated is failed to 
cleanup (kyungwan nam via billie)


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/090d8a63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/090d8a63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/090d8a63

Branch: refs/heads/develop
Commit: 090d8a63b4b8dd0b75882047174c7c884dc0b323
Parents: 42b3819
Author: Billie Rinaldi <bil...@apache.org>
Authored: Fri May 12 09:55:24 2017 -0700
Committer: Billie Rinaldi <bil...@apache.org>
Committed: Fri May 12 09:55:24 2017 -0700

----------------------------------------------------------------------
 .../state/OutstandingRequestTracker.java        | 17 ++++++--
 ...tRoleHistoryOutstandingRequestTracker.groovy | 46 ++++++++++++++++++++
 2 files changed, 60 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/090d8a63/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
index c16aa3c..e66976d 100644
--- 
a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
+++ 
b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/OutstandingRequestTracker.java
@@ -176,7 +176,9 @@ public class OutstandingRequestTracker {
       if (request != null) {
         log.debug("Found open outstanding request for container: {}", request);
         request.completed();
-        outcome = ContainerAllocationOutcome.Open;
+        outcome = request.mayEscalate() && request.isEscalated()
+            ? ContainerAllocationOutcome.Escalated
+            : ContainerAllocationOutcome.Open;
       } else {
         log.warn("No oustanding request found for container {}, outstanding 
queue has {} entries ",
             containerDetails,
@@ -370,7 +372,9 @@ public class OutstandingRequestTracker {
     }
 
     List<AbstractRMOperation> operations = new ArrayList<>();
-    for (OutstandingRequest outstandingRequest : placedRequests.values()) {
+    List<RoleHostnamePair> requestsToMove = new ArrayList<>();
+    for (Map.Entry<RoleHostnamePair, OutstandingRequest> entry : 
placedRequests.entrySet()) {
+      OutstandingRequest outstandingRequest = entry.getValue();
       synchronized (outstandingRequest) {
         // sync escalation check with operation so that nothing can happen to 
state
         // of the request during the escalation
@@ -381,10 +385,17 @@ public class OutstandingRequestTracker {
           operations.add(cancel);
           AMRMClient.ContainerRequest escalated = 
outstandingRequest.escalate();
           operations.add(new ContainerRequestOperation(escalated));
+          requestsToMove.add(entry.getKey());
         }
       }
-      
     }
+    for (RoleHostnamePair keys : requestsToMove) {
+      OutstandingRequest escalatedOutstandingRequest = 
placedRequests.get(keys);
+      log.info("move placedRequests to openRequests {}", 
escalatedOutstandingRequest);
+      openRequests.add(escalatedOutstandingRequest);
+      placedRequests.remove(keys);
+    }
+
     return operations;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/090d8a63/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
----------------------------------------------------------------------
diff --git 
a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
 
b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
index 7be01ad..574ded5 100644
--- 
a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
+++ 
b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryOutstandingRequestTracker.groovy
@@ -143,6 +143,52 @@ class TestRoleHistoryOutstandingRequestTracker extends 
BaseMockAppStateTest  {
   }
 
   @Test
+  public void testIssuedEscalatedRequest() throws Throwable {
+    def req1 = tracker.newRequest(host1, 0)
+    def resource = factory.newResource()
+    resource.virtualCores = 1
+    resource.memory = 48;
+    def yarnRequest = req1.buildContainerRequest(resource, role0Status, 0)
+    assert tracker.listPlacedRequests().size() == 1
+    assert tracker.listOpenRequests().size() == 0
+
+    tracker.escalateOutstandingRequests(role0Status.placementTimeoutSeconds * 
1000)
+    assert !req1.isEscalated()
+    assert tracker.listPlacedRequests().size() == 1
+    assert tracker.listOpenRequests().size() == 0
+
+    tracker.escalateOutstandingRequests(role0Status.placementTimeoutSeconds * 
1000 + 1)
+    assert req1.isEscalated()
+    assert tracker.listPlacedRequests().size() == 0
+    assert tracker.listOpenRequests().size() == 1
+
+    def c1 = factory.newContainer()
+
+    def nodeId = factory.newNodeId()
+    c1.nodeId = nodeId
+    // if request was escalated, container can be allocated to another host
+    // by relaxed placement.
+    nodeId.host = "host9"
+
+    def pri = ContainerPriority.buildPriority(0, false)
+    assert pri > 0
+    c1.setPriority(new MockPriority(pri))
+
+    c1.setResource(resource)
+
+    def issued = req1.issuedRequest
+    assert issued.capability == resource
+    assert issued.priority.priority == c1.getPriority().getPriority()
+    assert req1.resourceRequirementsMatch(resource)
+
+    def allocation = tracker.onContainerAllocated(0, nodeId.host, c1)
+    assert tracker.listPlacedRequests().size() == 0
+    assert tracker.listOpenRequests().size() == 0
+    assert allocation.outcome == ContainerAllocationOutcome.Escalated;
+    assert allocation.origin.is(req1)
+  }
+
+  @Test
   public void testResetEntries() throws Throwable {
     tracker.newRequest(host1, 0)
     tracker.newRequest(host2, 0)

Reply via email to