Repository: incubator-reef
Updated Branches:
  refs/heads/master 73ff56478 -> 7bef8e755


[REEF-589] Support node addition while REEF applications run

If an admin happens to dynamically add a new node to the cluster, and
future allocations are done in that node, REEF crashes as it is not able
to find that node in the catalog.

The change involves adding a node to our catalog if an allocation event
is received and the node is not already in the catalog.

JIRA:
  [REEF-589](https://issues.apache.org/jira/browse/REEF-589)

Pull Request:
  This closes #370


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/7bef8e75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/7bef8e75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/7bef8e75

Branch: refs/heads/master
Commit: 7bef8e75512998086b7fff54bec095e55a71c1cd
Parents: 73ff564
Author: Ignacio Cano <[email protected]>
Authored: Thu Aug 13 10:12:29 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Thu Aug 13 18:42:24 2015 -0700

----------------------------------------------------------------------
 .../driver/evaluator/EvaluatorManagerFactory.java | 18 ++++++++++++++++--
 .../runtime/yarn/driver/YarnContainerManager.java | 15 +++++++++++++++
 2 files changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7bef8e75/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
index 2a0dd49..fb496f5 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/evaluator/EvaluatorManagerFactory.java
@@ -18,11 +18,15 @@
  */
 package org.apache.reef.runtime.common.driver.evaluator;
 
+import org.apache.commons.lang3.Validate;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.driver.catalog.NodeDescriptor;
 import org.apache.reef.driver.catalog.ResourceCatalog;
 import org.apache.reef.driver.evaluator.EvaluatorProcessFactory;
+import org.apache.reef.runtime.common.driver.catalog.ResourceCatalogImpl;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
 import org.apache.reef.tang.Injector;
@@ -90,10 +94,20 @@ public final class EvaluatorManagerFactory {
    */
   public EvaluatorManager getNewEvaluatorManagerForNewlyAllocatedEvaluator(
       final ResourceAllocationEvent resourceAllocationEvent) {
-    final NodeDescriptor nodeDescriptor = 
this.resourceCatalog.getNode(resourceAllocationEvent.getNodeId());
+    NodeDescriptor nodeDescriptor = 
this.resourceCatalog.getNode(resourceAllocationEvent.getNodeId());
 
     if (nodeDescriptor == null) {
-      throw new RuntimeException("Unknown resource: " + 
resourceAllocationEvent.getNodeId());
+      final String nodeId = resourceAllocationEvent.getNodeId();
+      LOG.log(Level.WARNING, "Node {} is not in our catalog, adding it", 
nodeId);
+      final String[] hostNameAndPort = nodeId.split(":");
+      Validate.isTrue(hostNameAndPort.length == 2);
+      final NodeDescriptorEvent nodeDescriptorEvent = 
NodeDescriptorEventImpl.newBuilder().setIdentifier(nodeId)
+          
.setHostName(hostNameAndPort[0]).setPort(Integer.parseInt(hostNameAndPort[1]))
+          .setMemorySize(resourceAllocationEvent.getResourceMemory())
+          .setRackName(resourceAllocationEvent.getRackName().get()).build();
+      // downcasting not to change the API
+      ((ResourceCatalogImpl) resourceCatalog).handle(nodeDescriptorEvent);
+      nodeDescriptor = this.resourceCatalog.getNode(nodeId);
     }
     final EvaluatorDescriptorImpl evaluatorDescriptor = new 
EvaluatorDescriptorImpl(nodeDescriptor,
         resourceAllocationEvent.getResourceMemory(), 
resourceAllocationEvent.getVirtualCores().get(),

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/7bef8e75/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index af460d8..1f51614 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -392,6 +392,20 @@ final class YarnContainerManager
         this.requestsAfterSentToRM.remove();
         doHomogeneousRequests();
 
+        // the rack name comes as part of the host name, e.g.
+        // <rackName>-<hostNumber>
+        // we perform some checks just in case it doesn't
+        final String hostName = container.getNodeId().getHost();
+        String rackName = null;
+        if (hostName != null) {
+          final String[] rackNameAndNumber = hostName.split("-");
+          if (rackNameAndNumber.length == 2) {
+            rackName = rackNameAndNumber[0];
+          } else {
+            LOG.log(Level.WARNING, "Could not get information from the rack 
name, should use the default");
+          }
+        }
+
         LOG.log(Level.FINEST, "Allocated Container: memory = {0}, core number 
= {1}",
             new Object[]{container.getResource().getMemory(), 
container.getResource().getVirtualCores()});
         
this.reefEventHandlers.onResourceAllocation(ResourceAllocationEventImpl.newBuilder()
@@ -399,6 +413,7 @@ final class YarnContainerManager
             .setNodeId(container.getNodeId().toString())
             .setResourceMemory(container.getResource().getMemory())
             .setVirtualCores(container.getResource().getVirtualCores())
+            .setRackName(rackName)
             .build());
         this.updateRuntimeStatus();
       } else {

Reply via email to