Repository: reef Updated Branches: refs/heads/master 8d0bad248 -> 24bfea49d
[REEF-2011] Enable node label expressions in evaluator requestor. This add the notion of node label expressions to resource manager requests in both Java and C#. JIRA: [REEF-2011](https://issues.apache.org/jira/browse/REEF-2011) Pull Request: Closes #1451 Project: http://git-wip-us.apache.org/repos/asf/reef/repo Commit: http://git-wip-us.apache.org/repos/asf/reef/commit/24bfea49 Tree: http://git-wip-us.apache.org/repos/asf/reef/tree/24bfea49 Diff: http://git-wip-us.apache.org/repos/asf/reef/diff/24bfea49 Branch: refs/heads/master Commit: 24bfea49dbf5c956033c214bddcb6c8ff5cd2a57 Parents: 8d0bad2 Author: Tyson Condie <[email protected]> Authored: Wed Apr 25 16:33:07 2018 -0700 Committer: Sergiy Matusevych <[email protected]> Committed: Thu Apr 26 15:48:10 2018 -0700 ---------------------------------------------------------------------- .../EvaluatorRequestorClr2Java.cpp | 5 ++-- .../Evaluator/EvaluatorRequest.cs | 20 ++++++++------ .../Evaluator/EvaluatorRequestBuilder.cs | 16 ++++++++++- .../Evaluator/IEvaluatorRequest.cs | 6 ++++ .../javabridge/EvaluatorRequestorBridge.java | 4 ++- .../reef/driver/evaluator/EvaluatorRequest.java | 29 ++++++++++++++++++-- .../common/driver/EvaluatorRequestorImpl.java | 1 + .../common/driver/api/ResourceRequestEvent.java | 5 ++++ .../driver/api/ResourceRequestEventImpl.java | 19 +++++++++++++ .../yarn/driver/YarnContainerManager.java | 3 ++ .../yarn/driver/YarnResourceRequestHandler.java | 6 +++- 11 files changed, 98 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp index 90fe5d4..4dc9bd1 100644 --- a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp +++ b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp @@ -53,7 +53,7 @@ namespace Org { ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Submit"); JNIEnv *env = RetrieveEnv(_jvm); jclass jclassEvaluatorRequestor = env->GetObjectClass(_jobjectEvaluatorRequestor); - jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(IIIZLjava/lang/String;Ljava/lang/String;Ljava/util/ArrayList;)V"); + jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(IIIZLjava/lang/String;Ljava/lang/String;Ljava/util/ArrayList;Ljava/lang/String;)V"); if (jmidSubmit == NULL) { fprintf(stdout, " jmidSubmit is NULL\n"); @@ -69,7 +69,8 @@ namespace Org { request->RelaxLocality, JavaStringFromManagedString(env, request->Rack), JavaStringFromManagedString(env, request->RuntimeName), - JavaArrayListFromManagedList(env, request->NodeNames)); + JavaArrayListFromManagedList(env, request->NodeNames), + JavaStringFromManagedString(env, request->NodeLabelExpression)); ManagedLog::LOGGER->LogStop("EvaluatorRequestorClr2Java::Submit"); } http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs index 2c8365d..f4e827f 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs @@ -29,43 +29,43 @@ namespace Org.Apache.REEF.Driver.Evaluator internal class EvaluatorRequest : IEvaluatorRequest { internal EvaluatorRequest() - : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true) + : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, String.Empty) { } internal EvaluatorRequest(int number, int megaBytes) - : this(number, megaBytes, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true) + : this(number, megaBytes, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, string.Empty) { } internal EvaluatorRequest(int number, int megaBytes, int core) - : this(number, megaBytes, core, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true) + : this(number, megaBytes, core, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, string.Empty) { } internal EvaluatorRequest(int number, int megaBytes, string rack) - : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true) + : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, String.Empty) { } internal EvaluatorRequest(int number, int megaBytes, int core, string rack) - : this(number, megaBytes, core, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true) + : this(number, megaBytes, core, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true, string.Empty) { } internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, ICollection<string> nodeNames) - : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, true) + : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, true, string.Empty) { } internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, ICollection<string> nodeNames, bool relaxLocality) - : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, relaxLocality) + : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, relaxLocality, string.Empty) { } - internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, string runtimeName, ICollection<string> nodeNames, bool relaxLocality) + internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, string runtimeName, ICollection<string> nodeNames, bool relaxLocality, string nodeLabelExpression) { Number = number; MemoryMegaBytes = megaBytes; @@ -75,6 +75,7 @@ namespace Org.Apache.REEF.Driver.Evaluator RuntimeName = runtimeName; NodeNames = nodeNames; RelaxLocality = relaxLocality; + NodeLabelExpression = NodeLabelExpression; } [DataMember] @@ -101,6 +102,9 @@ namespace Org.Apache.REEF.Driver.Evaluator [DataMember] public bool RelaxLocality { get; private set; } + [DataMember] + public string NodeLabelExpression { get; private set; } + internal static EvaluatorRequestBuilder NewBuilder() { return new EvaluatorRequestBuilder(); http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs index 6ccd1fb..3554bb6 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs @@ -29,6 +29,7 @@ namespace Org.Apache.REEF.Driver.Evaluator private string _runtimeName; private ICollection<string> _nodeNames; private bool _relaxLocality; + private string _nodeLabelExpression; internal EvaluatorRequestBuilder(IEvaluatorRequest request) { @@ -40,6 +41,7 @@ namespace Org.Apache.REEF.Driver.Evaluator _runtimeName = request.RuntimeName; _nodeNames = request.NodeNames; _relaxLocality = request.RelaxLocality; + _nodeLabelExpression = request.NodeLabelExpression; } internal EvaluatorRequestBuilder() @@ -52,6 +54,7 @@ namespace Org.Apache.REEF.Driver.Evaluator _runtimeName = string.Empty; _nodeNames = Enumerable.Empty<string>().ToList(); _relaxLocality = true; + _nodeLabelExpression = string.Empty; } public int Number { get; private set; } @@ -158,12 +161,23 @@ namespace Org.Apache.REEF.Driver.Evaluator } /// <summary> + /// Set the node label expression. + /// </summary> + /// <param name="nodeLabelExpression">describing a desired node type.</param> + /// <returns></returns> + public EvaluatorRequestBuilder SetNodeLabelExpression(string nodeLabelExpression) + { + _nodeLabelExpression = nodeLabelExpression; + return this; + } + + /// <summary> /// Build the EvaluatorRequest. /// </summary> /// <returns></returns> public IEvaluatorRequest Build() { - return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: _rackName, evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName, nodeNames: _nodeNames, relaxLocality: _relaxLocality); + return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: _rackName, evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName, nodeNames: _nodeNames, relaxLocality: _relaxLocality, nodeLabelExpression: _nodeLabelExpression); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs ---------------------------------------------------------------------- diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs index 357ffe3..0ed0545 100644 --- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs +++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs @@ -73,5 +73,11 @@ namespace Org.Apache.REEF.Driver.Evaluator /// the corresponding any-level request should have locality relaxation set to false. /// </summary> bool RelaxLocality { get; } + + /// <summary> + /// For specifying a node label expression that can be used by the resource manager + /// to aquire certain container types. + /// </summary> + string NodeLabelExpression { get; } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java index 518537d..2327846 100644 --- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java +++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java @@ -69,7 +69,8 @@ public final class EvaluatorRequestorBridge extends NativeBridge { final boolean relaxLocality, final String rack, final String runtimeName, - final ArrayList<String> nodeNames) { + final ArrayList<String> nodeNames, + final String nodeLabelExpression) { if (this.isBlocked) { throw new RuntimeException("Cannot request additional Evaluator, this is probably because " + "the Driver has crashed and restarted, and cannot ask for new container due to YARN-2433."); @@ -89,6 +90,7 @@ public final class EvaluatorRequestorBridge extends NativeBridge { .setRuntimeName(runtimeName) .setRelaxLocality(relaxLocality) .addNodeNames(nodeNames) + .setNodeLabelExpression(nodeLabelExpression) .build(); LOG.log(Level.FINE, "submitting evaluator request {0}", request); http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java index 4c48a24..b539a9b 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java @@ -41,6 +41,7 @@ public final class EvaluatorRequest { private final List<String> rackNames; private final String runtimeName; private final boolean relaxLocality; + private final String nodeLabelExpression; EvaluatorRequest(final int number, final int megaBytes, @@ -56,7 +57,7 @@ public final class EvaluatorRequest { final List<String> nodeNames, final List<String> rackNames, final String runtimeName) { - this(number, megaBytes, cores, nodeNames, rackNames, runtimeName, true); + this(number, megaBytes, cores, nodeNames, rackNames, runtimeName, true, null); } @@ -66,7 +67,8 @@ public final class EvaluatorRequest { final List<String> nodeNames, final List<String> rackNames, final String runtimeName, - final boolean relaxLocality) { + final boolean relaxLocality, + final String nodeLabelExpression) { this.number = number; this.megaBytes = megaBytes; this.cores = cores; @@ -74,6 +76,7 @@ public final class EvaluatorRequest { this.rackNames = rackNames; this.runtimeName = runtimeName; this.relaxLocality = relaxLocality; + this.nodeLabelExpression = nodeLabelExpression; } /** @@ -158,6 +161,13 @@ public final class EvaluatorRequest { return relaxLocality; } + /** + * Node label expression. + * @return string expression + */ + public String getNodeLabelExpression() { + return nodeLabelExpression; + } /** * {@link EvaluatorRequest}s are build using this Builder. @@ -171,6 +181,7 @@ public final class EvaluatorRequest { private final List<String> rackNames = new ArrayList<>(); private String runtimeName = ""; private boolean relaxLocality = true; //if not set, default to true + private String nodeLabelExpression = null; @Private public Builder() { @@ -194,6 +205,7 @@ public final class EvaluatorRequest { for (final String rackName : request.getRackNames()) { addRackName(rackName); } + setNodeLabelExpression(request.getNodeLabelExpression()); } /** @@ -300,12 +312,23 @@ public final class EvaluatorRequest { } /** + * A string expression that describes the node type being requested. + * @param nodeLabelExpr describing node type + * @return this Builder. + */ + public T setNodeLabelExpression(final String nodeLabelExpr) { + this.nodeLabelExpression = nodeLabelExpr; + return (T) this; + } + + /** * Builds the {@link EvaluatorRequest}. */ @Override public EvaluatorRequest build() { return new EvaluatorRequest(this.n, this.megaBytes, this.cores, this.nodeNames, - this.rackNames, this.runtimeName, this.relaxLocality); + this.rackNames, this.runtimeName, this.relaxLocality, + this.nodeLabelExpression); } /** http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java index 08b385d..a007805 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java @@ -109,6 +109,7 @@ public final class EvaluatorRequestorImpl implements EvaluatorRequestor { .addRackNames(req.getRackNames()) .setRelaxLocality(relaxLocality) .setRuntimeName(req.getRuntimeName()) + .setNodeLabelExpression(req.getNodeLabelExpression()) .build(); this.resourceRequestHandler.onNext(request); } http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java index b8ce952..735662f 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEvent.java @@ -73,6 +73,11 @@ public interface ResourceRequestEvent { Optional<Boolean> getRelaxLocality(); /** + * @return label expression that can further describe the desired resources. + */ + Optional<String> getNodeLabelExpression(); + + /** * @return The runtime name */ String getRuntimeName(); http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java index f385087..4b43265 100644 --- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java +++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/api/ResourceRequestEventImpl.java @@ -36,6 +36,7 @@ public final class ResourceRequestEventImpl implements ResourceRequestEvent { private final Optional<Integer> priority; private final Optional<Integer> virtualCores; private final Optional<Boolean> relaxLocality; + private final Optional<String> nodeLabelExpression; private final String runtimeName; private ResourceRequestEventImpl(final Builder builder) { @@ -46,6 +47,7 @@ public final class ResourceRequestEventImpl implements ResourceRequestEvent { this.priority = Optional.ofNullable(builder.priority); this.virtualCores = Optional.ofNullable(builder.virtualCores); this.relaxLocality = Optional.ofNullable(builder.relaxLocality); + this.nodeLabelExpression = Optional.ofNullable(builder.nodeLabelExpression); this.runtimeName = builder.runtimeName == null ? "" : builder.runtimeName; } @@ -89,6 +91,11 @@ public final class ResourceRequestEventImpl implements ResourceRequestEvent { return runtimeName; } + @Override + public Optional<String> getNodeLabelExpression() { + return nodeLabelExpression; + } + public static Builder newBuilder() { return new Builder(); } @@ -105,6 +112,7 @@ public final class ResourceRequestEventImpl implements ResourceRequestEvent { private Integer virtualCores; private Boolean relaxLocality; private String runtimeName; + private String nodeLabelExpression; /** * Create a builder from an existing ResourceRequestEvent. @@ -118,6 +126,9 @@ public final class ResourceRequestEventImpl implements ResourceRequestEvent { this.virtualCores = resourceRequestEvent.getVirtualCores().orElse(null); this.relaxLocality = resourceRequestEvent.getRelaxLocality().orElse(null); this.runtimeName = resourceRequestEvent.getRuntimeName(); + this.nodeLabelExpression = resourceRequestEvent + .getNodeLabelExpression() + .orElse(null); return this; } @@ -209,6 +220,14 @@ public final class ResourceRequestEventImpl implements ResourceRequestEvent { return this; } + /** + * @see ResourceRequestEvent#getNodeLabelExpression() + */ + public Builder setNodeLabelExpression(final String nodeLabelExpression) { + this.nodeLabelExpression = nodeLabelExpression; + return this; + } + @Override public ResourceRequestEvent build() { return new ResourceRequestEventImpl(this); http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java index de8d0b1..299af5b 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnContainerManager.java @@ -562,8 +562,11 @@ final class YarnContainerManager implements AMRMClientAsync.CallbackHandler, NMC } private boolean isSameKindOfRequest(final AMRMClient.ContainerRequest r1, final AMRMClient.ContainerRequest r2) { + final boolean nodeLabelExpressionIsEqual = r1.getNodeLabelExpression() == r2.getNodeLabelExpression() || + (r1.getNodeLabelExpression() != null && r1.getNodeLabelExpression().equals(r2.getNodeLabelExpression())); return r1.getPriority().compareTo(r2.getPriority()) == 0 && r1.getCapability().compareTo(r2.getCapability()) == 0 + && nodeLabelExpressionIsEqual && r1.getRelaxLocality() == r2.getRelaxLocality() && ListUtils.isEqualList(r1.getNodes(), r2.getNodes()) && ListUtils.isEqualList(r1.getRacks(), r2.getRacks()); http://git-wip-us.apache.org/repos/asf/reef/blob/24bfea49/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java ---------------------------------------------------------------------- diff --git a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java index baab946..8c6f367 100644 --- a/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java +++ b/lang/java/reef-runtime-yarn/src/main/java/org/apache/reef/runtime/yarn/driver/YarnResourceRequestHandler.java @@ -18,6 +18,7 @@ */ package org.apache.reef.runtime.yarn.driver; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; @@ -64,12 +65,15 @@ public final class YarnResourceRequestHandler implements ResourceRequestHandler final Priority pri = getPriority(resourceRequestEvent); final Resource resource = getResource(resourceRequestEvent); final boolean relaxLocality = resourceRequestEvent.getRelaxLocality().orElse(true); + final String nodeLabelExpression = resourceRequestEvent.getNodeLabelExpression().orElse(""); final AMRMClient.ContainerRequest[] containerRequests = new AMRMClient.ContainerRequest[resourceRequestEvent.getResourceCount()]; for (int i = 0; i < resourceRequestEvent.getResourceCount(); i++) { - containerRequests[i] = new AMRMClient.ContainerRequest(resource, nodes, racks, pri, relaxLocality); + containerRequests[i] = + new AMRMClient.ContainerRequest(resource, nodes, racks, pri, relaxLocality, + StringUtils.isEmpty(nodeLabelExpression) ? null : nodeLabelExpression); } this.yarnContainerRequestHandler.onContainerRequest(containerRequests); }
