YARN-4512 [YARN-1011]. Provide a knob to turn on over-allocation. (kasha)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/96b35f47
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/96b35f47
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/96b35f47

Branch: refs/heads/YARN-1011
Commit: 96b35f47c9ba3ce074829eb8f9e4a70b541af72f
Parents: 5cf3741
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Fri Jan 29 14:31:45 2016 -0800
Committer: Haibo Chen <haiboc...@apache.org>
Committed: Mon Jun 4 16:25:31 2018 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  13 ++-
 .../src/main/resources/yarn-default.xml         |  21 ++++
 .../RegisterNodeManagerRequest.java             |  14 ++-
 .../pb/RegisterNodeManagerRequestPBImpl.java    |  48 ++++++++-
 .../server/api/records/OverAllocationInfo.java  |  45 ++++++++
 .../server/api/records/ResourceThresholds.java  |  45 ++++++++
 .../impl/pb/OverAllocationInfoPBImpl.java       | 106 +++++++++++++++++++
 .../impl/pb/ResourceThresholdsPBImpl.java       |  93 ++++++++++++++++
 .../yarn_server_common_service_protos.proto     |  10 ++
 .../hadoop/yarn/server/nodemanager/Context.java |   5 +
 .../yarn/server/nodemanager/NodeManager.java    |  17 +++
 .../nodemanager/NodeStatusUpdaterImpl.java      |   7 +-
 .../monitor/ContainersMonitorImpl.java          |  34 ++++++
 .../amrmproxy/BaseAMRMProxyTest.java            |  11 ++
 14 files changed, 457 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index f7f82f8..c373228 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2096,7 +2096,6 @@ public class YarnConfiguration extends Configuration {
   public static final boolean 
DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE =
       false;
 
-
   // Configurations for applicaiton life time monitor feature
   public static final String RM_APPLICATION_MONITOR_INTERVAL_MS =
       RM_PREFIX + "application-timeouts.monitor.interval-ms";
@@ -2104,6 +2103,18 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_RM_APPLICATION_MONITOR_INTERVAL_MS =
       3000;
 
+  /** Overallocation (= allocation based on utilization) configs. */
+  public static final String NM_OVERALLOCATION_ALLOCATION_THRESHOLD =
+      NM_PREFIX + "overallocation.allocation-threshold";
+  public static final float DEFAULT_NM_OVERALLOCATION_ALLOCATION_THRESHOLD
+      = 0f;
+  @Private
+  public static final float MAX_NM_OVERALLOCATION_ALLOCATION_THRESHOLD = 0.95f;
+  public static final String NM_OVERALLOCATION_PREEMPTION_THRESHOLD =
+      NM_PREFIX + "overallocation.preemption-threshold";
+  public static final float DEFAULT_NM_OVERALLOCATION_PREEMPTION_THRESHOLD
+      = 0f;
+
   /**
    * Interval of time the linux container executor should try cleaning up
    * cgroups entry when cleaning up a container. This is required due to what 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index b0ffc48..ee41734 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1726,6 +1726,27 @@
   </property>
 
   <property>
+    <description>The extent of over-allocation (container-allocation based on
+      current utilization instead of prior allocation) allowed on this node,
+      expressed as a float between 0 and 0.95. By default, over-allocation is
+      turned off (value = 0). When turned on, the node allows running
+      OPPORTUNISTIC containers when the aggregate utilization is under the
+      value specified here multiplied by the node's advertised capacity.
+    </description>
+    <name>yarn.nodemanager.overallocation.allocation-threshold</name>
+    <value>0f</value>
+  </property>
+
+  <property>
+    <description>When a node is over-allocated to improve utilization by
+      running OPPORTUNISTIC containers, this config captures the utilization
+      beyond which OPPORTUNISTIC containers should start getting preempted.
+    </description>
+    <name>yarn.nodemanager.overallocation.preemption-threshold</name>
+    <value>1</value>
+  </property>
+
+  <property>
     <description>This configuration setting determines the capabilities
       assigned to docker containers when they are launched. While these may not
       be case-sensitive from a docker perspective, it is best to keep these

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
index ff50330..66c0a98 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import org.apache.hadoop.yarn.util.Records;
 
 public abstract class RegisterNodeManagerRequest {
@@ -42,14 +43,14 @@ public abstract class RegisterNodeManagerRequest {
       List<NMContainerStatus> containerStatuses,
       List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels) {
     return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
-        containerStatuses, runningApplications, nodeLabels, null);
+        containerStatuses, runningApplications, nodeLabels, null, null);
   }
 
   public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
       int httpPort, Resource resource, String nodeManagerVersionId,
       List<NMContainerStatus> containerStatuses,
       List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels,
-      Resource physicalResource) {
+      Resource physicalResource, OverAllocationInfo overAllocationInfo) {
     RegisterNodeManagerRequest request =
         Records.newRecord(RegisterNodeManagerRequest.class);
     request.setHttpPort(httpPort);
@@ -60,9 +61,10 @@ public abstract class RegisterNodeManagerRequest {
     request.setRunningApplications(runningApplications);
     request.setNodeLabels(nodeLabels);
     request.setPhysicalResource(physicalResource);
+    request.setOverAllocationInfo(overAllocationInfo);
     return request;
   }
-  
+
   public abstract NodeId getNodeId();
   public abstract int getHttpPort();
   public abstract Resource getResource();
@@ -70,7 +72,11 @@ public abstract class RegisterNodeManagerRequest {
   public abstract List<NMContainerStatus> getNMContainerStatuses();
   public abstract Set<NodeLabel> getNodeLabels();
   public abstract void setNodeLabels(Set<NodeLabel> nodeLabels);
-  
+
+  public abstract OverAllocationInfo getOverAllocationInfo();
+  public abstract void setOverAllocationInfo(
+      OverAllocationInfo overAllocationInfo);
+
   /**
    * We introduce this here because currently YARN RM doesn't persist nodes 
info
    * for application running. When RM restart happened, we cannot determinate 
if

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
index 02fd20f..802a9fb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
@@ -42,12 +42,15 @@ import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregation
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
+import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.OverAllocationInfoProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-    
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import 
org.apache.hadoop.yarn.server.api.records.impl.pb.OverAllocationInfoPBImpl;
+
 public class RegisterNodeManagerRequestPBImpl extends 
RegisterNodeManagerRequest {
   RegisterNodeManagerRequestProto proto = 
RegisterNodeManagerRequestProto.getDefaultInstance();
   RegisterNodeManagerRequestProto.Builder builder = null;
@@ -58,6 +61,7 @@ public class RegisterNodeManagerRequestPBImpl extends 
RegisterNodeManagerRequest
   private List<NMContainerStatus> containerStatuses = null;
   private List<ApplicationId> runningApplications = null;
   private Set<NodeLabel> labels = null;
+  private OverAllocationInfo overAllocationInfo = null;
 
   private List<LogAggregationReport> logAggregationReportsForApps = null;
 
@@ -107,6 +111,11 @@ public class RegisterNodeManagerRequestPBImpl extends 
RegisterNodeManagerRequest
     if (this.logAggregationReportsForApps != null) {
       addLogAggregationStatusForAppsToProto();
     }
+
+    if (this.overAllocationInfo != null) {
+      builder.setOverAllocationInfo(
+          convertToProtoFormat(this.overAllocationInfo));
+    }
   }
 
   private void addLogAggregationStatusForAppsToProto() {
@@ -387,7 +396,30 @@ public class RegisterNodeManagerRequestPBImpl extends 
RegisterNodeManagerRequest
     builder.clearNodeLabels();
     this.labels = nodeLabels;
   }
-  
+
+  @Override
+  public synchronized OverAllocationInfo getOverAllocationInfo() {
+    RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.overAllocationInfo != null) {
+      return this.overAllocationInfo;
+    }
+    if (!p.hasOverAllocationInfo()) {
+      return null;
+    }
+    this.overAllocationInfo = 
convertFromProtoFormat(p.getOverAllocationInfo());
+    return this.overAllocationInfo;
+  }
+
+  @Override
+  public synchronized void setOverAllocationInfo(
+      OverAllocationInfo overAllocationInfo) {
+    maybeInitBuilder();
+    if (this.overAllocationInfo == null) {
+      builder.clearOverAllocationInfo();
+    }
+    this.overAllocationInfo = overAllocationInfo;
+  }
+
   private synchronized void initNodeLabels() {
     if (this.labels != null) {
       return;
@@ -475,9 +507,19 @@ public class RegisterNodeManagerRequestPBImpl extends 
RegisterNodeManagerRequest
   @Override
   public synchronized void setLogAggregationReportsForApps(
       List<LogAggregationReport> logAggregationStatusForApps) {
-    if(logAggregationStatusForApps == null) {
+    if (logAggregationStatusForApps == null) {
       builder.clearLogAggregationReportsForApps();
     }
     this.logAggregationReportsForApps = logAggregationStatusForApps;
   }
+
+  private static OverAllocationInfoProto convertToProtoFormat(
+      OverAllocationInfo overAllocationInfo) {
+    return ((OverAllocationInfoPBImpl)overAllocationInfo).getProto();
+  }
+
+  private static OverAllocationInfo convertFromProtoFormat(
+      OverAllocationInfoProto proto) {
+    return new OverAllocationInfoPBImpl(proto);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java
new file mode 100644
index 0000000..77952bf
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.server.api.records.impl.pb
+    .OverAllocationInfoPBImpl;
+
+/**
+ * Captures information on how aggressively the scheduler can over-allocate
+ * OPPORTUNISTIC containers on a node. This is node-specific, and is sent on
+ * the wire on each heartbeat.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class OverAllocationInfo {
+  public static OverAllocationInfo newInstance(
+      ResourceThresholds overAllocationThresholds) {
+    OverAllocationInfo info = new OverAllocationInfoPBImpl();
+    info.setOverAllocationThreshold(overAllocationThresholds);
+    return info;
+  }
+
+  public abstract ResourceThresholds getOverAllocationThresholds();
+
+  public abstract void setOverAllocationThreshold(
+      ResourceThresholds resourceThresholds);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java
new file mode 100644
index 0000000..d57706a
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import 
org.apache.hadoop.yarn.server.api.records.impl.pb.ResourceThresholdsPBImpl;
+
+/**
+ * Captures resource thresholds to be used for allocation and preemption
+ * when over-allocation is turned on.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class ResourceThresholds {
+  public static ResourceThresholds newInstance(float threshold) {
+    ResourceThresholds thresholds = new ResourceThresholdsPBImpl();
+    thresholds.setMemoryThreshold(threshold);
+    thresholds.setCpuThreshold(threshold);
+    return thresholds;
+  }
+
+  public abstract float getMemoryThreshold();
+
+  public abstract float getCpuThreshold();
+
+  public abstract void setMemoryThreshold(float memoryThreshold);
+
+  public abstract void setCpuThreshold(float cpuThreshold);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java
new file mode 100644
index 0000000..758f4fb
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.OverAllocationInfoProto;
+import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.OverAllocationInfoProtoOrBuilder;
+import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ResourceThresholdsProto;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+
+public class OverAllocationInfoPBImpl extends OverAllocationInfo {
+  private OverAllocationInfoProto proto =
+      OverAllocationInfoProto.getDefaultInstance();
+  private OverAllocationInfoProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  private ResourceThresholds overAllocationThresholds = null;
+
+  public OverAllocationInfoPBImpl() {
+    builder = OverAllocationInfoProto.newBuilder();
+  }
+
+  public OverAllocationInfoPBImpl(OverAllocationInfoProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public synchronized OverAllocationInfoProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private synchronized void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private synchronized void mergeLocalToBuilder() {
+    if (overAllocationThresholds != null) {
+      builder.setOverAllocationThresholds(
+          convertToProtoFormat(overAllocationThresholds));
+    }
+  }
+
+  private synchronized void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = OverAllocationInfoProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public synchronized ResourceThresholds getOverAllocationThresholds() {
+    OverAllocationInfoProtoOrBuilder p = viaProto ? proto : builder;
+    if (overAllocationThresholds != null) {
+      return overAllocationThresholds;
+    }
+    if (!p.hasOverAllocationThresholds()) {
+      return null;
+    }
+    overAllocationThresholds =
+        convertFromProtoFormat(p.getOverAllocationThresholds());
+    return overAllocationThresholds;
+  }
+
+  @Override
+  public synchronized void setOverAllocationThreshold(
+      ResourceThresholds resourceThresholds) {
+    maybeInitBuilder();
+    if (this.overAllocationThresholds != null) {
+      builder.clearOverAllocationThresholds();
+    }
+    this.overAllocationThresholds = resourceThresholds;
+  }
+
+  private static ResourceThresholdsProto convertToProtoFormat(
+      ResourceThresholds overAllocationThresholds) {
+    return ((ResourceThresholdsPBImpl) overAllocationThresholds).getProto();
+  }
+
+  private static ResourceThresholds convertFromProtoFormat(
+      ResourceThresholdsProto overAllocationThresholdsProto) {
+    return new ResourceThresholdsPBImpl(overAllocationThresholdsProto);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java
new file mode 100644
index 0000000..10fb284
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ResourceThresholdsProto;
+import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ResourceThresholdsProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+
+public class ResourceThresholdsPBImpl extends ResourceThresholds{
+  private ResourceThresholdsProto proto =
+      ResourceThresholdsProto.getDefaultInstance();
+  private ResourceThresholdsProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  public ResourceThresholdsPBImpl() {
+    builder = ResourceThresholdsProto.newBuilder();
+  }
+
+  public ResourceThresholdsPBImpl(ResourceThresholdsProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public synchronized ResourceThresholdsProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private synchronized void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private synchronized void mergeLocalToBuilder() {
+    /*
+     * Right now, we have only memory and cpu thresholds that are floats.
+     * This is a no-op until we add other non-static fields to the proto.
+     */
+  }
+
+  private synchronized void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = ResourceThresholdsProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public synchronized float getMemoryThreshold() {
+    ResourceThresholdsProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getMemory();
+  }
+
+  @Override
+  public synchronized float getCpuThreshold() {
+    ResourceThresholdsProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getCpu();
+  }
+
+  @Override
+  public synchronized void setMemoryThreshold(float memoryThreshold) {
+    maybeInitBuilder();
+    builder.setMemory(memoryThreshold);
+  }
+
+  @Override
+  public synchronized void setCpuThreshold(float cpuThreshold) {
+    maybeInitBuilder();
+    builder.setCpu(cpuThreshold);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 387ddb4..e3822c2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -68,6 +68,7 @@ message RegisterNodeManagerRequestProto {
   optional NodeLabelsProto nodeLabels = 8;
   optional ResourceProto physicalResource = 9;
   repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10;
+  optional OverAllocationInfoProto overAllocationInfo = 11;
 }
 
 message RegisterNodeManagerResponseProto {
@@ -198,3 +199,12 @@ message SCMUploaderCanUploadRequestProto {
 message SCMUploaderCanUploadResponseProto {
   optional bool uploadable = 1;
 }
+
+message OverAllocationInfoProto {
+  optional ResourceThresholdsProto over_allocation_thresholds = 1;
+}
+
+message ResourceThresholdsProto {
+  optional float memory = 1 [default = 0];
+  optional float cpu = 2 [default = 0];
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 84b3915..75d06c4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -31,6 +31,7 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
@@ -137,4 +138,8 @@ public interface Context {
    * @return the NM {@code DeletionService}.
    */
   DeletionService getDeletionService();
+
+  boolean isOverAllocationEnabled();
+
+  OverAllocationInfo getOverAllocationInfo();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 2748a8f..e720602 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -51,6 +51,7 @@ import 
org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import 
org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
@@ -631,6 +632,8 @@ public class NodeManager extends CompositeService
 
     private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
 
+    private OverAllocationInfo overAllocationInfo;
+
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
         LocalDirsHandlerService dirsHandler, ApplicationACLsManager 
aclsManager,
@@ -778,6 +781,20 @@ public class NodeManager extends CompositeService
       this.nodeStatusUpdater = nodeStatusUpdater;
     }
 
+    @Override
+    public boolean isOverAllocationEnabled() {
+      return getOverAllocationInfo() != null;
+    }
+
+    @Override
+    public OverAllocationInfo getOverAllocationInfo() {
+      return this.overAllocationInfo;
+    }
+
+    public void setOverAllocationInfo(OverAllocationInfo overAllocationInfo) {
+      this.overAllocationInfo = overAllocationInfo;
+    }
+
     public boolean isDistributedSchedulingEnabled() {
       return isDistSchedulingEnabled;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 8154723..44f9740 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -376,8 +376,7 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
       RegisterNodeManagerRequest request =
           RegisterNodeManagerRequest.newInstance(nodeId, httpPort, 
totalResource,
               nodeManagerVersionId, containerReports, getRunningApplications(),
-              nodeLabels, physicalResource);
-
+              nodeLabels, physicalResource, context.getOverAllocationInfo());
       if (containerReports != null) {
         LOG.info("Registering with RM using containers :" + containerReports);
       }
@@ -508,8 +507,8 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
         = getIncreasedContainers();
     NodeStatus nodeStatus =
         NodeStatus.newInstance(nodeId, responseId, containersStatuses,
-          createKeepAliveApplicationList(), nodeHealthStatus,
-          containersUtilization, nodeUtilization, increasedContainers);
+            createKeepAliveApplicationList(), nodeHealthStatus,
+            containersUtilization, nodeUtilization, increasedContainers);
 
     nodeStatus.setOpportunisticContainersStatus(
         getOpportunisticContainersStatus());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index bd68dfe..83a2091 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -37,8 +37,11 @@ import 
org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
@@ -108,6 +111,8 @@ public class ContainersMonitorImpl extends AbstractService 
implements
 
   private ResourceUtilization containersUtilization;
 
+  private ResourceThresholds overAllocationPreemptionThresholds;
+
   private volatile boolean stopped = false;
 
   public ContainersMonitorImpl(ContainerExecutor exec,
@@ -211,6 +216,13 @@ public class ContainersMonitorImpl extends AbstractService 
implements
       }
     }
 
+    initializeOverAllocation(conf);
+    if (context.isOverAllocationEnabled()) {
+      pmemCheckEnabled = true;
+      LOG.info("Force enabling physical memory checks because " +
+          "overallocation is enabled");
+    }
+
     containersMonitorEnabled =
         isContainerMonitorEnabled() && monitoringInterval > 0;
     LOG.info("ContainersMonitor enabled: " + containersMonitorEnabled);
@@ -262,6 +274,28 @@ public class ContainersMonitorImpl extends AbstractService 
implements
             pId, processTreeClass, conf);
   }
 
+  private void initializeOverAllocation(Configuration conf) {
+    float overAllocationTreshold = conf.getFloat(
+        YarnConfiguration.NM_OVERALLOCATION_ALLOCATION_THRESHOLD,
+        YarnConfiguration.DEFAULT_NM_OVERALLOCATION_ALLOCATION_THRESHOLD);
+    overAllocationTreshold = Math.min(overAllocationTreshold,
+        YarnConfiguration.MAX_NM_OVERALLOCATION_ALLOCATION_THRESHOLD);
+    overAllocationTreshold = Math.max(0, overAllocationTreshold);
+
+    if (overAllocationTreshold > 0f) {
+      ((NodeManager.NMContext) context).setOverAllocationInfo(
+          OverAllocationInfo.newInstance(
+              ResourceThresholds.newInstance(overAllocationTreshold)));
+
+      float preemptionThreshold = conf.getFloat(
+          YarnConfiguration.NM_OVERALLOCATION_PREEMPTION_THRESHOLD,
+          YarnConfiguration.DEFAULT_NM_OVERALLOCATION_PREEMPTION_THRESHOLD);
+
+      this.overAllocationPreemptionThresholds =
+          ResourceThresholds.newInstance(preemptionThreshold);
+    }
+  }
+
   private boolean isResourceCalculatorAvailable() {
     if (resourceCalculatorPlugin == null) {
       LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + 
this

http://git-wip-us.apache.org/repos/asf/hadoop/blob/96b35f47/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 677732d..86d4f84 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -46,6 +46,7 @@ import 
org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import 
org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@@ -770,6 +771,16 @@ public abstract class BaseAMRMProxyTest {
     }
 
     @Override
+    public boolean isOverAllocationEnabled() {
+      return false;
+    }
+
+    @Override
+    public OverAllocationInfo getOverAllocationInfo() {
+      return null;
+    }
+
+    @Override
     public NodeResourceMonitor getNodeResourceMonitor() {
       return null;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to