This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 62e44761020 YARN-10122. Support signalToContainer API for Federation. 
(#4421)
62e44761020 is described below

commit 62e447610208919a00ecdf8eb99ad498689bbb05
Author: slfan1989 <55643692+slfan1...@users.noreply.github.com>
AuthorDate: Fri Jun 17 16:38:36 2022 -0700

    YARN-10122. Support signalToContainer API for Federation. (#4421)
---
 .../hadoop/yarn/server/router/RouterMetrics.java   | 33 ++++++++++++++++
 .../clientrm/FederationClientInterceptor.java      | 38 +++++++++++++++++-
 .../yarn/server/router/TestRouterMetrics.java      | 33 ++++++++++++++++
 .../clientrm/TestFederationClientInterceptor.java  | 45 ++++++++++++++++++++++
 .../TestableFederationClientInterceptor.java       | 11 +++++-
 5 files changed, 158 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
index b02b3e155fa..ac37c4ed1b9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -81,6 +81,8 @@ public final class RouterMetrics {
   private MutableGaugeInt numUpdateAppPriorityFailedRetrieved;
   @Metric("# of updateApplicationPriority failed to be retrieved")
   private MutableGaugeInt numUpdateAppTimeoutsFailedRetrieved;
+  @Metric("# of signalToContainer failed to be retrieved")
+  private MutableGaugeInt numSignalToContainerFailedRetrieved;
 
   // Aggregate metrics are shared, and don't have to be looked up per call
   @Metric("Total number of successful Submitted apps and latency(ms)")
@@ -126,6 +128,8 @@ public final class RouterMetrics {
   private MutableRate totalSucceededUpdateAppPriorityRetrieved;
   @Metric("Total number of successful Retrieved updateApplicationTimeouts and 
latency(ms)")
   private MutableRate totalSucceededUpdateAppTimeoutsRetrieved;
+  @Metric("Total number of successful Retrieved signalToContainer and 
latency(ms)")
+  private MutableRate totalSucceededSignalToContainerRetrieved;
 
   /**
    * Provide quantile counters for all latencies.
@@ -150,6 +154,7 @@ public final class RouterMetrics {
   private MutableQuantiles failAppAttemptLatency;
   private MutableQuantiles updateAppPriorityLatency;
   private MutableQuantiles updateAppTimeoutsLatency;
+  private MutableQuantiles signalToContainerLatency;
 
   private static volatile RouterMetrics instance = null;
   private static MetricsRegistry registry;
@@ -228,6 +233,10 @@ public final class RouterMetrics {
     updateAppTimeoutsLatency =
         registry.newQuantiles("updateApplicationTimeoutsLatency",
             "latency of update application timeouts", "ops", "latency", 10);
+
+    signalToContainerLatency =
+        registry.newQuantiles("signalToContainerLatency",
+            "latency of signal to container timeouts", "ops", "latency", 10);
   }
 
   public static RouterMetrics getMetrics() {
@@ -349,6 +358,11 @@ public final class RouterMetrics {
     return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().numSamples();
   }
 
+  @VisibleForTesting
+  public long getNumSucceededSignalToContainerRetrieved() {
+    return totalSucceededSignalToContainerRetrieved.lastStat().numSamples();
+  }
+
   @VisibleForTesting
   public double getLatencySucceededAppsCreated() {
     return totalSucceededAppsCreated.lastStat().mean();
@@ -449,6 +463,11 @@ public final class RouterMetrics {
     return totalSucceededUpdateAppTimeoutsRetrieved.lastStat().mean();
   }
 
+  @VisibleForTesting
+  public double getLatencySucceededSignalToContainerRetrieved() {
+    return totalSucceededSignalToContainerRetrieved.lastStat().mean();
+  }
+
   @VisibleForTesting
   public int getAppsFailedCreated() {
     return numAppsFailedCreated.value();
@@ -549,6 +568,11 @@ public final class RouterMetrics {
     return numUpdateAppTimeoutsFailedRetrieved.value();
   }
 
+  @VisibleForTesting
+  public int getSignalToContainerFailedRetrieved() {
+    return numSignalToContainerFailedRetrieved.value();
+  }
+
   public void succeededAppsCreated(long duration) {
     totalSucceededAppsCreated.add(duration);
     getNewApplicationLatency.add(duration);
@@ -649,6 +673,11 @@ public final class RouterMetrics {
     updateAppTimeoutsLatency.add(duration);
   }
 
+  public void succeededSignalToContainerRetrieved(long duration) {
+    totalSucceededSignalToContainerRetrieved.add(duration);
+    signalToContainerLatency.add(duration);
+  }
+
   public void incrAppsFailedCreated() {
     numAppsFailedCreated.incr();
   }
@@ -728,4 +757,8 @@ public final class RouterMetrics {
   public void incrUpdateApplicationTimeoutsRetrieved() {
     numUpdateAppTimeoutsFailedRetrieved.incr();
   }
+
+  public void incrSignalToContainerFailedRetrieved() {
+    numSignalToContainerFailedRetrieved.incr();
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index fec62d4b080..6cc317242cd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -1304,7 +1304,43 @@ public class FederationClientInterceptor
   @Override
   public SignalContainerResponse signalToContainer(
       SignalContainerRequest request) throws YarnException, IOException {
-    throw new NotImplementedException("Code is not implemented");
+    if (request == null || request.getContainerId() == null
+            || request.getCommand() == null) {
+      routerMetrics.incrSignalToContainerFailedRetrieved();
+      RouterServerUtil.logAndThrowException(
+          "Missing signalToContainer request or containerId " +
+          "or command information.", null);
+    }
+
+    long startTime = clock.getTime();
+    SubClusterId subClusterId = null;
+    ApplicationId applicationId =
+        request.getContainerId().getApplicationAttemptId().getApplicationId();
+    try {
+      subClusterId = getApplicationHomeSubCluster(applicationId);
+    } catch (YarnException ex) {
+      routerMetrics.incrSignalToContainerFailedRetrieved();
+      RouterServerUtil.logAndThrowException("Application " + applicationId +
+          " does not exist in FederationStateStore.", ex);
+    }
+
+    ApplicationClientProtocol clientRMProxy = 
getClientRMProxyForSubCluster(subClusterId);
+    SignalContainerResponse response = null;
+    try {
+      response = clientRMProxy.signalToContainer(request);
+    } catch (Exception ex) {
+      RouterServerUtil.logAndThrowException("Unable to signal to container for 
" +
+          applicationId + " from SubCluster " + subClusterId.getId(), ex);
+    }
+
+    if (response == null) {
+      LOG.error("No response when signal to container of " +
+          "the applicationId {} to SubCluster {}.", applicationId, 
subClusterId.getId());
+    }
+
+    long stopTime = clock.getTime();
+    routerMetrics.succeededSignalToContainerRetrieved(stopTime - startTime);
+    return response;
   }
 
   @Override
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
index 4b1049e8b64..eddd2a0ab48 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -413,6 +413,11 @@ public class TestRouterMetrics {
       LOG.info("Mocked: failed updateApplicationTimeouts call");
       metrics.incrUpdateApplicationTimeoutsRetrieved();
     }
+
+    public void getSignalContainer() {
+      LOG.info("Mocked: failed signalContainer call");
+      metrics.incrSignalToContainerFailedRetrieved();
+    }
   }
 
   // Records successes for all calls
@@ -523,6 +528,11 @@ public class TestRouterMetrics {
       LOG.info("Mocked: successful updateApplicationTimeouts call with 
duration {}", duration);
       metrics.succeededUpdateAppTimeoutsRetrieved(duration);
     }
+
+    public void getSignalToContainerTimeouts(long duration) {
+      LOG.info("Mocked: successful signalToContainer call with duration {}", 
duration);
+      metrics.succeededSignalToContainerRetrieved(duration);
+    }
   }
 
   @Test
@@ -806,4 +816,27 @@ public class TestRouterMetrics {
         metrics.getUpdateApplicationTimeoutsFailedRetrieved());
   }
 
+  @Test
+  public void testSucceededSignalToContainerRetrieved() {
+    long totalGoodBefore = metrics.getNumSucceededSignalToContainerRetrieved();
+    goodSubCluster.getSignalToContainerTimeouts(150);
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededSignalToContainerRetrieved());
+    Assert.assertEquals(150,
+        metrics.getLatencySucceededSignalToContainerRetrieved(), 
ASSERT_DOUBLE_DELTA);
+    goodSubCluster.getSignalToContainerTimeouts(300);
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededSignalToContainerRetrieved());
+    Assert.assertEquals(225,
+        metrics.getLatencySucceededSignalToContainerRetrieved(), 
ASSERT_DOUBLE_DELTA);
+  }
+
+  @Test
+  public void testSignalToContainerFailed() {
+    long totalBadBefore = metrics.getSignalToContainerFailedRetrieved();
+    badSubCluster.getSignalContainer();
+    Assert.assertEquals(totalBadBefore + 1,
+        metrics.getSignalToContainerFailedRetrieved());
+  }
+
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
index 9ead9fbe721..30377382402 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -72,6 +72,8 @@ import 
org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityReque
 import 
org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
 import 
org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
 import 
org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -83,6 +85,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import 
org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
@@ -91,6 +94,7 @@ import 
org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import 
org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -1056,4 +1060,45 @@ public class TestFederationClientInterceptor extends 
BaseRouterClientRMTest {
     Assert.assertNotNull(timeoutsResponse);
     Assert.assertEquals(appTimeout, responseTimeOut);
   }
+
+  @Test
+  public void testSignalContainer() throws Exception {
+    LOG.info("Test FederationClientInterceptor : Signal Container request.");
+
+    // null request
+    LambdaTestUtils.intercept(YarnException.class, "Missing signalToContainer 
request " +
+        "or containerId or command information.", () -> 
interceptor.signalToContainer(null));
+
+    // normal request
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
+
+    // Submit the application
+    SubmitApplicationResponse response = 
interceptor.submitApplication(request);
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    SubClusterId subClusterId = 
interceptor.getApplicationHomeSubCluster(appId);
+    Assert.assertNotNull(subClusterId);
+
+    MockRM mockRM = interceptor.getMockRMs().get(subClusterId);
+    mockRM.waitForState(appId, RMAppState.ACCEPTED);
+    RMApp rmApp = mockRM.getRMContext().getRMApps().get(appId);
+    mockRM.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
+        RMAppAttemptState.SCHEDULED);
+    MockNM nm = interceptor.getMockNMs().get(subClusterId);
+    nm.nodeHeartbeat(true);
+    mockRM.waitForState(rmApp.getCurrentAppAttempt(), 
RMAppAttemptState.ALLOCATED);
+    mockRM.sendAMLaunched(rmApp.getCurrentAppAttempt().getAppAttemptId());
+
+    ContainerId containerId = 
rmApp.getCurrentAppAttempt().getMasterContainer().getId();
+
+    SignalContainerRequest signalContainerRequest =
+        SignalContainerRequest.newInstance(containerId, 
SignalContainerCommand.GRACEFUL_SHUTDOWN);
+    SignalContainerResponse signalContainerResponse =
+        interceptor.signalToContainer(signalContainerRequest);
+
+    Assert.assertNotNull(signalContainerResponse);
+  }
 }
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
index 202a286696a..af1f45924c1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAppManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@@ -51,6 +52,9 @@ public class TestableFederationClientInterceptor
   private ConcurrentHashMap<SubClusterId, MockRM> mockRMs =
       new ConcurrentHashMap<>();
 
+  private ConcurrentHashMap<SubClusterId, MockNM> mockNMs =
+      new ConcurrentHashMap<>();
+
   private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>();
 
   @Override
@@ -71,7 +75,8 @@ public class TestableFederationClientInterceptor
         mockRM.init(super.getConf());
         mockRM.start();
         try {
-          mockRM.registerNode("h1:1234", 1024);
+          MockNM nm = mockRM.registerNode("127.0.0.1:1234", 8*1024, 4);
+          mockNMs.put(subClusterId, nm);
         } catch (Exception e) {
           Assert.fail(e.getMessage());
         }
@@ -118,4 +123,8 @@ public class TestableFederationClientInterceptor
   public ConcurrentHashMap<SubClusterId, MockRM> getMockRMs() {
     return mockRMs;
   }
+
+  public ConcurrentHashMap<SubClusterId, MockNM> getMockNMs() {
+    return mockNMs;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to