YARN-4081. Add support for multiple resource types in the Resource class. 
(Varun Vasudev via wangda)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d328f70e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d328f70e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d328f70e

Branch: refs/heads/YARN-3926
Commit: d328f70ec21018e3440ce67c47f8229c80c45ef7
Parents: 662e17b
Author: Wangda Tan <wan...@apache.org>
Authored: Thu Sep 10 09:43:26 2015 -0700
Committer: Varun Vasudev <vvasu...@apache.org>
Committed: Thu Jan 28 12:11:24 2016 +0530

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../yarn/api/protocolrecords/ResourceTypes.java |  27 +++
 .../hadoop/yarn/api/records/Resource.java       | 197 +++++++++++++++--
 .../yarn/api/records/ResourceInformation.java   | 218 +++++++++++++++++++
 .../exceptions/ResourceNotFoundException.java   |  45 ++++
 .../hadoop/yarn/util/UnitsConversionUtil.java   | 197 +++++++++++++++++
 .../src/main/proto/yarn_protos.proto            |  12 +
 .../yarn/conf/TestResourceInformation.java      |  70 ++++++
 .../yarn/util/TestUnitsConversionUtil.java      | 120 ++++++++++
 .../yarn/api/records/impl/pb/ProtoUtils.java    |  14 ++
 .../api/records/impl/pb/ResourcePBImpl.java     | 199 +++++++++++++++--
 .../hadoop/yarn/util/resource/Resources.java    | 120 +++++++---
 .../hadoop/yarn/api/TestPBImplRecords.java      |   3 +
 13 files changed, 1149 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328f70e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2fae034..1ce1ffc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -381,6 +381,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4265. Provide new timeline plugin storage to support fine-grained 
entity
     caching. (Li Lu and Jason Lowe via junping_du)
 
+    YARN-4081. Add support for multiple resource types in the Resource
+    class. (Varun Vasudev via wangda)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328f70e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ResourceTypes.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ResourceTypes.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ResourceTypes.java
new file mode 100644
index 0000000..dbd9c37
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ResourceTypes.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.protocolrecords;
+
+/**
+ * Enum which represents the resource type. Currently, the only type allowed is
+ * COUNTABLE.
+ */
+public enum ResourceTypes {
+  COUNTABLE
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328f70e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
index 88b57f1..4ba5397 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Resource.java
@@ -22,8 +22,12 @@ import 
org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Records;
 
+import java.util.Map;
+
 /**
  * <p><code>Resource</code> models a set of computer resources in the 
  * cluster.</p>
@@ -37,10 +41,10 @@ import org.apache.hadoop.yarn.util.Records;
  * the average number of threads it expects to have runnable at a time.</p>
  * 
  * <p>Virtual cores take integer values and thus currently CPU-scheduling is
- * very coarse.  A complementary axis for CPU requests that represents 
processing
- * power will likely be added in the future to enable finer-grained resource
- * configuration.</p>
- * 
+ * very coarse.  A complementary axis for CPU requests that represents
+ * processing power will likely be added in the future to enable finer-grained
+ * resource configuration.</p>
+ *
  * <p>Typically, applications request <code>Resource</code> of suitable
  * capability to run their component tasks.</p>
  * 
@@ -60,17 +64,38 @@ public abstract class Resource implements 
Comparable<Resource> {
     return resource;
   }
 
+  @Public
+  @Stable
+  public static Resource newInstance(
+      Map<String, ResourceInformation> resources) {
+    Resource resource = Records.newRecord(Resource.class);
+    resource.setResources(resources);
+    return resource;
+  }
+
   /**
-   * Get <em>memory</em> of the resource.
-   * @return <em>memory</em> of the resource
+   * Get <em>memory</em> of the resource. Note - while memory has
+   * never had a unit specified, all YARN configurations have specified memory
+   * in MB. The assumption has been that the daemons and applications are 
always
+   * using the same units. With the introduction of the ResourceInformation
+   * class we have support for units - so this function will continue to return
+   * memory but in the units of MB
+   *
+   * @return <em>memory</em>(in MB) of the resource
    */
   @Public
   @Stable
   public abstract int getMemory();
   
   /**
-   * Set <em>memory</em> of the resource.
-   * @param memory <em>memory</em> of the resource
+   * Set <em>memory</em> of the resource. Note - while memory has
+   * never had a unit specified, all YARN configurations have specified memory
+   * in MB. The assumption has been that the daemons and applications are 
always
+   * using the same units. With the introduction of the ResourceInformation
+   * class we have support for units - so this function will continue to set
+   * memory but the assumption is that the value passed is in units of MB.
+   *
+   * @param memory <em>memory</em>(in MB) of the resource
    */
   @Public
   @Stable
@@ -81,10 +106,11 @@ public abstract class Resource implements 
Comparable<Resource> {
    * Get <em>number of virtual cpu cores</em> of the resource.
    * 
    * Virtual cores are a unit for expressing CPU parallelism. A node's capacity
-   * should be configured with virtual cores equal to its number of physical 
cores.
-   * A container should be requested with the number of cores it can saturate, 
i.e.
-   * the average number of threads it expects to have runnable at a time.
-   *   
+   * should be configured with virtual cores equal to its number of physical
+   * cores. A container should be requested with the number of cores it can
+   * saturate, i.e. the average number of threads it expects to have runnable
+   * at a time.
+   *
    * @return <em>num of virtual cpu cores</em> of the resource
    */
   @Public
@@ -95,43 +121,168 @@ public abstract class Resource implements 
Comparable<Resource> {
    * Set <em>number of virtual cpu cores</em> of the resource.
    * 
    * Virtual cores are a unit for expressing CPU parallelism. A node's capacity
-   * should be configured with virtual cores equal to its number of physical 
cores.
-   * A container should be requested with the number of cores it can saturate, 
i.e.
-   * the average number of threads it expects to have runnable at a time.
-   *    
+   * should be configured with virtual cores equal to its number of physical
+   * cores. A container should be requested with the number of cores it can
+   * saturate, i.e. the average number of threads it expects to have runnable
+   * at a time.
+   *
    * @param vCores <em>number of virtual cpu cores</em> of the resource
    */
   @Public
   @Evolving
   public abstract void setVirtualCores(int vCores);
 
+  /**
+   * Get ResourceInformation for all resources.
+   *
+   * @return Map of resource name to ResourceInformation
+   */
+  @Public
+  @Evolving
+  public abstract Map<String, ResourceInformation> getResources();
+
+  /**
+   * Get ResourceInformation for a specified resource.
+   *
+   * @param resource name of the resource
+   * @return the ResourceInformation object for the resource
+   * @throws YarnException if the resource can't be found
+   */
+  @Public
+  @Evolving
+  public abstract ResourceInformation getResourceInformation(String resource)
+      throws YarnException;
+
+  /**
+   * Get the value for a specified resource. No information about the units is
+   * returned.
+   *
+   * @param resource name of the resource
+   * @return the value for the resource
+   * @throws YarnException if the resource can't be found
+   */
+  @Public
+  @Evolving
+  public abstract Long getResourceValue(String resource) throws YarnException;
+
+  /**
+   * Set the resources to the map specified.
+   *
+   * @param resources Desired resources
+   */
+  @Public
+  @Evolving
+  public abstract void setResources(Map<String, ResourceInformation> 
resources);
+
+  /**
+   * Set the ResourceInformation object for a particular resource.
+   *
+   * @param resource the resource for which the ResourceInformation is provided
+   * @param resourceInformation ResourceInformation object
+   * @throws ResourceNotFoundException if the resource is not found
+   */
+  @Public
+  @Evolving
+  public abstract void setResourceInformation(String resource,
+      ResourceInformation resourceInformation) throws 
ResourceNotFoundException;
+
+  /**
+   * Set the value of a resource in the ResourceInformation object. The unit of
+   * the value is assumed to be the one in the ResourceInformation object.
+   *
+   * @param resource the resource for which the value is provided.
+   * @param value    the value to set
+   * @throws ResourceNotFoundException if the resource is not found
+   */
+  @Public
+  @Evolving
+  public abstract void setResourceValue(String resource, Long value)
+      throws ResourceNotFoundException;
+
   @Override
   public int hashCode() {
     final int prime = 263167;
     int result = 3571;
     result = 939769357 + getMemory(); // prime * result = 939769357 initially
     result = prime * result + getVirtualCores();
+    for (Map.Entry<String, ResourceInformation> entry : getResources()
+        .entrySet()) {
+      if (entry.getKey().equals(ResourceInformation.MEMORY.getName()) || entry
+          .getKey().equals(ResourceInformation.VCORES.getName())) {
+        continue;
+      }
+      result = prime * result + entry.getValue().hashCode();
+    }
     return result;
   }
 
   @Override
   public boolean equals(Object obj) {
-    if (this == obj)
+    if (this == obj) {
       return true;
-    if (obj == null)
+    }
+    if (obj == null) {
       return false;
-    if (!(obj instanceof Resource))
+    }
+    if (!(obj instanceof Resource)) {
       return false;
+    }
     Resource other = (Resource) obj;
-    if (getMemory() != other.getMemory() || 
-        getVirtualCores() != other.getVirtualCores()) {
+    if (getMemory() != other.getMemory() || getVirtualCores() != other
+        .getVirtualCores()) {
       return false;
     }
-    return true;
+    return this.getResources().equals(other.getResources());
   }
 
   @Override
   public String toString() {
-    return "<memory:" + getMemory() + ", vCores:" + getVirtualCores() + ">";
+    StringBuilder sb = new StringBuilder();
+    sb.append("<memory:").append(getMemory()).append(", vCores:")
+        .append(getVirtualCores());
+    for (Map.Entry<String, ResourceInformation> entry : getResources()
+        .entrySet()) {
+      if (entry.getKey().equals(ResourceInformation.MEMORY.getName())
+          && entry.getValue().getUnits()
+          .equals(ResourceInformation.MEMORY_MB.getUnits())) {
+        continue;
+      }
+      if (entry.getKey().equals(ResourceInformation.VCORES.getName())
+          && entry.getValue().getUnits().equals("")) {
+        continue;
+      }
+      sb.append(", ").append(entry.getKey()).append(": ")
+          .append(entry.getValue().getValue())
+          .append(entry.getValue().getUnits());
+    }
+    sb.append(">");
+    return sb.toString();
+  }
+
+  @Override
+  public int compareTo(Resource other) {
+    Map<String, ResourceInformation> thisResources, otherResources;
+    thisResources = this.getResources();
+    otherResources = other.getResources();
+    int diff = thisResources.size() - otherResources.size();
+    if (diff == 0) {
+      if (thisResources.keySet().equals(otherResources.keySet())) {
+        diff = this.getMemory() - other.getMemory();
+        if (diff == 0) {
+          diff = this.getVirtualCores() - other.getVirtualCores();
+        }
+        if (diff == 0) {
+          for (Map.Entry<String, ResourceInformation> entry : thisResources
+              .entrySet()) {
+            diff =
+                entry.getValue().compareTo(otherResources.get(entry.getKey()));
+            if (diff != 0) {
+              break;
+            }
+          }
+        }
+      }
+    }
+    return diff;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328f70e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
new file mode 100644
index 0000000..4e780c1
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceInformation.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+
+/**
+ * Class to encapsulate information about a Resource - the name of the 
resource,
+ * the units(milli, micro, etc), the type(countable), and the value.
+ */
+public class ResourceInformation implements Comparable<ResourceInformation> {
+
+  private String name;
+  private String units;
+  private ResourceTypes resourceType;
+  private Long value;
+
+  private static final String MEMORY_URI = "yarn.io/memory";
+  private static final String VCORES_URI = "yarn.io/vcores";
+
+  public static final ResourceInformation MEMORY =
+      ResourceInformation.newInstance(MEMORY_URI);
+  public static final ResourceInformation MEMORY_MB =
+      ResourceInformation.newInstance(MEMORY_URI, "M");
+  public static final ResourceInformation VCORES =
+      ResourceInformation.newInstance(VCORES_URI);
+
+  /**
+   * Get the name for the resource.
+   *
+   * @return resource name
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Set the name for the resource.
+   *
+   * @param rName name for the resource
+   */
+  public void setName(String rName) {
+    this.name = rName;
+  }
+
+  /**
+   * Get units for the resource.
+   *
+   * @return units for the resource
+   */
+  public String getUnits() {
+    return units;
+  }
+
+  /**
+   * Set the units for the resource.
+   *
+   * @param rUnits units for the resource
+   */
+  public void setUnits(String rUnits) {
+    if (!UnitsConversionUtil.KNOWN_UNITS.contains(rUnits)) {
+      throw new IllegalArgumentException(
+          "Unknown unit '" + units + "'. Known units are "
+              + UnitsConversionUtil.KNOWN_UNITS);
+    }
+    this.units = rUnits;
+  }
+
+  /**
+   * Get the resource type.
+   *
+   * @return the resource type
+   */
+  public ResourceTypes getResourceType() {
+    return resourceType;
+  }
+
+  /**
+   * Set the resource type.
+   *
+   * @param type the resource type
+   */
+  public void setResourceType(ResourceTypes type) {
+    this.resourceType = type;
+  }
+
+  /**
+   * Get the value for the resource.
+   *
+   * @return the resource value
+   */
+  public Long getValue() {
+    return value;
+  }
+
+  /**
+   * Set the value for the resource.
+   *
+   * @param rValue the resource value
+   */
+  public void setValue(Long rValue) {
+    this.value = rValue;
+  }
+
+  /**
+   * Create a new instance of ResourceInformation from another object.
+   *
+   * @param other the object from which the new object should be created
+   * @return the new ResourceInformation object
+   */
+  public static ResourceInformation newInstance(ResourceInformation other) {
+    ResourceInformation ret = new ResourceInformation();
+    ret.setName(other.getName());
+    ret.setResourceType(other.getResourceType());
+    ret.setUnits(other.getUnits());
+    ret.setValue(other.getValue());
+    return ret;
+  }
+
+  public static ResourceInformation newInstance(String name, String units,
+      Long value, ResourceTypes type) {
+    ResourceInformation ret = new ResourceInformation();
+    ret.setName(name);
+    ret.setResourceType(type);
+    ret.setUnits(units);
+    ret.setValue(value);
+    return ret;
+  }
+
+  public static ResourceInformation newInstance(String name, String units,
+      Long value) {
+    return ResourceInformation
+        .newInstance(name, units, value, ResourceTypes.COUNTABLE);
+  }
+
+  public static ResourceInformation newInstance(String name, String units) {
+    return ResourceInformation
+        .newInstance(name, units, 0L, ResourceTypes.COUNTABLE);
+  }
+
+  public static ResourceInformation newInstance(String name, Long value) {
+    return ResourceInformation
+        .newInstance(name, "", value, ResourceTypes.COUNTABLE);
+  }
+
+  public static ResourceInformation newInstance(String name) {
+    return ResourceInformation.newInstance(name, "");
+  }
+
+  @Override
+  public String toString() {
+    return "name: " + this.name + ", units: " + this.units + ", type: "
+        + resourceType + ", value: " + value;
+  }
+
+  public String getShorthandRepresentation() {
+    return "" + this.value + this.units;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null) {
+      return false;
+    }
+    if (!(obj instanceof ResourceInformation)) {
+      return false;
+    }
+    ResourceInformation r = (ResourceInformation) obj;
+    int cmp =
+        UnitsConversionUtil.compare(this.units, this.value, r.units, r.value);
+    return this.name.equals(r.getName()) && this.resourceType
+        .equals(r.getResourceType()) && (cmp == 0);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 263167;
+    int result =
+        939769357 + name.hashCode(); // prime * result = 939769357 initially
+    result = prime * result + resourceType.hashCode();
+    result = prime * result + units.hashCode();
+    result = prime * result + value.hashCode();
+    return result;
+  }
+
+  @Override
+  public int compareTo(ResourceInformation other) {
+    int diff = this.name.compareTo(other.name);
+    if (diff == 0) {
+      diff = UnitsConversionUtil
+          .compare(this.units, this.value, other.units, other.value);
+      if (diff == 0) {
+        diff = this.resourceType.compareTo(other.resourceType);
+      }
+    }
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328f70e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceNotFoundException.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceNotFoundException.java
new file mode 100644
index 0000000..4277034
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/ResourceNotFoundException.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This exception is thrown when details of an unknown resource type
+ * are requested.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class ResourceNotFoundException extends YarnException {
+
+  private static final long serialVersionUID = 10081982L;
+
+  public ResourceNotFoundException(String message) {
+    super(message);
+  }
+
+  public ResourceNotFoundException(Throwable cause) {
+    super(cause);
+  }
+
+  public ResourceNotFoundException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328f70e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
new file mode 100644
index 0000000..7785263
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/UnitsConversionUtil.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.util;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.math.BigInteger;
+import java.util.*;
+
+/**
+ * A util to convert values in one unit to another. Units refers to whether
+ * the value is expressed in pico, nano, etc.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class UnitsConversionUtil {
+
+  /**
+   * Helper class for encapsulating conversion values.
+   */
+  public static class Converter {
+    private long numerator;
+    private long denominator;
+
+    Converter(long n, long d) {
+      this.numerator = n;
+      this.denominator = d;
+    }
+  }
+
+  private static final String[] UNITS =
+      {"p", "n", "u", "m", "", "k", "M", "G", "T", "P"};
+  private static final List<String> SORTED_UNITS = Arrays.asList(UNITS);
+  public static final Set<String> KNOWN_UNITS = createKnownUnitsSet();
+  private static final Converter PICO =
+      new Converter(1L, 1000L * 1000L * 1000L * 1000L);
+  private static final Converter NANO =
+      new Converter(1L, 1000L * 1000L * 1000L);
+  private static final Converter MICRO = new Converter(1L, 1000L * 1000L);
+  private static final Converter MILLI = new Converter(1L, 1000L);
+  private static final Converter BASE = new Converter(1L, 1L);
+  private static final Converter KILO = new Converter(1000L, 1L);
+  private static final Converter MEGA = new Converter(1000L * 1000L, 1L);
+  private static final Converter GIGA =
+      new Converter(1000L * 1000L * 1000L, 1L);
+  private static final Converter TERA =
+      new Converter(1000L * 1000L * 1000L * 1000L, 1L);
+  private static final Converter PETA =
+      new Converter(1000L * 1000L * 1000L * 1000L * 1000L, 1L);
+
+  private static Set<String> createKnownUnitsSet() {
+    Set<String> ret = new HashSet<>();
+    ret.addAll(Arrays.asList(UNITS));
+    return ret;
+  }
+
+  private static Converter getConverter(String unit) {
+    switch (unit) {
+    case "p":
+      return PICO;
+    case "n":
+      return NANO;
+    case "u":
+      return MICRO;
+    case "m":
+      return MILLI;
+    case "":
+      return BASE;
+    case "k":
+      return KILO;
+    case "M":
+      return MEGA;
+    case "G":
+      return GIGA;
+    case "T":
+      return TERA;
+    case "P":
+      return PETA;
+    default:
+      throw new IllegalArgumentException(
+          "Unknown unit '" + unit + "'. Known units are " + KNOWN_UNITS);
+    }
+  }
+
+  /**
+   * Converts a value from one unit to another. Supported units can be obtained
+   * by inspecting the KNOWN_UNITS set.
+   *
+   * @param fromUnit  the unit of the from value
+   * @param toUnit    the target unit
+   * @param fromValue the value you wish to convert
+   * @return the value in toUnit
+   */
+  public static Long convert(String fromUnit, String toUnit, Long fromValue) {
+    if (toUnit == null || fromUnit == null || fromValue == null) {
+      throw new IllegalArgumentException("One or more arguments are null");
+    }
+    Long tmp;
+    String overflowMsg =
+        "Converting " + fromValue + " from '" + fromUnit + "' to '" + toUnit
+            + "' will result in an overflow of Long";
+    Converter fc = getConverter(fromUnit);
+    Converter tc = getConverter(toUnit);
+    Long numerator = fc.numerator * tc.denominator;
+    Long denominator = fc.denominator * tc.numerator;
+    if (numerator < denominator) {
+      if (!toUnit.equals(fromUnit)) {
+        tmp = Long.MAX_VALUE / numerator;
+        if (tmp < fromValue) {
+          throw new IllegalArgumentException(overflowMsg);
+        }
+      }
+      return (fromValue * numerator) / denominator;
+    }
+    tmp = numerator / denominator;
+    if (!toUnit.equals(fromUnit)) {
+      if ((Long.MAX_VALUE / tmp) < fromValue) {
+        throw new IllegalArgumentException(overflowMsg);
+      }
+    }
+    return fromValue * tmp;
+  }
+
+  /**
+   * Compare a value in a given unit with a value in another unit. The return
+   * value is equivalent to the value returned by compareTo.
+   *
+   * @param unitA  first unit
+   * @param valueA first value
+   * @param unitB  second unit
+   * @param valueB second value
+   * @return +1, 0 or -1 depending on whether the relationship is greater than,
+   * equal to or lesser than
+   */
+  public static int compare(String unitA, Long valueA, String unitB,
+      Long valueB) {
+    if (unitA == null || unitB == null || !KNOWN_UNITS.contains(unitA)
+        || !KNOWN_UNITS.contains(unitB)) {
+      throw new IllegalArgumentException("Units cannot be null");
+    }
+    if (!KNOWN_UNITS.contains(unitA)) {
+      throw new IllegalArgumentException("Unknown unit '" + unitA + "'");
+    }
+    if (!KNOWN_UNITS.contains(unitB)) {
+      throw new IllegalArgumentException("Unknown unit '" + unitB + "'");
+    }
+    Converter unitAC = getConverter(unitA);
+    Converter unitBC = getConverter(unitB);
+    if (unitA.equals(unitB)) {
+      return valueA.compareTo(valueB);
+    }
+    int unitAPos = SORTED_UNITS.indexOf(unitA);
+    int unitBPos = SORTED_UNITS.indexOf(unitB);
+    try {
+      Long tmpA = valueA;
+      Long tmpB = valueB;
+      if (unitAPos < unitBPos) {
+        tmpB = convert(unitB, unitA, valueB);
+      } else {
+        tmpA = convert(unitA, unitB, valueA);
+      }
+      return tmpA.compareTo(tmpB);
+    } catch (IllegalArgumentException ie) {
+      BigInteger tmpA = BigInteger.valueOf(valueA);
+      BigInteger tmpB = BigInteger.valueOf(valueB);
+      if (unitAPos < unitBPos) {
+        tmpB = tmpB.multiply(BigInteger.valueOf(unitBC.numerator));
+        tmpB = tmpB.multiply(BigInteger.valueOf(unitAC.denominator));
+        tmpB = tmpB.divide(BigInteger.valueOf(unitBC.denominator));
+        tmpB = tmpB.divide(BigInteger.valueOf(unitAC.numerator));
+      } else {
+        tmpA = tmpA.multiply(BigInteger.valueOf(unitAC.numerator));
+        tmpA = tmpA.multiply(BigInteger.valueOf(unitBC.denominator));
+        tmpA = tmpA.divide(BigInteger.valueOf(unitAC.denominator));
+        tmpA = tmpA.divide(BigInteger.valueOf(unitBC.numerator));
+      }
+      return tmpA.compareTo(tmpB);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328f70e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 85bfe90..9b01061 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -53,9 +53,21 @@ message ContainerIdProto {
   optional int64 id = 3;
 }
 
+enum ResourceTypesProto {
+  COUNTABLE = 0;
+}
+
+message ResourceInformationProto {
+  required string key = 1;
+  optional int64 value = 2;
+  optional string units = 3;
+  optional ResourceTypesProto type = 4;
+}
+
 message ResourceProto {
   optional int32 memory = 1;
   optional int32 virtual_cores = 2;
+  repeated ResourceInformationProto resource_value_map = 3;
 }
 
 message ResourceUtilizationProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328f70e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestResourceInformation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestResourceInformation.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestResourceInformation.java
new file mode 100644
index 0000000..28f69c9
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestResourceInformation.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.conf;
+
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestResourceInformation {
+
+  @Test
+  public void testName() {
+    String name = "yarn.io/test";
+    ResourceInformation ri = ResourceInformation.newInstance(name);
+    Assert.assertEquals("Resource name incorrect", name, ri.getName());
+  }
+
+  @Test
+  public void testUnits() {
+    String name = "yarn.io/test";
+    String units = "m";
+    ResourceInformation ri = ResourceInformation.newInstance(name, units);
+    Assert.assertEquals("Resource name incorrect", name, ri.getName());
+    Assert.assertEquals("Resource units incorrect", units, ri.getUnits());
+    units = "z";
+    try {
+      ResourceInformation.newInstance(name, units);
+      Assert.fail(units + "is not a valid unit");
+    } catch (IllegalArgumentException ie) {
+      // do nothing
+    }
+  }
+
+  @Test
+  public void testValue() {
+    String name = "yarn.io/test";
+    Long value = 1l;
+    ResourceInformation ri = ResourceInformation.newInstance(name, value);
+    Assert.assertEquals("Resource name incorrect", name, ri.getName());
+    Assert.assertEquals("Resource value incorrect", value, ri.getValue());
+  }
+
+  @Test
+  public void testResourceInformation() {
+    String name = "yarn.io/test";
+    Long value = 1l;
+    String units = "m";
+    ResourceInformation ri =
+        ResourceInformation.newInstance(name, units, value);
+    Assert.assertEquals("Resource name incorrect", name, ri.getName());
+    Assert.assertEquals("Resource value incorrect", value, ri.getValue());
+    Assert.assertEquals("Resource units incorrect", units, ri.getUnits());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328f70e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/util/TestUnitsConversionUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/util/TestUnitsConversionUtil.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/util/TestUnitsConversionUtil.java
new file mode 100644
index 0000000..421768f
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/util/TestUnitsConversionUtil.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.util;
+
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestUnitsConversionUtil {
+
+  @Test
+  public void testUnitsConversion() {
+    int value = 5;
+    String fromUnit = "";
+    Long test = Long.valueOf(value);
+    Assert.assertEquals("pico test failed",
+        Long.valueOf(value * 1000l * 1000l * 1000l * 1000l),
+        UnitsConversionUtil.convert(fromUnit, "p", test));
+    Assert.assertEquals("nano test failed",
+        Long.valueOf(value * 1000l * 1000l * 1000l),
+        UnitsConversionUtil.convert(fromUnit, "n", test));
+    Assert
+        .assertEquals("micro test failed", Long.valueOf(value * 1000l * 1000l),
+            UnitsConversionUtil.convert(fromUnit, "u", test));
+    Assert.assertEquals("milli test failed", Long.valueOf(value * 1000l),
+        UnitsConversionUtil.convert(fromUnit, "m", test));
+
+    test = Long.valueOf(value * 1000l * 1000l * 1000l * 1000l * 1000l);
+    fromUnit = "";
+    Assert.assertEquals("kilo test failed", Long.valueOf(test / 1000l),
+        UnitsConversionUtil.convert(fromUnit, "k", test));
+    Assert
+        .assertEquals("mega test failed", Long.valueOf(test / (1000l * 1000l)),
+            UnitsConversionUtil.convert(fromUnit, "M", test));
+    Assert.assertEquals("giga test failed",
+        Long.valueOf(test / (1000l * 1000l * 1000l)),
+        UnitsConversionUtil.convert(fromUnit, "G", test));
+    Assert.assertEquals("tera test failed",
+        Long.valueOf(test / (1000l * 1000l * 1000l * 1000l)),
+        UnitsConversionUtil.convert(fromUnit, "T", test));
+    Assert.assertEquals("peta test failed",
+        Long.valueOf(test / (1000l * 1000l * 1000l * 1000l * 1000l)),
+        UnitsConversionUtil.convert(fromUnit, "P", test));
+
+    Assert.assertEquals("nano to pico test failed", Long.valueOf(value * 
1000l),
+        UnitsConversionUtil.convert("n", "p", Long.valueOf(value)));
+
+    Assert.assertEquals("mega to giga test failed", Long.valueOf(value),
+        UnitsConversionUtil.convert("M", "G", Long.valueOf(value * 1000l)));
+  }
+
+  @Test
+  public void testOverflow() {
+    Long test = Long.valueOf(5 * 1000l * 1000l * 1000l * 1000l * 1000l);
+    try {
+      UnitsConversionUtil.convert("P", "p", test);
+      Assert.fail("this operation should result in an overflow");
+    } catch (IllegalArgumentException ie) {
+      ; // do nothing
+    }
+    try {
+      UnitsConversionUtil.convert("m", "p", Long.MAX_VALUE - 1);
+      Assert.fail("this operation should result in an overflow");
+    } catch (IllegalArgumentException ie) {
+      ; // do nothing
+    }
+  }
+
+  @Test
+  public void testCompare() {
+    String unitA = "P";
+    Long valueA = Long.valueOf(1);
+    String unitB = "p";
+    Long valueB = Long.valueOf(2);
+    Assert.assertEquals(1,
+        UnitsConversionUtil.compare(unitA, valueA, unitB, valueB));
+    Assert.assertEquals(-1,
+        UnitsConversionUtil.compare(unitB, valueB, unitA, valueA));
+    Assert.assertEquals(0,
+        UnitsConversionUtil.compare(unitA, valueA, unitA, valueA));
+    Assert.assertEquals(-1,
+        UnitsConversionUtil.compare(unitA, valueA, unitA, valueB));
+    Assert.assertEquals(1,
+        UnitsConversionUtil.compare(unitA, valueB, unitA, valueA));
+
+    unitB = "T";
+    Assert.assertEquals(1,
+        UnitsConversionUtil.compare(unitA, valueA, unitB, valueB));
+    Assert.assertEquals(-1,
+        UnitsConversionUtil.compare(unitB, valueB, unitA, valueA));
+    Assert.assertEquals(0,
+        UnitsConversionUtil.compare(unitA, valueA, unitB, 1000l));
+
+    unitA = "p";
+    unitB = "n";
+    Assert.assertEquals(-1,
+        UnitsConversionUtil.compare(unitA, valueA, unitB, valueB));
+    Assert.assertEquals(1,
+        UnitsConversionUtil.compare(unitB, valueB, unitA, valueA));
+    Assert.assertEquals(0,
+        UnitsConversionUtil.compare(unitA, 1000l, unitB, valueA));
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328f70e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
index 29ed0f3..7182b20 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ProtoUtils.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnProtos.AMCommandProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationAccessTypeProto;
 import 
org.apache.hadoop.yarn.proto.YarnProtos.ApplicationResourceUsageReportProto;
@@ -56,6 +58,7 @@ import 
org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ExecutionTypeProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceTypesProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 
@@ -294,4 +297,15 @@ public class ProtoUtils {
   public static ExecutionType convertFromProtoFormat(ExecutionTypeProto e) {
     return ExecutionType.valueOf(e.name());
   }
+
+  /*
+   * ResourceTypes
+   */
+  public static ResourceTypesProto converToProtoFormat(ResourceTypes e) {
+    return ResourceTypesProto.valueOf(e.name());
+  }
+
+  public static ResourceTypes convertFromProtoFormat(ResourceTypesProto e) {
+    return ResourceTypes.valueOf(e.name());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328f70e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
index a28c6ed..51df7fb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ResourcePBImpl.java
@@ -18,12 +18,19 @@
 
 package org.apache.hadoop.yarn.api.records.impl.pb;
 
-
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceInformationProto;
+import org.apache.hadoop.yarn.util.UnitsConversionUtil;
+
+import java.util.*;
 
 @Private
 @Unstable
@@ -31,7 +38,9 @@ public class ResourcePBImpl extends Resource {
   ResourceProto proto = ResourceProto.getDefaultInstance();
   ResourceProto.Builder builder = null;
   boolean viaProto = false;
-  
+
+  private Map<String, ResourceInformation> resources;
+
   public ResourcePBImpl() {
     builder = ResourceProto.newBuilder();
   }
@@ -39,9 +48,12 @@ public class ResourcePBImpl extends Resource {
   public ResourcePBImpl(ResourceProto proto) {
     this.proto = proto;
     viaProto = true;
+    this.resources = null;
+    initResources();
   }
-  
+
   public ResourceProto getProto() {
+    mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
     return proto;
@@ -53,40 +65,189 @@ public class ResourcePBImpl extends Resource {
     }
     viaProto = false;
   }
-    
-  
+
   @Override
   public int getMemory() {
-    ResourceProtoOrBuilder p = viaProto ? proto : builder;
-    return (p.getMemory());
+    try {
+      ResourceInformation ri =
+          this.getResourceInformation(ResourceInformation.MEMORY.getName());
+      return (int) UnitsConversionUtil
+          .convert(ri.getUnits(), "M", ri.getValue()).longValue();
+    } catch (YarnException ye) {
+      // memory should always be present
+      initResourcesMap();
+      return 0;
+    }
   }
 
   @Override
   public void setMemory(int memory) {
-    maybeInitBuilder();
-    builder.setMemory((memory));
+    setResourceInformation(ResourceInformation.MEMORY_MB.getName(),
+        
ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(),
+            ResourceInformation.MEMORY_MB.getUnits(), (long) memory));
+
   }
 
   @Override
   public int getVirtualCores() {
-    ResourceProtoOrBuilder p = viaProto ? proto : builder;
-    return (p.getVirtualCores());
+    try {
+      return (int) this.getResourceValue(ResourceInformation.VCORES.getName())
+          .longValue();
+    } catch (YarnException ye) {
+      // vcores should always be present
+      initResourcesMap();
+      return 0;
+    }
   }
 
   @Override
   public void setVirtualCores(int vCores) {
+    try {
+      setResourceValue(ResourceInformation.VCORES.getName(),
+          Long.valueOf(vCores));
+    } catch (ResourceNotFoundException re) {
+      this.setResourceInformation(ResourceInformation.VCORES.getName(),
+          ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
+              (long) vCores));
+    }
+  }
+
+  private void initResources() {
+    if (this.resources != null) {
+      return;
+    }
+    ResourceProtoOrBuilder p = viaProto ? proto : builder;
+    initResourcesMap();
+    for (ResourceInformationProto entry : p.getResourceValueMapList()) {
+      ResourceTypes type =
+          entry.hasType() ? ProtoUtils.convertFromProtoFormat(entry.getType()) 
:
+              ResourceTypes.COUNTABLE;
+      String units = entry.hasUnits() ? entry.getUnits() : "";
+      Long value = entry.hasValue() ? entry.getValue() : 0L;
+      ResourceInformation ri =
+          ResourceInformation.newInstance(entry.getKey(), units, value, type);
+      resources.put(ri.getName(), ri);
+    }
+    if(this.getMemory() != p.getMemory()) {
+      setMemory(p.getMemory());
+    }
+    if(this.getVirtualCores() != p.getVirtualCores()) {
+      setVirtualCores(p.getVirtualCores());
+    }
+  }
+
+  @Override
+  public void setResources(Map<String, ResourceInformation> resources) {
     maybeInitBuilder();
-    builder.setVirtualCores((vCores));
+    if (resources == null || resources.isEmpty()) {
+      builder.clearResourceValueMap();
+    } else {
+      for (Map.Entry<String, ResourceInformation> entry : 
resources.entrySet()) {
+        if (!entry.getKey().equals(entry.getValue().getName())) {
+          entry.getValue().setName(entry.getKey());
+        }
+      }
+    }
+    this.resources = resources;
   }
 
   @Override
-  public int compareTo(Resource other) {
-    int diff = this.getMemory() - other.getMemory();
-    if (diff == 0) {
-      diff = this.getVirtualCores() - other.getVirtualCores();
+  public void setResourceInformation(String resource,
+      ResourceInformation resourceInformation) {
+    maybeInitBuilder();
+    if (resource == null || resourceInformation == null) {
+      throw new IllegalArgumentException(
+          "resource and/or resourceInformation cannot be null");
     }
-    return diff;
+    if (!resource.equals(resourceInformation.getName())) {
+      resourceInformation.setName(resource);
+    }
+    initResourcesMap();
+    resources.put(resource, resourceInformation);
+  }
+
+  @Override
+  public void setResourceValue(String resource, Long value)
+      throws ResourceNotFoundException {
+    maybeInitBuilder();
+    if (resource == null) {
+      throw new IllegalArgumentException("resource type object cannot be 
null");
+    }
+    if (resources == null || (!resources.containsKey(resource))) {
+      throw new ResourceNotFoundException(
+          "Resource " + resource + " not found");
+    }
+    ResourceInformation ri = resources.get(resource);
+    ri.setValue(value);
+    resources.put(resource, ri);
+  }
+
+  @Override
+  public Map<String, ResourceInformation> getResources() {
+    initResources();
+    return Collections.unmodifiableMap(this.resources);
+  }
+
+  @Override
+  public ResourceInformation getResourceInformation(String resource)
+      throws YarnException {
+    initResources();
+    if (this.resources.containsKey(resource)) {
+      return this.resources.get(resource);
+    }
+    throw new ResourceNotFoundException("Could not find entry for " + 
resource);
+  }
+
+  @Override
+  public Long getResourceValue(String resource) throws YarnException {
+    initResources();
+    if (this.resources.containsKey(resource)) {
+      return this.resources.get(resource).getValue();
+    }
+    throw new ResourceNotFoundException("Could not find entry for " + 
resource);
+  }
+
+  private void initResourcesMap() {
+    if (resources == null) {
+      resources = new HashMap<>();
+    }
+    ResourceInformation ri;
+    if (!resources.containsKey(ResourceInformation.MEMORY.getName())) {
+      ri = ResourceInformation
+          .newInstance(ResourceInformation.MEMORY_MB.getName(),
+              ResourceInformation.MEMORY_MB.getUnits());
+      this.resources.put(ResourceInformation.MEMORY.getName(), ri);
+    }
+    if (!resources.containsKey(ResourceInformation.VCORES.getName())) {
+      ri =
+          
ResourceInformation.newInstance(ResourceInformation.VCORES.getName());
+      this.resources.put(ResourceInformation.VCORES.getName(), ri);
+    }
+  }
+
+  synchronized private void mergeLocalToBuilder() {
+    builder.clearResourceValueMap();
+    if (resources != null && !resources.isEmpty()) {
+      for (Map.Entry<String, ResourceInformation> entry : 
resources.entrySet()) {
+        ResourceInformationProto.Builder e = 
ResourceInformationProto.newBuilder();
+        e.setKey(entry.getKey());
+        e.setUnits(entry.getValue().getUnits());
+        e.setType(
+            
ProtoUtils.converToProtoFormat(entry.getValue().getResourceType()));
+        e.setValue(entry.getValue().getValue());
+        builder.addResourceValueMap(e);
+      }
+    }
+    builder.setMemory(this.getMemory());
+    builder.setVirtualCores(this.getVirtualCores());
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
   }
-  
-  
 }  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328f70e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index b05d021..57cfdaa 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -21,78 +21,130 @@ package org.apache.hadoop.yarn.util.resource;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.Records;
 
-@InterfaceAudience.LimitedPrivate({"YARN", "MapReduce"})
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@InterfaceAudience.LimitedPrivate({ "YARN", "MapReduce" })
 @Unstable
 public class Resources {
-  
-  // Java doesn't have const :(
-  private static final Resource NONE = new Resource() {
+
+  /**
+   * Helper class to create a resource with a fixed value for all resource
+   * types. For example, a NONE resource which returns 0 for any resource type.
+   */
+  static class FixedValueResource extends Resource {
+
+    private Map<String, ResourceInformation> resources;
+    private Long resourceValue;
+    private String name;
+
+    /**
+     * Constructor for a fixed value resource
+     * @param rName the name of the resource
+     * @param value the fixed value to be returned for all resource types
+     */
+    FixedValueResource(String rName, Long value) {
+      this.resourceValue = value;
+      this.name = rName;
+      resources = initResourceMap();
+    }
+
+    private int resourceValueToInt() {
+      if(this.resourceValue > Integer.MAX_VALUE) {
+        return Integer.MAX_VALUE;
+      }
+      return this.resourceValue.intValue();
+    }
 
     @Override
     public int getMemory() {
-      return 0;
+      return resourceValueToInt();
     }
 
     @Override
     public void setMemory(int memory) {
-      throw new RuntimeException("NONE cannot be modified!");
+      throw new RuntimeException(name + " cannot be modified!");
     }
 
     @Override
     public int getVirtualCores() {
-      return 0;
+      return resourceValueToInt();
     }
 
     @Override
-    public void setVirtualCores(int cores) {
-      throw new RuntimeException("NONE cannot be modified!");
+    public Map<String, ResourceInformation> getResources() {
+      return Collections.unmodifiableMap(this.resources);
     }
 
     @Override
-    public int compareTo(Resource o) {
-      int diff = 0 - o.getMemory();
-      if (diff == 0) {
-        diff = 0 - o.getVirtualCores();
+    public ResourceInformation getResourceInformation(String resource)
+        throws YarnException {
+      if (resources.containsKey(resource)) {
+        ResourceInformation value = this.resources.get(resource);
+        ResourceInformation ret = ResourceInformation.newInstance(value);
+        ret.setValue(resourceValue);
+        return ret;
       }
-      return diff;
+      throw new YarnException("" + resource + " not found");
     }
-    
-  };
-  
-  private static final Resource UNBOUNDED = new Resource() {
 
     @Override
-    public int getMemory() {
-      return Integer.MAX_VALUE;
+    public Long getResourceValue(String resource) throws YarnException {
+      if (resources.containsKey(resource)) {
+        return resourceValue;
+      }
+      throw new YarnException("" + resource + " not found");
     }
 
     @Override
-    public void setMemory(int memory) {
-      throw new RuntimeException("UNBOUNDED cannot be modified!");
+    public void setVirtualCores(int cores) {
+      throw new RuntimeException(name + " cannot be modified!");
     }
 
     @Override
-    public int getVirtualCores() {
-      return Integer.MAX_VALUE;
+    public void setResources(Map<String, ResourceInformation> resources) {
+      throw new RuntimeException(name + " cannot be modified!");
     }
 
     @Override
-    public void setVirtualCores(int cores) {
-      throw new RuntimeException("UNBOUNDED cannot be modified!");
+    public void setResourceInformation(String resource,
+        ResourceInformation resourceInformation)
+        throws ResourceNotFoundException {
+      throw new RuntimeException(name + " cannot be modified!");
     }
 
     @Override
-    public int compareTo(Resource o) {
-      int diff = Integer.MAX_VALUE - o.getMemory();
-      if (diff == 0) {
-        diff = Integer.MAX_VALUE - o.getVirtualCores();
-      }
-      return diff;
+    public void setResourceValue(String resource, Long value)
+        throws ResourceNotFoundException {
+      throw new RuntimeException(name + " cannot be modified!");
     }
-    
-  };
+
+    private Map<String, ResourceInformation> initResourceMap() {
+      Map<String, ResourceInformation> tmp = new HashMap<>();
+      // Due to backwards compat, the max value for memory and vcores
+      // needs to be Integer.MAX_VALUE
+      int max = resourceValue > Integer.MAX_VALUE ? Integer.MAX_VALUE :
+          resourceValue.intValue();
+      tmp.put(ResourceInformation.MEMORY.getName(), ResourceInformation
+          .newInstance(ResourceInformation.MEMORY.getName(),
+              ResourceInformation.MEMORY_MB.getUnits(), (long) max));
+      tmp.put(ResourceInformation.VCORES.getName(), ResourceInformation
+          .newInstance(ResourceInformation.VCORES.getName(), (long) max));
+      return tmp;
+    }
+
+  }
+
+  private static final Resource UNBOUNDED =
+      new FixedValueResource("UNBOUNDED", Long.MAX_VALUE);
+
+  private static final Resource NONE = new FixedValueResource("NONE", 0L);
 
   public static Resource createResource(int memory) {
     return createResource(memory, (memory > 0) ? 1 : 0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d328f70e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index 479697e..75b89c3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -140,6 +140,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ReservationRequest;
 import org.apache.hadoop.yarn.api.records.ReservationRequests;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -454,6 +455,8 @@ public class TestPBImplRecords {
         "http", "localhost", 8080, "file0"));
     typeValueCache.put(SerializedException.class,
         SerializedException.newInstance(new IOException("exception for 
test")));
+    typeValueCache.put(ResourceInformation.class, ResourceInformation
+        .newInstance("localhost.test/sample", 1l));
     generateByNewInstance(LogAggregationContext.class);
     generateByNewInstance(ApplicationId.class);
     generateByNewInstance(ApplicationAttemptId.class);

Reply via email to