Repository: incubator-reef
Updated Branches:
  refs/heads/master 6c8a46083 -> 2d0abfddd


[REEF-621] Set relaxLocality flag to ResourceRequestEvent objects

Currently, the relax locality flag is not being set when requesting
resources.  Therefore, YarnResourceRequestHandler always sets it to
true.  This change adds the flag when constructing the
ResourceRequestEvent object.  In order to be backwards compatible, by
default the flag will be set to true.  If the user defined rack names,
and if none of those racks is the ANY modifier *, we will set it to
false.

JIRA:
  [REEF-621](https://issues.apache.org/jira/browse/REEF-621)

Pull Request:
  Closes #436


Project: http://git-wip-us.apache.org/repos/asf/incubator-reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-reef/commit/2d0abfdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-reef/tree/2d0abfdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-reef/diff/2d0abfdd

Branch: refs/heads/master
Commit: 2d0abfddd4744d8c2fb7883393e2a6fcc701fb70
Parents: 6c8a460
Author: Ignacio Cano <[email protected]>
Authored: Fri Aug 28 16:27:19 2015 -0700
Committer: Markus Weimer <[email protected]>
Committed: Mon Aug 31 09:45:16 2015 -0700

----------------------------------------------------------------------
 .../common/driver/EvaluatorRequestorImpl.java   | 16 ++++++++
 .../reef/runtime/common/utils/Constants.java    | 43 ++++++++++++++++++++
 .../loading/api/DataLoadingRequestBuilder.java  |  5 ++-
 .../impl/DistributedDataSetPartition.java       |  6 ---
 ...iDataCenterEvaluatorToPartitionStrategy.java | 13 +++---
 .../runtime/local/driver/ContainerManager.java  | 13 +++---
 6 files changed, 74 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d0abfdd/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
index ef3294a..78e4d28 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
@@ -24,6 +24,7 @@ import org.apache.reef.driver.evaluator.EvaluatorRequestor;
 import org.apache.reef.runtime.common.driver.api.ResourceRequestEvent;
 import org.apache.reef.runtime.common.driver.api.ResourceRequestEventImpl;
 import org.apache.reef.runtime.common.driver.api.ResourceRequestHandler;
+import org.apache.reef.runtime.common.utils.Constants;
 import org.apache.reef.util.logging.LoggingScope;
 import org.apache.reef.util.logging.LoggingScopeFactory;
 
@@ -77,6 +78,20 @@ public final class EvaluatorRequestorImpl implements 
EvaluatorRequestor {
       throw new IllegalArgumentException("Rack names cannot be null");
     }
 
+    // for backwards compatibility, we will always set the relax locality flag
+    // to true unless the user configured racks, in which case we will check 
for
+    // the ANY modifier (*), if not there, then we won't relax the locality
+    boolean relaxLocality = true;
+    if (!req.getRackNames().isEmpty()) {
+      for (final String rackName : req.getRackNames()) {
+        if (Constants.ANY_RACK.equals(rackName)) {
+          relaxLocality = true;
+          break;
+        }
+        relaxLocality = false;
+      }
+    }
+
     try (LoggingScope ls = 
loggingScopeFactory.evaluatorSubmit(req.getNumber())) {
       final ResourceRequestEvent request = ResourceRequestEventImpl
           .newBuilder()
@@ -85,6 +100,7 @@ public final class EvaluatorRequestorImpl implements 
EvaluatorRequestor {
           .setMemorySize(req.getMegaBytes())
           .addNodeNames(req.getNodeNames())
           .addRackNames(req.getRackNames())
+          .setRelaxLocality(relaxLocality)
           .build();
       this.resourceRequestHandler.onNext(request);
     }

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d0abfdd/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/Constants.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/Constants.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/Constants.java
new file mode 100644
index 0000000..15fb7a0
--- /dev/null
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/utils/Constants.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.common.utils;
+
+/**
+ * Constants used by different REEF modules.
+ */
+public final class Constants {
+
+  /**
+   * Any modifier. Used as a wildcard to specify that evaluators can be
+   * allocated in any rack.
+   */
+  public static final String ANY_RACK = "*";
+
+  /**
+   * Rack path separator. Used to separate the fully qualified rack name of an
+   * evaluator, e.g. /dc1/room1/rack1
+   */
+  public static final String RACK_PATH_SEPARATOR = "/";
+
+  /**
+   * Empty private constructor to prohibit instantiation of utility class.
+   */
+  private Constants() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d0abfdd/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
index d835aac..82a4b85 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/api/DataLoadingRequestBuilder.java
@@ -30,6 +30,7 @@ import 
org.apache.reef.io.data.loading.impl.DistributedDataSetPartition;
 import org.apache.reef.io.data.loading.impl.InputFormatLoadingService;
 import org.apache.reef.io.data.loading.impl.JobConfExternalConstructor;
 import 
org.apache.reef.io.data.loading.impl.MultiDataCenterEvaluatorToPartitionStrategy;
+import org.apache.reef.runtime.common.utils.Constants;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.JavaConfigurationBuilder;
 import org.apache.reef.tang.Tang;
@@ -52,7 +53,7 @@ public final class DataLoadingRequestBuilder
   // constant used in several places.
   private static final int UNINITIALIZED = -1;
   private int numberOfDesiredSplits = UNINITIALIZED;
-  private List<EvaluatorRequest> computeRequests = new ArrayList<>();
+  private final List<EvaluatorRequest> computeRequests = new ArrayList<>();
   private final List<EvaluatorRequest> dataRequests = new ArrayList<>();
   private boolean inMemory = false;
   private boolean renewFailedEvaluators = true;
@@ -207,7 +208,7 @@ public final class DataLoadingRequestBuilder
       dds.addPartition(DistributedDataSetPartition
           .newBuilder()
           .setPath(inputPath)
-          .setLocation(DistributedDataSetPartition.LOAD_INTO_ANY_LOCATION)
+          .setLocation(Constants.ANY_RACK)
           .setDesiredSplits(
               numberOfDesiredSplits > 0 ? numberOfDesiredSplits : Integer
                   
.valueOf(NumberOfDesiredSplits.DEFAULT_DESIRED_SPLITS)).build());

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d0abfdd/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartition.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartition.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartition.java
index 7cc52c1..eb17615 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartition.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/DistributedDataSetPartition.java
@@ -32,12 +32,6 @@ import org.apache.reef.annotations.Unstable;
 public final class DistributedDataSetPartition {
 
   /**
-   * Constant to specify that the data partition could be loaded into any
-   * location.
-   */
-  public static final String LOAD_INTO_ANY_LOCATION = "/*";
-
-  /**
    * The path of the distributed data set partition. If we use HDFS, it will 
be the
    * hdfs path.
    */

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d0abfdd/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.java
 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.java
index 9a17626..ce21a8b 100644
--- 
a/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.java
+++ 
b/lang/java/reef-io/src/main/java/org/apache/reef/io/data/loading/impl/MultiDataCenterEvaluatorToPartitionStrategy.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.reef.annotations.Unstable;
 import org.apache.reef.annotations.audience.DriverSide;
 import org.apache.reef.driver.catalog.NodeDescriptor;
+import org.apache.reef.runtime.common.utils.Constants;
 import org.apache.reef.tang.annotations.Parameter;
 
 import java.util.Collections;
@@ -47,8 +48,6 @@ import javax.inject.Inject;
 public final class MultiDataCenterEvaluatorToPartitionStrategy extends 
AbstractEvaluatorToPartitionStrategy {
   private static final Logger LOG = 
Logger.getLogger(MultiDataCenterEvaluatorToPartitionStrategy.class.getName());
 
-  private static final String PATH_SEPARATOR = "/";
-  private static final String ANY = "*";
   /**
    * Sorted set in reverse order, to keep track of the locations from most to
    * least specific. For example: [/dc1/room1, /dc1].
@@ -137,15 +136,15 @@ public final class 
MultiDataCenterEvaluatorToPartitionStrategy extends AbstractE
   private String normalize(final String location) {
     String loc = location;
     // should start with a separator
-    if (!loc.startsWith(PATH_SEPARATOR)) {
-      loc = PATH_SEPARATOR + loc;
+    if (!loc.startsWith(Constants.RACK_PATH_SEPARATOR)) {
+      loc = Constants.RACK_PATH_SEPARATOR + loc;
     }
     // if it is just /*, return /
-    if (loc.equals(PATH_SEPARATOR + ANY)) {
-      return PATH_SEPARATOR;
+    if (loc.equals(Constants.RACK_PATH_SEPARATOR + Constants.ANY_RACK)) {
+      return Constants.RACK_PATH_SEPARATOR;
     }
     // remove the ending ANY or path separator
-    while (loc.endsWith(ANY) || loc.endsWith(PATH_SEPARATOR)) {
+    while (loc.endsWith(Constants.ANY_RACK) || 
loc.endsWith(Constants.RACK_PATH_SEPARATOR)) {
       loc = loc.substring(0, loc.length() - 1);
     }
     return loc;

http://git-wip-us.apache.org/repos/asf/incubator-reef/blob/2d0abfdd/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
index a31c0e0..96978da 100644
--- 
a/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
+++ 
b/lang/java/reef-runtime-local/src/main/java/org/apache/reef/runtime/local/driver/ContainerManager.java
@@ -28,6 +28,7 @@ import 
org.apache.reef.runtime.common.driver.api.RuntimeParameters;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEvent;
 import 
org.apache.reef.runtime.common.driver.resourcemanager.NodeDescriptorEventImpl;
 import org.apache.reef.runtime.common.files.REEFFileNames;
+import org.apache.reef.runtime.common.utils.Constants;
 import org.apache.reef.runtime.common.utils.RemoteManager;
 import org.apache.reef.runtime.local.client.parameters.DefaultMemorySize;
 import org.apache.reef.runtime.local.client.parameters.DefaultNumberOfCores;
@@ -66,8 +67,6 @@ final class ContainerManager implements AutoCloseable {
   private static final Logger LOG = 
Logger.getLogger(ContainerManager.class.getName());
 
   private static final Collection<String> DEFAULT_RACKS = 
Arrays.asList(RackNames.DEFAULT_RACK_NAME);
-  private static final String PATH_SEPARATOR = "/";
-  private static final String ANY = "*";
 
   /**
    * Map from containerID -> Container.
@@ -164,15 +163,15 @@ final class ContainerManager implements AutoCloseable {
       String rackName = it.next().trim();
       Validate.notEmpty(rackName, "Rack names cannot be empty");
       // should start with a separator
-      if (!rackName.startsWith(PATH_SEPARATOR)) {
-        rackName = PATH_SEPARATOR + rackName;
+      if (!rackName.startsWith(Constants.RACK_PATH_SEPARATOR)) {
+        rackName = Constants.RACK_PATH_SEPARATOR + rackName;
       }
       // remove the ending separator
-      if (rackName.endsWith(PATH_SEPARATOR)) {
+      if (rackName.endsWith(Constants.RACK_PATH_SEPARATOR)) {
         rackName = rackName.substring(0, rackName.length() - 1);
       }
       if (validateEnd) {
-        Validate.isTrue(!rackName.endsWith(ANY));
+        Validate.isTrue(!rackName.endsWith(Constants.ANY_RACK));
       }
       normalizedRackNames.add(rackName);
     }
@@ -263,7 +262,7 @@ final class ContainerManager implements AutoCloseable {
     for (final String rackName : normalized) {
       // if it does not end with the any modifier,
       // then we should do an exact match
-      if (!rackName.endsWith(ANY)) {
+      if (!rackName.endsWith(Constants.ANY_RACK)) {
         if (freeNodesPerRack.containsKey(rackName)
             && freeNodesPerRack.get(rackName).size() > 0) {
           return Optional.of(rackName);

Reply via email to