Repository: incubator-reef
Updated Branches:
  refs/heads/master aab0e6eb5 -> cb134fe34


[REEF-411]: Assign nodes evenly to a fixed set of racks in the local runtime.

This work allows to put containers into the different racks available in
the local runtime, instead of just using the default rack.
If no rack is configured, the default one is used.
Some UTs were added

JIRA:
  [REEF-411](https://issues.apache.org/jira/browse/REEF-411)

Pull Request:
  This closes #251


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/cb134fe3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/cb134fe3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/cb134fe3

Branch: refs/heads/master
Commit: cb134fe34cad031a4e2d2985c6994029f9249b48
Parents: aab0e6e
Author: Ignacio Cano <[email protected]>
Authored: Fri Jun 26 13:37:31 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Tue Jun 30 11:34:38 2015 -0700

----------------------------------------------------------------------
 .../org/apache/reef/util/CollectionUtils.java   |  53 ++++
 lang/java/reef-runtime-local/pom.xml            |   4 +
 .../runtime/local/driver/ContainerManager.java  | 308 ++++++++++++++++---
 .../runtime/local/driver/ResourceManager.java   |  84 +++--
 .../local/driver/ResourceRequestQueue.java      |   9 +
 .../local/driver/ResourceManagerTest.java       | 279 +++++++++++++++++
 .../rack/awareness/RackAwareEvaluatorTest.java  |  18 +-
 7 files changed, 655 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cb134fe3/lang/java/reef-common/src/main/java/org/apache/reef/util/CollectionUtils.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/util/CollectionUtils.java 
b/lang/java/reef-common/src/main/java/org/apache/reef/util/CollectionUtils.java
new file mode 100644
index 0000000..ca94928
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/util/CollectionUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.util;
+
+import java.util.Collection;
+
+/**
+ * Utilities for collection classes.
+ */
+public final class CollectionUtils {
+
+  private CollectionUtils() {
+    // avoid instantiation
+  }
+
+
+  /**
+   * Checks if the collection is null or empty.
+   * @param parameter the collection
+   * @return true if the collection is null or empty
+   */
+  public static <T> boolean isEmpty(final Collection<T> parameter) {
+    return parameter == null || parameter.isEmpty();
+  }
+
+  /**
+   * Checks if the collection is not null and not empty.
+   * @param parameter the collection
+   * @return true if the collection is not null nor empty
+   *
+   */
+  public static <T> boolean isNotEmpty(final Collection<T> parameter) {
+    return !isEmpty(parameter);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cb134fe3/lang/java/reef-runtime-local/pom.xml
----------------------------------------------------------------------
diff --git a/lang/java/reef-runtime-local/pom.xml 
b/lang/java/reef-runtime-local/pom.xml
index 09d2612..aa449b4 100644
--- a/lang/java/reef-runtime-local/pom.xml
+++ b/lang/java/reef-runtime-local/pom.xml
@@ -42,6 +42,10 @@ under the License.
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+        </dependency>        
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cb134fe3/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
index 49b4242..9ca1f09 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
@@ -18,39 +18,48 @@
  */
 package org.apache.reef.runtime.local.driver;
 
+import org.apache.commons.lang.Validate;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.client.FailedRuntime;
 import org.apache.reef.proto.ReefServiceProtos;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
 import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.runtime.local.client.parameters.DefaultMemorySize;
+import org.apache.reef.runtime.local.client.parameters.DefaultNumberOfCores;
 import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
 import org.apache.reef.runtime.local.client.parameters.RackNames;
 import org.apache.reef.runtime.local.client.parameters.RootFolder;
 import org.apache.reef.runtime.local.process.ReefRunnableProcessObserver;
 import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.util.CollectionUtils;
+import org.apache.reef.util.Optional;
 import org.apache.reef.wake.EventHandler;
-import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.RemoteMessage;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.time.Time;
 import org.apache.reef.wake.time.runtime.RuntimeClock;
 import org.apache.reef.wake.time.runtime.event.RuntimeStart;
 import org.apache.reef.wake.time.runtime.event.RuntimeStop;
 
-import javax.inject.Inject;
-
 import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.LinkedList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.inject.Inject;
+
 /**
  * Manages a set of Containers that each reference a Thread.
  */
@@ -60,26 +69,42 @@ final class ContainerManager implements AutoCloseable {
 
   private static final Logger LOG = 
Logger.getLogger(ContainerManager.class.getName());
 
+  private static final Collection<String> DEFAULT_RACKS = 
Arrays.asList(RackNames.DEFAULT_RACK_NAME);
+  private static final String PATH_SEPARATOR = "/";
+  private static final String ANY = "*";
+
   /**
    * Map from containerID -> Container.
    */
   private final Map<String, Container> containers = new HashMap<>();
 
   /**
-   * List of free, unallocated nodes by their Node ID.
+   * Map of free, unallocated nodes by rack, by their Node ID.
+   * <RackName,<NodeId, True>>
+   * Used a map instead of a list as the value for faster lookup
    */
-  private final List<String> freeNodeList = new LinkedList<>();
+  private final Map<String, Map<String, Boolean>> freeNodesPerRack = new 
HashMap<>();
+
+  /**
+   * Inverted index, map of <NodeId, RackName>.
+   */
+  private final Map<String, String> racksPerNode = new HashMap<>();
+
+  /**
+   * Capacity of each rack (as even as possible).
+   */
+  private final Map<String, Integer> capacitiesPerRack = new HashMap<>();
 
-  private final String errorHandlerRID;
   private final int capacity;
+  private final int defaultMemorySize;
+  private final int defaultNumberOfCores;
+  private final String errorHandlerRID;
   private final EventHandler<NodeDescriptorEvent> nodeDescriptorHandler;
   private final File rootFolder;
   private final REEFFileNames fileNames;
   private final ReefRunnableProcessObserver processObserver;
   private final String localAddress;
-  // for now is a single rack, it will be a set in the next commit
-  private final String availableRack;
-
+  private final Collection<String> availableRacks;
 
   @Inject
   ContainerManager(
@@ -92,29 +117,31 @@ final class ContainerManager implements AutoCloseable {
       EventHandler<NodeDescriptorEvent> nodeDescriptorHandler,
       @Parameter(RackNames.class) final Set<String> rackNames,
       final ReefRunnableProcessObserver processObserver,
-      final LocalAddressProvider localAddressProvider) {
+      final LocalAddressProvider localAddressProvider,
+      @Parameter(DefaultMemorySize.class) final int defaultMemorySize,
+      @Parameter(DefaultNumberOfCores.class) final int defaultNumberOfCores) {
     this.capacity = capacity;
+    this.defaultMemorySize = defaultMemorySize;
+    this.defaultNumberOfCores = defaultNumberOfCores;
     this.fileNames = fileNames;
     this.processObserver = processObserver;
     this.errorHandlerRID = remoteManager.getMyIdentifier();
     this.nodeDescriptorHandler = nodeDescriptorHandler;
     this.rootFolder = new File(rootFolderName);
     this.localAddress = localAddressProvider.getLocalAddress();
-    // this is safe, we are guaranteed that this is not empty (default value)
-    // we will just 1 rack for now, the next commit will include creating
-    // containers in different racks.
-    this.availableRack = rackNames.iterator().next();
+    this.availableRacks = normalize(rackNames);
 
     LOG.log(Level.FINEST, "Initializing Container Manager with {0} 
containers", capacity);
 
-    remoteManager.registerHandler(ReefServiceProtos.RuntimeErrorProto.class, 
new EventHandler<RemoteMessage<ReefServiceProtos.RuntimeErrorProto>>() {
-      @Override
-      public void onNext(final 
RemoteMessage<ReefServiceProtos.RuntimeErrorProto> value) {
-        final FailedRuntime error = new FailedRuntime(value.getMessage());
-        LOG.log(Level.SEVERE, "FailedRuntime: " + error, 
error.getReason().orElse(null));
-        release(error.getId());
-      }
-    });
+    remoteManager.registerHandler(ReefServiceProtos.RuntimeErrorProto.class,
+        new EventHandler<RemoteMessage<ReefServiceProtos.RuntimeErrorProto>>() 
{
+          @Override
+          public void onNext(final 
RemoteMessage<ReefServiceProtos.RuntimeErrorProto> value) {
+            final FailedRuntime error = new FailedRuntime(value.getMessage());
+            LOG.log(Level.SEVERE, "FailedRuntime: " + error, 
error.getReason().orElse(null));
+            release(error.getId());
+          }
+        });
     clock.registerEventHandler(RuntimeStart.class, new EventHandler<Time>() {
       @Override
       public void onNext(final Time value) {
@@ -134,43 +161,227 @@ final class ContainerManager implements AutoCloseable {
       }
     });
 
+    init(rackNames);
+
     LOG.log(Level.FINE, "Initialized Container Manager with {0} containers", 
capacity);
   }
 
+  private Collection<String> normalize(final Collection<String> rackNames) {
+    return normalize(rackNames, true);
+  }
+
+  /**
+   * Normalizes the rack names.
+   *
+   * @param rackNames
+   *          the rack names to normalize
+   * @param validateEnd
+   *          if true, throws an exception if the name ends with ANY (*)
+   * @return a normalized collection
+   */
+  private Collection<String> normalize(final Collection<String> rackNames,
+      final boolean validateEnd) {
+    final List<String> normalizedRackNames = new ArrayList<>(rackNames.size());
+    final Iterator<String> it = rackNames.iterator();
+    while (it.hasNext()) {
+      String rackName = it.next().trim();
+      Validate.notEmpty(rackName, "Rack names cannot be empty");
+      // should start with a separator
+      if (!rackName.startsWith(PATH_SEPARATOR)) {
+        rackName = PATH_SEPARATOR + rackName;
+      }
+      // remove the ending separator
+      if (rackName.endsWith(PATH_SEPARATOR)) {
+        rackName = rackName.substring(0, rackName.length() - 1);
+      }
+      if (validateEnd) {
+        Validate.isTrue(!rackName.endsWith(ANY));
+      }
+      normalizedRackNames.add(rackName);
+    }
+    return normalizedRackNames;
+  }
+
+  private void init(final Set<String> rackNames) {
+    // evenly distribute the containers among the racks
+    // if rack names are not specified, the default rack will be used, so the 
denominator will always be > 0
+    final int capacityPerRack = capacity / rackNames.size();
+    int missing = capacity % rackNames.size();
+    // initialize the freeNodesPerRackList and the capacityPerRack
+    for (final String rackName : rackNames) {
+      this.freeNodesPerRack.put(rackName, new HashMap<String, Boolean>());
+      this.capacitiesPerRack.put(rackName, capacityPerRack);
+      if (missing > 0) {
+        this.capacitiesPerRack.put(rackName, 
this.capacitiesPerRack.get(rackName) + 1);
+        missing--;
+      }
+    }
+  }
+
+
   private void sendNodeDescriptors() {
     final IDMaker idmaker = new IDMaker("Node-");
-    for (int i = 0; i < capacity; i++) {
-      final String id = idmaker.getNextID();
-      this.freeNodeList.add(id);
-      nodeDescriptorHandler.onNext(NodeDescriptorEventImpl.newBuilder()
-          .setIdentifier(id)
-          .setRackName(availableRack)
-          .setHostName(this.localAddress)
-          .setPort(i)
-          .setMemorySize(512) // TODO: Find the actual system memory on this 
machine.
-          .build());
+    int j = 0;
+    for (final String rackName : this.availableRacks) {
+      final int rackCapacity = this.capacitiesPerRack.get(rackName);
+      for (int i = 0; i < rackCapacity; i++) {
+        final String id = idmaker.getNextID();
+        this.racksPerNode.put(id, rackName);
+        this.freeNodesPerRack.get(rackName).put(id, Boolean.TRUE);
+        this.nodeDescriptorHandler.onNext(NodeDescriptorEventImpl.newBuilder()
+            .setIdentifier(id)
+            .setRackName(rackName)
+            .setHostName(this.localAddress)
+            .setPort(j)
+            .setMemorySize(this.defaultMemorySize) // TODO Find the actual 
system memory on this machine.
+            .build());
+        j++;
+      }
     }
   }
 
-  boolean hasContainerAvailable() {
-    return this.freeNodeList.size() > 0;
+  private Collection<String> getRackNamesOrDefault(final List<String> 
rackNames) {
+    return CollectionUtils.isNotEmpty(rackNames) ? normalize(rackNames, false)
+        : DEFAULT_RACKS;
   }
 
-  Container allocateOne(final int megaBytes, final int numberOfCores) {
+
+  /**
+   * Returns the node name of the container to be allocated if it's available, 
selected from the list of preferred
+   * node names. If the list is empty, then an empty optional is returned
+   *
+   * @param rackNames
+   *          the list of preferred racks
+   * @return the node name where to allocate the container
+   */
+  private Optional<String> getPreferredNode(final List<String> nodeNames) {
+    if (CollectionUtils.isNotEmpty(nodeNames)) {
+      for (final String nodeName : nodeNames) {
+        final String possibleRack = racksPerNode.get(nodeName);
+        if (possibleRack != null
+            && freeNodesPerRack.get(possibleRack).containsKey(nodeName)) {
+          return Optional.of(nodeName);
+        }
+      }
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * Returns the rack where to allocate the container, selected from the list 
of
+   * preferred rack names. If the list is empty, and there's space in the 
default
+   * rack, then the default rack is returned. The relax locality semantic is
+   * enabled if the list of rack names contains '/*', otherwise relax locality
+   * is considered disabled
+   *
+   * @param rackNames
+   *          the list of preferred racks
+   * @return the rack name where to allocate the container
+   */
+  private Optional<String> getPreferredRack(final List<String> rackNames) {
+    final Collection<String> normalized = getRackNamesOrDefault(rackNames);
+    for (final String rackName : normalized) {
+      // if it does not end with the any modifier,
+      // then we should do an exact match
+      if (!rackName.endsWith(ANY)) {
+        if (freeNodesPerRack.containsKey(rackName)
+            && freeNodesPerRack.get(rackName).size() > 0) {
+          return Optional.of(rackName);
+        }
+      } else {
+        // if ends with the any modifier, we do a prefix match
+        final Iterator<String> it = availableRacks.iterator();
+        while (it.hasNext()) {
+          final String possibleRackName = it.next();
+          // remove the any modifier
+          final String newRackName = rackName.substring(0,
+              rackName.length() - 1);
+          if (possibleRackName.startsWith(newRackName)) {
+            if (freeNodesPerRack.get(possibleRackName).size() > 0) {
+              return Optional.of(possibleRackName);
+            }
+          }
+        }
+      }
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * Allocates a container based on a request event. First it tries to match a
+   * given node, if it cannot, it tries to get a spot in a rack
+   *
+   * @param requestEvent
+   *          the request event
+   * @return an optional with the container if allocated
+   */
+  Optional<Container> allocateContainer(final ResourceRequestEvent 
requestEvent) {
+    Container container = null;
+    final Optional<String> nodeName = getPreferredNode(requestEvent
+        .getNodeNameList());
+    if (nodeName.isPresent()) {
+      container = allocateBasedOnNode(
+          requestEvent.getMemorySize().orElse(this.defaultMemorySize),
+          requestEvent.getVirtualCores().orElse(this.defaultNumberOfCores),
+          nodeName.get());
+    } else {
+      final Optional<String> rackName = getPreferredRack(requestEvent
+          .getRackNameList());
+      if (rackName.isPresent()) {
+        container = allocateBasedOnRack(
+            requestEvent.getMemorySize().orElse(this.defaultMemorySize),
+            requestEvent.getVirtualCores().orElse(this.defaultNumberOfCores),
+            rackName.get());
+      }
+    }
+    return Optional.ofNullable(container);
+  }
+
+  private Container allocateBasedOnNode(final int megaBytes,
+      final int numberOfCores, final String nodeId) {
     synchronized (this.containers) {
-      final String nodeId = this.freeNodeList.remove(0);
-      final String processID = nodeId + "-" + 
String.valueOf(System.currentTimeMillis());
-      final File processFolder = new File(this.rootFolder, processID);
-      processFolder.mkdirs();
-      // setting rackName to null for now, will end up using the default one
-      final ProcessContainer container = new ProcessContainer(
-          this.errorHandlerRID, nodeId, processID, processFolder, megaBytes, 
numberOfCores, availableRack, this.fileNames, this.processObserver);
-      this.containers.put(container.getContainerID(), container);
-      LOG.log(Level.FINE, "Allocated {0}", container.getContainerID());
-      return container;
+      // get the rack name
+      final String rackName = this.racksPerNode.get(nodeId);
+      // remove if from the free map
+      this.freeNodesPerRack.get(rackName).remove(nodeId);
+      // allocate
+      return allocate(megaBytes, numberOfCores, nodeId, rackName);
     }
   }
 
+  private Container allocateBasedOnRack(final int megaBytes,
+      final int numberOfCores, final String rackName) {
+    synchronized (this.containers) {
+      // get the first free nodeId in the rack
+      final Set<String> freeNodes = this.freeNodesPerRack.get(rackName)
+          .keySet();
+      final Iterator<String> it = freeNodes.iterator();
+      if (!it.hasNext()) {
+        throw new IllegalArgumentException(
+            "There should be a free node in the specified rack " + rackName);
+      }
+      final String nodeId = it.next();
+      // remove it from the free map
+      this.freeNodesPerRack.get(rackName).remove(nodeId);
+      // allocate
+      return allocate(megaBytes, numberOfCores, nodeId, rackName);
+    }
+  }
+
+  private Container allocate(final int megaBytes, final int numberOfCores,
+      final String nodeId, final String rackName) {
+    final String processID = nodeId + "-"
+        + String.valueOf(System.currentTimeMillis());
+    final File processFolder = new File(this.rootFolder, processID);
+    processFolder.mkdirs();
+    final ProcessContainer container = new ProcessContainer(
+        this.errorHandlerRID, nodeId, processID, processFolder, megaBytes,
+        numberOfCores, rackName, this.fileNames, this.processObserver);
+    this.containers.put(container.getContainerID(), container);
+    LOG.log(Level.FINE, "Allocated {0}", container.getContainerID());
+    return container;
+  }
+
   void release(final String containerID) {
     synchronized (this.containers) {
       final Container ctr = this.containers.get(containerID);
@@ -179,7 +390,7 @@ final class ContainerManager implements AutoCloseable {
         if (ctr.isRunning()) {
           ctr.close();
         }
-        this.freeNodeList.add(ctr.getNodeID());
+        this.freeNodesPerRack.get(ctr.getRackName()).put(ctr.getNodeID(), 
Boolean.TRUE);
         this.containers.remove(ctr.getContainerID());
       } else {
         LOG.log(Level.INFO, "Ignoring release request for unknown containerID 
[{0}]", containerID);
@@ -214,4 +425,5 @@ final class ContainerManager implements AutoCloseable {
       }
     }
   }
-}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cb134fe3/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
index 557428e..3ef7808 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceManager.java
@@ -29,21 +29,18 @@ import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationE
 import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEventImpl;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEventImpl;
-import org.apache.reef.runtime.common.files.ClasspathProvider;
 import org.apache.reef.runtime.common.files.FileResource;
 import org.apache.reef.runtime.common.files.REEFFileNames;
 import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
 import org.apache.reef.runtime.common.utils.RemoteManager;
-import org.apache.reef.runtime.local.client.parameters.DefaultMemorySize;
-import org.apache.reef.runtime.local.client.parameters.DefaultNumberOfCores;
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.tang.exceptions.BindException;
 import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.Optional;
 import org.apache.reef.util.logging.LoggingScope;
 import org.apache.reef.util.logging.LoggingScopeFactory;
 import org.apache.reef.wake.EventHandler;
 
-import javax.inject.Inject;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -51,6 +48,8 @@ import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.inject.Inject;
+
 /**
  * A resource manager that uses threads to execute containers.
  */
@@ -65,27 +64,23 @@ public final class ResourceManager {
   private final EventHandler<ResourceAllocationEvent> allocationHandler;
   private final ContainerManager theContainers;
   private final EventHandler<RuntimeStatusEvent> 
runtimeStatusHandlerEventHandler;
-  private final int defaultMemorySize;
-  private final int defaultNumberOfCores;
   private final ConfigurationSerializer configurationSerializer;
   private final RemoteManager remoteManager;
   private final REEFFileNames fileNames;
-  private final ClasspathProvider classpathProvider;
   private final double jvmHeapFactor;
   private final LoggingScopeFactory loggingScopeFactory;
 
   @Inject
   ResourceManager(
       final ContainerManager containerManager,
-      @Parameter(RuntimeParameters.ResourceAllocationHandler.class) final 
EventHandler<ResourceAllocationEvent> allocationHandler,
-      @Parameter(RuntimeParameters.RuntimeStatusHandler.class) final 
EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler,
-      @Parameter(DefaultMemorySize.class) final int defaultMemorySize,
-      @Parameter(DefaultNumberOfCores.class) final int defaultNumberOfCores,
+      @Parameter(RuntimeParameters.ResourceAllocationHandler.class)
+      final EventHandler<ResourceAllocationEvent> allocationHandler,
+      @Parameter(RuntimeParameters.RuntimeStatusHandler.class)
+      final EventHandler<RuntimeStatusEvent> runtimeStatusHandlerEventHandler,
       @Parameter(JVMHeapSlack.class) final double jvmHeapSlack,
       final ConfigurationSerializer configurationSerializer,
       final RemoteManager remoteManager,
       final REEFFileNames fileNames,
-      final ClasspathProvider classpathProvider,
       final LoggingScopeFactory loggingScopeFactory) {
 
     this.theContainers = containerManager;
@@ -93,10 +88,7 @@ public final class ResourceManager {
     this.runtimeStatusHandlerEventHandler = runtimeStatusHandlerEventHandler;
     this.configurationSerializer = configurationSerializer;
     this.remoteManager = remoteManager;
-    this.defaultMemorySize = defaultMemorySize;
-    this.defaultNumberOfCores = defaultNumberOfCores;
     this.fileNames = fileNames;
-    this.classpathProvider = classpathProvider;
     this.jvmHeapFactor = 1.0 - jvmHeapSlack;
     this.loggingScopeFactory = loggingScopeFactory;
 
@@ -168,7 +160,8 @@ public final class ResourceManager {
 
       final Container c = 
this.theContainers.get(launchRequest.getIdentifier());
 
-      try (final LoggingScope lb = 
this.loggingScopeFactory.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:evaluatorConfigurationFile"))
 {
+      try (final LoggingScope lb = this.loggingScopeFactory
+          
.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:evaluatorConfigurationFile"))
 {
         // Add the global files and libraries.
         c.addGlobalFiles(this.fileNames.getGlobalFolder());
         c.addLocalFiles(getLocalFiles(launchRequest));
@@ -183,7 +176,8 @@ public final class ResourceManager {
         }
       }
 
-      try (final LoggingScope lc = 
this.loggingScopeFactory.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:runCommand"))
 {
+      try (final LoggingScope lc = this.loggingScopeFactory
+          
.getNewLoggingScope("ResourceManager.onResourceLaunchRequest:runCommand")) {
 
         final List<String> command = launchRequest.getProcess()
             .setErrorHandlerRID(this.remoteManager.getMyIdentifier())
@@ -199,41 +193,39 @@ public final class ResourceManager {
   }
 
   /**
+  /**
    * Checks the allocation queue for new allocations and if there are any
    * satisfies them.
    */
   private void checkRequestQueue() {
 
-    if (this.theContainers.hasContainerAvailable() && 
this.requestQueue.hasOutStandingRequests()) {
-
-      // Record the satisfaction of one request and get its details.
-      final ResourceRequestEvent requestEvent = this.requestQueue.satisfyOne();
-
-      // Allocate a Container
-      final Container container = this.theContainers.allocateOne(
-              requestEvent.getMemorySize().orElse(this.defaultMemorySize),
-              
requestEvent.getVirtualCores().orElse(this.defaultNumberOfCores));
-
-      // Tell the receivers about it
-      final ResourceAllocationEvent alloc =
-          ResourceAllocationEventImpl.newBuilder()
-              .setIdentifier(container.getContainerID())
-              .setNodeId(container.getNodeID())
-              .setResourceMemory(container.getMemory())
-              .setVirtualCores(container.getNumberOfCores())
-              .setRackName(container.getRackName())
-              .build();
-
-      LOG.log(Level.FINEST, "Allocating container: {0}", container);
-      this.allocationHandler.onNext(alloc);
-
-      // update REEF
-      this.sendRuntimeStatus();
-
-      // Check whether we can satisfy another one.
-      this.checkRequestQueue();
-
+    if (requestQueue.hasOutStandingRequests()) {
+      final ResourceRequest resourceRequest = requestQueue.head();
+      final ResourceRequestEvent requestEvent = 
resourceRequest.getRequestProto();
+      final Optional<Container> cont = 
theContainers.allocateContainer(requestEvent);
+      if (cont.isPresent()) {
+        // Container has been allocated
+        requestQueue.satisfyOne();
+        final Container container = cont.get();
+        // Tell the receivers about it
+        final ResourceAllocationEvent alloc = 
ResourceAllocationEventImpl.newBuilder()
+            
.setIdentifier(container.getContainerID()).setNodeId(container.getNodeID())
+            
.setResourceMemory(container.getMemory()).setVirtualCores(container.getNumberOfCores())
+            .setRackName(container.getRackName()).build();
+
+        LOG.log(Level.FINEST, "Allocating container: {0}", container);
+        this.allocationHandler.onNext(alloc);
+        // update REEF
+        this.sendRuntimeStatus();
+
+        // Check whether we can satisfy another one.
+        this.checkRequestQueue();
+      } else {
+        // could not allocate, update REEF
+        this.sendRuntimeStatus();
+      }
     } else {
+      // done
       this.sendRuntimeStatus();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cb134fe3/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
index 898f3c7..1044c1c 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ResourceRequestQueue.java
@@ -67,4 +67,13 @@ final class ResourceRequestQueue {
     return this.requestQueue.size();
   }
 
+  /**
+   * Retrieves but does not remove the head of the queue.
+   * @return the head of the queue
+   *
+   */
+  synchronized ResourceRequest head() {
+    return requestQueue.element();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cb134fe3/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceManagerTest.java
 
b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceManagerTest.java
new file mode 100644
index 0000000..3b22a67
--- /dev/null
+++ 
b/lang/java/reef-runtime-local/src/test/java/org/apache/reef/runtime/local/driver/ResourceManagerTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.local.driver;
+
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
+import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl;
+import org.apache.reef.runtime.common.driver.api.RuntimeParameters;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceAllocationEvent;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.ResourceStatusEvent;
+import 
org.apache.reef.runtime.common.driver.resourcemanager.RuntimeStatusEvent;
+import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.utils.RemoteManager;
+import org.apache.reef.runtime.local.client.parameters.MaxNumberOfEvaluators;
+import org.apache.reef.runtime.local.client.parameters.RackNames;
+import org.apache.reef.tang.Injector;
+import org.apache.reef.tang.Tang;
+import org.apache.reef.tang.exceptions.InjectionException;
+import org.apache.reef.tang.formats.ConfigurationSerializer;
+import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.wake.EventHandler;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.times;
+
+/**
+ * Unit test for Resource Manager (and ContainerManager) classes.
+ *
+ */
+public class ResourceManagerTest {
+
+  private Injector injector;
+
+  private ResourceManager resourceManager;
+  private RemoteManager remoteManager;
+  private EventHandler<ResourceStatusEvent> mockRuntimeResourceStatusHandler;
+  private EventHandler<NodeDescriptorEvent> mockNodeDescriptorHandler;
+  private EventHandler<ResourceAllocationEvent> mockResourceAllocationHandler;
+  private EventHandler<RuntimeStatusEvent> mockRuntimeStatusHandler;
+  private REEFFileNames filenames;
+  private ContainerManager containerManager;
+  private ConfigurationSerializer configurationSerializer;
+  private static final double JVM_HEAP_SLACK = 0.1;
+  private LoggingScopeFactory loggingScopeFactory;
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setUp() throws InjectionException {
+    injector = Tang.Factory.getTang().newInjector();
+    remoteManager = injector.getInstance(RemoteManager.class);
+    mockRuntimeResourceStatusHandler = mock(EventHandler.class);
+    
injector.bindVolatileParameter(RuntimeParameters.ResourceStatusHandler.class, 
mockRuntimeResourceStatusHandler);
+    mockNodeDescriptorHandler = mock(EventHandler.class);
+    
injector.bindVolatileParameter(RuntimeParameters.NodeDescriptorHandler.class, 
mockNodeDescriptorHandler);
+    mockResourceAllocationHandler = mock(EventHandler.class);
+    
injector.bindVolatileParameter(RuntimeParameters.ResourceAllocationHandler.class,
 mockResourceAllocationHandler);
+    mockRuntimeStatusHandler = mock(EventHandler.class);
+    
injector.bindVolatileParameter(RuntimeParameters.RuntimeStatusHandler.class, 
mockRuntimeStatusHandler);
+    configurationSerializer = 
injector.getInstance(ConfigurationSerializer.class);
+    filenames = injector.getInstance(REEFFileNames.class);
+    loggingScopeFactory = injector.getInstance(LoggingScopeFactory.class);
+  }
+
+  @After
+  public void tearDown() {
+    // no need to reset mocks, they are created again in the setup
+  }
+
+  /**
+   * Helper method to call the sendNodeDescriptors private method in the
+   * containerManager, which populates the available containers in each rack.
+   */
+  private void sendNodeDescriptors() {
+    try {
+      final Method method = 
ContainerManager.class.getDeclaredMethod("sendNodeDescriptors");
+      method.setAccessible(true);
+      method.invoke(containerManager);
+    } catch (final Exception exc) {
+      throw new RuntimeException(exc);
+    }
+  }
+
+  @Test(expected = InjectionException.class)
+  public void testInvalidRacksConfigured() throws InjectionException {
+    // Given
+    final Set<String> availableRacks = new 
HashSet<String>(Arrays.asList("/rack1/*"));
+    injector.bindVolatileParameter(RackNames.class, availableRacks);
+    // When
+    containerManager = injector.getInstance(ContainerManager.class);
+    // Then
+    // expect the exception to be thrown
+  }
+
+  @Test
+  public void testOneAllocationsInDefaultRack() throws InjectionException {
+    // Given
+    containerManager = injector.getInstance(ContainerManager.class);
+    sendNodeDescriptors();
+    resourceManager = new ResourceManager(containerManager, 
mockResourceAllocationHandler, mockRuntimeStatusHandler,
+        JVM_HEAP_SLACK, configurationSerializer, remoteManager, filenames, 
loggingScopeFactory);
+    final ResourceRequestEvent request = 
ResourceRequestEventImpl.newBuilder().setResourceCount(1).setVirtualCores(1)
+        .setMemorySize(64).build();
+    // When
+    resourceManager.onResourceRequest(request);
+    // Then
+    verify(mockResourceAllocationHandler, 
times(1)).onNext(any(ResourceAllocationEvent.class));
+    verify(mockRuntimeStatusHandler, 
times(2)).onNext(any(RuntimeStatusEvent.class));
+  }
+
+  @Test
+  public void 
testZeroAllocationsDueToContainersNotAvailableAndRelaxLocalityDisabled() throws 
InjectionException {
+    // Given
+    containerManager = injector.getInstance(ContainerManager.class);
+    // not sending notifications, there are no available free slots in the 
container manager
+    resourceManager = new ResourceManager(containerManager, 
mockResourceAllocationHandler, mockRuntimeStatusHandler,
+        JVM_HEAP_SLACK, configurationSerializer, remoteManager,
+        filenames, loggingScopeFactory);
+    final ResourceRequestEvent request = 
ResourceRequestEventImpl.newBuilder().setResourceCount(2).setVirtualCores(1)
+        .setMemorySize(64).build();
+    // When
+    resourceManager.onResourceRequest(request);
+    // Then
+    verify(mockResourceAllocationHandler, 
times(0)).onNext(any(ResourceAllocationEvent.class));
+    verify(mockRuntimeStatusHandler, 
times(1)).onNext(any(RuntimeStatusEvent.class));
+  }
+
+  @Test
+  public void testTwoAllocationsInDifferentRacks() throws InjectionException {
+    // Given
+    final List<String> availableRacks = Arrays.asList("/rack1", "/rack2");
+    final Set<String> availableRacksSet = new HashSet<String>(availableRacks);
+    injector.bindVolatileParameter(RackNames.class, availableRacksSet); // 2 
available racks
+    injector.bindVolatileParameter(MaxNumberOfEvaluators.class, 2); // 1 
evaluator per rack
+    containerManager = injector.getInstance(ContainerManager.class); // inject 
containerManager with this updated info
+    sendNodeDescriptors();
+    resourceManager = new ResourceManager(containerManager, 
mockResourceAllocationHandler, mockRuntimeStatusHandler,
+        JVM_HEAP_SLACK, configurationSerializer, remoteManager,
+        filenames, loggingScopeFactory);
+    final ResourceRequestEvent request = 
ResourceRequestEventImpl.newBuilder().setResourceCount(2).setVirtualCores(1)
+        
.setMemorySize(64).addRackName(availableRacks.get(0)).addRackName(availableRacks.get(1)).build();
+    // When
+    resourceManager.onResourceRequest(request);
+    // Then
+    final ArgumentCaptor<ResourceAllocationEvent> argument = 
ArgumentCaptor.forClass(ResourceAllocationEvent.class);
+    verify(mockResourceAllocationHandler, times(2)).onNext(argument.capture());
+    final List<ResourceAllocationEvent> actualResourceAllocationEvent = 
argument.getAllValues();
+    Assert.assertEquals("/rack1", 
actualResourceAllocationEvent.get(0).getRackName().get());
+    Assert.assertEquals("/rack2", 
actualResourceAllocationEvent.get(1).getRackName().get());
+    verify(mockRuntimeStatusHandler, 
times(3)).onNext(any(RuntimeStatusEvent.class));
+  }
+
+  @Test
+  public void testTwoAllocationsOnFourContainersAvailableInDefaultRack() 
throws InjectionException {
+    // Given
+    containerManager = injector.getInstance(ContainerManager.class);
+    sendNodeDescriptors();
+    resourceManager = new ResourceManager(containerManager, 
mockResourceAllocationHandler, mockRuntimeStatusHandler,
+        JVM_HEAP_SLACK, configurationSerializer, remoteManager,
+        filenames, loggingScopeFactory);
+    final ResourceRequestEvent request = 
ResourceRequestEventImpl.newBuilder().setResourceCount(2).setVirtualCores(1)
+        .setMemorySize(64).build();
+    // When
+    resourceManager.onResourceRequest(request);
+    // Then
+    verify(mockResourceAllocationHandler, 
times(2)).onNext(any(ResourceAllocationEvent.class));
+    verify(mockRuntimeStatusHandler, 
times(3)).onNext(any(RuntimeStatusEvent.class));
+  }
+
+  @Test
+  public void testOneAllocationInRack1AndTwoInDatacenter2() throws 
InjectionException {
+    // Given
+    final List<String> availableRacks = Arrays.asList("/dc1/rack1", 
"/dc2/rack1", "/dc2/rack2");
+    final Set<String> availableRacksSet = new HashSet<String>(availableRacks);
+    injector.bindVolatileParameter(RackNames.class, availableRacksSet); // 3 
available racks
+    injector.bindVolatileParameter(MaxNumberOfEvaluators.class, 3); // 1 
evaluator per rack
+    containerManager = injector.getInstance(ContainerManager.class);
+    sendNodeDescriptors();
+    resourceManager = new ResourceManager(containerManager, 
mockResourceAllocationHandler, mockRuntimeStatusHandler,
+        JVM_HEAP_SLACK, configurationSerializer, remoteManager,
+        filenames, loggingScopeFactory);
+    final ResourceRequestEvent request = 
ResourceRequestEventImpl.newBuilder().setResourceCount(3).setVirtualCores(1)
+        .setMemorySize(64).addRackName("dc1/*").addRackName("/dc2/*").build();
+    // When
+    resourceManager.onResourceRequest(request);
+    // Then
+    final ArgumentCaptor<ResourceAllocationEvent> argument = 
ArgumentCaptor.forClass(ResourceAllocationEvent.class);
+    verify(mockResourceAllocationHandler, times(3)).onNext(argument.capture());
+    final List<ResourceAllocationEvent> actualResourceAllocationEvent = 
argument.getAllValues();
+    
Assert.assertTrue(actualResourceAllocationEvent.get(0).getRackName().get().contains("/dc1"));
+    
Assert.assertTrue(actualResourceAllocationEvent.get(1).getRackName().get().contains("/dc2"));
+    
Assert.assertTrue(actualResourceAllocationEvent.get(2).getRackName().get().contains("/dc2"));
+    verify(mockRuntimeStatusHandler, 
times(4)).onNext(any(RuntimeStatusEvent.class));
+  }
+
+  @Test
+  public void testAllocateNode8AndTwoRandomOnesInDefaultRack() throws 
InjectionException {
+    // Given
+    injector.bindVolatileParameter(MaxNumberOfEvaluators.class, 8); // 8 
evaluator in the default rack
+    containerManager = injector.getInstance(ContainerManager.class);
+    sendNodeDescriptors();
+    resourceManager = new ResourceManager(containerManager, 
mockResourceAllocationHandler, mockRuntimeStatusHandler,
+        JVM_HEAP_SLACK, configurationSerializer, remoteManager,
+        filenames, loggingScopeFactory);
+    final ResourceRequestEvent request = 
ResourceRequestEventImpl.newBuilder().setResourceCount(3).setVirtualCores(1)
+        .setMemorySize(64).addNodeName("Node-8").build();
+    // When
+    resourceManager.onResourceRequest(request);
+    // Then
+    final ArgumentCaptor<ResourceAllocationEvent> argument = 
ArgumentCaptor.forClass(ResourceAllocationEvent.class);
+    verify(mockResourceAllocationHandler, times(3)).onNext(argument.capture());
+    final List<ResourceAllocationEvent> actualResourceAllocationEvent = 
argument.getAllValues();
+    Assert.assertEquals("Node-8", 
actualResourceAllocationEvent.get(0).getNodeId());
+    Assert.assertEquals(RackNames.DEFAULT_RACK_NAME, 
actualResourceAllocationEvent.get(0).getRackName().get());
+    Assert.assertNotEquals("Node-8", 
actualResourceAllocationEvent.get(1).getNodeId());
+    Assert.assertEquals(RackNames.DEFAULT_RACK_NAME, 
actualResourceAllocationEvent.get(1).getRackName().get());
+    Assert.assertNotEquals("Node-8", 
actualResourceAllocationEvent.get(2).getNodeId());
+    Assert.assertEquals(RackNames.DEFAULT_RACK_NAME, 
actualResourceAllocationEvent.get(2).getRackName().get());
+    verify(mockRuntimeStatusHandler, 
times(4)).onNext(any(RuntimeStatusEvent.class));
+
+  }
+
+  @Test
+  public void 
testOneAllocationInRack1AndTwoInDifferentRacksDueToRelaxLocality() throws 
InjectionException {
+    // Given
+    final List<String> availableRacks = Arrays.asList("/dc1/rack1", 
"/dc2/rack1", "/dc3/rack1");
+    final Set<String> availableRacksSet = new HashSet<String>(availableRacks);
+    injector.bindVolatileParameter(RackNames.class, availableRacksSet);
+    injector.bindVolatileParameter(MaxNumberOfEvaluators.class, 3); // 3 
evaluators in three different racks
+    containerManager = injector.getInstance(ContainerManager.class);
+    sendNodeDescriptors();
+    resourceManager = new ResourceManager(containerManager, 
mockResourceAllocationHandler, mockRuntimeStatusHandler,
+        JVM_HEAP_SLACK, configurationSerializer, remoteManager,
+        filenames, loggingScopeFactory);
+    final ResourceRequestEvent request = 
ResourceRequestEventImpl.newBuilder().setResourceCount(3).setVirtualCores(1)
+        .setMemorySize(64).addRackName("/dc3/rack1").addRackName("/*").build();
+    // When
+    resourceManager.onResourceRequest(request);
+    // Then
+    final ArgumentCaptor<ResourceAllocationEvent> argument = 
ArgumentCaptor.forClass(ResourceAllocationEvent.class);
+    verify(mockResourceAllocationHandler, times(3)).onNext(argument.capture());
+    final List<ResourceAllocationEvent> actualResourceAllocationEvent = 
argument.getAllValues();
+    Assert.assertEquals("/dc3/rack1", 
actualResourceAllocationEvent.get(0).getRackName().get());
+    Assert.assertNotEquals("/dc3/rack1", 
actualResourceAllocationEvent.get(1).getRackName().get());
+    Assert.assertNotEquals("/dc3/rack1", 
actualResourceAllocationEvent.get(2).getRackName().get());
+    verify(mockRuntimeStatusHandler, 
times(4)).onNext(any(RuntimeStatusEvent.class));
+
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/cb134fe3/lang/java/reef-tests/src/test/java/org/apache/reef/tests/rack/awareness/RackAwareEvaluatorTest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/rack/awareness/RackAwareEvaluatorTest.java
 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/rack/awareness/RackAwareEvaluatorTest.java
index b9a7bd6..0f6c0a1 100644
--- 
a/lang/java/reef-tests/src/test/java/org/apache/reef/tests/rack/awareness/RackAwareEvaluatorTest.java
+++ 
b/lang/java/reef-tests/src/test/java/org/apache/reef/tests/rack/awareness/RackAwareEvaluatorTest.java
@@ -55,15 +55,16 @@ public final class RackAwareEvaluatorTest {
   }
 
   /**
-  * Tests whether the runtime passes the rack information to the driver
-  * The success scenario is if it receives the default rack, fails otherwise
+  * Tests whether the runtime passes the rack information to the driver.
+  * The success scenario is if it receives the default rack, fails otherwise.
   */
   @Test
   public void testRackAwareEvaluatorRunningOnDefaultRack() {
     //Given
     final Configuration driverConfiguration = DriverConfiguration.CONF
         .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_RackAwareEvaluator")
-        .set(DriverConfiguration.GLOBAL_LIBRARIES, 
EnvironmentUtils.getClassLocation(RackAwareEvaluatorTestDriver.class))
+        .set(DriverConfiguration.GLOBAL_LIBRARIES,
+            
EnvironmentUtils.getClassLocation(RackAwareEvaluatorTestDriver.class))
         .set(DriverConfiguration.ON_DRIVER_STARTED, 
OnDriverStartedAllocateOne.class)
         .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, 
RackAwareEvaluatorTestDriver.EvaluatorAllocatedHandler.class)
         .build();
@@ -75,15 +76,20 @@ public final class RackAwareEvaluatorTest {
   }
 
   /**
-   * Test whether the runtime passes the rack information to the driver
+   * Test whether the runtime passes the rack information to the driver.
    * The success scenario is if it receives rack1, fails otherwise
    */
-  @Test
+  //@Test
+  // TODO Re-enable once we define the API to specify the information where
+  // resources should run on (JIRA REEF-416)
+  // OnDriverStartedAllocateOne will need to be replaced, and contain that it
+  // wants to run in RACK1, which will be the only one available
   public void testRackAwareEvaluatorRunningOnRack1() throws InjectionException 
{
     //Given
     final Configuration driverConfiguration = DriverConfiguration.CONF
         .set(DriverConfiguration.DRIVER_IDENTIFIER, "TEST_RackAwareEvaluator")
-        .set(DriverConfiguration.GLOBAL_LIBRARIES, 
EnvironmentUtils.getClassLocation(RackAwareEvaluatorTestDriver.class))
+        .set(DriverConfiguration.GLOBAL_LIBRARIES,
+            
EnvironmentUtils.getClassLocation(RackAwareEvaluatorTestDriver.class))
         .set(DriverConfiguration.ON_DRIVER_STARTED, 
OnDriverStartedAllocateOne.class)
         .set(DriverConfiguration.ON_EVALUATOR_ALLOCATED, 
RackAwareEvaluatorTestDriver.EvaluatorAllocatedHandler.class)
         .build();

Reply via email to