Repository: reef
Updated Branches:
  refs/heads/master 8d0bad248 -> 24bfea49d


[REEF-2011] Enable node label expressions in evaluator requestor.

  This add the notion of node label expressions to resource manager
  requests in both Java and C#.

JIRA:
  [REEF-2011](https://issues.apache.org/jira/browse/REEF-2011)

Pull Request:
  Closes #1451


Project: http://git-wip-us.apache.org/repos/asf/reef/repo
Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/24bfea49
Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/24bfea49
Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/24bfea49

Branch: refs/heads/master
Commit: 24bfea49dbf5c956033c214bddcb6c8ff5cd2a57
Parents: 8d0bad2
Author: Tyson Condie <[email protected]>
Authored: Wed Apr 25 16:33:07 2018 -0700
Committer: Sergiy Matusevych <[email protected]>
Committed: Thu Apr 26 15:48:10 2018 -0700

----------------------------------------------------------------------
 .../EvaluatorRequestorClr2Java.cpp              |  5 ++--
 .../Evaluator/EvaluatorRequest.cs               | 20 ++++++++------
 .../Evaluator/EvaluatorRequestBuilder.cs        | 16 ++++++++++-
 .../Evaluator/IEvaluatorRequest.cs              |  6 ++++
 .../javabridge/EvaluatorRequestorBridge.java    |  4 ++-
 .../reef/driver/evaluator/EvaluatorRequest.java | 29 ++++++++++++++++++--
 .../common/driver/EvaluatorRequestorImpl.java   |  1 +
 .../common/driver/api/ResourceRequestEvent.java |  5 ++++
 .../driver/api/ResourceRequestEventImpl.java    | 19 +++++++++++++
 .../yarn/driver/YarnContainerManager.java       |  3 ++
 .../yarn/driver/YarnResourceRequestHandler.java |  6 +++-
 11 files changed, 98 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp 
b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp
index 90fe5d4..4dc9bd1 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp
@@ -53,7 +53,7 @@ namespace Org {
               
ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Submit");
               JNIEnv *env = RetrieveEnv(_jvm);
               jclass jclassEvaluatorRequestor = 
env->GetObjectClass(_jobjectEvaluatorRequestor);
-              jmethodID jmidSubmit = 
env->GetMethodID(jclassEvaluatorRequestor, "submit", 
"(IIIZLjava/lang/String;Ljava/lang/String;Ljava/util/ArrayList;)V");
+              jmethodID jmidSubmit = 
env->GetMethodID(jclassEvaluatorRequestor, "submit", 
"(IIIZLjava/lang/String;Ljava/lang/String;Ljava/util/ArrayList;Ljava/lang/String;)V");
 
               if (jmidSubmit == NULL) {
                 fprintf(stdout, " jmidSubmit is NULL\n");
@@ -69,7 +69,8 @@ namespace Org {
                 request->RelaxLocality,
                 JavaStringFromManagedString(env, request->Rack),
                 JavaStringFromManagedString(env, request->RuntimeName),
-                JavaArrayListFromManagedList(env, request->NodeNames));
+                JavaArrayListFromManagedList(env, request->NodeNames),
+                               JavaStringFromManagedString(env, 
request->NodeLabelExpression));
               
ManagedLog::LOGGER->LogStop("EvaluatorRequestorClr2Java::Submit");
             }
 

http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs 
b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
index 2c8365d..f4e827f 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
@@ -29,43 +29,43 @@ namespace Org.Apache.REEF.Driver.Evaluator
     internal class EvaluatorRequest : IEvaluatorRequest
     {
         internal EvaluatorRequest()
-            : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N"), 
string.Empty, Enumerable.Empty<string>().ToList(), true)
+            : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N"), 
string.Empty, Enumerable.Empty<string>().ToList(), true, String.Empty)
         {
         }
 
         internal EvaluatorRequest(int number, int megaBytes)
-            : this(number, megaBytes, 1, string.Empty, 
Guid.NewGuid().ToString("N"), string.Empty, 
Enumerable.Empty<string>().ToList(), true)
+            : this(number, megaBytes, 1, string.Empty, 
Guid.NewGuid().ToString("N"), string.Empty, 
Enumerable.Empty<string>().ToList(), true, string.Empty)
         {
         }
 
         internal EvaluatorRequest(int number, int megaBytes, int core)
-            : this(number, megaBytes, core, string.Empty, 
Guid.NewGuid().ToString("N"), string.Empty, 
Enumerable.Empty<string>().ToList(), true)
+            : this(number, megaBytes, core, string.Empty, 
Guid.NewGuid().ToString("N"), string.Empty, 
Enumerable.Empty<string>().ToList(), true, string.Empty)
         {
         }
 
         internal EvaluatorRequest(int number, int megaBytes, string rack)
-            : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N"), 
string.Empty, Enumerable.Empty<string>().ToList(), true)
+            : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N"), 
string.Empty, Enumerable.Empty<string>().ToList(), true, String.Empty)
         {
         }
 
         internal EvaluatorRequest(int number, int megaBytes, int core, string 
rack)
-            : this(number, megaBytes, core, rack, 
Guid.NewGuid().ToString("N"), string.Empty, 
Enumerable.Empty<string>().ToList(), true)
+            : this(number, megaBytes, core, rack, 
Guid.NewGuid().ToString("N"), string.Empty, 
Enumerable.Empty<string>().ToList(), true, string.Empty)
         {
         }
 
         internal EvaluatorRequest(int number, int megaBytes, int core, string 
rack, string evaluatorBatchId, ICollection<string> nodeNames)
-            : this(number, megaBytes, core, rack, evaluatorBatchId, 
string.Empty, nodeNames, true)
+            : this(number, megaBytes, core, rack, evaluatorBatchId, 
string.Empty, nodeNames, true, string.Empty)
 
         {
         }
 
         internal EvaluatorRequest(int number, int megaBytes, int core, string 
rack, string evaluatorBatchId, ICollection<string> nodeNames, bool 
relaxLocality)
-           : this(number, megaBytes, core, rack, evaluatorBatchId, 
string.Empty, nodeNames, relaxLocality)
+           : this(number, megaBytes, core, rack, evaluatorBatchId, 
string.Empty, nodeNames, relaxLocality, string.Empty)
 
         {
         }
 
-        internal EvaluatorRequest(int number, int megaBytes, int core, string 
rack, string evaluatorBatchId, string runtimeName, ICollection<string> 
nodeNames, bool relaxLocality)
+        internal EvaluatorRequest(int number, int megaBytes, int core, string 
rack, string evaluatorBatchId, string runtimeName, ICollection<string> 
nodeNames, bool relaxLocality, string nodeLabelExpression)
         {
             Number = number;
             MemoryMegaBytes = megaBytes;
@@ -75,6 +75,7 @@ namespace Org.Apache.REEF.Driver.Evaluator
             RuntimeName = runtimeName;
             NodeNames = nodeNames;
             RelaxLocality = relaxLocality;
+            NodeLabelExpression = NodeLabelExpression;
         }
 
         [DataMember]
@@ -101,6 +102,9 @@ namespace Org.Apache.REEF.Driver.Evaluator
         [DataMember]
         public bool RelaxLocality { get; private set; }
 
+        [DataMember]
+        public string NodeLabelExpression { get; private set; }
+
         internal static EvaluatorRequestBuilder NewBuilder()
         {
             return new EvaluatorRequestBuilder();

http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs
----------------------------------------------------------------------
diff --git 
a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs 
b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs
index 6ccd1fb..3554bb6 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs
@@ -29,6 +29,7 @@ namespace Org.Apache.REEF.Driver.Evaluator
         private string _runtimeName;
         private ICollection<string> _nodeNames;
         private bool _relaxLocality;
+        private string _nodeLabelExpression;
 
         internal EvaluatorRequestBuilder(IEvaluatorRequest request)
         {
@@ -40,6 +41,7 @@ namespace Org.Apache.REEF.Driver.Evaluator
             _runtimeName = request.RuntimeName;
             _nodeNames = request.NodeNames;
             _relaxLocality = request.RelaxLocality;
+            _nodeLabelExpression = request.NodeLabelExpression;
         }
 
         internal EvaluatorRequestBuilder()
@@ -52,6 +54,7 @@ namespace Org.Apache.REEF.Driver.Evaluator
             _runtimeName = string.Empty;
             _nodeNames = Enumerable.Empty<string>().ToList();
             _relaxLocality = true;
+            _nodeLabelExpression = string.Empty;
         }
 
         public int Number { get; private set; }
@@ -158,12 +161,23 @@ namespace Org.Apache.REEF.Driver.Evaluator
         }
 
         /// <summary>
+        /// Set the node label expression.
+        /// </summary>
+        /// <param name="nodeLabelExpression">describing a desired node 
type.</param>
+        /// <returns></returns>
+        public EvaluatorRequestBuilder SetNodeLabelExpression(string 
nodeLabelExpression)
+        {
+            _nodeLabelExpression = nodeLabelExpression;
+            return this;
+        }
+
+        /// <summary>
         /// Build the EvaluatorRequest.
         /// </summary>
         /// <returns></returns>
         public IEvaluatorRequest Build()
         {
-            return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: 
_rackName, evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName, 
nodeNames: _nodeNames, relaxLocality: _relaxLocality);
+            return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: 
_rackName, evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName, 
nodeNames: _nodeNames, relaxLocality: _relaxLocality, nodeLabelExpression: 
_nodeLabelExpression);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs
----------------------------------------------------------------------
diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs 
b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs
index 357ffe3..0ed0545 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs
@@ -73,5 +73,11 @@ namespace Org.Apache.REEF.Driver.Evaluator
         /// the corresponding any-level request should have locality 
relaxation set to false.
         /// </summary>
         bool RelaxLocality { get; }
+
+        /// <summary>
+        /// For specifying a node label expression that can be used by the 
resource manager
+        /// to aquire certain container types.
+        /// </summary>
+        string NodeLabelExpression { get; }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
index 518537d..2327846 100644
--- 
a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
+++ 
b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
@@ -69,7 +69,8 @@ public final class EvaluatorRequestorBridge extends 
NativeBridge {
                      final boolean relaxLocality,
                      final String rack,
                      final String runtimeName,
-                     final ArrayList<String> nodeNames) {
+                     final ArrayList<String> nodeNames,
+                     final String nodeLabelExpression) {
     if (this.isBlocked) {
       throw new RuntimeException("Cannot request additional Evaluator, this is 
probably because " +
           "the Driver has crashed and restarted, and cannot ask for new 
container due to YARN-2433.");
@@ -89,6 +90,7 @@ public final class EvaluatorRequestorBridge extends 
NativeBridge {
           .setRuntimeName(runtimeName)
           .setRelaxLocality(relaxLocality)
           .addNodeNames(nodeNames)
+          .setNodeLabelExpression(nodeLabelExpression)
           .build();
 
       LOG.log(Level.FINE, "submitting evaluator request {0}", request);

http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
index 4c48a24..b539a9b 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
@@ -41,6 +41,7 @@ public final class EvaluatorRequest {
   private final List<String> rackNames;
   private final String runtimeName;
   private final boolean relaxLocality;
+  private final String nodeLabelExpression;
 
   EvaluatorRequest(final int number,
                    final int megaBytes,
@@ -56,7 +57,7 @@ public final class EvaluatorRequest {
                    final List<String> nodeNames,
                    final List<String> rackNames,
                    final String runtimeName) {
-    this(number, megaBytes, cores, nodeNames, rackNames, runtimeName, true);
+    this(number, megaBytes, cores, nodeNames, rackNames, runtimeName, true, 
null);
   }
 
 
@@ -66,7 +67,8 @@ public final class EvaluatorRequest {
                    final List<String> nodeNames,
                    final List<String> rackNames,
                    final String runtimeName,
-                   final boolean relaxLocality) {
+                   final boolean relaxLocality,
+                   final String nodeLabelExpression) {
     this.number = number;
     this.megaBytes = megaBytes;
     this.cores = cores;
@@ -74,6 +76,7 @@ public final class EvaluatorRequest {
     this.rackNames = rackNames;
     this.runtimeName = runtimeName;
     this.relaxLocality = relaxLocality;
+    this.nodeLabelExpression = nodeLabelExpression;
   }
 
   /**
@@ -158,6 +161,13 @@ public final class EvaluatorRequest {
     return relaxLocality;
   }
 
+  /**
+   * Node label expression.
+   * @return string expression
+   */
+  public String getNodeLabelExpression() {
+    return nodeLabelExpression;
+  }
 
   /**
    * {@link EvaluatorRequest}s are build using this Builder.
@@ -171,6 +181,7 @@ public final class EvaluatorRequest {
     private final List<String> rackNames = new ArrayList<>();
     private String runtimeName = "";
     private boolean relaxLocality = true; //if not set, default to true
+    private String nodeLabelExpression = null;
 
     @Private
     public Builder() {
@@ -194,6 +205,7 @@ public final class EvaluatorRequest {
       for (final String rackName : request.getRackNames()) {
         addRackName(rackName);
       }
+      setNodeLabelExpression(request.getNodeLabelExpression());
     }
 
     /**
@@ -300,12 +312,23 @@ public final class EvaluatorRequest {
     }
 
     /**
+     * A string expression that describes the node type being requested.
+     * @param nodeLabelExpr describing node type
+     * @return this Builder.
+     */
+    public T setNodeLabelExpression(final String nodeLabelExpr) {
+      this.nodeLabelExpression = nodeLabelExpr;
+      return (T) this;
+    }
+
+    /**
      * Builds the {@link EvaluatorRequest}.
      */
     @Override
     public EvaluatorRequest build() {
       return new EvaluatorRequest(this.n, this.megaBytes, this.cores, 
this.nodeNames,
-                                  this.rackNames, this.runtimeName, 
this.relaxLocality);
+          this.rackNames, this.runtimeName, this.relaxLocality,
+          this.nodeLabelExpression);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
index 08b385d..a007805 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
@@ -109,6 +109,7 @@ public final class EvaluatorRequestorImpl implements 
EvaluatorRequestor {
           .addRackNames(req.getRackNames())
           .setRelaxLocality(relaxLocality)
           .setRuntimeName(req.getRuntimeName())
+          .setNodeLabelExpression(req.getNodeLabelExpression())
           .build();
       this.resourceRequestHandler.onNext(request);
     }

http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java
index b8ce952..735662f 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java
@@ -73,6 +73,11 @@ public interface ResourceRequestEvent {
   Optional<Boolean> getRelaxLocality();
 
   /**
+   * @return label expression that can further describe the desired resources.
+   */
+  Optional<String> getNodeLabelExpression();
+
+  /**
    * @return The runtime name
    */
   String getRuntimeName();

http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java
 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java
index f385087..4b43265 100644
--- 
a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java
+++ 
b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java
@@ -36,6 +36,7 @@ public final class ResourceRequestEventImpl implements 
ResourceRequestEvent {
   private final Optional<Integer> priority;
   private final Optional<Integer> virtualCores;
   private final Optional<Boolean> relaxLocality;
+  private final Optional<String> nodeLabelExpression;
   private final String runtimeName;
 
   private ResourceRequestEventImpl(final Builder builder) {
@@ -46,6 +47,7 @@ public final class ResourceRequestEventImpl implements 
ResourceRequestEvent {
     this.priority = Optional.ofNullable(builder.priority);
     this.virtualCores = Optional.ofNullable(builder.virtualCores);
     this.relaxLocality = Optional.ofNullable(builder.relaxLocality);
+    this.nodeLabelExpression = 
Optional.ofNullable(builder.nodeLabelExpression);
     this.runtimeName = builder.runtimeName == null ? "" : builder.runtimeName;
   }
 
@@ -89,6 +91,11 @@ public final class ResourceRequestEventImpl implements 
ResourceRequestEvent {
     return runtimeName;
   }
 
+  @Override
+  public Optional<String> getNodeLabelExpression() {
+    return nodeLabelExpression;
+  }
+
   public static Builder newBuilder() {
     return new Builder();
   }
@@ -105,6 +112,7 @@ public final class ResourceRequestEventImpl implements 
ResourceRequestEvent {
     private Integer virtualCores;
     private Boolean relaxLocality;
     private String runtimeName;
+    private String nodeLabelExpression;
 
     /**
      * Create a builder from an existing ResourceRequestEvent.
@@ -118,6 +126,9 @@ public final class ResourceRequestEventImpl implements 
ResourceRequestEvent {
       this.virtualCores = resourceRequestEvent.getVirtualCores().orElse(null);
       this.relaxLocality = 
resourceRequestEvent.getRelaxLocality().orElse(null);
       this.runtimeName = resourceRequestEvent.getRuntimeName();
+      this.nodeLabelExpression = resourceRequestEvent
+          .getNodeLabelExpression()
+          .orElse(null);
       return this;
     }
 
@@ -209,6 +220,14 @@ public final class ResourceRequestEventImpl implements 
ResourceRequestEvent {
       return this;
     }
 
+    /**
+     * @see ResourceRequestEvent#getNodeLabelExpression()
+     */
+    public Builder setNodeLabelExpression(final String nodeLabelExpression) {
+      this.nodeLabelExpression = nodeLabelExpression;
+      return this;
+    }
+
     @Override
     public ResourceRequestEvent build() {
       return new ResourceRequestEventImpl(this);

http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
index de8d0b1..299af5b 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java
@@ -562,8 +562,11 @@ final class YarnContainerManager implements 
AMRMClientAsync.CallbackHandler, NMC
   }
 
   private boolean isSameKindOfRequest(final AMRMClient.ContainerRequest r1, 
final AMRMClient.ContainerRequest r2) {
+    final boolean nodeLabelExpressionIsEqual = r1.getNodeLabelExpression() == 
r2.getNodeLabelExpression() ||
+        (r1.getNodeLabelExpression() != null && 
r1.getNodeLabelExpression().equals(r2.getNodeLabelExpression()));
     return r1.getPriority().compareTo(r2.getPriority()) == 0
         && r1.getCapability().compareTo(r2.getCapability()) == 0
+        && nodeLabelExpressionIsEqual
         && r1.getRelaxLocality() == r2.getRelaxLocality()
         && ListUtils.isEqualList(r1.getNodes(), r2.getNodes())
         && ListUtils.isEqualList(r1.getRacks(), r2.getRacks());

http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java
 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java
index baab946..8c6f367 100644
--- 
a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java
+++ 
b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java
@@ -18,6 +18,7 @@
  */
 package org.apache.reef.runtime.yarn.driver;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
@@ -64,12 +65,15 @@ public final class YarnResourceRequestHandler implements 
ResourceRequestHandler
     final Priority pri = getPriority(resourceRequestEvent);
     final Resource resource = getResource(resourceRequestEvent);
     final boolean relaxLocality = 
resourceRequestEvent.getRelaxLocality().orElse(true);
+    final String nodeLabelExpression = 
resourceRequestEvent.getNodeLabelExpression().orElse("");
 
     final AMRMClient.ContainerRequest[] containerRequests =
         new 
AMRMClient.ContainerRequest[resourceRequestEvent.getResourceCount()];
 
     for (int i = 0; i < resourceRequestEvent.getResourceCount(); i++) {
-      containerRequests[i] = new AMRMClient.ContainerRequest(resource, nodes, 
racks, pri, relaxLocality);
+      containerRequests[i] =
+          new AMRMClient.ContainerRequest(resource, nodes, racks, pri, 
relaxLocality,
+              StringUtils.isEmpty(nodeLabelExpression) ? null : 
nodeLabelExpression);
     }
     this.yarnContainerRequestHandler.onContainerRequest(containerRequests);
   }

Reply via email to