This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new bbcb51893 feat: YarnResourceRequester supports multiple providers
(#4852)
bbcb51893 is described below
commit bbcb5189370aea2355adac851c3785bc2e9d5a2b
Author: zlucelia <[email protected]>
AuthorDate: Sat Aug 5 23:30:20 2023 +0800
feat: YarnResourceRequester supports multiple providers (#4852)
* feat: YarnResourceRequester supports multiple providers
* fix: format code according to scala style
---
.../rm/external/yarn/YarnResourceRequester.java | 63 +++++++++++-----------
1 file changed, 33 insertions(+), 30 deletions(-)
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
index 332eb765e..cb5457807 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/yarn/YarnResourceRequester.java
@@ -56,11 +56,9 @@ public class YarnResourceRequester implements
ExternalResourceRequester {
private final String HASTATE_ACTIVE = "active";
private static final ObjectMapper objectMapper = new ObjectMapper();
-
- private ExternalResourceProvider provider = null;
private final Map<String, String> rmAddressMap = new ConcurrentHashMap<>();
- private String getAuthorizationStr() {
+ private String getAuthorizationStr(ExternalResourceProvider provider) {
String user = (String) provider.getConfigMap().getOrDefault("user", "");
String pwd = (String) provider.getConfigMap().getOrDefault("pwd", "");
String authKey = user + ":" + pwd;
@@ -70,18 +68,16 @@ public class YarnResourceRequester implements
ExternalResourceRequester {
@Override
public NodeResource requestResourceInfo(
ExternalResourceIdentifier identifier, ExternalResourceProvider
provider) {
- String rmWebHaAddress = (String)
provider.getConfigMap().get("rmWebAddress");
- this.provider = provider;
- String rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress);
+ String rmWebAddress = getAndUpdateActiveRmWebAddress(provider);
logger.info("rmWebAddress: " + rmWebAddress);
- String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
+ String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
String realQueueName = "root." + queueName;
return LinkisUtils.tryCatch(
() -> {
Pair<YarnResource, YarnResource> yarnResource =
- getResources(rmWebAddress, realQueueName, queueName);
+ getResources(rmWebAddress, realQueueName, queueName, provider);
CommonNodeResource nodeResource = new CommonNodeResource();
nodeResource.setMaxResource(yarnResource.getKey());
@@ -95,9 +91,12 @@ public class YarnResourceRequester implements
ExternalResourceRequester {
}
public Optional<YarnResource> maxEffectiveHandle(
- Optional<JsonNode> queueValue, String rmWebAddress, String queueName) {
+ Optional<JsonNode> queueValue,
+ String rmWebAddress,
+ String queueName,
+ ExternalResourceProvider provider) {
try {
- JsonNode metrics = getResponseByUrl("metrics", rmWebAddress);
+ JsonNode metrics = getResponseByUrl("metrics", rmWebAddress, provider);
JsonNode clusterMetrics = metrics.path("clusterMetrics");
long totalMemory = clusterMetrics.path("totalMB").asLong();
long totalCores = clusterMetrics.path("totalVirtualCores").asLong();
@@ -201,8 +200,11 @@ public class YarnResourceRequester implements
ExternalResourceRequester {
}
public Pair<YarnResource, YarnResource> getResources(
- String rmWebAddress, String realQueueName, String queueName) {
- JsonNode resp = getResponseByUrl("scheduler", rmWebAddress);
+ String rmWebAddress,
+ String realQueueName,
+ String queueName,
+ ExternalResourceProvider provider) {
+ JsonNode resp = getResponseByUrl("scheduler", rmWebAddress, provider);
JsonNode schedulerInfo = resp.path("scheduler").path("schedulerInfo");
String schedulerType = schedulerInfo.path("type").asText();
if ("capacityScheduler".equals(schedulerType)) {
@@ -217,7 +219,7 @@ public class YarnResourceRequester implements
ExternalResourceRequester {
MessageFormat.format(YARN_NOT_EXISTS_QUEUE.getErrorDesc(),
queueName));
}
return Pair.of(
- maxEffectiveHandle(queue, rmWebAddress, queueName).get(),
+ maxEffectiveHandle(queue, rmWebAddress, queueName, provider).get(),
getYarnResource(queue.map(node -> node.path("resourcesUsed")),
queueName).get());
} else if ("fairScheduler".equals(schedulerType)) {
@@ -277,13 +279,13 @@ public class YarnResourceRequester implements
ExternalResourceRequester {
@Override
public List<ExternalAppInfo> requestAppInfo(
ExternalResourceIdentifier identifier, ExternalResourceProvider
provider) {
- String rmWebHaAddress = (String)
provider.getConfigMap().get("rmWebAddress");
- String rmWebAddress = getAndUpdateActiveRmWebAddress(rmWebHaAddress);
- String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
+ String rmWebAddress = getAndUpdateActiveRmWebAddress(provider);
+
+ String queueName = ((YarnResourceIdentifier) identifier).getQueueName();
String realQueueName = "root." + queueName;
- JsonNode resp = getResponseByUrl("apps",
rmWebAddress).path("apps").path("app");
+ JsonNode resp = getResponseByUrl("apps", rmWebAddress,
provider).path("apps").path("app");
if (resp.isMissingNode()) {
return new ArrayList<>();
}
@@ -317,22 +319,24 @@ public class YarnResourceRequester implements
ExternalResourceRequester {
return ResourceType.Yarn;
}
- private JsonNode getResponseByUrl(String url, String rmWebAddress) {
+ private JsonNode getResponseByUrl(
+ String url, String rmWebAddress, ExternalResourceProvider provider) {
+
HttpGet httpGet = new HttpGet(rmWebAddress + "/ws/v1/cluster/" + url);
httpGet.addHeader("Accept", "application/json");
- Object authorEnable = this.provider.getConfigMap().get("authorEnable");
+ Object authorEnable = provider.getConfigMap().get("authorEnable");
HttpResponse httpResponse = null;
if (authorEnable instanceof Boolean) {
if ((Boolean) authorEnable) {
- httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " +
getAuthorizationStr());
+ httpGet.addHeader(HttpHeaders.AUTHORIZATION, "Basic " +
getAuthorizationStr(provider));
}
}
- Object kerberosEnable = this.provider.getConfigMap().get("kerberosEnable");
+ Object kerberosEnable = provider.getConfigMap().get("kerberosEnable");
if (kerberosEnable instanceof Boolean) {
if ((Boolean) kerberosEnable) {
- String principalName = (String)
this.provider.getConfigMap().get("principalName");
- String keytabPath = (String)
this.provider.getConfigMap().get("keytabPath");
- String krb5Path = (String)
this.provider.getConfigMap().get("krb5Path");
+ String principalName = (String)
provider.getConfigMap().get("principalName");
+ String keytabPath = (String) provider.getConfigMap().get("keytabPath");
+ String krb5Path = (String) provider.getConfigMap().get("krb5Path");
if (StringUtils.isNotBlank(krb5Path)) {
logger.warn(
"krb5Path: {} has been specified, but not allow to be set to
avoid conflict",
@@ -388,8 +392,9 @@ public class YarnResourceRequester implements
ExternalResourceRequester {
return jsonNode;
}
- public String getAndUpdateActiveRmWebAddress(String haAddress) {
+ public String getAndUpdateActiveRmWebAddress(ExternalResourceProvider
provider) {
// todo check if it will stuck for many requests
+ String haAddress = (String) provider.getConfigMap().get("rmWebAddress");
String activeAddress = rmAddressMap.get(haAddress);
if (StringUtils.isBlank(activeAddress)) {
synchronized (haAddress.intern()) {
@@ -406,7 +411,7 @@ public class YarnResourceRequester implements
ExternalResourceRequester {
haAddress.split(RMConfiguration.DEFAULT_YARN_RM_WEB_ADDRESS_DELIMITER.getValue());
for (String address : addresses) {
try {
- JsonNode response = getResponseByUrl("info", address);
+ JsonNode response = getResponseByUrl("info", address,
provider);
JsonNode haStateValue =
response.path("clusterInfo").path("haState");
if (!haStateValue.isMissingNode() && haStateValue.isTextual())
{
String haState = haStateValue.asText();
@@ -442,12 +447,10 @@ public class YarnResourceRequester implements
ExternalResourceRequester {
@Override
public Boolean reloadExternalResourceAddress(ExternalResourceProvider
provider) {
- if (null == provider) {
- rmAddressMap.clear();
- } else {
+ if (null != provider) {
String rmWebHaAddress = (String)
provider.getConfigMap().get("rmWebAddress");
rmAddressMap.remove(rmWebHaAddress);
- getAndUpdateActiveRmWebAddress(rmWebHaAddress);
+ getAndUpdateActiveRmWebAddress(provider);
}
return true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]