Repository: incubator-slider Updated Branches: refs/heads/develop 714a335df -> ec07bdc12
SLIDER-439 RM never fulfilled Slider AM's container request after NM died on a node where HRegionServer was running Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/34b909a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/34b909a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/34b909a8 Branch: refs/heads/develop Commit: 34b909a8edb551b6e9aa7f5ab2b3f6bd04f1b7c5 Parents: 6da903d Author: Gour Saha <gs...@hortonworks.com> Authored: Wed Oct 22 21:19:52 2014 -0700 Committer: Gour Saha <gs...@hortonworks.com> Committed: Wed Oct 22 21:19:52 2014 -0700 ---------------------------------------------------------------------- .../server/appmaster/SliderAppMaster.java | 5 +- .../slider/server/appmaster/state/AppState.java | 5 ++ .../server/appmaster/state/RoleHistory.java | 45 +++++++++++++++- .../TestRoleHistoryContainerEvents.groovy | 55 ++++++++++++++++++++ 4 files changed, 108 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/34b909a8/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java index 06d3597..b3c4b4c 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/SliderAppMaster.java @@ -1514,7 +1514,10 @@ public class SliderAppMaster extends AbstractSliderLaunchedService */ @Override //AMRMClientAsync public void onNodesUpdated(List<NodeReport> updatedNodes) { - LOG_YARN.info("Nodes updated"); + LOG_YARN.info("onNodesUpdated({})", updatedNodes.size()); + log.info("Updated nodes {}", updatedNodes); + // Check if any nodes are lost or revived and update state accordingly + appState.onNodesUpdated(updatedNodes); } /** http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/34b909a8/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java index 706b0d2..db119bd 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/AppState.java @@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.client.api.AMRMClient; @@ -1242,6 +1243,10 @@ public class AppState { } } + public synchronized void onNodesUpdated(List<NodeReport> updatedNodes) { + roleHistory.onNodesUpdated(updatedNodes); + } + /** * Is a role short lived by the threshold set for this application * @param instance instance http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/34b909a8/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java index dca7384..9aca32f 100644 --- a/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java +++ b/slider-core/src/main/java/org/apache/slider/server/appmaster/state/RoleHistory.java @@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.slider.common.tools.SliderUtils; @@ -88,6 +89,14 @@ public class RoleHistory { */ private Map<Integer, LinkedList<NodeInstance>> availableNodes; + /** + * Track the failed nodes. Currently used to make wiser decision of container + * ask with/without locality. Has other potential uses as well. + */ + private Map<String, Object> failedNodes = new HashMap<String, Object>(); + // dummy to be used in maps for faster lookup where we don't care about values + private final Object DUMMY_VALUE = new Object(); + public RoleHistory(List<ProviderRole> providerRoles) throws BadConfigException { this.providerRoles = providerRoles; @@ -660,6 +669,28 @@ public class RoleHistory { } /** + * Update failedNodes and nodemap based on the node state + * + * @param updatedNodes list of updated nodes + */ + public synchronized void onNodesUpdated(List<NodeReport> updatedNodes) { + for (NodeReport updatedNode : updatedNodes) { + String hostname = updatedNode.getNodeId() == null ? null : updatedNode + .getNodeId().getHost(); + if (hostname == null) { + continue; + } + if (updatedNode.getNodeState() != null + && updatedNode.getNodeState().isUnusable()) { + failedNodes.put(hostname, DUMMY_VALUE); + nodemap.remove(hostname); + } else { + failedNodes.remove(hostname); + } + } + } + + /** * A container release request was issued * @param container container submitted */ @@ -710,7 +741,11 @@ public class RoleHistory { available = false; } else { available = nodeEntry.containerCompleted(wasReleased); - maybeQueueNodeForWork(container, nodeEntry, available); + boolean isFailedNode = failedNodes.containsKey(RoleHistoryUtils + .hostnameOf(container)); + if (!isFailedNode) { + maybeQueueNodeForWork(container, nodeEntry, available); + } } touch(); return available; @@ -775,5 +810,13 @@ public class RoleHistory { return outstandingRequests.listOutstandingRequests(); } + /** + * Get a clone of the failedNodes + * + * @return the list + */ + public List<String> cloneFailedNodes() { + return new ArrayList<String>(failedNodes.keySet()); + } } http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/34b909a8/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy ---------------------------------------------------------------------- diff --git a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy index 340e72d..dbb70fa 100644 --- a/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy +++ b/slider-core/src/test/groovy/org/apache/slider/server/appmaster/model/history/TestRoleHistoryContainerEvents.groovy @@ -18,9 +18,14 @@ package org.apache.slider.server.appmaster.model.history +import java.util.List; + import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import org.apache.hadoop.yarn.api.records.Container +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority import org.apache.hadoop.yarn.api.records.Resource import org.apache.hadoop.yarn.client.api.AMRMClient @@ -365,4 +370,54 @@ class TestRoleHistoryContainerEvents extends BaseMockAppStateTest { MockContainer c2 = (MockContainer) sortedContainers[1] assert c2.priority.getPriority() == 1 } + + @Test + public void testNodeUpdated() throws Throwable { + describe("fail a node") + + int role = 0 + ProviderRole provRole = new ProviderRole(roleName, role) + RoleStatus roleStatus = new RoleStatus(provRole) + AMRMClient.ContainerRequest request = + roleHistory.requestNode(roleStatus, resource); + + String hostname = request.getNodes()[0] + assert hostname == age3Active0.hostname + + // build a container + MockContainer container = factory.newContainer() + container.nodeId = new MockNodeId(hostname, 0) + container.priority = request.getPriority() + roleHistory.onContainerAssigned(container); + + NodeMap nodemap = roleHistory.cloneNodemap(); + NodeInstance allocated = nodemap.get(hostname) + NodeEntry roleEntry = allocated.get(role) + assert roleEntry.starting == 1 + assert !roleEntry.available + RoleInstance ri = new RoleInstance(container); + // start it + roleHistory.onContainerStartSubmitted(container, ri) + roleHistory.onContainerStarted(container) + + int startSize = nodemap.size() + + // now send a list of updated (failed) nodes event + List<NodeReport> nodesUpdated = new ArrayList<NodeReport>(); + NodeId nodeId = NodeId.newInstance(hostname, 0) + NodeReport nodeReport = NodeReport.newInstance(nodeId, NodeState.LOST, null, null, null, null, 1, null, 0) + nodesUpdated.add(nodeReport) + roleHistory.onNodesUpdated(nodesUpdated) + + nodemap = roleHistory.cloneNodemap() + int endSize = nodemap.size() + if (startSize == 0) { + assert endSize == 0 + } else { + assert startSize - endSize == 1 + } + assert nodemap.get(hostname) == null + List<String> failedNodes = roleHistory.cloneFailedNodes() + assert failedNodes.contains(hostname) + } }