Author: mayank
Date: Tue Jul 15 22:43:07 2014
New Revision: 1610872
URL: http://svn.apache.org/r1610872
Log:
YARN-1408 Preemption caused Invalid State Event: ACQUIRED at KILLED and caused
a task timeout for 30mins. (Sunil G via mayank)
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Modified: hadoop/common/branches/branch-2.5/hadoop-yarn-project/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/CHANGES.txt?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.5/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.5/hadoop-yarn-project/CHANGES.txt Tue Jul
15 22:43:07 2014
@@ -200,6 +200,9 @@ Release 2.5.0 - UNRELEASED
YARN-2241. ZKRMStateStore: On startup, show nicer messages if znodes
already
exist. (Robert Kanter via kasha)
+ YARN-1408 Preemption caused Invalid State Event: ACQUIRED at KILLED and
caused
+ a task timeout for 30mins. (Sunil G via mayank)
+
OPTIMIZATIONS
BUG FIXES
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
(original)
+++
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
Tue Jul 15 22:43:07 2014
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
+import java.util.List;
+
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -26,6 +28,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
/**
@@ -73,5 +76,7 @@ public interface RMContainer extends Eve
ContainerReport createContainerReport();
boolean isAMContainer();
+
+ List<ResourceRequest> getResourceRequests();
}
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
(original)
+++
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
Tue Jul 15 22:43:07 2014
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
import java.util.EnumSet;
+import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -156,6 +158,7 @@ public class RMContainerImpl implements
private long finishTime;
private ContainerStatus finishedStatus;
private boolean isAMContainer;
+ private List<ResourceRequest> resourceRequests;
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -178,7 +181,8 @@ public class RMContainerImpl implements
this.eventHandler = rmContext.getDispatcher().getEventHandler();
this.containerAllocationExpirer =
rmContext.getContainerAllocationExpirer();
this.isAMContainer = false;
-
+ this.resourceRequests = null;
+
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
@@ -309,6 +313,25 @@ public class RMContainerImpl implements
readLock.unlock();
}
}
+
+ @Override
+ public List<ResourceRequest> getResourceRequests() {
+ try {
+ readLock.lock();
+ return resourceRequests;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ public void setResourceRequests(List<ResourceRequest> requests) {
+ try {
+ writeLock.lock();
+ this.resourceRequests = requests;
+ } finally {
+ writeLock.unlock();
+ }
+ }
@Override
public String toString() {
@@ -430,6 +453,9 @@ public class RMContainerImpl implements
@Override
public void transition(RMContainerImpl container, RMContainerEvent event) {
+ // Clear ResourceRequest stored in RMContainer
+ container.setResourceRequests(null);
+
// Register with containerAllocationExpirer.
container.containerAllocationExpirer.register(container.getContainerId());
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
(original)
+++
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
Tue Jul 15 22:43:07 2014
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -275,6 +276,27 @@ public abstract class AbstractYarnSchedu
return rmContainer;
}
+ /**
+ * Recover resource request back from RMContainer when a container is
+ * preempted before AM pulled the same. If container is pulled by
+ * AM, then RMContainer will not have resource request to recover.
+ * @param rmContainer
+ */
+ protected void recoverResourceRequestForContainer(RMContainer rmContainer) {
+ List<ResourceRequest> requests = rmContainer.getResourceRequests();
+
+ // If container state is moved to ACQUIRED, request will be empty.
+ if (requests == null) {
+ return;
+ }
+ // Add resource request back to Scheduler.
+ SchedulerApplicationAttempt schedulerAttempt
+ = getCurrentAttemptForContainer(rmContainer.getContainerId());
+ if (schedulerAttempt != null) {
+ schedulerAttempt.recoverResourceRequests(requests);
+ }
+ }
+
public SchedulerNode getSchedulerNode(NodeId nodeId) {
return nodes.get(nodeId);
}
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
(original)
+++
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
Tue Jul 15 22:43:07 2014
@@ -127,9 +127,10 @@ public class AppSchedulingInfo {
* by the application.
*
* @param requests resources to be acquired
+ * @param recoverPreemptedRequest recover Resource Request on preemption
*/
synchronized public void updateResourceRequests(
- List<ResourceRequest> requests) {
+ List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
QueueMetrics metrics = queue.getMetrics();
// Update resource requests
@@ -163,8 +164,13 @@ public class AppSchedulingInfo {
asks = new HashMap<String, ResourceRequest>();
this.requests.put(priority, asks);
this.priorities.add(priority);
- } else if (updatePendingResources) {
- lastRequest = asks.get(resourceName);
+ }
+ lastRequest = asks.get(resourceName);
+
+ if (recoverPreemptedRequest && lastRequest != null) {
+ // Increment the number of containers to 1, as it is recovering a
+ // single container.
+ request.setNumContainers(lastRequest.getNumContainers() + 1);
}
asks.put(resourceName, request);
@@ -254,14 +260,16 @@ public class AppSchedulingInfo {
* @param container
* the containers allocated.
*/
- synchronized public void allocate(NodeType type, SchedulerNode node,
- Priority priority, ResourceRequest request, Container container) {
+ synchronized public List<ResourceRequest> allocate(NodeType type,
+ SchedulerNode node, Priority priority, ResourceRequest request,
+ Container container) {
+ List<ResourceRequest> resourceRequests = new ArrayList<ResourceRequest>();
if (type == NodeType.NODE_LOCAL) {
- allocateNodeLocal(node, priority, request, container);
+ allocateNodeLocal(node, priority, request, container, resourceRequests);
} else if (type == NodeType.RACK_LOCAL) {
- allocateRackLocal(node, priority, request, container);
+ allocateRackLocal(node, priority, request, container, resourceRequests);
} else {
- allocateOffSwitch(node, priority, request, container);
+ allocateOffSwitch(node, priority, request, container, resourceRequests);
}
QueueMetrics metrics = queue.getMetrics();
if (pending) {
@@ -279,6 +287,7 @@ public class AppSchedulingInfo {
+ " resource=" + request.getCapability());
}
metrics.allocateResources(user, 1, request.getCapability(), true);
+ return resourceRequests;
}
/**
@@ -288,9 +297,9 @@ public class AppSchedulingInfo {
* @param allocatedContainers
* resources allocated to the application
*/
- synchronized private void allocateNodeLocal(
- SchedulerNode node, Priority priority,
- ResourceRequest nodeLocalRequest, Container container) {
+ synchronized private void allocateNodeLocal(SchedulerNode node,
+ Priority priority, ResourceRequest nodeLocalRequest, Container container,
+ List<ResourceRequest> resourceRequests) {
// Update future requirements
nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
if (nodeLocalRequest.getNumContainers() == 0) {
@@ -304,7 +313,14 @@ public class AppSchedulingInfo {
this.requests.get(priority).remove(node.getRackName());
}
- decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+ ResourceRequest offRackRequest = requests.get(priority).get(
+ ResourceRequest.ANY);
+ decrementOutstanding(offRackRequest);
+
+ // Update cloned NodeLocal, RackLocal and OffRack requests for recovery
+ resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
+ resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+ resourceRequests.add(cloneResourceRequest(offRackRequest));
}
/**
@@ -314,16 +330,22 @@ public class AppSchedulingInfo {
* @param allocatedContainers
* resources allocated to the application
*/
- synchronized private void allocateRackLocal(
- SchedulerNode node, Priority priority,
- ResourceRequest rackLocalRequest, Container container) {
+ synchronized private void allocateRackLocal(SchedulerNode node,
+ Priority priority, ResourceRequest rackLocalRequest, Container container,
+ List<ResourceRequest> resourceRequests) {
// Update future requirements
rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
if (rackLocalRequest.getNumContainers() == 0) {
this.requests.get(priority).remove(node.getRackName());
}
- decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
+ ResourceRequest offRackRequest = requests.get(priority).get(
+ ResourceRequest.ANY);
+ decrementOutstanding(offRackRequest);
+
+ // Update cloned RackLocal and OffRack requests for recovery
+ resourceRequests.add(cloneResourceRequest(rackLocalRequest));
+ resourceRequests.add(cloneResourceRequest(offRackRequest));
}
/**
@@ -333,11 +355,13 @@ public class AppSchedulingInfo {
* @param allocatedContainers
* resources allocated to the application
*/
- synchronized private void allocateOffSwitch(
- SchedulerNode node, Priority priority,
- ResourceRequest offSwitchRequest, Container container) {
+ synchronized private void allocateOffSwitch(SchedulerNode node,
+ Priority priority, ResourceRequest offSwitchRequest, Container container,
+ List<ResourceRequest> resourceRequests) {
// Update future requirements
decrementOutstanding(offSwitchRequest);
+ // Update cloned RackLocal and OffRack requests for recovery
+ resourceRequests.add(cloneResourceRequest(offSwitchRequest));
}
synchronized private void decrementOutstanding(
@@ -436,4 +460,11 @@ public class AppSchedulingInfo {
metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(),
false);
}
+
+ public ResourceRequest cloneResourceRequest(ResourceRequest request) {
+ ResourceRequest newRequest = ResourceRequest.newInstance(
+ request.getPriority(), request.getResourceName(),
+ request.getCapability(), 1, request.getRelaxLocality());
+ return newRequest;
+ }
}
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
(original)
+++
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
Tue Jul 15 22:43:07 2014
@@ -241,7 +241,14 @@ public class SchedulerApplicationAttempt
public synchronized void updateResourceRequests(
List<ResourceRequest> requests) {
if (!isStopped) {
- appSchedulingInfo.updateResourceRequests(requests);
+ appSchedulingInfo.updateResourceRequests(requests, false);
+ }
+ }
+
+ public synchronized void recoverResourceRequests(
+ List<ResourceRequest> requests) {
+ if (!isStopped) {
+ appSchedulingInfo.updateResourceRequests(requests, true);
}
}
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
(original)
+++
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
Tue Jul 15 22:43:07 2014
@@ -1089,6 +1089,7 @@ public class CapacityScheduler extends
if (LOG.isDebugEnabled()) {
LOG.debug("KILL_CONTAINER: container" + cont.toString());
}
+ recoverResourceRequestForContainer(cont);
completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
RMContainerEventType.KILL);
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
(original)
+++
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
Tue Jul 15 22:43:07 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.util.Collections;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -77,6 +78,9 @@ public class FiCaSchedulerApp extends Sc
if (null == liveContainers.remove(rmContainer.getContainerId())) {
return false;
}
+
+ // Remove from the list of newly allocated containers if found
+ newlyAllocatedContainers.remove(rmContainer);
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
@@ -129,8 +133,12 @@ public class FiCaSchedulerApp extends Sc
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
- appSchedulingInfo.allocate(type, node, priority, request, container);
+ List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+ type, node, priority, request, container);
Resources.addTo(currentConsumption, container.getResource());
+
+ // Update resource requests related to "request" and store in RMContainer
+ ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
// Inform the container
rmContainer.handle(
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
(original)
+++
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
Tue Jul 15 22:43:07 2014
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -82,6 +83,9 @@ public class FSSchedulerApp extends Sche
Container container = rmContainer.getContainer();
ContainerId containerId = container.getId();
+ // Remove from the list of newly allocated containers if found
+ newlyAllocatedContainers.remove(rmContainer);
+
// Inform the container
rmContainer.handle(
new RMContainerFinishedEvent(
@@ -281,9 +285,13 @@ public class FSSchedulerApp extends Sche
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
- appSchedulingInfo.allocate(type, node, priority, request, container);
+ List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
+ type, node, priority, request, container);
Resources.addTo(currentConsumption, container.getResource());
+ // Update resource requests related to "request" and store in RMContainer
+ ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);
+
// Inform the container
rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START));
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
(original)
+++
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
Tue Jul 15 22:43:07 2014
@@ -408,7 +408,7 @@ public class FairScheduler extends
}
}
- private void warnOrKillContainer(RMContainer container) {
+ protected void warnOrKillContainer(RMContainer container) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSSchedulerApp app = getSchedulerApp(appAttemptId);
FSLeafQueue queue = app.getQueue();
@@ -426,6 +426,7 @@ public class FairScheduler extends
SchedulerUtils.createPreemptedContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
+ recoverResourceRequestForContainer(container);
// TODO: Not sure if this ever actually adds this to the list of
cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
completedContainer(container, status, RMContainerEventType.KILL);
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
(original)
+++
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
Tue Jul 15 22:43:07 2014
@@ -26,6 +26,9 @@ import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -36,17 +39,24 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import
org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
@@ -204,4 +214,36 @@ public class TestRMContainerImpl {
assertEquals(RMContainerState.RUNNING, rmContainer.getState());
verify(writer, never()).containerFinished(any(RMContainer.class));
}
+
+ @Test
+ public void testExistenceOfResourceRequestInRMContainer() throws Exception {
+ Configuration conf = new Configuration();
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
+ RMApp app1 = rm1.submitApp(1024);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ ResourceScheduler scheduler = rm1.getResourceScheduler();
+
+ // request a container.
+ am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
+ ContainerId containerId2 = ContainerId.newInstance(
+ am1.getApplicationAttemptId(), 2);
+ rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+
+ // Verify whether list of ResourceRequest is present in RMContainer
+ // while moving to ALLOCATED state
+ Assert.assertNotNull(scheduler.getRMContainer(containerId2)
+ .getResourceRequests());
+
+ // Allocate container
+ am1.allocate(new ArrayList<ResourceRequest>(), new
ArrayList<ContainerId>())
+ .getAllocatedContainers();
+ rm1.waitForState(nm1, containerId2, RMContainerState.ACQUIRED);
+
+ // After RMContainer moving to ACQUIRED state, list of ResourceRequest will
+ // be empty
+ Assert.assertNull(scheduler.getRMContainer(containerId2)
+ .getResourceRequests());
+ }
}
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
(original)
+++
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
Tue Jul 15 22:43:07 2014
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@@ -62,6 +64,7 @@ import org.apache.hadoop.yarn.exceptions
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -74,6 +77,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -82,6 +87,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -828,4 +834,65 @@ public class TestCapacityScheduler {
cs.stop();
}
+
+ @Test(timeout = 30000)
+ public void testRecoverRequestAfterPreemption() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ MockRM rm1 = new MockRM(conf);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 8000);
+ RMApp app1 = rm1.submitApp(1024);
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+
+ // request a container.
+ am1.allocate("127.0.0.1", 1024, 1, new ArrayList<ContainerId>());
+ ContainerId containerId1 = ContainerId.newInstance(am1
+ .getApplicationAttemptId(), 2);
+ rm1.waitForState(nm1, containerId1, RMContainerState.ALLOCATED);
+
+ RMContainer rmContainer = cs.getRMContainer(containerId1);
+ List<ResourceRequest> requests = rmContainer.getResourceRequests();
+ FiCaSchedulerApp app = cs.getApplicationAttempt(am1
+ .getApplicationAttemptId());
+
+ FiCaSchedulerNode node = cs.getNode(rmContainer.getAllocatedNode());
+ for (ResourceRequest request : requests) {
+ // Skip the OffRack and RackLocal resource requests.
+ if (request.getResourceName().equals(node.getRackName())
+ || request.getResourceName().equals(ResourceRequest.ANY)) {
+ continue;
+ }
+
+ // Already the node local resource request is cleared from RM after
+ // allocation.
+ Assert.assertNull(app.getResourceRequest(request.getPriority(), request
+ .getResourceName()));
+ }
+
+ // Call killContainer to preempt the container
+ cs.killContainer(rmContainer);
+
+ Assert.assertEquals(3, requests.size());
+ for (ResourceRequest request : requests) {
+ // Resource request must have added back in RM after preempt event
+ // handling.
+ Assert.assertEquals(1, app.getResourceRequest(request.getPriority(),
+ request.getResourceName()).getNumContainers());
+ }
+
+ // New container will be allocated and will move to ALLOCATED state
+ ContainerId containerId2 = ContainerId.newInstance(am1
+ .getApplicationAttemptId(), 3);
+ rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED);
+
+ // allocate container
+ List<Container> containers = am1.allocate(new ArrayList<ResourceRequest>(),
+ new ArrayList<ContainerId>()).getAllocatedContainers();
+
+ // Now with updated ResourceRequest, a container is allocated for AM.
+ Assert.assertTrue(containers.size() == 1);
+ }
}
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
(original)
+++
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java
Tue Jul 15 22:43:07 2014
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -32,6 +35,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -162,6 +166,26 @@ public class FairSchedulerTestBase {
priority, 1, true);
createSchedulingRequestExistingApplication(request, attId);
}
+
+
+ protected ApplicationAttemptId createSchedulingRequest(String queueId,
+ String userId, List<ResourceRequest> ask) {
+ ApplicationAttemptId id = createAppAttemptId(this.APP_ID++,
+ this.ATTEMPT_ID++);
+ scheduler.addApplication(id.getApplicationId(), queueId, userId);
+ // This conditional is for testAclSubmitApplication where app is rejected
+ // and no app is added.
+ if
(scheduler.getSchedulerApplications().containsKey(id.getApplicationId())) {
+ scheduler.addApplicationAttempt(id, false, true);
+ }
+ scheduler.allocate(id, ask, new ArrayList<ContainerId>(), null, null);
+ RMApp rmApp = mock(RMApp.class);
+ RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
+ when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
+ resourceManager.getRMContext().getRMApps()
+ .put(id.getApplicationId(), rmApp);
+ return id;
+ }
protected void createSchedulingRequestExistingApplication(
int memory, int vcores, int priority, ApplicationAttemptId attId) {
Modified:
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1610872&r1=1610871&r2=1610872&view=diff
==============================================================================
---
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
(original)
+++
hadoop/common/branches/branch-2.5/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
Tue Jul 15 22:43:07 2014
@@ -53,10 +53,13 @@ import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
@@ -77,11 +80,13 @@ import org.apache.hadoop.yarn.server.res
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import
org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import
org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import
org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -2831,6 +2836,87 @@ public class TestFairScheduler extends F
}
}
}
+
+ @Test(timeout=5000)
+ public void testRecoverRequestAfterPreemption() throws Exception {
+ conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
+
+ MockClock clock = new MockClock();
+ scheduler.setClock(clock);
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ Priority priority = Priority.newInstance(20);
+ String host = "127.0.0.1";
+ int GB = 1024;
+
+ // Create Node and raised Node Added event
+ RMNode node = MockNodes.newNodeInfo(1,
+ Resources.createResource(16 * 1024, 4), 0, host);
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+
+ // Create 3 container requests and place it in ask
+ List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+ ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host,
+ priority.getPriority(), 1, true);
+ ResourceRequest rackLocalRequest = createResourceRequest(GB, 1,
+ node.getRackName(), priority.getPriority(), 1, true);
+ ResourceRequest offRackRequest = createResourceRequest(GB, 1,
+ ResourceRequest.ANY, priority.getPriority(), 1, true);
+ ask.add(nodeLocalRequest);
+ ask.add(rackLocalRequest);
+ ask.add(offRackRequest);
+
+ // Create Request and update
+ ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA",
+ "user1", ask);
+ scheduler.update();
+
+ // Sufficient node check-ins to fully schedule containers
+ NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(nodeUpdate);
+
+ assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
+ .size());
+ FSSchedulerApp app = scheduler.getSchedulerApp(appAttemptId);
+
+ // ResourceRequest will be empty once NodeUpdate is completed
+ Assert.assertNull(app.getResourceRequest(priority, host));
+
+ ContainerId containerId1 = ContainerId.newInstance(appAttemptId, 1);
+ RMContainer rmContainer = app.getRMContainer(containerId1);
+
+ // Create a preempt event and register for preemption
+ scheduler.warnOrKillContainer(rmContainer);
+
+ // Wait for few clock ticks
+ clock.tick(5);
+
+ // preempt now
+ scheduler.warnOrKillContainer(rmContainer);
+
+ List<ResourceRequest> requests = rmContainer.getResourceRequests();
+ // Once recovered, resource request will be present again in app
+ Assert.assertEquals(3, requests.size());
+ for (ResourceRequest request : requests) {
+ Assert.assertEquals(1,
+ app.getResourceRequest(priority, request.getResourceName())
+ .getNumContainers());
+ }
+
+ // Send node heartbeat
+ scheduler.update();
+ scheduler.handle(nodeUpdate);
+
+ List<Container> containers = scheduler.allocate(appAttemptId,
+ Collections.<ResourceRequest> emptyList(),
+ Collections.<ContainerId> emptyList(), null, null).getContainers();
+
+ // Now with updated ResourceRequest, a container is allocated for AM.
+ Assert.assertTrue(containers.size() == 1);
+ }
@SuppressWarnings("resource")
@Test