YARN-4832. NM side resource value should get updated if change applied in RM 
side. Contributed by Junping Du


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fa3bc340
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fa3bc340
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fa3bc340

Branch: refs/heads/HDFS-7240
Commit: fa3bc3405dc2f8497faab45ba5c4de2caf4c29bc
Parents: 7cd5ae6
Author: Jian He <jia...@apache.org>
Authored: Tue May 17 12:51:08 2016 -0700
Committer: Jian He <jia...@apache.org>
Committed: Tue May 17 12:52:19 2016 -0700

----------------------------------------------------------------------
 .../protocolrecords/NodeHeartbeatResponse.java  |  4 +
 .../RegisterNodeManagerResponse.java            |  6 +-
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    | 46 ++++++++-
 .../pb/RegisterNodeManagerResponsePBImpl.java   | 46 ++++++++-
 .../yarn_server_common_service_protos.proto     |  2 +
 .../nodemanager/NodeStatusUpdaterImpl.java      | 29 +++++-
 .../resourcemanager/ResourceTrackerService.java | 67 ++++++++++---
 .../yarn/server/resourcemanager/MockNM.java     | 21 ++++-
 .../resourcemanager/TestRMAdminService.java     | 99 ++++++++++++++++++++
 9 files changed, 290 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa3bc340/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index bd04a0d..386353a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -27,6 +27,7 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -77,6 +78,9 @@ public interface NodeHeartbeatResponse {
   boolean getAreNodeLabelsAcceptedByRM();
   void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
 
+  Resource getResource();
+  void setResource(Resource resource);
+
   List<Container> getContainersToDecrease();
   void addAllContainersToDecrease(Collection<Container> containersToDecrease);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa3bc340/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
index c8678f6..fb1da4c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 
@@ -45,7 +46,10 @@ public interface RegisterNodeManagerResponse {
   void setRMVersion(String version);
 
   String getRMVersion();
-  
+
+  Resource getResource();
+  void setResource(Resource resource);
+
   boolean getAreNodeLabelsAcceptedByRM();
   void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa3bc340/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index a259158..3422697 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -31,14 +31,17 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequest
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
 import 
org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
@@ -54,17 +57,17 @@ import 
org.apache.hadoop.yarn.server.api.records.impl.pb.ContainerQueuingLimitPB
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
 
-    
 public class NodeHeartbeatResponsePBImpl extends
     ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse {
   NodeHeartbeatResponseProto proto = 
NodeHeartbeatResponseProto.getDefaultInstance();
   NodeHeartbeatResponseProto.Builder builder = null;
   boolean viaProto = false;
-  
+
   private List<ContainerId> containersToCleanup = null;
   private List<ContainerId> containersToBeRemovedFromNM = null;
   private List<ApplicationId> applicationsToCleanup = null;
   private Map<ApplicationId, ByteBuffer> systemCredentials = null;
+  private Resource resource = null;
 
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
@@ -80,7 +83,7 @@ public class NodeHeartbeatResponsePBImpl extends
     this.proto = proto;
     viaProto = true;
   }
-  
+
   public NodeHeartbeatResponseProto getProto() {
     mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
@@ -119,6 +122,9 @@ public class NodeHeartbeatResponsePBImpl extends
     if (this.containersToSignal != null) {
       addContainersToSignalToProto();
     }
+    if (this.resource != null) {
+      builder.setResource(convertToProtoFormat(this.resource));
+    }
   }
 
   private void addSystemCredentialsToProto() {
@@ -146,8 +152,8 @@ public class NodeHeartbeatResponsePBImpl extends
     }
     viaProto = false;
   }
-    
-  
+
+
   @Override
   public int getResponseId() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
@@ -161,6 +167,28 @@ public class NodeHeartbeatResponsePBImpl extends
   }
 
   @Override
+  public Resource getResource() {
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.resource != null) {
+      return this.resource;
+    }
+    if (!p.hasResource()) {
+      return null;
+    }
+    this.resource = convertFromProtoFormat(p.getResource());
+    return this.resource;
+  }
+
+  @Override
+  public void setResource(Resource resource) {
+    maybeInitBuilder();
+    if (resource == null) {
+      builder.clearResource();
+    }
+    this.resource = resource;
+  }
+
+  @Override
   public MasterKey getContainerTokenMasterKey() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
     if (this.containerTokenMasterKey != null) {
@@ -565,6 +593,14 @@ public class NodeHeartbeatResponsePBImpl extends
     return ((ContainerIdPBImpl) t).getProto();
   }
 
+  private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
+    return new ResourcePBImpl(p);
+  }
+
+  private ResourceProto convertToProtoFormat(Resource t) {
+    return ((ResourcePBImpl)t).getProto();
+  }
+
   private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
     return new ApplicationIdPBImpl(p);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa3bc340/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
index 391d00d..56b675b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java
@@ -19,7 +19,10 @@
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
 
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto;
@@ -30,17 +33,17 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
 
-    
 public class RegisterNodeManagerResponsePBImpl extends 
ProtoBase<RegisterNodeManagerResponseProto> implements 
RegisterNodeManagerResponse {
   RegisterNodeManagerResponseProto proto = 
RegisterNodeManagerResponseProto.getDefaultInstance();
   RegisterNodeManagerResponseProto.Builder builder = null;
   boolean viaProto = false;
-  
+  private Resource resource = null;
+
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
-  
+
   private boolean rebuild = false;
-  
+
   public RegisterNodeManagerResponsePBImpl() {
     builder = RegisterNodeManagerResponseProto.newBuilder();
   }
@@ -49,7 +52,7 @@ public class RegisterNodeManagerResponsePBImpl extends 
ProtoBase<RegisterNodeMan
     this.proto = proto;
     viaProto = true;
   }
-  
+
   public RegisterNodeManagerResponseProto getProto() {
     if (rebuild)
       mergeLocalToProto();
@@ -67,6 +70,9 @@ public class RegisterNodeManagerResponsePBImpl extends 
ProtoBase<RegisterNodeMan
       builder.setNmTokenMasterKey(
           convertToProtoFormat(this.nmTokenMasterKey));
     }
+    if (this.resource != null) {
+      builder.setResource(convertToProtoFormat(this.resource));
+    }
   }
 
   private void mergeLocalToProto() {
@@ -86,6 +92,28 @@ public class RegisterNodeManagerResponsePBImpl extends 
ProtoBase<RegisterNodeMan
   }
 
   @Override
+  public Resource getResource() {
+    RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.resource != null) {
+      return this.resource;
+    }
+    if (!p.hasResource()) {
+      return null;
+    }
+    this.resource = convertFromProtoFormat(p.getResource());
+    return this.resource;
+  }
+
+  @Override
+  public void setResource(Resource resource) {
+    maybeInitBuilder();
+    if (resource == null) {
+      builder.clearResource();
+    }
+    this.resource = resource;
+  }
+
+  @Override
   public MasterKey getContainerTokenMasterKey() {
     RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
     if (this.containerTokenMasterKey != null) {
@@ -217,6 +245,14 @@ public class RegisterNodeManagerResponsePBImpl extends 
ProtoBase<RegisterNodeMan
     return ((MasterKeyPBImpl)t).getProto();
   }
 
+  private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
+    return new ResourcePBImpl(p);
+  }
+
+  private ResourceProto convertToProtoFormat(Resource t) {
+    return ((ResourcePBImpl)t).getProto();
+  }
+
   @Override
   public boolean getAreNodeLabelsAcceptedByRM() {
     RegisterNodeManagerResponseProtoOrBuilder p =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa3bc340/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index a977653..7d39d30 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -63,6 +63,7 @@ message RegisterNodeManagerResponseProto {
   optional string diagnostics_message = 5;
   optional string rm_version = 6;
   optional bool areNodeLabelsAcceptedByRM = 7 [default = false];
+  optional ResourceProto resource = 8;
 }
 
 message UnRegisterNodeManagerRequestProto {
@@ -101,6 +102,7 @@ message NodeHeartbeatResponseProto {
   repeated ContainerProto containers_to_decrease = 12;
   repeated SignalContainerRequestProto containers_to_signal = 13;
   optional ContainerQueuingLimitProto container_queuing_limit = 14;
+  optional ResourceProto resource = 15;
 }
 
 message ContainerQueuingLimitProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa3bc340/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index e02aa0a..f53e6c2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -88,6 +88,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.YarnVersionInfo;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -397,8 +398,17 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
 
     StringBuilder successfullRegistrationMsg = new StringBuilder();
     successfullRegistrationMsg.append("Registered with ResourceManager as ")
-        .append(this.nodeId).append(" with total resource of ")
-        .append(this.totalResource);
+        .append(this.nodeId);
+
+    Resource newResource = regNMResponse.getResource();
+    if (newResource != null) {
+      updateNMResource(newResource);
+      successfullRegistrationMsg.append(" with updated total resource of ")
+          .append(this.totalResource);
+    } else {
+      successfullRegistrationMsg.append(" with total resource of ")
+          .append(this.totalResource);
+    }
 
     successfullRegistrationMsg.append(nodeLabelsHandler
         .verifyRMRegistrationResponseForNodeLabels(regNMResponse));
@@ -498,6 +508,12 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
     return increasedContainers;
   }
 
+  // Update NM's Resource.
+  private void updateNMResource(Resource resource) {
+    metrics.addResource(Resources.subtract(resource, totalResource));
+    this.totalResource = resource;
+  }
+
   // Iterate through the NMContext and clone and get all the containers'
   // statuses. If it's a completed container, add into the
   // recentlyStoppedContainers collections.
@@ -880,6 +896,15 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
                 context.getContainerManager().updateQueuingLimit(queuingLimit);
               }
             }
+            // Handling node resource update case.
+            Resource newResource = response.getResource();
+            if (newResource != null) {
+              updateNMResource(newResource);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Node's resource is updated to " +
+                    newResource.toString());
+              }
+            }
           } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect 
RM
             dispatcher.getEventHandler().handle(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa3bc340/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index d306b60..f8c9258 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -26,6 +26,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -97,6 +100,9 @@ public class ResourceTrackerService extends AbstractService 
implements
   private final RMContainerTokenSecretManager containerTokenSecretManager;
   private final NMTokenSecretManagerInRM nmTokenSecretManager;
 
+  private final ReadLock readLock;
+  private final WriteLock writeLock;
+
   private long nextHeartBeatInterval;
   private Server server;
   private InetSocketAddress resourceTrackerAddress;
@@ -107,7 +113,7 @@ public class ResourceTrackerService extends AbstractService 
implements
 
   private boolean isDistributedNodeLabelsConf;
   private boolean isDelegatedCentralizedNodeLabelsConf;
-  private volatile DynamicResourceConfiguration drConf;
+  private DynamicResourceConfiguration drConf;
 
   public ResourceTrackerService(RMContext rmContext,
       NodesListManager nodesListManager,
@@ -120,7 +126,9 @@ public class ResourceTrackerService extends AbstractService 
implements
     this.nmLivelinessMonitor = nmLivelinessMonitor;
     this.containerTokenSecretManager = containerTokenSecretManager;
     this.nmTokenSecretManager = nmTokenSecretManager;
-
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    this.readLock = lock.readLock();
+    this.writeLock = lock.writeLock();
   }
 
   @Override
@@ -160,7 +168,6 @@ public class ResourceTrackerService extends AbstractService 
implements
     }
 
     loadDynamicResourceConfiguration(conf);
-
     super.serviceInit(conf);
   }
 
@@ -176,6 +183,9 @@ public class ResourceTrackerService extends AbstractService 
implements
       InputStream drInputStream = this.rmContext.getConfigurationProvider()
           .getConfigurationInputStream(conf,
           YarnConfiguration.DR_CONFIGURATION_FILE);
+      // write lock here on drConfig is unnecessary as here get called at
+      // ResourceTrackerService get initiated and other read and write
+      // operations haven't started yet.
       if (drInputStream != null) {
         this.drConf = new DynamicResourceConfiguration(conf, drInputStream);
       } else {
@@ -192,7 +202,12 @@ public class ResourceTrackerService extends 
AbstractService implements
    */
   public void updateDynamicResourceConfiguration(
       DynamicResourceConfiguration conf) {
-    this.drConf = conf;
+    this.writeLock.lock();
+    try {
+      this.drConf = conf;
+    } finally {
+      this.writeLock.unlock();
+    }
   }
 
   @Override
@@ -233,6 +248,7 @@ public class ResourceTrackerService extends AbstractService 
implements
     if (this.server != null) {
       this.server.stop();
     }
+
     super.serviceStop();
   }
 
@@ -331,16 +347,18 @@ public class ResourceTrackerService extends 
AbstractService implements
     }
 
     // check if node's capacity is load from dynamic-resources.xml
-    String[] nodes = this.drConf.getNodes();
     String nid = nodeId.toString();
 
-    if (nodes != null && Arrays.asList(nodes).contains(nid)) {
-      capability.setMemory(this.drConf.getMemoryPerNode(nid));
-      capability.setVirtualCores(this.drConf.getVcoresPerNode(nid));
+    Resource dynamicLoadCapability = loadNodeResourceFromDRConfiguration(nid);
+    if (dynamicLoadCapability != null) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Resource for node: " + nid + " is adjusted to " +
-            capability + " due to settings in dynamic-resources.xml.");
+        LOG.debug("Resource for node: " + nid + " is adjusted from: " +
+            capability + " to: " + dynamicLoadCapability +
+            " due to settings in dynamic-resources.xml.");
       }
+      capability = dynamicLoadCapability;
+      // sync back with new resource.
+      response.setResource(capability);
     }
 
     // Check if this node has minimum allocations
@@ -536,8 +554,17 @@ public class ResourceTrackerService extends 
AbstractService implements
       }
     }
 
-    // 6. Send Container Queuing Limits back to the Node. This will be used by
-    //    the node to truncate the number of Containers queued for execution.
+    // 6. check if node's capacity is load from dynamic-resources.xml
+    // if so, send updated resource back to NM.
+    String nid = nodeId.toString();
+    Resource capability = loadNodeResourceFromDRConfiguration(nid);
+    // sync back with new resource if not null.
+    if (capability != null) {
+      nodeHeartBeatResponse.setResource(capability);
+    }
+
+    // 7. Send Container Queuing Limits back to the Node. This will be used by
+    // the node to truncate the number of Containers queued for execution.
     if (this.rmContext.getNodeManagerQueueLimitCalculator() != null) {
       nodeHeartBeatResponse.setContainerQueuingLimit(
           this.rmContext.getNodeManagerQueueLimitCalculator()
@@ -629,6 +656,22 @@ public class ResourceTrackerService extends 
AbstractService implements
     }
   }
 
+  private Resource loadNodeResourceFromDRConfiguration(String nodeId) {
+    // check if node's capacity is loaded from dynamic-resources.xml
+    this.readLock.lock();
+    try {
+      String[] nodes = this.drConf.getNodes();
+      if (nodes != null && Arrays.asList(nodes).contains(nodeId)) {
+        return Resource.newInstance(this.drConf.getMemoryPerNode(nodeId),
+            this.drConf.getVcoresPerNode(nodeId));
+      } else {
+        return null;
+      }
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
   /**
    * resolving the network topology.
    * @param hostName the hostname of this node.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa3bc340/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
index 4407fe9..4cec29e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
@@ -50,8 +50,8 @@ public class MockNM {
 
   private int responseId;
   private NodeId nodeId;
-  private final int memory;
-  private final int vCores;
+  private int memory;
+  private int vCores;
   private ResourceTrackerService resourceTracker;
   private int httpPort = 2;
   private MasterKey currentContainerTokenMasterKey;
@@ -142,9 +142,14 @@ public class MockNM {
     this.currentContainerTokenMasterKey =
         registrationResponse.getContainerTokenMasterKey();
     this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
-    return registrationResponse;    
+    Resource newResource = registrationResponse.getResource();
+    if (newResource != null) {
+      memory = newResource.getMemory();
+      vCores = newResource.getVirtualCores();
+    }
+    return registrationResponse;
   }
-  
+
   public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws 
Exception {
     return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
         isHealthy, ++responseId);
@@ -211,7 +216,13 @@ public class MockNM {
             .getKeyId()) {
       this.currentNMTokenMasterKey = masterKeyFromRM;
     }
-    
+
+    Resource newResource = heartbeatResponse.getResource();
+    if (newResource != null) {
+      memory = newResource.getMemory();
+      vCores = newResource.getVirtualCores();
+    }
+
     return heartbeatResponse;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa3bc340/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
index 5c69411..b109639 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java
@@ -239,6 +239,105 @@ public class TestRMAdminService {
   }
 
   @Test
+  public void testRefreshNodesResourceWithResourceReturnInRegistration()
+      throws IOException, YarnException {
+    configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+        "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
+
+    //upload default configurations
+    uploadDefaultConfiguration();
+
+    MockNM nm = null;
+    try {
+      rm = new MockRM(configuration);
+      rm.init(configuration);
+      rm.start();
+      nm = rm.registerNode("h1:1234", 2048, 2);
+    } catch(Exception ex) {
+      fail("Should not get any exceptions");
+    }
+
+    NodeId nid = ConverterUtils.toNodeId("h1:1234");
+    RMNode ni = rm.getRMContext().getRMNodes().get(nid);
+    Resource resource = ni.getTotalCapability();
+    Assert.assertEquals("<memory:2048, vCores:2>", resource.toString());
+
+    DynamicResourceConfiguration drConf =
+        new DynamicResourceConfiguration();
+    drConf.set("yarn.resource.dynamic.nodes", "h1:1234");
+    drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4");
+    drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096");
+    uploadConfiguration(drConf, "dynamic-resources.xml");
+
+    rm.adminService.refreshNodesResources(
+        RefreshNodesResourcesRequest.newInstance());
+
+    try {
+      // register the same node again with original resource.
+      // validate this will get new resource back;
+      nm.registerNode();
+    } catch (Exception ex) {
+      fail("Should not get any exceptions");
+    }
+
+    RMNode niAfter = rm.getRMContext().getRMNodes().get(nid);
+    Resource resourceAfter = niAfter.getTotalCapability();
+    Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
+
+    Assert.assertEquals(4096, nm.getMemory());
+    Assert.assertEquals(4, nm.getvCores());
+  }
+
+  @Test
+  public void testRefreshNodesResourceWithResourceReturnInHeartbeat()
+      throws IOException, YarnException {
+    configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+        "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
+
+    //upload default configurations
+    uploadDefaultConfiguration();
+
+    MockNM nm = null;
+    try {
+      rm = new MockRM(configuration);
+      rm.init(configuration);
+      rm.start();
+      nm = rm.registerNode("h1:1234", 2048, 2);
+    } catch(Exception ex) {
+      fail("Should not get any exceptions");
+    }
+
+    NodeId nid = ConverterUtils.toNodeId("h1:1234");
+    RMNode ni = rm.getRMContext().getRMNodes().get(nid);
+    Resource resource = ni.getTotalCapability();
+    Assert.assertEquals("<memory:2048, vCores:2>", resource.toString());
+
+    DynamicResourceConfiguration drConf =
+        new DynamicResourceConfiguration();
+    drConf.set("yarn.resource.dynamic.nodes", "h1:1234");
+    drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4");
+    drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096");
+    uploadConfiguration(drConf, "dynamic-resources.xml");
+
+    rm.adminService.refreshNodesResources(
+        RefreshNodesResourcesRequest.newInstance());
+
+    try {
+      // NM-RM heartbeat, validate that this will get new resource back.
+      nm.nodeHeartbeat(true);
+    } catch (Exception ex) {
+      fail("Should not get any exceptions");
+    }
+
+    RMNode niAfter = rm.getRMContext().getRMNodes().get(nid);
+    Resource resourceAfter = niAfter.getTotalCapability();
+    Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
+
+    Assert.assertEquals(4096, nm.getMemory());
+    Assert.assertEquals(4, nm.getvCores());
+  }
+
+  @Test
   public void testResourcePersistentForNMRegistrationWithNewResource()
       throws IOException, YarnException {
     configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to