This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new bbcb51893 feat: YarnResourceRequester supports multiple providers 
(#4852)
bbcb51893 is described below

commit bbcb5189370aea2355adac851c3785bc2e9d5a2b
Author: zlucelia <[email protected]>
AuthorDate: Sat Aug 5 23:30:20 2023 +0800

    feat: YarnResourceRequester supports multiple providers (#4852)
    
    * feat: YarnResourceRequester supports multiple providers
    
    * fix: format code according to scala style
---
 .../rm/external/yarn/YarnResourceRequester.java    | 63 +++++++++++-----------
 1 file changed, 33 insertions(+), 30 deletions(-)

diff --git 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
index 332eb765e..cb5457807 100644
--- 
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
+++ 
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
@@ -56,11 +56,9 @@ public class YarnResourceRequester implements 
ExternalResourceRequester {
 
   private final String HASTATE_ACTIVE = "active";
   private static final ObjectMapper objectMapper = new ObjectMapper();
-
-  private ExternalResourceProvider provider = null;
   private final Map<String, String> rmAddressMap = new ConcurrentHashMap<>();
 
-  private String getAuthorizationStr() {
+  private String getAuthorizationStr(ExternalResourceProvider provider) {
     String user = (String) provider.getConfigMap().getOrDefault("user", "");
     String pwd = (String) provider.getConfigMap().getOrDefault("pwd", "");
     String authKey = user + ":" + pwd;
@@ -70,18 +68,16 @@ public class YarnResourceRequester implements 
ExternalResourceRequester {
   @Override
   public NodeResource requestResourceInfo(
       ExternalResourceIdentifier identifier, ExternalResourceProvider 
provider) {
-    String rmWebHaAddress = (String) 
provider.getConfigMap().get("rmWebAddress");
-    this.provider = provider;
-    String rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress);
+    String rmWebAddress = getAndUpdateActiveRmWebAddress(provider);
     logger.info("rmWebAddress: " + rmWebAddress);
-    String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
 
+    String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
     String realQueueName = "root." + queueName;
 
     return LinkisUtils.tryCatch(
         () -> {
           Pair<YarnResource, YarnResource> yarnResource =
-              getResources(rmWebAddress, realQueueName, queueName);
+              getResources(rmWebAddress, realQueueName, queueName, provider);
 
           CommonNodeResource nodeResource = new CommonNodeResource();
           nodeResource.setMaxResource(yarnResource.getKey());
@@ -95,9 +91,12 @@ public class YarnResourceRequester implements 
ExternalResourceRequester {
   }
 
   public Optional<YarnResource> maxEffectiveHandle(
-      Optional<JsonNode> queueValue, String rmWebAddress, String queueName) {
+      Optional<JsonNode> queueValue,
+      String rmWebAddress,
+      String queueName,
+      ExternalResourceProvider provider) {
     try {
-      JsonNode metrics = getResponseByUrl("metrics", rmWebAddress);
+      JsonNode metrics = getResponseByUrl("metrics", rmWebAddress, provider);
       JsonNode clusterMetrics = metrics.path("clusterMetrics");
       long totalMemory = clusterMetrics.path("totalMB").asLong();
       long totalCores = clusterMetrics.path("totalVirtualCores").asLong();
@@ -201,8 +200,11 @@ public class YarnResourceRequester implements 
ExternalResourceRequester {
   }
 
   public Pair<YarnResource, YarnResource> getResources(
-      String rmWebAddress, String realQueueName, String queueName) {
-    JsonNode resp = getResponseByUrl("scheduler", rmWebAddress);
+      String rmWebAddress,
+      String realQueueName,
+      String queueName,
+      ExternalResourceProvider provider) {
+    JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider);
     JsonNode schedulerInfo = resp.path("scheduler").path("schedulerInfo");
     String schedulerType = schedulerInfo.path("type").asText();
     if ("capacityScheduler".equals(schedulerType)) {
@@ -217,7 +219,7 @@ public class YarnResourceRequester implements 
ExternalResourceRequester {
             MessageFormat.format(YARN_NOT_EXISTS_QUEUE.getErrorDesc(), 
queueName));
       }
       return Pair.of(
-          maxEffectiveHandle(queue, rmWebAddress, queueName).get(),
+          maxEffectiveHandle(queue, rmWebAddress, queueName, provider).get(),
           getYarnResource(queue.map(node -> node.path("resourcesUsed")), 
queueName).get());
 
     } else if ("fairScheduler".equals(schedulerType)) {
@@ -277,13 +279,13 @@ public class YarnResourceRequester implements 
ExternalResourceRequester {
   @Override
   public List<ExternalAppInfo> requestAppInfo(
       ExternalResourceIdentifier identifier, ExternalResourceProvider 
provider) {
-    String rmWebHaAddress = (String) 
provider.getConfigMap().get("rmWebAddress");
-    String rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress);
-    String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
 
+    String rmWebAddress = getAndUpdateActiveRmWebAddress(provider);
+
+    String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
     String realQueueName = "root." + queueName;
 
-    JsonNode resp = getResponseByUrl("apps", 
rmWebAddress).path("apps").path("app");
+    JsonNode resp = getResponseByUrl("apps", rmWebAddress, 
provider).path("apps").path("app");
     if (resp.isMissingNode()) {
       return new ArrayList<>();
     }
@@ -317,22 +319,24 @@ public class YarnResourceRequester implements 
ExternalResourceRequester {
     return ResourceType.Yarn;
   }
 
-  private JsonNode getResponseByUrl(String url, String rmWebAddress) {
+  private JsonNode getResponseByUrl(
+      String url, String rmWebAddress, ExternalResourceProvider provider) {
+
     HttpGet httpGet = new HttpGet(rmWebAddress + "/ws/v1/cluster/" + url);
     httpGet.addHeader("Accept", "application/json");
-    Object authorEnable = this.provider.getConfigMap().get("authorEnable");
+    Object authorEnable = provider.getConfigMap().get("authorEnable");
     HttpResponse httpResponse = null;
     if (authorEnable instanceof Boolean) {
       if ((Boolean) authorEnable) {
-        httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + 
getAuthorizationStr());
+        httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + 
getAuthorizationStr(provider));
       }
     }
-    Object kerberosEnable = this.provider.getConfigMap().get("kerberosEnable");
+    Object kerberosEnable = provider.getConfigMap().get("kerberosEnable");
     if (kerberosEnable instanceof Boolean) {
       if ((Boolean) kerberosEnable) {
-        String principalName = (String) 
this.provider.getConfigMap().get("principalName");
-        String keytabPath = (String) 
this.provider.getConfigMap().get("keytabPath");
-        String krb5Path = (String) 
this.provider.getConfigMap().get("krb5Path");
+        String principalName = (String) 
provider.getConfigMap().get("principalName");
+        String keytabPath = (String) provider.getConfigMap().get("keytabPath");
+        String krb5Path = (String) provider.getConfigMap().get("krb5Path");
         if (StringUtils.isNotBlank(krb5Path)) {
           logger.warn(
               "krb5Path: {} has been specified, but not allow to be set to 
avoid conflict",
@@ -388,8 +392,9 @@ public class YarnResourceRequester implements 
ExternalResourceRequester {
     return jsonNode;
   }
 
-  public String getAndUpdateActiveRmWebAddress(String haAddress) {
+  public String getAndUpdateActiveRmWebAddress(ExternalResourceProvider 
provider) {
     // todo check if it will stuck for many requests
+    String haAddress = (String) provider.getConfigMap().get("rmWebAddress");
     String activeAddress = rmAddressMap.get(haAddress);
     if (StringUtils.isBlank(activeAddress)) {
       synchronized (haAddress.intern()) {
@@ -406,7 +411,7 @@ public class YarnResourceRequester implements 
ExternalResourceRequester {
                 
haAddress.split(RMConfiguration.DEFAULT_YARN_RM_WEB_ADDRESS_DELIMITER.getValue());
             for (String address : addresses) {
               try {
-                JsonNode response = getResponseByUrl("info", address);
+                JsonNode response = getResponseByUrl("info", address, 
provider);
                 JsonNode haStateValue = 
response.path("clusterInfo").path("haState");
                 if (!haStateValue.isMissingNode() && haStateValue.isTextual()) 
{
                   String haState = haStateValue.asText();
@@ -442,12 +447,10 @@ public class YarnResourceRequester implements 
ExternalResourceRequester {
 
   @Override
   public Boolean reloadExternalResourceAddress(ExternalResourceProvider 
provider) {
-    if (null == provider) {
-      rmAddressMap.clear();
-    } else {
+    if (null != provider) {
       String rmWebHaAddress = (String) 
provider.getConfigMap().get("rmWebAddress");
       rmAddressMap.remove(rmWebHaAddress);
-      getAndUpdateActiveRmWebAddress(rmWebHaAddress);
+      getAndUpdateActiveRmWebAddress(provider);
     }
     return true;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to