YARN-7842. PB changes to carry node-attributes in NM heartbeat. Contributed by 
Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/843dac46
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/843dac46
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/843dac46

Branch: refs/heads/YARN-3409
Commit: 843dac46ff0d125941e994e3df7039e6e9e1e9c2
Parents: bc66343
Author: Weiwei Yang <w...@apache.org>
Authored: Wed Jan 31 20:28:41 2018 +0800
Committer: Sunil G <sun...@apache.org>
Committed: Fri Jul 27 16:38:12 2018 +0530

----------------------------------------------------------------------
 .../protocolrecords/NodeHeartbeatRequest.java   | 17 +++++++
 .../impl/pb/NodeHeartbeatRequestPBImpl.java     | 52 ++++++++++++++++++++
 .../yarn_server_common_service_protos.proto     |  5 ++
 .../protocolrecords/TestProtocolRecords.java    | 12 +++++
 4 files changed, 86 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/843dac46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index f238f79..4f99225 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -28,6 +28,7 @@ import 
org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
 
 public abstract class NodeHeartbeatRequest {
   
@@ -61,6 +62,18 @@ public abstract class NodeHeartbeatRequest {
     return nodeHeartbeatRequest;
   }
 
+  public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
+      MasterKey lastKnownContainerTokenMasterKey,
+      MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels,
+      Set<NodeAttribute> nodeAttributes,
+      Map<ApplicationId, AppCollectorData> registeringCollectors) {
+    NodeHeartbeatRequest request = NodeHeartbeatRequest
+        .newInstance(nodeStatus, lastKnownContainerTokenMasterKey,
+            lastKnownNMTokenMasterKey, nodeLabels, registeringCollectors);
+    request.setNodeAttributes(nodeAttributes);
+    return request;
+  }
+
   public abstract NodeStatus getNodeStatus();
   public abstract void setNodeStatus(NodeStatus status);
 
@@ -85,4 +98,8 @@ public abstract class NodeHeartbeatRequest {
 
   public abstract void setRegisteringCollectors(Map<ApplicationId,
       AppCollectorData> appCollectorsMap);
+
+  public abstract Set<NodeAttribute> getNodeAttributes();
+
+  public abstract void setNodeAttributes(Set<NodeAttribute> nodeAttributes);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/843dac46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 1ffd223..c59127a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -27,6 +27,9 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeAttributePBImpl;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos;
 import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
@@ -36,6 +39,7 @@ import 
org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
@@ -60,6 +64,7 @@ public class NodeHeartbeatRequestPBImpl extends 
NodeHeartbeatRequest {
   private MasterKey lastKnownContainerTokenMasterKey = null;
   private MasterKey lastKnownNMTokenMasterKey = null;
   private Set<NodeLabel> labels = null;
+  private Set<NodeAttribute> attributes = null;
   private List<LogAggregationReport> logAggregationReportsForApps = null;
 
   private Map<ApplicationId, AppCollectorData> registeringCollectors = null;
@@ -115,6 +120,15 @@ public class NodeHeartbeatRequestPBImpl extends 
NodeHeartbeatRequest {
       }
       builder.setNodeLabels(newBuilder.build());
     }
+    if (this.attributes != null) {
+      builder.clearNodeAttributes();
+      YarnServerCommonServiceProtos.NodeAttributesProto.Builder attBuilder =
+          YarnServerCommonServiceProtos.NodeAttributesProto.newBuilder();
+      for (NodeAttribute attribute : attributes) {
+        attBuilder.addNodeAttributes(convertToProtoFormat(attribute));
+      }
+      builder.setNodeAttributes(attBuilder.build());
+    }
     if (this.logAggregationReportsForApps != null) {
       addLogAggregationStatusForAppsToProto();
     }
@@ -372,6 +386,44 @@ public class NodeHeartbeatRequestPBImpl extends 
NodeHeartbeatRequest {
   }
 
   @Override
+  public Set<NodeAttribute> getNodeAttributes() {
+    initNodeAttributes();
+    return this.attributes;
+  }
+
+  private void initNodeAttributes() {
+    if (this.attributes != null) {
+      return;
+    }
+    NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (!p.hasNodeAttributes()) {
+      return;
+    }
+    YarnServerCommonServiceProtos.NodeAttributesProto nodeAttributes =
+        p.getNodeAttributes();
+    attributes = new HashSet<>();
+    for (NodeAttributeProto attributeProto :
+        nodeAttributes.getNodeAttributesList()) {
+      attributes.add(convertFromProtoFormat(attributeProto));
+    }
+  }
+
+  @Override
+  public void setNodeAttributes(Set<NodeAttribute> nodeAttributes) {
+    maybeInitBuilder();
+    builder.clearNodeAttributes();
+    this.attributes = nodeAttributes;
+  }
+
+  private NodeAttributePBImpl convertFromProtoFormat(NodeAttributeProto p) {
+    return new NodeAttributePBImpl(p);
+  }
+
+  private NodeAttributeProto convertToProtoFormat(NodeAttribute attribute) {
+    return ((NodeAttributePBImpl) attribute).getProto();
+  }
+
+  @Override
   public List<LogAggregationReport> getLogAggregationReportsForApps() {
     if (this.logAggregationReportsForApps != null) {
       return this.logAggregationReportsForApps;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/843dac46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 387ddb4..0b8c4a3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -58,6 +58,10 @@ message NodeLabelsProto {
   repeated NodeLabelProto nodeLabels = 1;
 }
 
+message NodeAttributesProto {
+  repeated NodeAttributeProto nodeAttributes = 1;
+}
+
 message RegisterNodeManagerRequestProto {
   optional NodeIdProto node_id = 1;
   optional int32 http_port = 3;
@@ -95,6 +99,7 @@ message NodeHeartbeatRequestProto {
   optional NodeLabelsProto nodeLabels = 4;
   repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
   repeated AppCollectorDataProto registering_collectors = 6;
+  optional NodeAttributesProto nodeAttributes = 7;
 }
 
 message LogAggregationReportProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/843dac46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
index 74f19e5..e6e79d3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
@@ -24,7 +24,9 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import com.google.common.collect.Sets;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
@@ -39,6 +41,8 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.NodeAttribute;
+import org.apache.hadoop.yarn.api.records.NodeAttributeType;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
@@ -173,6 +177,13 @@ public class TestProtocolRecords {
     nodeStatus.setOpportunisticContainersStatus(opportunisticContainersStatus);
     record.setNodeStatus(nodeStatus);
 
+    Set<NodeAttribute> attributeSet =
+        Sets.newHashSet(NodeAttribute.newInstance("attributeA",
+                NodeAttributeType.STRING, "valueA"),
+            NodeAttribute.newInstance("attributeB",
+                NodeAttributeType.STRING, "valueB"));
+    record.setNodeAttributes(attributeSet);
+
     NodeHeartbeatRequestPBImpl pb = new
         NodeHeartbeatRequestPBImpl(
         ((NodeHeartbeatRequestPBImpl) record).getProto());
@@ -183,6 +194,7 @@ public class TestProtocolRecords {
     Assert.assertEquals(321,
         pb.getNodeStatus().getOpportunisticContainersStatus()
             .getWaitQueueLength());
+    Assert.assertEquals(2, pb.getNodeAttributes().size());
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to