Repository: incubator-reef Updated Branches: refs/heads/master 73ff56478 -> 7bef8e755
[REEF-589] Support node addition while REEF applications run If an admin happens to dynamically add a new node to the cluster, and future allocations are done in that node, REEF crashes as it is not able to find that node in the catalog. The change involves adding a node to our catalog if an allocation event is received and the node is not already in the catalog. JIRA: [REEF-589](https://issues.apache.org/jira/browse/REEF-589) Pull Request: This closes #370 Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/7bef8e75 Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/7bef8e75 Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/7bef8e75 Branch: refs/heads/master Commit: 7bef8e75512998086b7fff54bec095e55a71c1cd Parents: 73ff564 Author: Ignacio Cano <[email protected]> Authored: Thu Aug 13 10:12:29 2015 -0700 Committer: Markus Weimer <[email protected]> Committed: Thu Aug 13 18:42:24 2015 -0700 ---------------------------------------------------------------------- .../driver/evaluator/EvaluatorManagerFactory.java | 18 ++++++++++++++++-- .../runtime/yarn/driver/YarnContainerManager.java | 15 +++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7bef8e75/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java index 2a0dd49..fb496f5 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java @@ -18,11 +18,15 @@ */ package org.apache.reef.runtime.common.driver.evaluator; +import org.apache.commons.lang3.Validate; import org.apache.reef.annotations.audience.DriverSide; import org.apache.reef.annotations.audience.Private; import org.apache.reef.driver.catalog.NodeDescriptor; import org.apache.reef.driver.catalog.ResourceCatalog; import org.apache.reef.driver.evaluator.EvaluatorProcessFactory; +import org.apache.reef.runtime.common.driver.catalog.ResourceCatalogImpl; +import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent; +import org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent; import org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent; import org.apache.reef.tang.Injector; @@ -90,10 +94,20 @@ public final class EvaluatorManagerFactory { */ public EvaluatorManager getNewEvaluatorManagerForNewlyAllocatedEvaluator( final ResourceAllocationEvent resourceAllocationEvent) { - final NodeDescriptor nodeDescriptor = this.resourceCatalog.getNode(resourceAllocationEvent.getNodeId()); + NodeDescriptor nodeDescriptor = this.resourceCatalog.getNode(resourceAllocationEvent.getNodeId()); if (nodeDescriptor == null) { - throw new RuntimeException("Unknown resource: " + resourceAllocationEvent.getNodeId()); + final String nodeId = resourceAllocationEvent.getNodeId(); + LOG.log(Level.WARNING, "Node {} is not in our catalog, adding it", nodeId); + final String[] hostNameAndPort = nodeId.split(":"); + Validate.isTrue(hostNameAndPort.length == 2); + final NodeDescriptorEvent nodeDescriptorEvent = NodeDescriptorEventImpl.newBuilder().setIdentifier(nodeId) + .setHostName(hostNameAndPort[0]).setPort(Integer.parseInt(hostNameAndPort[1])) + .setMemorySize(resourceAllocationEvent.getResourceMemory()) + .setRackName(resourceAllocationEvent.getRackName().get()).build(); + // downcasting not to change the API + ((ResourceCatalogImpl) resourceCatalog).handle(nodeDescriptorEvent); + nodeDescriptor = this.resourceCatalog.getNode(nodeId); } final EvaluatorDescriptorImpl evaluatorDescriptor = new EvaluatorDescriptorImpl(nodeDescriptor, resourceAllocationEvent.getResourceMemory(), resourceAllocationEvent.getVirtualCores().get(), http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7bef8e75/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java index af460d8..1f51614 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java @@ -392,6 +392,20 @@ final class YarnContainerManager this.requestsAfterSentToRM.remove(); doHomogeneousRequests(); + // the rack name comes as part of the host name, e.g. + // <rackName>-<hostNumber> + // we perform some checks just in case it doesn't + final String hostName = container.getNodeId().getHost(); + String rackName = null; + if (hostName != null) { + final String[] rackNameAndNumber = hostName.split("-"); + if (rackNameAndNumber.length == 2) { + rackName = rackNameAndNumber[0]; + } else { + LOG.log(Level.WARNING, "Could not get information from the rack name, should use the default"); + } + } + LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number = {1}", new Object[]{container.getResource().getMemory(), container.getResource().getVirtualCores()}); this.reefEventHandlers.onResourceAllocation(ResourceAllocationEventImpl.newBuilder() @@ -399,6 +413,7 @@ final class YarnContainerManager .setNodeId(container.getNodeId().toString()) .setResourceMemory(container.getResource().getMemory()) .setVirtualCores(container.getResource().getVirtualCores()) + .setRackName(rackName) .build()); this.updateRuntimeStatus(); } else {
