Repository: hadoop Updated Branches: refs/heads/branch-2 7e36b9159 -> 264c06a43
YARN-4832. NM side resource value should get updated if change applied in RM side. Contributed by Junping Du Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/264c06a4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/264c06a4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/264c06a4 Branch: refs/heads/branch-2 Commit: 264c06a4381695002d218f688c85f448d6362ec8 Parents: 7e36b91 Author: Jian He <jia...@apache.org> Authored: Tue May 17 15:02:14 2016 -0700 Committer: Jian He <jia...@apache.org> Committed: Tue May 17 15:02:14 2016 -0700 ---------------------------------------------------------------------- .../protocolrecords/NodeHeartbeatResponse.java | 4 + .../RegisterNodeManagerResponse.java | 6 +- .../impl/pb/NodeHeartbeatResponsePBImpl.java | 46 ++++++++- .../pb/RegisterNodeManagerResponsePBImpl.java | 46 ++++++++- .../yarn_server_common_service_protos.proto | 2 + .../nodemanager/NodeStatusUpdaterImpl.java | 29 +++++- .../resourcemanager/ResourceTrackerService.java | 63 +++++++++++-- .../yarn/server/resourcemanager/MockNM.java | 21 ++++- .../resourcemanager/TestRMAdminService.java | 99 ++++++++++++++++++++ 9 files changed, 288 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/264c06a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index f8a1320..d447a2e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -76,6 +77,9 @@ public interface NodeHeartbeatResponse { boolean getAreNodeLabelsAcceptedByRM(); void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM); + Resource getResource(); + void setResource(Resource resource); + List<Container> getContainersToDecrease(); void addAllContainersToDecrease(Collection<Container> containersToDecrease); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/264c06a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java index c8678f6..fb1da4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -45,7 +46,10 @@ public interface RegisterNodeManagerResponse { void setRMVersion(String version); String getRMVersion(); - + + Resource getResource(); + void setResource(Resource resource); + boolean getAreNodeLabelsAcceptedByRM(); void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/264c06a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index 224e50b..8162177 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -31,14 +31,17 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequest import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; @@ -51,17 +54,17 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; - public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse { NodeHeartbeatResponseProto proto = NodeHeartbeatResponseProto.getDefaultInstance(); NodeHeartbeatResponseProto.Builder builder = null; boolean viaProto = false; - + private List<ContainerId> containersToCleanup = null; private List<ContainerId> containersToBeRemovedFromNM = null; private List<ApplicationId> applicationsToCleanup = null; private Map<ApplicationId, ByteBuffer> systemCredentials = null; + private Resource resource = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -76,7 +79,7 @@ public class NodeHeartbeatResponsePBImpl extends this.proto = proto; viaProto = true; } - + public NodeHeartbeatResponseProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); @@ -111,6 +114,9 @@ public class NodeHeartbeatResponsePBImpl extends if (this.containersToSignal != null) { addContainersToSignalToProto(); } + if (this.resource != null) { + builder.setResource(convertToProtoFormat(this.resource)); + } } private void addSystemCredentialsToProto() { @@ -138,8 +144,8 @@ public class NodeHeartbeatResponsePBImpl extends } viaProto = false; } - - + + @Override public int getResponseId() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; @@ -153,6 +159,28 @@ public class NodeHeartbeatResponsePBImpl extends } @Override + public Resource getResource() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.resource != null) { + return this.resource; + } + if (!p.hasResource()) { + return null; + } + this.resource = convertFromProtoFormat(p.getResource()); + return this.resource; + } + + @Override + public void setResource(Resource resource) { + maybeInitBuilder(); + if (resource == null) { + builder.clearResource(); + } + this.resource = resource; + } + + @Override public MasterKey getContainerTokenMasterKey() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; if (this.containerTokenMasterKey != null) { @@ -533,6 +561,14 @@ public class NodeHeartbeatResponsePBImpl extends return ((ContainerIdPBImpl) t).getProto(); } + private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { + return new ResourcePBImpl(p); + } + + private ResourceProto convertToProtoFormat(Resource t) { + return ((ResourcePBImpl)t).getProto(); + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/264c06a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java index 391d00d..56b675b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java @@ -19,7 +19,10 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto; @@ -30,17 +33,17 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; - public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeManagerResponseProto> implements RegisterNodeManagerResponse { RegisterNodeManagerResponseProto proto = RegisterNodeManagerResponseProto.getDefaultInstance(); RegisterNodeManagerResponseProto.Builder builder = null; boolean viaProto = false; - + private Resource resource = null; + private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; - + private boolean rebuild = false; - + public RegisterNodeManagerResponsePBImpl() { builder = RegisterNodeManagerResponseProto.newBuilder(); } @@ -49,7 +52,7 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan this.proto = proto; viaProto = true; } - + public RegisterNodeManagerResponseProto getProto() { if (rebuild) mergeLocalToProto(); @@ -67,6 +70,9 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan builder.setNmTokenMasterKey( convertToProtoFormat(this.nmTokenMasterKey)); } + if (this.resource != null) { + builder.setResource(convertToProtoFormat(this.resource)); + } } private void mergeLocalToProto() { @@ -86,6 +92,28 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan } @Override + public Resource getResource() { + RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.resource != null) { + return this.resource; + } + if (!p.hasResource()) { + return null; + } + this.resource = convertFromProtoFormat(p.getResource()); + return this.resource; + } + + @Override + public void setResource(Resource resource) { + maybeInitBuilder(); + if (resource == null) { + builder.clearResource(); + } + this.resource = resource; + } + + @Override public MasterKey getContainerTokenMasterKey() { RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder; if (this.containerTokenMasterKey != null) { @@ -217,6 +245,14 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan return ((MasterKeyPBImpl)t).getProto(); } + private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { + return new ResourcePBImpl(p); + } + + private ResourceProto convertToProtoFormat(Resource t) { + return ((ResourcePBImpl)t).getProto(); + } + @Override public boolean getAreNodeLabelsAcceptedByRM() { RegisterNodeManagerResponseProtoOrBuilder p = http://git-wip-us.apache.org/repos/asf/hadoop/blob/264c06a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto index a54bbdb..b532bb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto @@ -48,6 +48,7 @@ message RegisterNodeManagerResponseProto { optional string diagnostics_message = 5; optional string rm_version = 6; optional bool areNodeLabelsAcceptedByRM = 7 [default = false]; + optional ResourceProto resource = 8; } message UnRegisterNodeManagerRequestProto { @@ -85,6 +86,7 @@ message NodeHeartbeatResponseProto { optional bool areNodeLabelsAcceptedByRM = 11 [default = false]; repeated ContainerProto containers_to_decrease = 12; repeated SignalContainerRequestProto containers_to_signal = 13; + optional ResourceProto resource = 14; } message SystemCredentialsForAppsProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/264c06a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 72769bf..98b267a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; +import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; @@ -392,8 +393,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements StringBuilder successfullRegistrationMsg = new StringBuilder(); successfullRegistrationMsg.append("Registered with ResourceManager as ") - .append(this.nodeId).append(" with total resource of ") - .append(this.totalResource); + .append(this.nodeId); + + Resource newResource = regNMResponse.getResource(); + if (newResource != null) { + updateNMResource(newResource); + successfullRegistrationMsg.append(" with updated total resource of ") + .append(this.totalResource); + } else { + successfullRegistrationMsg.append(" with total resource of ") + .append(this.totalResource); + } successfullRegistrationMsg.append(nodeLabelsHandler .verifyRMRegistrationResponseForNodeLabels(regNMResponse)); @@ -489,6 +499,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements return increasedContainers; } + // Update NM's Resource. + private void updateNMResource(Resource resource) { + metrics.addResource(Resources.subtract(resource, totalResource)); + this.totalResource = resource; + } + // Iterate through the NMContext and clone and get all the containers' // statuses. If it's a completed container, add into the // recentlyStoppedContainers collections. @@ -829,6 +845,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements dispatcher.getEventHandler().handle( new CMgrSignalContainersEvent(containersToSignal)); } + // Handling node resource update case. + Resource newResource = response.getResource(); + if (newResource != null) { + updateNMResource(newResource); + if (LOG.isDebugEnabled()) { + LOG.debug("Node's resource is updated to " + + newResource.toString()); + } + } } catch (ConnectException e) { //catch and throw the exception if tried MAX wait time to connect RM dispatcher.getEventHandler().handle( http://git-wip-us.apache.org/repos/asf/hadoop/blob/264c06a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index b0bc565..ed4d5c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -26,6 +26,9 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -97,6 +100,9 @@ public class ResourceTrackerService extends AbstractService implements private final RMContainerTokenSecretManager containerTokenSecretManager; private final NMTokenSecretManagerInRM nmTokenSecretManager; + private final ReadLock readLock; + private final WriteLock writeLock; + private long nextHeartBeatInterval; private Server server; private InetSocketAddress resourceTrackerAddress; @@ -107,7 +113,7 @@ public class ResourceTrackerService extends AbstractService implements private boolean isDistributedNodeLabelsConf; private boolean isDelegatedCentralizedNodeLabelsConf; - private volatile DynamicResourceConfiguration drConf; + private DynamicResourceConfiguration drConf; public ResourceTrackerService(RMContext rmContext, NodesListManager nodesListManager, @@ -120,7 +126,9 @@ public class ResourceTrackerService extends AbstractService implements this.nmLivelinessMonitor = nmLivelinessMonitor; this.containerTokenSecretManager = containerTokenSecretManager; this.nmTokenSecretManager = nmTokenSecretManager; - + ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + this.readLock = lock.readLock(); + this.writeLock = lock.writeLock(); } @Override @@ -160,7 +168,6 @@ public class ResourceTrackerService extends AbstractService implements } loadDynamicResourceConfiguration(conf); - super.serviceInit(conf); } @@ -176,6 +183,9 @@ public class ResourceTrackerService extends AbstractService implements InputStream drInputStream = this.rmContext.getConfigurationProvider() .getConfigurationInputStream(conf, YarnConfiguration.DR_CONFIGURATION_FILE); + // write lock here on drConfig is unnecessary as here get called at + // ResourceTrackerService get initiated and other read and write + // operations haven't started yet. if (drInputStream != null) { this.drConf = new DynamicResourceConfiguration(conf, drInputStream); } else { @@ -192,7 +202,12 @@ public class ResourceTrackerService extends AbstractService implements */ public void updateDynamicResourceConfiguration( DynamicResourceConfiguration conf) { - this.drConf = conf; + this.writeLock.lock(); + try { + this.drConf = conf; + } finally { + this.writeLock.unlock(); + } } @Override @@ -233,6 +248,7 @@ public class ResourceTrackerService extends AbstractService implements if (this.server != null) { this.server.stop(); } + super.serviceStop(); } @@ -331,16 +347,18 @@ public class ResourceTrackerService extends AbstractService implements } // check if node's capacity is load from dynamic-resources.xml - String[] nodes = this.drConf.getNodes(); String nid = nodeId.toString(); - if (nodes != null && Arrays.asList(nodes).contains(nid)) { - capability.setMemory(this.drConf.getMemoryPerNode(nid)); - capability.setVirtualCores(this.drConf.getVcoresPerNode(nid)); + Resource dynamicLoadCapability = loadNodeResourceFromDRConfiguration(nid); + if (dynamicLoadCapability != null) { if (LOG.isDebugEnabled()) { - LOG.debug("Resource for node: " + nid + " is adjusted to " + - capability + " due to settings in dynamic-resources.xml."); + LOG.debug("Resource for node: " + nid + " is adjusted from: " + + capability + " to: " + dynamicLoadCapability + + " due to settings in dynamic-resources.xml."); } + capability = dynamicLoadCapability; + // sync back with new resource. + response.setResource(capability); } // Check if this node has minimum allocations @@ -536,6 +554,15 @@ public class ResourceTrackerService extends AbstractService implements } } + // 6. check if node's capacity is load from dynamic-resources.xml + // if so, send updated resource back to NM. + String nid = nodeId.toString(); + Resource capability = loadNodeResourceFromDRConfiguration(nid); + // sync back with new resource if not null. + if (capability != null) { + nodeHeartBeatResponse.setResource(capability); + } + return nodeHeartBeatResponse; } @@ -622,6 +649,22 @@ public class ResourceTrackerService extends AbstractService implements } } + private Resource loadNodeResourceFromDRConfiguration(String nodeId) { + // check if node's capacity is loaded from dynamic-resources.xml + this.readLock.lock(); + try { + String[] nodes = this.drConf.getNodes(); + if (nodes != null && Arrays.asList(nodes).contains(nodeId)) { + return Resource.newInstance(this.drConf.getMemoryPerNode(nodeId), + this.drConf.getVcoresPerNode(nodeId)); + } else { + return null; + } + } finally { + this.readLock.unlock(); + } + } + /** * resolving the network topology. * @param hostName the hostname of this node. http://git-wip-us.apache.org/repos/asf/hadoop/blob/264c06a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java index 4407fe9..4cec29e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java @@ -50,8 +50,8 @@ public class MockNM { private int responseId; private NodeId nodeId; - private final int memory; - private final int vCores; + private int memory; + private int vCores; private ResourceTrackerService resourceTracker; private int httpPort = 2; private MasterKey currentContainerTokenMasterKey; @@ -142,9 +142,14 @@ public class MockNM { this.currentContainerTokenMasterKey = registrationResponse.getContainerTokenMasterKey(); this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey(); - return registrationResponse; + Resource newResource = registrationResponse.getResource(); + if (newResource != null) { + memory = newResource.getMemory(); + vCores = newResource.getVirtualCores(); + } + return registrationResponse; } - + public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), isHealthy, ++responseId); @@ -211,7 +216,13 @@ public class MockNM { .getKeyId()) { this.currentNMTokenMasterKey = masterKeyFromRM; } - + + Resource newResource = heartbeatResponse.getResource(); + if (newResource != null) { + memory = newResource.getMemory(); + vCores = newResource.getVirtualCores(); + } + return heartbeatResponse; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/264c06a4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index 5c69411..b109639 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -239,6 +239,105 @@ public class TestRMAdminService { } @Test + public void testRefreshNodesResourceWithResourceReturnInRegistration() + throws IOException, YarnException { + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + + //upload default configurations + uploadDefaultConfiguration(); + + MockNM nm = null; + try { + rm = new MockRM(configuration); + rm.init(configuration); + rm.start(); + nm = rm.registerNode("h1:1234", 2048, 2); + } catch(Exception ex) { + fail("Should not get any exceptions"); + } + + NodeId nid = ConverterUtils.toNodeId("h1:1234"); + RMNode ni = rm.getRMContext().getRMNodes().get(nid); + Resource resource = ni.getTotalCapability(); + Assert.assertEquals("<memory:2048, vCores:2>", resource.toString()); + + DynamicResourceConfiguration drConf = + new DynamicResourceConfiguration(); + drConf.set("yarn.resource.dynamic.nodes", "h1:1234"); + drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4"); + drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096"); + uploadConfiguration(drConf, "dynamic-resources.xml"); + + rm.adminService.refreshNodesResources( + RefreshNodesResourcesRequest.newInstance()); + + try { + // register the same node again with original resource. + // validate this will get new resource back; + nm.registerNode(); + } catch (Exception ex) { + fail("Should not get any exceptions"); + } + + RMNode niAfter = rm.getRMContext().getRMNodes().get(nid); + Resource resourceAfter = niAfter.getTotalCapability(); + Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString()); + + Assert.assertEquals(4096, nm.getMemory()); + Assert.assertEquals(4, nm.getvCores()); + } + + @Test + public void testRefreshNodesResourceWithResourceReturnInHeartbeat() + throws IOException, YarnException { + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + + //upload default configurations + uploadDefaultConfiguration(); + + MockNM nm = null; + try { + rm = new MockRM(configuration); + rm.init(configuration); + rm.start(); + nm = rm.registerNode("h1:1234", 2048, 2); + } catch(Exception ex) { + fail("Should not get any exceptions"); + } + + NodeId nid = ConverterUtils.toNodeId("h1:1234"); + RMNode ni = rm.getRMContext().getRMNodes().get(nid); + Resource resource = ni.getTotalCapability(); + Assert.assertEquals("<memory:2048, vCores:2>", resource.toString()); + + DynamicResourceConfiguration drConf = + new DynamicResourceConfiguration(); + drConf.set("yarn.resource.dynamic.nodes", "h1:1234"); + drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4"); + drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096"); + uploadConfiguration(drConf, "dynamic-resources.xml"); + + rm.adminService.refreshNodesResources( + RefreshNodesResourcesRequest.newInstance()); + + try { + // NM-RM heartbeat, validate that this will get new resource back. + nm.nodeHeartbeat(true); + } catch (Exception ex) { + fail("Should not get any exceptions"); + } + + RMNode niAfter = rm.getRMContext().getRMNodes().get(nid); + Resource resourceAfter = niAfter.getTotalCapability(); + Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString()); + + Assert.assertEquals(4096, nm.getMemory()); + Assert.assertEquals(4, nm.getvCores()); + } + + @Test public void testResourcePersistentForNMRegistrationWithNewResource() throws IOException, YarnException { configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org