YARN-5351. ResourceRequest should take ExecutionType into account during 
comparison. (Konstantinos Karanasos via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2d8d183b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2d8d183b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2d8d183b

Branch: refs/heads/HADOOP-12756
Commit: 2d8d183b1992b82c4d8dd3d6b41a1964685d909e
Parents: 49969b1
Author: Arun Suresh <asur...@apache.org>
Authored: Tue Jul 26 19:08:30 2016 -0700
Committer: Arun Suresh <asur...@apache.org>
Committed: Tue Jul 26 19:08:30 2016 -0700

----------------------------------------------------------------------
 .../yarn/api/records/ExecutionTypeRequest.java  |   9 +-
 .../yarn/api/records/ResourceRequest.java       |  23 +++-
 .../yarn/client/api/impl/TestAMRMClient.java    | 131 ++++++++++++++++++-
 .../impl/pb/ExecutionTypeRequestPBImpl.java     |   5 +
 4 files changed, 159 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d8d183b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java
index f553a44..e3ee8b4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ExecutionTypeRequest.java
@@ -31,7 +31,8 @@ import org.apache.hadoop.yarn.util.Records;
  */
 @Public
 @Evolving
-public abstract class ExecutionTypeRequest {
+public abstract class ExecutionTypeRequest
+    implements Comparable<ExecutionTypeRequest> {
 
   @Public
   @Evolving
@@ -41,6 +42,12 @@ public abstract class ExecutionTypeRequest {
 
   @Public
   @Evolving
+  public static ExecutionTypeRequest newInstance(ExecutionType execType) {
+    return newInstance(execType, false);
+  }
+
+  @Public
+  @Evolving
   public static ExecutionTypeRequest newInstance(ExecutionType execType,
       boolean ensureExecutionType) {
     ExecutionTypeRequest executionTypeRequest =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d8d183b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
index 50a7619..07f132c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceRequest.java
@@ -117,6 +117,10 @@ public abstract class ResourceRequest implements 
Comparable<ResourceRequest> {
         ret = h1.compareTo(h2);
       }
       if (ret == 0) {
+        ret = r1.getExecutionTypeRequest()
+            .compareTo(r2.getExecutionTypeRequest());
+      }
+      if (ret == 0) {
         ret = r1.getCapability().compareTo(r2.getCapability());
       }
       return ret;
@@ -414,7 +418,8 @@ public abstract class ResourceRequest implements 
Comparable<ResourceRequest> {
       if (other.getExecutionTypeRequest() != null) {
         return false;
       }
-    } else if (!execTypeRequest.equals(other.getExecutionTypeRequest())) {
+    } else if (!execTypeRequest.getExecutionType()
+        .equals(other.getExecutionTypeRequest().getExecutionType())) {
       return false;
     }
     if (getNodeLabelExpression() == null) {
@@ -441,12 +446,18 @@ public abstract class ResourceRequest implements 
Comparable<ResourceRequest> {
       int hostNameComparison =
           this.getResourceName().compareTo(other.getResourceName());
       if (hostNameComparison == 0) {
-        int capabilityComparison =
-            this.getCapability().compareTo(other.getCapability());
-        if (capabilityComparison == 0) {
-          return this.getNumContainers() - other.getNumContainers();
+        int execTypeReqComparison = this.getExecutionTypeRequest()
+            .compareTo(other.getExecutionTypeRequest());
+        if (execTypeReqComparison == 0) {
+          int capabilityComparison =
+              this.getCapability().compareTo(other.getCapability());
+          if (capabilityComparison == 0) {
+            return this.getNumContainers() - other.getNumContainers();
+          } else {
+            return capabilityComparison;
+          }
         } else {
-          return capabilityComparison;
+          return execTypeReqComparison;
         }
       } else {
         return hostNameComparison;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d8d183b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
index 99bfca5..57cdbfb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
@@ -39,7 +39,6 @@ import java.util.Set;
 import java.util.TreeSet;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
@@ -62,6 +61,7 @@ import 
org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.NMToken;
@@ -120,7 +120,7 @@ public class TestAMRMClient {
   static String[] nodes;
   static String[] racks;
   private final static int DEFAULT_ITERATION = 3;
-  
+
   @BeforeClass
   public static void setup() throws Exception {
     // start minicluster
@@ -335,6 +335,133 @@ public class TestAMRMClient {
       }
     }
   }
+
+  /**
+   * Test fit of both GUARANTEED and OPPORTUNISTIC containers.
+   */
+  @Test (timeout=60000)
+  public void testAMRMClientMatchingFitExecType()
+      throws YarnException, IOException {
+    AMRMClient<ContainerRequest> amClient = null;
+    try {
+      // start am rm client
+      amClient = AMRMClient.<ContainerRequest>createAMRMClient();
+      amClient.init(conf);
+      amClient.start();
+      amClient.registerApplicationMaster("Host", 10000, "");
+
+      Resource capability1 = Resource.newInstance(1024, 2);
+      Resource capability2 = Resource.newInstance(1024, 1);
+      Resource capability3 = Resource.newInstance(1000, 2);
+      Resource capability4 = Resource.newInstance(1000, 2);
+      Resource capability5 = Resource.newInstance(2000, 2);
+      Resource capability6 = Resource.newInstance(2000, 3);
+      Resource capability7 = Resource.newInstance(6000, 3);
+
+      // Add 2 GUARANTEED and 7 OPPORTUNISTIC requests.
+      ContainerRequest storedGuarContainer1 =
+          new ContainerRequest(capability1, nodes, racks, priority);
+      ContainerRequest storedGuarContainer2 =
+          new ContainerRequest(capability2, nodes, racks, priority);
+      ContainerRequest storedOpportContainer1 =
+          new ContainerRequest(capability1, nodes, racks, priority, true, null,
+              ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
+      ContainerRequest storedOpportContainer2 =
+          new ContainerRequest(capability2, nodes, racks, priority, true, null,
+              ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
+      ContainerRequest storedOpportContainer3 =
+          new ContainerRequest(capability3, nodes, racks, priority, true, null,
+              ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
+      ContainerRequest storedOpportContainer4 =
+          new ContainerRequest(capability4, nodes, racks, priority, true, null,
+              ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
+      ContainerRequest storedOpportContainer5 =
+          new ContainerRequest(capability5, nodes, racks, priority, true, null,
+              ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
+      ContainerRequest storedOpportContainer6 =
+          new ContainerRequest(capability6, nodes, racks, priority, true, null,
+              ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
+      ContainerRequest storedOpportContainer7 =
+          new ContainerRequest(capability7, nodes, racks, priority2,
+              false, null,
+              ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC));
+      amClient.addContainerRequest(storedGuarContainer1);
+      amClient.addContainerRequest(storedGuarContainer2);
+      amClient.addContainerRequest(storedOpportContainer1);
+      amClient.addContainerRequest(storedOpportContainer2);
+      amClient.addContainerRequest(storedOpportContainer3);
+      amClient.addContainerRequest(storedOpportContainer4);
+      amClient.addContainerRequest(storedOpportContainer5);
+      amClient.addContainerRequest(storedOpportContainer6);
+      amClient.addContainerRequest(storedOpportContainer7);
+
+      // Make sure 3 entries are generated in the ask list for each added
+      // container request of a given capability, locality, execution type and
+      // priority (one node-local, one rack-local, and one ANY).
+      assertEquals(24,
+          (((AMRMClientImpl<ContainerRequest>) amClient).ask.size()));
+
+      // test exact matching of GUARANTEED containers
+      List<? extends Collection<ContainerRequest>> matches;
+      ContainerRequest storedRequest;
+      Resource testCapability1 = Resource.newInstance(1024,  2);
+      matches = amClient
+          .getMatchingRequests(priority, node, ExecutionType.GUARANTEED,
+              testCapability1);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertEquals(storedGuarContainer1, storedRequest);
+      amClient.removeContainerRequest(storedGuarContainer1);
+
+      // test exact matching of OPPORTUNISTIC containers
+      matches = amClient.getMatchingRequests(priority, node,
+          ExecutionType.OPPORTUNISTIC, testCapability1);
+      verifyMatches(matches, 1);
+      storedRequest = matches.get(0).iterator().next();
+      assertEquals(storedOpportContainer1, storedRequest);
+      amClient.removeContainerRequest(storedOpportContainer1);
+
+      // exact OPPORTUNISTIC matching with order maintained
+      Resource testCapability2 = Resource.newInstance(1000, 2);
+      matches = amClient.getMatchingRequests(priority, node,
+          ExecutionType.OPPORTUNISTIC, testCapability2);
+      verifyMatches(matches, 2);
+      // must be returned in the order they were made
+      int i = 0;
+      for(ContainerRequest storedRequest1 : matches.get(0)) {
+        if(i++ == 0) {
+          assertEquals(storedOpportContainer3, storedRequest1);
+        } else {
+          assertEquals(storedOpportContainer4, storedRequest1);
+        }
+      }
+      amClient.removeContainerRequest(storedOpportContainer3);
+
+      // matching with larger container
+      Resource testCapability3 = Resource.newInstance(4000, 4);
+      matches = amClient.getMatchingRequests(priority, node,
+          ExecutionType.OPPORTUNISTIC, testCapability3);
+      assert(matches.size() == 4);
+
+      // verify requests without relaxed locality are only returned at specific
+      // locations
+      Resource testCapability4 = Resource.newInstance(6000, 3);
+      matches = amClient.getMatchingRequests(priority2, ResourceRequest.ANY,
+          ExecutionType.OPPORTUNISTIC, testCapability4);
+      assert(matches.size() == 0);
+      matches = amClient.getMatchingRequests(priority2, node,
+          ExecutionType.OPPORTUNISTIC, testCapability4);
+      assert(matches.size() == 1);
+
+      amClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
+          null, null);
+
+    } finally {
+      if (amClient != null && amClient.getServiceState() == STATE.STARTED) {
+        amClient.stop();
+      }
+    }
+  }
   
   private void verifyMatches(
                   List<? extends Collection<ContainerRequest>> matches,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2d8d183b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java
index 0037dd3..f6a5032 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ExecutionTypeRequestPBImpl.java
@@ -86,6 +86,11 @@ public class ExecutionTypeRequestPBImpl extends 
ExecutionTypeRequest {
   }
 
   @Override
+  public int compareTo(ExecutionTypeRequest other) {
+    return this.getExecutionType().compareTo(other.getExecutionType());
+  }
+
+  @Override
   public String toString() {
     return "{Execution Type: " + getExecutionType()
         + ", Enforce Execution Type: " + getEnforceExecutionType() + "}";


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to