YARN-4170. AM need to be notified with priority in AllocateResponse. 
Contributed by Sunil G


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f9da5cdb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f9da5cdb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f9da5cdb

Branch: refs/heads/HDFS-7966
Commit: f9da5cdb2b2dd071fd60fc01ea1edf0f79c0819b
Parents: 4337b26
Author: Jian He <jia...@apache.org>
Authored: Fri Oct 16 15:26:27 2015 -0700
Committer: Jian He <jia...@apache.org>
Committed: Fri Oct 16 15:26:27 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |  3 ++
 .../api/protocolrecords/AllocateResponse.java   | 14 +++++++
 .../src/main/proto/yarn_service_protos.proto    |  1 +
 .../impl/pb/AllocateResponsePBImpl.java         | 40 +++++++++++++++++-
 .../ApplicationMasterService.java               |  4 ++
 .../TestApplicationMasterService.java           | 43 ++++++++++++++++++++
 .../capacity/TestCapacityScheduler.java         | 17 +++-----
 7 files changed, 108 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9da5cdb/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 851870b..93c07d8 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -510,6 +510,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4162. CapacityScheduler: Add resource usage by partition and queue 
capacity 
     by partition to REST API. (Naganarasimha G R via wangda)
 
+    YARN-4170. AM need to be notified with priority in AllocateResponse.
+    (Sunil G via jianhe)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9da5cdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index c363070..d1b2a3a 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.util.Records;
@@ -314,4 +315,17 @@ public abstract class AllocateResponse {
   @Private
   @Unstable
   public abstract void setAMRMToken(Token amRMToken);
+
+  /**
+   * Priority of the application
+   *
+   * @return get application priority
+   */
+  @Public
+  @Unstable
+  public abstract Priority getApplicationPriority();
+
+  @Private
+  @Unstable
+  public abstract void setApplicationPriority(Priority priority);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9da5cdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index a4b9c37..8924eba 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -88,6 +88,7 @@ message AllocateResponseProto {
   repeated ContainerProto increased_containers = 10;
   repeated ContainerProto decreased_containers = 11;
   optional hadoop.common.TokenProto am_rm_token = 12;
+  optional PriorityProto application_priority = 13;
 }
 
 enum SchedulerResourceTypes {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9da5cdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index dd7d1a9..bd460f6 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
@@ -40,6 +41,7 @@ import 
org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
@@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.PreemptionMessageProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
 import 
org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
@@ -72,7 +75,8 @@ public class AllocateResponsePBImpl extends AllocateResponse {
   private List<NodeReport> updatedNodes = null;
   private PreemptionMessage preempt;
   private Token amrmToken = null;
-  
+  private Priority appPriority = null;
+
   public AllocateResponsePBImpl() {
     builder = AllocateResponseProto.newBuilder();
   }
@@ -154,6 +158,9 @@ public class AllocateResponsePBImpl extends 
AllocateResponse {
     if (this.amrmToken != null) {
       builder.setAmRmToken(convertToProtoFormat(this.amrmToken));
     }
+    if (this.appPriority != null) {
+      builder.setApplicationPriority(convertToProtoFormat(this.appPriority));
+    }
   }
 
   private synchronized void mergeLocalToProto() {
@@ -378,6 +385,27 @@ public class AllocateResponsePBImpl extends 
AllocateResponse {
     this.amrmToken = amRMToken;
   }
 
+  @Override
+  public Priority getApplicationPriority() {
+    AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.appPriority != null) {
+      return this.appPriority;
+    }
+    if (!p.hasApplicationPriority()) {
+      return null;
+    }
+    this.appPriority = convertFromProtoFormat(p.getApplicationPriority());
+    return this.appPriority;
+  }
+
+  @Override
+  public void setApplicationPriority(Priority priority) {
+    maybeInitBuilder();
+    if (priority == null)
+      builder.clearApplicationPriority();
+    this.appPriority = priority;
+  }
+
   private synchronized void initLocalIncreasedContainerList() {
     if (this.increasedContainers != null) {
       return;
@@ -644,4 +672,12 @@ public class AllocateResponsePBImpl extends 
AllocateResponse {
   private TokenProto convertToProtoFormat(Token t) {
     return ((TokenPBImpl)t).getProto();
   }
-}  
+
+  private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
+    return new PriorityPBImpl(p);
+  }
+
+  private PriorityProto convertToProtoFormat(Priority t) {
+    return ((PriorityPBImpl)t).getProto();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9da5cdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 87c7bfa..ab94175 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -563,6 +563,10 @@ public class ApplicationMasterService extends 
AbstractService implements
       allocateResponse
           .setPreemptionMessage(generatePreemptionMessage(allocation));
 
+      // Set application priority
+      allocateResponse.setApplicationPriority(app
+          .getApplicationSubmissionContext().getPriority());
+
       // update AMRMToken if the token is rolled-up
       MasterKeyData nextMasterKey =
           this.rmContext.getAMRMTokenSecretManager().getNextMasterKeyData();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9da5cdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index 8fa1ad2..cef1b5f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -437,6 +438,48 @@ public class TestApplicationMasterService {
     }
   }
 
+  @Test(timeout = 300000)
+  public void testPriorityInAllocatedResponse() throws Exception {
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    // Set Max Application Priority as 10
+    conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+
+    // Register node1
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
+
+    // Submit an application
+    Priority appPriority1 = Priority.newInstance(5);
+    RMApp app1 = rm.submitApp(2048, appPriority1);
+
+    nm1.nodeHeartbeat(true);
+    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+    am1.registerAppAttempt();
+
+    AllocateRequestPBImpl allocateRequest = new AllocateRequestPBImpl();
+    List<ContainerId> release = new ArrayList<ContainerId>();
+    List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
+    allocateRequest.setReleaseList(release);
+    allocateRequest.setAskList(ask);
+
+    AllocateResponse response1 = am1.allocate(allocateRequest);
+    Assert.assertEquals(appPriority1, response1.getApplicationPriority());
+
+    // get scheduler
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    // Change the priority of App1 to 8
+    Priority appPriority2 = Priority.newInstance(8);
+    cs.updateApplicationPriority(appPriority2, app1.getApplicationId());
+
+    AllocateResponse response2 = am1.allocate(allocateRequest);
+    Assert.assertEquals(appPriority2, response2.getApplicationPriority());
+    rm.stop();
+  }
+
   private static class MyResourceManager extends MockRM {
 
     public MyResourceManager(YarnConfiguration conf) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f9da5cdb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index d81b8cc..f0a1d03 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -976,16 +976,6 @@ public class TestCapacityScheduler {
       CapacityScheduler.schedule(cs);
     }
   }
-  
-  private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
-      throws Exception {
-    RMAppAttempt attempt = app.getCurrentAppAttempt();
-    nm.nodeHeartbeat(true);
-    MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
-    am.registerAppAttempt();
-    rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
-    return am;
-  }
 
   private void waitForAppPreemptionInfo(RMApp app, Resource preempted,
       int numAMPreempted, int numTaskPreempted,
@@ -1156,7 +1146,8 @@ public class TestCapacityScheduler {
 
     // create app and launch the AM
     RMApp app0 = rm1.submitApp(CONTAINER_MEMORY);
-    MockAM am0 = launchAM(app0, rm1, nm1);
+    MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
+    am0.registerAppAttempt();
 
     // get scheduler app
     FiCaSchedulerApp schedulerAppAttempt =
@@ -1190,7 +1181,9 @@ public class TestCapacityScheduler {
         Resource.newInstance(0, 0), false, 0);
 
     // launch app0-attempt1
-    MockAM am1 = launchAM(app0, rm1, nm1);
+    MockAM am1 = MockRM.launchAM(app0, rm1, nm1);
+    am1.registerAppAttempt();
+
     schedulerAppAttempt =
         cs.getSchedulerApplications().get(app0.getApplicationId())
             .getCurrentAppAttempt();

Reply via email to