This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 958c4940a feat: "spark-submit jar on k8s" related label and resource
expansion (#4784)
958c4940a is described below
commit 958c4940a9e56f367683bc2ba0284c7402d63e6d
Author: zlucelia <[email protected]>
AuthorDate: Sat Aug 5 23:31:25 2023 +0800
feat: "spark-submit jar on k8s" related label and resource expansion (#4784)
* feat: add k8sCluster label recognition
* feat: add kubernetes resource related basic classes
* feat: add kubernetes resource related classes
* fix: supplement third dependency
* feat: k8s resource support different namespace
* fix: fix some basic problems
* feat: extend the length of column label_key of linkis_cg_manager_label to
50
* fix: format code according to scala style
* feat: KubernetesResourceRequester supports multiple providers
* fix: fix wrong variable use
* feat: adjust constants
* feat: adjust constants
* fix: format code according to scala style
---
.../interactor/command/fitter/FitterUtils.java | 2 +-
.../command/template/UniversalCmdTemplate.java | 2 +-
.../linkis-application-manager/pom.xml | 26 ++-
.../linkis/manager/rm/exception/RMErrorCode.java | 14 ++
.../kubernetes/KubernetesResourceIdentifier.java} | 22 ++-
.../kubernetes/KubernetesResourceRequester.java | 182 +++++++++++++++++
.../KubernetesResourceIdentifierParser.java} | 21 +-
.../service/impl/ExternalResourceServiceImpl.java | 14 +-
.../manager/rm/service/RequestResourceService.java | 57 +++++-
.../rm/service/impl/DefaultResourceManager.java | 3 +-
.../DriverAndKubernetesReqResourceService.java | 133 +++++++++++++
.../linkis/manager/rm/utils/UserConfiguration.java | 20 ++
.../manager/label/entity/cluster/ClusterLabel.java | 21 +-
.../manager/common/conf/RMConfiguration.java | 11 ++
.../linkis/manager/common/constant/RMConstant.java | 3 +
.../resource/DriverAndKubernetesResource.java | 220 +++++++++++++++++++++
.../common/entity/resource/KubernetesResource.java | 153 ++++++++++++++
.../entity/resource/LoadInstanceResource.java | 5 +
.../manager/common/entity/resource/Resource.java | 8 +
.../common/entity/resource/ResourceType.java | 2 +
.../linkis/manager/common/utils/ResourceUtils.java | 8 +
.../src/test/resources/create.sql | 2 +-
.../src/test/resources/create_pg.sql | 2 +-
linkis-dist/package/db/linkis_ddl.sql | 2 +-
linkis-dist/package/db/linkis_ddl_pg.sql | 2 +-
linkis-dist/package/db/module/linkis_manager.sql | 2 +-
.../db/upgrade/1.5.0_schema/mysql/linkis_ddl.sql | 16 +-
linkis-engineconn-plugins/spark/pom.xml | 3 +-
.../spark/config/SparkConfiguration.scala | 6 +
.../factory/SparkEngineConnResourceFactory.scala | 85 +++++++-
.../src/test/resources/create.sql | 2 +-
.../src/test/resources/create_pg.sql | 2 +-
.../src/test/resources/create.sql | 2 +-
.../src/test/resources/create_pg.sql | 2 +-
tool/dependencies/known-dependencies.txt | 20 +-
35 files changed, 1018 insertions(+), 57 deletions(-)
diff --git
a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java
b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java
index 1b6b46cd1..ae97620d2 100644
---
a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java
+++
b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java
@@ -20,6 +20,6 @@ package
org.apache.linkis.cli.application.interactor.command.fitter;
public class FitterUtils {
public static boolean isOption(final String arg) {
- return arg.matches("-[a-zA-Z-]+");
+ return arg.matches("-[0-9a-zA-Z-]+");
}
}
diff --git
a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java
b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java
index fea877153..f761a6a47 100644
---
a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java
+++
b/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/template/UniversalCmdTemplate.java
@@ -138,7 +138,7 @@ public class UniversalCmdTemplate extends
AbstractCmdTemplate implements Cloneab
option(
CliKeys.JOB_LABEL,
CliKeys.JOB_LABEL_CLUSTER,
- new String[] {"-yarnCluster"},
+ new String[] {"-yarnCluster", "-k8sCluster"},
"specify linkis yarn cluster for this job",
true,
"");
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/pom.xml
b/linkis-computation-governance/linkis-manager/linkis-application-manager/pom.xml
index 2cc2788b9..1a5d5077d 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/pom.xml
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/pom.xml
@@ -104,7 +104,31 @@
<version>${gson.version}</version>
<scope>provided</scope>
</dependency>
-
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-client</artifactId>
+ <version>${kubernetes-client.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-model-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-model-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-model-common</artifactId>
+ <version>${kubernetes-client.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.fabric8</groupId>
+ <artifactId>kubernetes-model-core</artifactId>
+ <version>${kubernetes-client.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMErrorCode.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMErrorCode.java
index b51393141..a086653b9 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMErrorCode.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/exception/RMErrorCode.java
@@ -61,6 +61,20 @@ public enum RMErrorCode implements LinkisErrorCode {
CLUSTER_QUEUE_INSTANCES_INSUFFICIENT(12012, "Insufficient cluster queue
instance(集群队列实例不足)"),
+ NAMESPACE_MEMORY_INSUFFICIENT(12100, "Insufficient cluster namespace
memory(命名空间内存不足)"),
+
+ NAMESPACE_CPU_INSUFFICIENT(12101, "Insufficient cluster namespace
cpu(命名空间cpu不足)"),
+
+ NAMESPACE_MISMATCHED(12102, "Mismatched namespace(命名空间不匹配,建议配置对应命名空间的资源)"),
+
+ KUBERNETES_NAMESPACE_MEMORY_INSUFFICIENT(
+ 12110, "Insufficient cluster namespace memory(K8S集群命名空间内存不足)"),
+
+ KUBERNETES_NAMESPACE_CPU_INSUFFICIENT(
+ 12111, "Insufficient cluster namespace cpu(K8S集群命名空间cpu不足)"),
+
+ KUBERNETES_UNKNOWN_RESOURCE_TYPE(12112, "Unsupported resource
type(不支持的资源类型)"),
+
ECM_RESOURCE_INSUFFICIENT(11000, "ECM resources are insufficient(ECM 资源不足)"),
ECM_MEMORY_INSUFFICIENT(11001, "ECM memory resources are insufficient(ECM
内存资源不足)"),
diff --git
a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceIdentifier.java
similarity index 59%
copy from
linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java
copy to
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceIdentifier.java
index 1b6b46cd1..3913d6ac8 100644
---
a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceIdentifier.java
@@ -15,11 +15,25 @@
* limitations under the License.
*/
-package org.apache.linkis.cli.application.interactor.command.fitter;
+package org.apache.linkis.manager.rm.external.kubernetes;
-public class FitterUtils {
+import org.apache.linkis.manager.common.entity.resource.ResourceType;
+import org.apache.linkis.manager.rm.external.domain.ExternalResourceIdentifier;
- public static boolean isOption(final String arg) {
- return arg.matches("-[a-zA-Z-]+");
+public class KubernetesResourceIdentifier implements
ExternalResourceIdentifier {
+
+ String namespace;
+
+ public KubernetesResourceIdentifier(String namespace) {
+ this.namespace = namespace;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+
+ @Override
+ public ResourceType getResourceType() {
+ return ResourceType.Kubernetes;
}
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceRequester.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceRequester.java
new file mode 100644
index 000000000..3bc7004e3
--- /dev/null
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/kubernetes/KubernetesResourceRequester.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.rm.external.kubernetes;
+
+import org.apache.linkis.manager.common.entity.resource.*;
+import org.apache.linkis.manager.rm.external.domain.ExternalAppInfo;
+import org.apache.linkis.manager.rm.external.domain.ExternalResourceIdentifier;
+import org.apache.linkis.manager.rm.external.domain.ExternalResourceProvider;
+import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+import io.fabric8.kubernetes.api.model.Node;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.api.model.ResourceQuota;
+import io.fabric8.kubernetes.api.model.metrics.v1beta1.NodeMetrics;
+import io.fabric8.kubernetes.client.ConfigBuilder;
+import io.fabric8.kubernetes.client.DefaultKubernetesClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KubernetesResourceRequester implements ExternalResourceRequester {
+ private static final Logger logger =
LoggerFactory.getLogger(KubernetesResourceRequester.class);
+ private final Map<String, DefaultKubernetesClient> clientMap = new
ConcurrentHashMap<>();
+
+ @Override
+ public NodeResource requestResourceInfo(
+ ExternalResourceIdentifier identifier, ExternalResourceProvider
provider) {
+ String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
+ DefaultKubernetesClient client = clientMap.get(k8sMasterUrl);
+ if (client == null) {
+ constructKubernetesClient(provider);
+ client = clientMap.get(k8sMasterUrl);
+ }
+ String namespace = ((KubernetesResourceIdentifier)
identifier).getNamespace();
+ Pair<KubernetesResource, KubernetesResource> kubernetesResources =
+ getResources(client, namespace);
+
+ CommonNodeResource nodeResource = new CommonNodeResource();
+ nodeResource.setMaxResource(kubernetesResources.getKey());
+ nodeResource.setUsedResource(kubernetesResources.getValue());
+
+ return nodeResource;
+ }
+
+ public Pair<KubernetesResource, KubernetesResource> getResources(
+ DefaultKubernetesClient client, String namespace) {
+ long usedMemory = 0;
+ long allocatableMemory = 0;
+ long usedCPU = 0;
+ long allocatableCPU = 0;
+
+ List<ResourceQuota> resourceQuotaList =
+ client.resourceQuotas().inNamespace(namespace).list().getItems();
+
+ // Get resource from resourcequota if deployed, otherwise from node status
metrics.
+ if (CollectionUtils.isNotEmpty(resourceQuotaList)) {
+ Map<String, Quantity> usedQuotaResource =
resourceQuotaList.get(0).getStatus().getUsed();
+ usedCPU = getKubernetesCPUInMilli(usedQuotaResource);
+ usedMemory = getKubernetesMemoryInBytes(usedQuotaResource);
+ long hardMemory = Long.MAX_VALUE;
+ long hardCPU = Long.MAX_VALUE;
+ for (ResourceQuota resourceQuota : resourceQuotaList) {
+ Map<String, Quantity> hardResource =
resourceQuota.getStatus().getHard();
+ long c = getKubernetesCPUInMilli(hardResource);
+ long m = getKubernetesMemoryInBytes(hardResource);
+ if (m < hardMemory) {
+ hardMemory = m;
+ }
+ if (c < hardCPU) {
+ hardCPU = c;
+ }
+ }
+ allocatableCPU = hardCPU;
+ allocatableMemory = hardMemory;
+ } else {
+ for (NodeMetrics nodeMetrics :
client.top().nodes().metrics().getItems()) {
+ usedMemory += getKubernetesMemoryInBytes(nodeMetrics.getUsage());
+ usedCPU += getKubernetesCPUInMilli(nodeMetrics.getUsage());
+ }
+ for (Node node : client.nodes().list().getItems()) {
+ allocatableMemory +=
getKubernetesMemoryInBytes(node.getStatus().getAllocatable());
+ allocatableCPU +=
getKubernetesCPUInMilli(node.getStatus().getAllocatable());
+ }
+ }
+
+ logger.info(
+ "usedMemory: {}, usedCPU: {}, allocatableMemory: {}, allocatableCPU:
{}",
+ usedMemory,
+ usedCPU,
+ allocatableMemory,
+ allocatableCPU);
+
+ return Pair.of(
+ new KubernetesResource(allocatableMemory, allocatableCPU, namespace),
+ new KubernetesResource(usedMemory, usedCPU, namespace));
+ }
+
+ /**
+ * Get the CPU in milli example: 0.5 means 500 milli 500m means 500 milli
1000000n means 1 milli
+ * (The cpu would be formated with "n" when query resource from node metrics
by fabric8 api)
+ *
+ * @param resourceMap
+ * @return cpu in milli
+ */
+ private long getKubernetesCPUInMilli(Map<String, Quantity> resourceMap) {
+ String cpuKey = resourceMap.containsKey("cpu") ? "cpu" : "requests.cpu";
+ return (long)
(Quantity.getAmountInBytes(resourceMap.get(cpuKey)).doubleValue() * 1000);
+ }
+
+ /**
+ * Get the memory in bytes example: 500Ki means 500 * 1024 bytes 500Mi means
500 * 1024 * 1024
+ * bytes
+ *
+ * @param resourceMap
+ * @return memory in bytes
+ */
+ private long getKubernetesMemoryInBytes(Map<String, Quantity> resourceMap) {
+ String memoryKey = resourceMap.containsKey("memory") ? "memory" :
"requests.memory";
+ return Quantity.getAmountInBytes(resourceMap.get(memoryKey)).longValue();
+ }
+
+ @Override
+ public List<ExternalAppInfo> requestAppInfo(
+ ExternalResourceIdentifier identifier, ExternalResourceProvider
provider) {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public ResourceType getResourceType() {
+ return ResourceType.Kubernetes;
+ }
+
+ @Override
+ public Boolean reloadExternalResourceAddress(ExternalResourceProvider
provider) {
+ if (null != provider) {
+ DefaultKubernetesClient client =
+ clientMap.get((String) provider.getConfigMap().get("k8sMasterUrl"));
+ if (client != null) {
+ client.close();
+ }
+ constructKubernetesClient(provider);
+ }
+ return true;
+ }
+
+ private void constructKubernetesClient(ExternalResourceProvider provider) {
+ String k8sMasterUrl = (String) provider.getConfigMap().get("k8sMasterUrl");
+ String k8sClientCertData = (String)
provider.getConfigMap().get("k8sClientCertData");
+ String k8sClientKeyData = (String)
provider.getConfigMap().get("k8sClientKeyData");
+ String k8sCaCertData = (String)
provider.getConfigMap().get("k8sCaCertData");
+ DefaultKubernetesClient client =
+ new DefaultKubernetesClient(
+ new ConfigBuilder()
+ .withMasterUrl(k8sMasterUrl)
+ .withClientCertData(k8sClientCertData)
+ .withClientKeyData(k8sClientKeyData)
+ .withCaCertData(k8sCaCertData)
+ .build());
+ clientMap.put(k8sMasterUrl, client);
+ }
+}
diff --git
a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/parser/KubernetesResourceIdentifierParser.java
similarity index 52%
copy from
linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java
copy to
linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/parser/KubernetesResourceIdentifierParser.java
index 1b6b46cd1..d1c49333f 100644
---
a/linkis-computation-governance/linkis-client/linkis-cli/src/main/java/org/apache/linkis/cli/application/interactor/command/fitter/FitterUtils.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/parser/KubernetesResourceIdentifierParser.java
@@ -15,11 +15,24 @@
* limitations under the License.
*/
-package org.apache.linkis.cli.application.interactor.command.fitter;
+package org.apache.linkis.manager.rm.external.parser;
-public class FitterUtils {
+import org.apache.linkis.manager.common.entity.resource.ResourceType;
+import org.apache.linkis.manager.rm.external.domain.ExternalResourceIdentifier;
+import
org.apache.linkis.manager.rm.external.kubernetes.KubernetesResourceIdentifier;
- public static boolean isOption(final String arg) {
- return arg.matches("-[a-zA-Z-]+");
+import java.util.Map;
+
+public class KubernetesResourceIdentifierParser implements
ExternalResourceIdentifierParser {
+ public static String NAMESPACE = "namespace";
+
+ @Override
+ public ExternalResourceIdentifier parse(Map<String, Object> identifierMap) {
+ return new KubernetesResourceIdentifier((String)
identifierMap.get(NAMESPACE));
+ }
+
+ @Override
+ public ResourceType getResourceType() {
+ return ResourceType.Kubernetes;
}
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/impl/ExternalResourceServiceImpl.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/impl/ExternalResourceServiceImpl.java
index ab06f2691..96aa756e6 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/impl/ExternalResourceServiceImpl.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/external/service/impl/ExternalResourceServiceImpl.java
@@ -29,7 +29,9 @@ import
org.apache.linkis.manager.rm.external.dao.ExternalResourceProviderDao;
import org.apache.linkis.manager.rm.external.domain.ExternalAppInfo;
import org.apache.linkis.manager.rm.external.domain.ExternalResourceIdentifier;
import org.apache.linkis.manager.rm.external.domain.ExternalResourceProvider;
+import
org.apache.linkis.manager.rm.external.kubernetes.KubernetesResourceRequester;
import
org.apache.linkis.manager.rm.external.parser.ExternalResourceIdentifierParser;
+import
org.apache.linkis.manager.rm.external.parser.KubernetesResourceIdentifierParser;
import
org.apache.linkis.manager.rm.external.parser.YarnResourceIdentifierParser;
import org.apache.linkis.manager.rm.external.request.ExternalResourceRequester;
import org.apache.linkis.manager.rm.external.service.ExternalResourceService;
@@ -87,9 +89,15 @@ public class ExternalResourceServiceImpl implements
ExternalResourceService, Ini
@Override
public void afterPropertiesSet() throws Exception {
- resourceRequesters = new ExternalResourceRequester[] {new
YarnResourceRequester()};
-
- identifierParsers = new ExternalResourceIdentifierParser[] {new
YarnResourceIdentifierParser()};
+ resourceRequesters =
+ new ExternalResourceRequester[] {
+ new YarnResourceRequester(), new KubernetesResourceRequester()
+ };
+
+ identifierParsers =
+ new ExternalResourceIdentifierParser[] {
+ new YarnResourceIdentifierParser(), new
KubernetesResourceIdentifierParser()
+ };
}
@Override
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/RequestResourceService.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/RequestResourceService.java
index c46c53683..4c1237cc4 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/RequestResourceService.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/RequestResourceService.java
@@ -385,7 +385,6 @@ public abstract class RequestResourceService {
avail.getQueueInstances(),
max.getQueueInstances()));
}
-
} else if (requestResource instanceof DriverAndYarnResource) {
DriverAndYarnResource dy = (DriverAndYarnResource) requestResource;
DriverAndYarnResource dyAvailable = (DriverAndYarnResource)
availableResource;
@@ -404,7 +403,61 @@ public abstract class RequestResourceService {
return generateNotEnoughMessage(
dy.getYarnResource(), dyAvailable.getYarnResource(),
dyMax.getYarnResource());
}
-
+ } else if (requestResource instanceof KubernetesResource) {
+ KubernetesResource kubernetesResource = (KubernetesResource)
requestResource;
+ KubernetesResource kubernetesResourceAvailable = (KubernetesResource)
availableResource;
+ KubernetesResource kubernetesResourceMax = (KubernetesResource)
maxResource;
+ if (kubernetesResource.getCores() >
kubernetesResourceAvailable.getCores()) {
+ return Pair.of(
+ RMErrorCode.NAMESPACE_CPU_INSUFFICIENT.getErrorCode(),
+ RMErrorCode.NAMESPACE_CPU_INSUFFICIENT.getErrorDesc()
+ + RMUtils.getResourceInfoMsg(
+ RMConstant.CPU,
+ RMConstant.KUBERNETES_CPU_UNIT,
+ kubernetesResource.getCores(),
+ kubernetesResourceAvailable.getCores(),
+ kubernetesResourceMax.getCores()));
+ } else if (kubernetesResource.getMemory() >
kubernetesResourceAvailable.getMemory()) {
+ return Pair.of(
+ RMErrorCode.NAMESPACE_MEMORY_INSUFFICIENT.getErrorCode(),
+ RMErrorCode.NAMESPACE_MEMORY_INSUFFICIENT.getErrorDesc()
+ + RMUtils.getResourceInfoMsg(
+ RMConstant.MEMORY,
+ RMConstant.MEMORY_UNIT_BYTE,
+ kubernetesResource.getMemory(),
+ kubernetesResourceAvailable.getMemory(),
+ kubernetesResourceMax.getMemory()));
+ } else {
+ return Pair.of(
+ RMErrorCode.NAMESPACE_MISMATCHED.getErrorCode(),
+ RMErrorCode.NAMESPACE_MISMATCHED.getErrorDesc()
+ + RMUtils.getResourceInfoMsg(
+ RMConstant.KUBERNETES_NAMESPACE,
+ RMConstant.KUBERNETES_NAMESPACE_UNIT,
+ kubernetesResource.getNamespace(),
+ kubernetesResourceAvailable.getNamespace(),
+ kubernetesResourceMax.getNamespace()));
+ }
+ } else if (requestResource instanceof DriverAndKubernetesResource) {
+ DriverAndKubernetesResource dk = (DriverAndKubernetesResource)
requestResource;
+ DriverAndKubernetesResource dkAvailable = (DriverAndKubernetesResource)
availableResource;
+ DriverAndKubernetesResource dkMax = (DriverAndKubernetesResource)
maxResource;
+ if (dk.getLoadInstanceResource().getMemory()
+ > dkAvailable.getLoadInstanceResource().getMemory()
+ || dk.getLoadInstanceResource().getCores()
+ > dkAvailable.getLoadInstanceResource().getCores()
+ || dk.getLoadInstanceResource().getInstances()
+ > dkAvailable.getLoadInstanceResource().getInstances()) {
+ return generateNotEnoughMessage(
+ dk.getLoadInstanceResource(),
+ dkAvailable.getLoadInstanceResource(),
+ dkMax.getLoadInstanceResource());
+ } else {
+ return generateNotEnoughMessage(
+ dk.getKubernetesResource(),
+ dkAvailable.getKubernetesResource(),
+ dkMax.getKubernetesResource());
+ }
} else if (requestResource instanceof SpecialResource) {
throw new RMWarnException(
NOT_RESOURCE_TYPE.getErrorCode(),
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
index ef6b5bb18..c40270145 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DefaultResourceManager.java
@@ -112,7 +112,8 @@ public class DefaultResourceManager extends ResourceManager
implements Initializ
requestResourceServices =
new RequestResourceService[] {
new DefaultReqResourceService(labelResourceService),
- new DriverAndYarnReqResourceService(labelResourceService,
externalResourceService)
+ new DriverAndYarnReqResourceService(labelResourceService,
externalResourceService),
+ new DriverAndKubernetesReqResourceService(labelResourceService,
externalResourceService)
};
// submit force release timeout lock job
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DriverAndKubernetesReqResourceService.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DriverAndKubernetesReqResourceService.java
new file mode 100644
index 000000000..c7acde202
--- /dev/null
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/service/impl/DriverAndKubernetesReqResourceService.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.rm.service.impl;
+
+import org.apache.linkis.manager.common.constant.RMConstant;
+import org.apache.linkis.manager.common.entity.resource.*;
+import org.apache.linkis.manager.common.exception.RMWarnException;
+import org.apache.linkis.manager.rm.domain.RMLabelContainer;
+import org.apache.linkis.manager.rm.exception.RMErrorCode;
+import
org.apache.linkis.manager.rm.external.kubernetes.KubernetesResourceIdentifier;
+import org.apache.linkis.manager.rm.external.service.ExternalResourceService;
+import org.apache.linkis.manager.rm.service.LabelResourceService;
+import org.apache.linkis.manager.rm.service.RequestResourceService;
+import org.apache.linkis.manager.rm.utils.RMUtils;
+
+import org.apache.commons.math3.util.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.linkis.manager.common.entity.resource.ResourceType.DriverAndKubernetes;
+
+public class DriverAndKubernetesReqResourceService extends
RequestResourceService {
+ private static final Logger logger =
+ LoggerFactory.getLogger(DriverAndKubernetesReqResourceService.class);
+ private LabelResourceService labelResourceService;
+ private ExternalResourceService externalResourceService;
+ private final ResourceType resourceType = DriverAndKubernetes;
+
+ public DriverAndKubernetesReqResourceService(
+ LabelResourceService labelResourceService, ExternalResourceService
externalResourceService) {
+ super(labelResourceService);
+ this.labelResourceService = labelResourceService;
+ this.externalResourceService = externalResourceService;
+ }
+
+ @Override
+ public ResourceType resourceType() {
+ return this.resourceType;
+ }
+
+ @Override
+ public boolean canRequest(RMLabelContainer labelContainer, NodeResource
resource)
+ throws RMWarnException {
+ if (!super.canRequest(labelContainer, resource)) {
+ return false;
+ }
+ DriverAndKubernetesResource requestedDriverAndKubernetesResource =
+ (DriverAndKubernetesResource) resource.getMaxResource();
+ KubernetesResource requestedKubernetesResource =
+ requestedDriverAndKubernetesResource.getKubernetesResource();
+
+ KubernetesResourceIdentifier kubernetesResourceIdentifier =
+ new
KubernetesResourceIdentifier(requestedKubernetesResource.getNamespace());
+ NodeResource providedKubernetesResource =
+ externalResourceService.getResource(
+ ResourceType.Kubernetes, labelContainer,
kubernetesResourceIdentifier);
+ Resource maxCapacity = providedKubernetesResource.getMaxResource();
+ Resource usedCapacity = providedKubernetesResource.getUsedResource();
+
+ logger.info(
+ "Kubernetes resource: used resource:" + usedCapacity + " and max
resource: " + maxCapacity);
+ Resource leftResource = maxCapacity.minus(usedCapacity);
+ logger.info(
+ "Kubernetes resource: left "
+ + leftResource
+ + ", this request requires: "
+ + requestedKubernetesResource);
+ if (leftResource.less(requestedKubernetesResource)) {
+ logger.info(
+ "user: "
+ + labelContainer.getUserCreatorLabel().getUser()
+ + " request kubernetes resource "
+ + requestedKubernetesResource
+ + " > left resource "
+ + leftResource);
+ Pair<Integer, String> notEnoughMessage =
+ generateNamespaceNotEnoughMessage(requestedKubernetesResource,
leftResource, maxCapacity);
+ throw new RMWarnException(notEnoughMessage.getKey(),
notEnoughMessage.getValue());
+ } else {
+ return true;
+ }
+ }
+
+ public Pair<Integer, String> generateNamespaceNotEnoughMessage(
+ Resource requestResource, Resource availableResource, Resource
maxResource) {
+ if (requestResource instanceof KubernetesResource) {
+ KubernetesResource kubernetesResource = (KubernetesResource)
requestResource;
+ KubernetesResource kubernetesResourceAvailable = (KubernetesResource)
availableResource;
+ KubernetesResource kubernetesResourceMax = (KubernetesResource)
maxResource;
+ if (kubernetesResource.getCores() >
kubernetesResourceAvailable.getCores()) {
+ return Pair.create(
+ RMErrorCode.KUBERNETES_NAMESPACE_CPU_INSUFFICIENT.getErrorCode(),
+ RMErrorCode.KUBERNETES_NAMESPACE_CPU_INSUFFICIENT.getErrorDesc()
+ + RMUtils.getResourceInfoMsg(
+ RMConstant.CPU,
+ RMConstant.KUBERNETES_CPU_UNIT,
+ kubernetesResource.getCores(),
+ kubernetesResourceAvailable.getCores(),
+ kubernetesResourceMax.getCores()));
+ } else {
+ return Pair.create(
+
RMErrorCode.KUBERNETES_NAMESPACE_MEMORY_INSUFFICIENT.getErrorCode(),
+ RMErrorCode.KUBERNETES_NAMESPACE_MEMORY_INSUFFICIENT.getErrorDesc()
+ + RMUtils.getResourceInfoMsg(
+ RMConstant.MEMORY,
+ RMConstant.MEMORY_UNIT_BYTE,
+ kubernetesResource.getMemory(),
+ kubernetesResourceAvailable.getMemory(),
+ kubernetesResourceMax.getMemory()));
+ }
+ } else {
+ return Pair.create(
+ RMErrorCode.KUBERNETES_UNKNOWN_RESOURCE_TYPE.getErrorCode(),
+ RMErrorCode.KUBERNETES_UNKNOWN_RESOURCE_TYPE.getErrorDesc() + "
Unusual resource type.");
+ }
+ }
+}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/UserConfiguration.java
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/UserConfiguration.java
index a71237d07..b7629bfa4 100644
---
a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/UserConfiguration.java
+++
b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/rm/utils/UserConfiguration.java
@@ -199,6 +199,26 @@ public class UserConfiguration {
RMConfiguration.USER_AVAILABLE_YARN_INSTANCE.getValue(userConfiguration),
RMConfiguration.USER_AVAILABLE_YARN_QUEUE_NAME.getValue(userConfiguration));
return new DriverAndYarnResource(loadInstanceResource, yarnResource);
+ case Kubernetes:
+ return new KubernetesResource(
+ RMConfiguration.USER_AVAILABLE_KUBERNETES_INSTANCE_MEMORY
+ .getValue(userConfiguration)
+ .toLong(),
+
RMConfiguration.USER_AVAILABLE_KUBERNETES_INSTANCE_CPU.getValue(userConfiguration),
+
RMConfiguration.USER_AVAILABLE_KUBERNETES_INSTANCE_NAMESPACE.getValue(
+ userConfiguration));
+ case DriverAndKubernetes:
+ return new DriverAndKubernetesResource(
+ new LoadInstanceResource(
+
RMConfiguration.USER_AVAILABLE_MEMORY.getValue(userConfiguration).toLong(),
+ RMConfiguration.USER_AVAILABLE_CPU.getValue(userConfiguration),
+
RMConfiguration.USER_AVAILABLE_INSTANCE.getValue(userConfiguration)),
+ new KubernetesResource(
+ RMConfiguration.USER_AVAILABLE_KUBERNETES_INSTANCE_MEMORY
+ .getValue(userConfiguration)
+ .toLong(),
+
RMConfiguration.USER_AVAILABLE_KUBERNETES_INSTANCE_CPU.getValue(
+ userConfiguration)));
case Special:
return new SpecialResource(new HashMap<String, Object>());
default:
diff --git
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java
index 256b618e3..fa7708d7f 100644
---
a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java
+++
b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/cluster/ClusterLabel.java
@@ -18,14 +18,17 @@
package org.apache.linkis.manager.label.entity.cluster;
import org.apache.linkis.manager.label.constant.LabelKeyConstant;
-import org.apache.linkis.manager.label.entity.EMNodeLabel;
-import org.apache.linkis.manager.label.entity.Feature;
-import org.apache.linkis.manager.label.entity.GenericLabel;
+import org.apache.linkis.manager.label.entity.*;
import org.apache.linkis.manager.label.entity.annon.ValueSerialNum;
+import org.apache.linkis.manager.label.exception.LabelErrorException;
+
+import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
-public class ClusterLabel extends GenericLabel implements EMNodeLabel {
+import static
org.apache.linkis.manager.label.errorcode.LabelCommonErrorCodeSummary.LABEL_ERROR_CODE;
+
+public class ClusterLabel extends GenericLabel implements EngineNodeLabel,
UserModifiable {
public ClusterLabel() {
setLabelKey(LabelKeyConstant.YARN_CLUSTER_KEY);
@@ -79,4 +82,14 @@ public class ClusterLabel extends GenericLabel implements
EMNodeLabel {
return false;
}
}
+
+ @Override
+ public void valueCheck(String stringValue) throws LabelErrorException {
+ if (!StringUtils.isEmpty(stringValue)) {
+ if (stringValue.split(SerializableLabel.VALUE_SEPARATOR).length != 1) {
+ throw new LabelErrorException(
+ LABEL_ERROR_CODE.getErrorCode(), LABEL_ERROR_CODE.getErrorDesc());
+ }
+ }
+ }
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/conf/RMConfiguration.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/conf/RMConfiguration.java
index d5686601b..d9dc8a80d 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/conf/RMConfiguration.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/conf/RMConfiguration.java
@@ -65,6 +65,13 @@ public class RMConfiguration {
public static final CommonVars<String> USER_AVAILABLE_CLUSTER_NAME =
CommonVars.apply("wds.linkis.rm.cluster", "default");
+ public static final CommonVars<Integer>
USER_AVAILABLE_KUBERNETES_INSTANCE_CPU =
+ CommonVars.apply("wds.linkis.rm.kubernetes.cores.max", 150000);
+ public static final CommonVars<ByteType>
USER_AVAILABLE_KUBERNETES_INSTANCE_MEMORY =
+ CommonVars.apply("wds.linkis.rm.kubernetes.memory.max", new
ByteType("450g"));
+ public static final CommonVars<String>
USER_AVAILABLE_KUBERNETES_INSTANCE_NAMESPACE =
+ CommonVars.apply("wds.linkis.rm.kubernetes.namespace", "default");
+
public static final CommonVars<Long> USER_MODULE_WAIT_USED =
CommonVars.apply("wds.linkis.rm.user.module.wait.used", 60 * 10L);
public static final CommonVars<Long> USER_MODULE_WAIT_RELEASE =
@@ -106,6 +113,10 @@ public class RMConfiguration {
CommonVars.apply("wds.linkis.rm.default.yarn.cluster.name", "default");
public static final CommonVars<String> DEFAULT_YARN_TYPE =
CommonVars.apply("wds.linkis.rm.default.yarn.cluster.type", "Yarn");
+ public static final CommonVars<String> DEFAULT_KUBERNETES_CLUSTER_NAME =
+ CommonVars.apply("wds.linkis.rm.default.kubernetes.cluster.name",
"default");
+ public static final CommonVars<String> DEFAULT_KUBERNETES_TYPE =
+ CommonVars.apply("wds.linkis.rm.default.kubernetes.cluster.type", "K8S");
public static final CommonVars<Integer> EXTERNAL_RETRY_NUM =
CommonVars.apply("wds.linkis.rm.external.retry.num", 3);
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/RMConstant.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/RMConstant.java
index 635bd5af0..209cf3e60 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/RMConstant.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/constant/RMConstant.java
@@ -27,6 +27,9 @@ public class RMConstant {
public static final String CPU_UNIT = "cores";
+ public static final String KUBERNETES_CPU_UNIT = "milli cores";
+ public static final String KUBERNETES_NAMESPACE = "namespace";
+ public static final String KUBERNETES_NAMESPACE_UNIT = "";
public static final String MEMORY_UNIT_BYTE = "bytes";
public static final String INSTANCE_UNIT = "";
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/DriverAndKubernetesResource.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/DriverAndKubernetesResource.java
new file mode 100644
index 000000000..6d047b33a
--- /dev/null
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/DriverAndKubernetesResource.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.common.entity.resource;
+
+import org.apache.linkis.manager.common.exception.ResourceWarnException;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.linkis.manager.common.errorcode.ManagerCommonErrorCodeSummary.OPERATION_MULTIPLIED;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DriverAndKubernetesResource extends Resource {
+
+ private static final Logger logger =
LoggerFactory.getLogger(DriverAndKubernetesResource.class);
+
+ private final LoadInstanceResource loadInstanceResource;
+ private final KubernetesResource kubernetesResource;
+
+ public DriverAndKubernetesResource(
+ LoadInstanceResource loadInstanceResource, KubernetesResource
kubernetesResource) {
+ this.loadInstanceResource = loadInstanceResource;
+ this.kubernetesResource = kubernetesResource;
+ }
+
+ public DriverAndKubernetesResource() {
+ this(
+ new LoadInstanceResource(Long.MAX_VALUE, Integer.MAX_VALUE,
Integer.MAX_VALUE),
+ new KubernetesResource(Long.MAX_VALUE, Long.MAX_VALUE));
+ }
+
+ public LoadInstanceResource getLoadInstanceResource() {
+ return this.loadInstanceResource;
+ }
+
+ private DriverAndKubernetesResource(Resource r) {
+ if (r instanceof DriverAndKubernetesResource) {
+ DriverAndKubernetesResource t = (DriverAndKubernetesResource) r;
+ this.loadInstanceResource = t.loadInstanceResource;
+ this.kubernetesResource = t.kubernetesResource;
+ } else if (r instanceof KubernetesResource) {
+ this.loadInstanceResource = new LoadInstanceResource(0, 0, 0);
+ this.kubernetesResource = (KubernetesResource) r;
+ } else if (r instanceof LoadInstanceResource) {
+ this.loadInstanceResource = (LoadInstanceResource) r;
+ this.kubernetesResource = new KubernetesResource(0, 0);
+ } else if (r instanceof LoadResource) {
+ LoadResource l = (LoadResource) r;
+ this.loadInstanceResource = new LoadInstanceResource(l.getMemory(),
l.getCores(), 0);
+ this.kubernetesResource = new KubernetesResource(0, 0);
+ } else if (r instanceof MemoryResource) {
+ MemoryResource m = (MemoryResource) r;
+ this.loadInstanceResource = new LoadInstanceResource(m.getMemory(), 0,
0);
+ this.kubernetesResource = new KubernetesResource(0, 0);
+ } else if (r instanceof CPUResource) {
+ CPUResource c = (CPUResource) r;
+ this.loadInstanceResource = new LoadInstanceResource(0, c.getCores(), 0);
+ this.kubernetesResource = new KubernetesResource(0, 0);
+ } else {
+ this.loadInstanceResource =
+ new LoadInstanceResource(Long.MAX_VALUE, Integer.MAX_VALUE,
Integer.MAX_VALUE);
+ this.kubernetesResource = new KubernetesResource(Long.MAX_VALUE,
Long.MAX_VALUE);
+ }
+ }
+
+ public KubernetesResource getKubernetesResource() {
+ return kubernetesResource;
+ }
+
+ public boolean isModuleOperate(Resource r) {
+ return false; // TODO This method needs to return false by default, and
this method needs to be
+ // removed later
+ }
+
+ public boolean isModuleOperate() {
+ return kubernetesResource != null;
+ }
+
+ @Override
+ public DriverAndKubernetesResource add(Resource resource) {
+ DriverAndKubernetesResource r = new DriverAndKubernetesResource(resource);
+ if (isModuleOperate(r)) {
+ return new DriverAndKubernetesResource(
+ loadInstanceResource.add(r.getLoadInstanceResource()),
kubernetesResource);
+ } else {
+ return new DriverAndKubernetesResource(
+ loadInstanceResource.add(r.getLoadInstanceResource()),
+ kubernetesResource.add(r.getKubernetesResource()));
+ }
+ }
+
+ @Override
+ public DriverAndKubernetesResource minus(Resource resource) {
+ DriverAndKubernetesResource r = new DriverAndKubernetesResource(resource);
+ if (isModuleOperate(r)) {
+ return new DriverAndKubernetesResource(
+ loadInstanceResource.minus(r.getLoadInstanceResource()),
kubernetesResource);
+ } else {
+ return new DriverAndKubernetesResource(
+ loadInstanceResource.minus(r.getLoadInstanceResource()),
+ kubernetesResource.minus(r.getKubernetesResource()));
+ }
+ }
+
+ @Override
+ public Resource multiplied(Resource r) {
+ throw new ResourceWarnException(
+ OPERATION_MULTIPLIED.getErrorCode(),
OPERATION_MULTIPLIED.getErrorDesc());
+ }
+
+ @Override
+ public Resource multiplied(float rate) {
+ if (isModuleOperate()) {
+ return new DriverAndKubernetesResource(
+ loadInstanceResource.multiplied(rate), kubernetesResource);
+ } else {
+ return new DriverAndKubernetesResource(
+ loadInstanceResource.multiplied(rate),
kubernetesResource.multiplied(rate));
+ }
+ }
+
+ @Override
+ public Resource divide(Resource r) {
+ throw new ResourceWarnException(
+ OPERATION_MULTIPLIED.getErrorCode(),
OPERATION_MULTIPLIED.getErrorDesc());
+ }
+
+ @Override
+ public Resource divide(int rate) {
+ if (isModuleOperate()) {
+ return new
DriverAndKubernetesResource(loadInstanceResource.divide(rate),
kubernetesResource);
+ } else {
+ return new DriverAndKubernetesResource(
+ loadInstanceResource.divide(rate), kubernetesResource.divide(rate));
+ }
+ }
+
+ @Override
+ public boolean moreThan(Resource resource) {
+ DriverAndKubernetesResource r = new DriverAndKubernetesResource(resource);
+ if (isModuleOperate(r)) {
+ return loadInstanceResource.moreThan(r.loadInstanceResource);
+ } else {
+ return loadInstanceResource.moreThan(r.loadInstanceResource)
+ && kubernetesResource.moreThan(r.kubernetesResource);
+ }
+ }
+
+ @Override
+ public boolean caseMore(Resource resource) {
+ DriverAndKubernetesResource r = new DriverAndKubernetesResource(resource);
+ if (isModuleOperate(r)) {
+ return loadInstanceResource.caseMore(r.loadInstanceResource);
+ } else {
+ return loadInstanceResource.caseMore(r.loadInstanceResource)
+ || kubernetesResource.caseMore(r.kubernetesResource);
+ }
+ }
+
+ @Override
+ public boolean equalsTo(Resource resource) {
+ DriverAndKubernetesResource r = new DriverAndKubernetesResource(resource);
+ if (isModuleOperate(r)) {
+ return loadInstanceResource.equalsTo(r.loadInstanceResource);
+ } else {
+ return loadInstanceResource.equalsTo(r.loadInstanceResource)
+ && kubernetesResource.equalsTo(r.kubernetesResource);
+ }
+ }
+
+ @Override
+ public boolean notLess(Resource resource) {
+ DriverAndKubernetesResource r = new DriverAndKubernetesResource(resource);
+ if (isModuleOperate(r)) {
+ return loadInstanceResource.notLess(r.loadInstanceResource);
+ } else {
+ return loadInstanceResource.notLess(r.loadInstanceResource)
+ && kubernetesResource.notLess(r.kubernetesResource);
+ }
+ }
+
+ @Override
+ public boolean less(Resource r) {
+ return !notLess(r);
+ }
+
+ public String toJson() {
+ String load = "null";
+ String kubernetes = "null";
+ if (loadInstanceResource != null) {
+ load = loadInstanceResource.toJson();
+ }
+ if (kubernetesResource != null) {
+ kubernetes = kubernetesResource.toJson();
+ }
+ return String.format("{\"driver\":%s, \"kubernetes\":%s}", load,
kubernetes);
+ }
+
+ public String toString() {
+ return String.format(
+ "Driver resources(Driver资源):%s, Kubernetes resource(K8S资源):%s",
+ loadInstanceResource, kubernetesResource);
+ }
+}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/KubernetesResource.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/KubernetesResource.java
new file mode 100644
index 000000000..d716569c5
--- /dev/null
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/KubernetesResource.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.manager.common.entity.resource;
+
+import org.apache.linkis.common.utils.ByteTimeUtils;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class KubernetesResource extends Resource {
+ private final long memory;
+ private final long cores;
+ private final String namespace;
+
+ private KubernetesResource(Resource r) {
+ if (r instanceof KubernetesResource) {
+ KubernetesResource t = (KubernetesResource) r;
+ this.memory = t.memory;
+ this.cores = t.cores;
+ this.namespace = t.namespace;
+ } else if (r instanceof MemoryResource) {
+ MemoryResource m = (MemoryResource) r;
+ this.memory = m.getMemory();
+ this.cores = 0;
+ this.namespace = "default";
+ } else if (r instanceof CPUResource) {
+ CPUResource c = (CPUResource) r;
+ this.memory = 0;
+ this.cores = c.getCores();
+ this.namespace = "default";
+ } else {
+ this.memory = Long.MAX_VALUE;
+ this.cores = Long.MAX_VALUE;
+ this.namespace = "default";
+ }
+ }
+
+ public KubernetesResource() {
+ this(Long.MAX_VALUE, Long.MAX_VALUE, "default");
+ }
+
+ public KubernetesResource(long memory, long cores, String namespace) {
+ this.memory = memory;
+ this.cores = cores;
+ this.namespace = namespace;
+ }
+
+ public KubernetesResource(long memory, long cores) {
+ this.memory = memory;
+ this.cores = cores;
+ this.namespace = "default";
+ }
+
+ public KubernetesResource add(Resource r) {
+ KubernetesResource temp = new KubernetesResource(r);
+ return new KubernetesResource(
+ this.memory + temp.memory, this.cores + temp.cores, this.namespace);
+ }
+
+ public KubernetesResource minus(Resource r) {
+ KubernetesResource temp = new KubernetesResource(r);
+ return new KubernetesResource(
+ this.memory - temp.memory, this.cores - temp.cores, this.namespace);
+ }
+
+ public KubernetesResource multiplied(Resource r) {
+ KubernetesResource temp = new KubernetesResource(r);
+ return new KubernetesResource(
+ this.memory * temp.memory, this.cores * temp.cores, this.namespace);
+ }
+
+ public KubernetesResource multiplied(float rate) {
+ return new KubernetesResource(
+ (long) (this.memory * rate), Math.round(this.cores * rate),
this.namespace);
+ }
+
+ public KubernetesResource divide(Resource r) {
+ KubernetesResource temp = new KubernetesResource(r);
+ return new KubernetesResource(
+ this.memory / temp.memory, this.cores / temp.cores, this.namespace);
+ }
+
+ public KubernetesResource divide(int rate) {
+ return new KubernetesResource(this.memory / rate, this.cores / rate,
this.namespace);
+ }
+
+ public boolean moreThan(Resource r) {
+ KubernetesResource temp = new KubernetesResource(r);
+ return this.memory > temp.memory && this.cores > temp.cores;
+ }
+
+ public boolean caseMore(Resource r) {
+ KubernetesResource temp = new KubernetesResource(r);
+ return this.memory > temp.memory || this.cores > temp.cores;
+ }
+
+ public boolean equalsTo(Resource r) {
+ KubernetesResource temp = new KubernetesResource(r);
+ return this.memory == temp.memory && this.cores == temp.cores;
+ }
+
+ @Override
+ public boolean notLess(Resource r) {
+ KubernetesResource temp = new KubernetesResource(r);
+ return this.memory >= temp.memory
+ && this.cores >= temp.cores
+ && this.namespace.equals(temp.namespace);
+ }
+
+ @Override
+ public boolean less(Resource r) {
+ return !notLess(r);
+ }
+
+ @Override
+ public String toJson() {
+ return String.format(
+ "{\"namespace\":\"%s\",\"memory\":\"%s\",\"cpu\":%d}",
+ namespace, ByteTimeUtils.bytesToString(this.memory), this.cores);
+ }
+
+ @Override
+ public String toString() {
+ return this.toJson();
+ }
+
+ public long getMemory() {
+ return memory;
+ }
+
+ public long getCores() {
+ return cores;
+ }
+
+ public String getNamespace() {
+ return namespace;
+ }
+}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/LoadInstanceResource.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/LoadInstanceResource.java
index ed6b06cfa..57ccc4f31 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/LoadInstanceResource.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/LoadInstanceResource.java
@@ -53,6 +53,11 @@ public class LoadInstanceResource extends Resource {
this.memory = d.getLoadInstanceResource().getMemory();
this.cores = d.getLoadInstanceResource().getCores();
this.instances = d.getLoadInstanceResource().getInstances();
+ } else if (r instanceof DriverAndKubernetesResource) {
+ DriverAndKubernetesResource d = (DriverAndKubernetesResource) r;
+ this.memory = d.getLoadInstanceResource().getMemory();
+ this.cores = d.getLoadInstanceResource().getCores();
+ this.instances = d.getLoadInstanceResource().getInstances();
} else {
this.memory = Long.MAX_VALUE;
this.cores = Integer.MAX_VALUE;
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/Resource.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/Resource.java
index 21a39e80c..361a6f28e 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/Resource.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/Resource.java
@@ -43,6 +43,11 @@ public abstract class Resource {
case DriverAndYarn:
return new DriverAndYarnResource(
new LoadInstanceResource(0, 0, 0), new YarnResource(0, 0, 0,
"default", ""));
+ case Kubernetes:
+ return new KubernetesResource(0, 0);
+ case DriverAndKubernetes:
+ return new DriverAndKubernetesResource(
+ new LoadInstanceResource(0, 0, 0), new KubernetesResource(0, 0));
case Special:
return new SpecialResource(new HashMap<String, Object>());
case Default:
@@ -77,6 +82,9 @@ public abstract class Resource {
return new DriverAndYarnResource(
new LoadInstanceResource(0, 0, 0), new YarnResource(0, 0, 0,
"default", ""));
}
+ } else if (resource instanceof DriverAndKubernetesResource) {
+ return new DriverAndKubernetesResource(
+ new LoadInstanceResource(0, 0, 0), new KubernetesResource(0, 0));
} else if (resource instanceof SpecialResource) {
return new SpecialResource(new HashMap<String, Object>());
} else {
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/ResourceType.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/ResourceType.java
index 488d7fe4b..8872dceb9 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/ResourceType.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/ResourceType.java
@@ -26,5 +26,7 @@ public enum ResourceType {
LoadInstance,
Yarn,
DriverAndYarn,
+ Kubernetes,
+ DriverAndKubernetes,
Special
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/utils/ResourceUtils.java
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/utils/ResourceUtils.java
index 3bdf64656..ab4f665f0 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/utils/ResourceUtils.java
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/utils/ResourceUtils.java
@@ -88,6 +88,10 @@ public class ResourceUtils {
return mapper.readValue(plainResource, SpecialResource.class);
case Yarn:
return mapper.readValue(plainResource, YarnResource.class);
+ case DriverAndKubernetes:
+ return mapper.readValue(plainResource,
DriverAndKubernetesResource.class);
+ case Kubernetes:
+ return mapper.readValue(plainResource, KubernetesResource.class);
default:
return mapper.readValue(plainResource, LoadResource.class);
}
@@ -227,6 +231,10 @@ public class ResourceUtils {
return ResourceType.DriverAndYarn;
} else if (resource instanceof SpecialResource) {
return ResourceType.Special;
+ } else if (resource instanceof KubernetesResource) {
+ return ResourceType.Kubernetes;
+ } else if (resource instanceof DriverAndKubernetesResource) {
+ return ResourceType.DriverAndKubernetes;
} else {
return ResourceType.LoadInstance;
}
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/test/resources/create.sql
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/test/resources/create.sql
index 0c26cca75..3238d32f8 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/test/resources/create.sql
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/test/resources/create.sql
@@ -78,7 +78,7 @@ INSERT INTO linkis_cg_manager_label_user(username, label_id,
update_time, create
CREATE TABLE `linkis_cg_manager_label` (
`id` int(20) NOT NULL AUTO_INCREMENT,
- `label_key` varchar(32) NOT NULL,
+ `label_key` varchar(50) NOT NULL,
`label_value` varchar(255) NOT NULL,
`label_feature` varchar(16) NOT NULL,
`label_value_size` int(20) NOT NULL,
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/test/resources/create_pg.sql
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/test/resources/create_pg.sql
index ef99d9e1e..5c305db39 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/test/resources/create_pg.sql
+++
b/linkis-computation-governance/linkis-manager/linkis-manager-persistence/src/test/resources/create_pg.sql
@@ -85,7 +85,7 @@ CREATE TABLE linkis_cg_manager_engine_em (
DROP TABLE IF EXISTS "linkis_cg_manager_label";
CREATE TABLE linkis_cg_manager_label (
id serial NOT NULL,
- label_key varchar(32) NOT NULL,
+ label_key varchar(50) NOT NULL,
label_value varchar(255) NOT NULL,
label_feature varchar(16) NOT NULL,
label_value_size int4 NOT NULL,
diff --git a/linkis-dist/package/db/linkis_ddl.sql
b/linkis-dist/package/db/linkis_ddl.sql
index 86b856a8d..739fa0ba7 100644
--- a/linkis-dist/package/db/linkis_ddl.sql
+++ b/linkis-dist/package/db/linkis_ddl.sql
@@ -731,7 +731,7 @@ CREATE TABLE `linkis_cg_manager_engine_em` (
DROP TABLE IF EXISTS `linkis_cg_manager_label`;
CREATE TABLE `linkis_cg_manager_label` (
`id` int(20) NOT NULL AUTO_INCREMENT,
- `label_key` varchar(32) COLLATE utf8_bin NOT NULL,
+ `label_key` varchar(50) COLLATE utf8_bin NOT NULL,
`label_value` varchar(255) COLLATE utf8_bin NOT NULL,
`label_feature` varchar(16) COLLATE utf8_bin NOT NULL,
`label_value_size` int(20) NOT NULL,
diff --git a/linkis-dist/package/db/linkis_ddl_pg.sql
b/linkis-dist/package/db/linkis_ddl_pg.sql
index 8fb739c99..c205d7659 100644
--- a/linkis-dist/package/db/linkis_ddl_pg.sql
+++ b/linkis-dist/package/db/linkis_ddl_pg.sql
@@ -810,7 +810,7 @@ CREATE TABLE linkis_cg_manager_engine_em (
DROP TABLE IF EXISTS "linkis_cg_manager_label";
CREATE TABLE linkis_cg_manager_label (
id serial4 NOT NULL,
- label_key varchar(32) NOT NULL,
+ label_key varchar(50) NOT NULL,
label_value varchar(255) NOT NULL,
label_feature varchar(16) NOT NULL,
label_value_size int4 NOT NULL,
diff --git a/linkis-dist/package/db/module/linkis_manager.sql
b/linkis-dist/package/db/module/linkis_manager.sql
index c3128633e..4471080b8 100644
--- a/linkis-dist/package/db/module/linkis_manager.sql
+++ b/linkis-dist/package/db/module/linkis_manager.sql
@@ -87,7 +87,7 @@ DROP TABLE IF EXISTS `linkis_cg_manager_label`;
CREATE TABLE `linkis_cg_manager_label` (
`id` int(20) NOT NULL AUTO_INCREMENT,
- `label_key` varchar(32) COLLATE utf8_bin NOT NULL,
+ `label_key` varchar(50) COLLATE utf8_bin NOT NULL,
`label_value` varchar(255) COLLATE utf8_bin NOT NULL,
`label_feature` varchar(16) COLLATE utf8_bin NOT NULL,
`label_value_size` int(20) NOT NULL,
diff --git
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/ResourceType.java
b/linkis-dist/package/db/upgrade/1.5.0_schema/mysql/linkis_ddl.sql
similarity index 76%
copy from
linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/ResourceType.java
copy to linkis-dist/package/db/upgrade/1.5.0_schema/mysql/linkis_ddl.sql
index 488d7fe4b..6d78b7b67 100644
---
a/linkis-computation-governance/linkis-manager/linkis-manager-common/src/main/java/org/apache/linkis/manager/common/entity/resource/ResourceType.java
+++ b/linkis-dist/package/db/upgrade/1.5.0_schema/mysql/linkis_ddl.sql
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,16 +15,4 @@
* limitations under the License.
*/
-package org.apache.linkis.manager.common.entity.resource;
-
-public enum ResourceType {
- Default,
- Memory,
- CPU,
- Load,
- Instance,
- LoadInstance,
- Yarn,
- DriverAndYarn,
- Special
-}
+ALTER TABLE `linkis_cg_manager_label` MODIFY COLUMN label_key varchar(50);
\ No newline at end of file
diff --git a/linkis-engineconn-plugins/spark/pom.xml
b/linkis-engineconn-plugins/spark/pom.xml
index add427443..d98f6fcbd 100644
--- a/linkis-engineconn-plugins/spark/pom.xml
+++ b/linkis-engineconn-plugins/spark/pom.xml
@@ -416,9 +416,8 @@
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-model-core</artifactId>
- <version>5.4.1</version>
+ <version>${kubernetes-client.version}</version>
</dependency>
-
</dependencies>
<build>
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
index ecc37597d..ccc21a776 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala
@@ -62,6 +62,12 @@ object SparkConfiguration extends Logging {
val SPARK_K8S_SPARK_VERSION =
CommonVars[String]("linkis.spark.k8s.sparkVersion", "3.2.1")
val SPARK_K8S_NAMESPACE = CommonVars[String]("linkis.spark.k8s.namespace",
"default")
+ val SPARK_K8S_EXECUTOR_REQUEST_CORES =
+ CommonVars[String]("linkis.spark.k8s.executor.request.cores", "1")
+
+ val SPARK_K8S_DRIVER_REQUEST_CORES =
+ CommonVars[String]("linkis.spark.k8s.driver.request.cores", "1")
+
val SPARK_KUBERNETES_FILE_UPLOAD_PATH =
CommonVars[String]("spark.kubernetes.file.upload.path",
"local:///opt/spark/tmp")
diff --git
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala
index 8b642c723..922826c2a 100644
---
a/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala
+++
b/linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/factory/SparkEngineConnResourceFactory.scala
@@ -18,21 +18,64 @@
package org.apache.linkis.engineplugin.spark.factory
import org.apache.linkis.common.utils.{ByteTimeUtils, Logging}
+import org.apache.linkis.engineplugin.spark.config.SparkConfiguration._
import org.apache.linkis.engineplugin.spark.config.SparkResourceConfiguration._
+import
org.apache.linkis.manager.common.conf.RMConfiguration.DEFAULT_KUBERNETES_TYPE
import org.apache.linkis.manager.common.entity.resource.{
+ DriverAndKubernetesResource,
DriverAndYarnResource,
+ KubernetesResource,
LoadInstanceResource,
Resource,
YarnResource
}
-import
org.apache.linkis.manager.engineplugin.common.resource.AbstractEngineResourceFactory
+import org.apache.linkis.manager.engineplugin.common.resource.{
+ AbstractEngineResourceFactory,
+ EngineResourceRequest
+}
+import org.apache.linkis.manager.label.entity.cluster.ClusterLabel
+import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.commons.lang3.StringUtils
import java.util
+import io.fabric8.kubernetes.api.model.Quantity
+
class SparkEngineConnResourceFactory extends AbstractEngineResourceFactory
with Logging {
+ override protected def getMinRequestResource(
+ engineResourceRequest: EngineResourceRequest
+ ): Resource = {
+ val clusterLabel =
LabelUtil.getLabelFromList[ClusterLabel](engineResourceRequest.labels)
+ if (
+ clusterLabel != null && StringUtils.equals(
+ clusterLabel.getClusterType.toUpperCase(),
+ DEFAULT_KUBERNETES_TYPE.getValue
+ )
+ ) {
+ getRequestKubernetesResource(engineResourceRequest.properties)
+ } else {
+ getRequestResource(engineResourceRequest.properties)
+ }
+ }
+
+ override protected def getMaxRequestResource(
+ engineResourceRequest: EngineResourceRequest
+ ): Resource = {
+ val clusterLabel =
LabelUtil.getLabelFromList[ClusterLabel](engineResourceRequest.labels)
+ if (
+ clusterLabel != null && StringUtils.equals(
+ clusterLabel.getClusterType.toUpperCase(),
+ DEFAULT_KUBERNETES_TYPE.getValue
+ )
+ ) {
+ getRequestKubernetesResource(engineResourceRequest.properties)
+ } else {
+ getRequestResource(engineResourceRequest.properties)
+ }
+ }
+
override protected def getRequestResource(properties: util.Map[String,
String]): Resource = {
val executorNum = LINKIS_SPARK_EXECUTOR_INSTANCES.getValue(properties)
val executorMemory = LINKIS_SPARK_EXECUTOR_MEMORY.getValue(properties)
@@ -63,4 +106,44 @@ class SparkEngineConnResourceFactory extends
AbstractEngineResourceFactory with
)
}
+ def getRequestKubernetesResource(properties: util.Map[String, String]):
Resource = {
+ val executorNum = LINKIS_SPARK_EXECUTOR_INSTANCES.getValue(properties)
+ val executorCores = if
(properties.containsKey(SPARK_K8S_EXECUTOR_REQUEST_CORES.key)) {
+ val executorCoresQuantity =
+ Quantity.parse(SPARK_K8S_EXECUTOR_REQUEST_CORES.getValue(properties))
+ (Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() *
1000).toLong
+ } else {
+ LINKIS_SPARK_EXECUTOR_CORES.getValue(properties) * 1000L
+ }
+ val executorMemory = LINKIS_SPARK_EXECUTOR_MEMORY.getValue(properties)
+ val executorMemoryWithUnit = if (StringUtils.isNumeric(executorMemory)) {
+ executorMemory + "g"
+ } else {
+ executorMemory
+ }
+ val driverCores = if
(properties.containsKey(SPARK_K8S_DRIVER_REQUEST_CORES.key)) {
+ val executorCoresQuantity =
+ Quantity.parse(SPARK_K8S_DRIVER_REQUEST_CORES.getValue(properties))
+ (Quantity.getAmountInBytes(executorCoresQuantity).doubleValue() *
1000).toLong
+ } else {
+ LINKIS_SPARK_DRIVER_CORES.getValue(properties) * 1000L
+ }
+ val driverMemory = LINKIS_SPARK_DRIVER_MEMORY.getValue(properties)
+ val driverMemoryWithUnit = if (StringUtils.isNumeric(driverMemory)) {
+ driverMemory + "g"
+ } else {
+ driverMemory
+ }
+ val totalExecutorMemory = ByteTimeUtils.byteStringAsBytes(
+ executorMemoryWithUnit
+ ) * executorNum + ByteTimeUtils.byteStringAsBytes(driverMemoryWithUnit)
+ val totalExecutorCores = executorCores * executorNum + driverCores
+ val namespace = SPARK_K8S_NAMESPACE.getValue(properties)
+
+ new DriverAndKubernetesResource(
+ new LoadInstanceResource(0, 0, 0),
+ new KubernetesResource(totalExecutorMemory, totalExecutorCores,
namespace)
+ )
+ }
+
}
diff --git
a/linkis-public-enhancements/linkis-basedata-manager/src/test/resources/create.sql
b/linkis-public-enhancements/linkis-basedata-manager/src/test/resources/create.sql
index 4fd3ccc93..fd297f9d1 100644
---
a/linkis-public-enhancements/linkis-basedata-manager/src/test/resources/create.sql
+++
b/linkis-public-enhancements/linkis-basedata-manager/src/test/resources/create.sql
@@ -155,7 +155,7 @@ DROP TABLE IF EXISTS `linkis_cg_manager_label`;
CREATE TABLE `linkis_cg_manager_label`
(
`id` int(20) NOT NULL AUTO_INCREMENT,
- `label_key` varchar(32) NOT NULL,
+ `label_key` varchar(50) NOT NULL,
`label_value` varchar(255) NOT NULL,
`label_feature` varchar(16) NOT NULL,
`label_value_size` int(20) NOT NULL,
diff --git
a/linkis-public-enhancements/linkis-basedata-manager/src/test/resources/create_pg.sql
b/linkis-public-enhancements/linkis-basedata-manager/src/test/resources/create_pg.sql
index bd9d2efc5..7dc0e7b04 100644
---
a/linkis-public-enhancements/linkis-basedata-manager/src/test/resources/create_pg.sql
+++
b/linkis-public-enhancements/linkis-basedata-manager/src/test/resources/create_pg.sql
@@ -141,7 +141,7 @@ CREATE TABLE linkis_ps_udf_tree (
DROP TABLE IF EXISTS "linkis_cg_manager_label";
CREATE TABLE linkis_cg_manager_label (
id serial NOT NULL,
- label_key varchar(32) NOT NULL,
+ label_key varchar(50) NOT NULL,
label_value varchar(255) NOT NULL,
label_feature varchar(16) NOT NULL,
label_value_size int4 NOT NULL,
diff --git
a/linkis-public-enhancements/linkis-configuration/src/test/resources/create.sql
b/linkis-public-enhancements/linkis-configuration/src/test/resources/create.sql
index ea3069fe7..fb22c7a11 100644
---
a/linkis-public-enhancements/linkis-configuration/src/test/resources/create.sql
+++
b/linkis-public-enhancements/linkis-configuration/src/test/resources/create.sql
@@ -23,7 +23,7 @@ DROP TABLE IF EXISTS `linkis_cg_manager_label`;
CREATE TABLE `linkis_cg_manager_label`
(
`id` int(20) NOT NULL AUTO_INCREMENT,
- `label_key` varchar(32) NOT NULL,
+ `label_key` varchar(50) NOT NULL,
`label_value` varchar(255) NOT NULL,
`label_feature` varchar(16) NOT NULL,
`label_value_size` int(20) NOT NULL,
diff --git
a/linkis-public-enhancements/linkis-configuration/src/test/resources/create_pg.sql
b/linkis-public-enhancements/linkis-configuration/src/test/resources/create_pg.sql
index b452af0ac..56859ad16 100644
---
a/linkis-public-enhancements/linkis-configuration/src/test/resources/create_pg.sql
+++
b/linkis-public-enhancements/linkis-configuration/src/test/resources/create_pg.sql
@@ -18,7 +18,7 @@
DROP TABLE IF EXISTS "linkis_cg_manager_label";
CREATE TABLE linkis_cg_manager_label (
id serial NOT NULL,
- label_key varchar(32) NOT NULL,
+ label_key varchar(50) NOT NULL,
label_value varchar(255) NOT NULL,
label_feature varchar(16) NOT NULL,
label_value_size int4 NOT NULL,
diff --git a/tool/dependencies/known-dependencies.txt
b/tool/dependencies/known-dependencies.txt
index 67641f5c8..d98dbef75 100644
--- a/tool/dependencies/known-dependencies.txt
+++ b/tool/dependencies/known-dependencies.txt
@@ -30,6 +30,7 @@ asm-commons-9.3.jar
asm-tree-9.3.jar
aspectjweaver-1.9.6.jar
audience-annotations-0.5.0.jar
+automaton-1.11-8.jar
avatica-1.11.0.jar
avro-1.7.7.jar
avro-1.8.2.jar
@@ -51,10 +52,9 @@ calcite-core-1.16.0.jar
calcite-druid-1.16.0.jar
calcite-linq4j-1.16.0.jar
cglib-nodep-3.2.5.jar
+checker-qual-3.33.0.jar
checker-qual-3.4.0.jar
checker-qual-3.5.0.jar
-checker-qual-3.8.0.jar
-checker-qual-3.33.0.jar
chill-java-0.7.6.jar
chill_2.12-0.7.6.jar
classgraph-4.1.7.jar
@@ -103,9 +103,9 @@ druid-1.2.16.jar
elasticsearch-rest-client-6.8.15.jar
elasticsearch-rest-client-7.6.2.jar
elasticsearch-rest-client-sniffer-7.6.2.jar
+error_prone_annotations-2.18.0.jar
error_prone_annotations-2.3.4.jar
error_prone_annotations-2.4.0.jar
-error_prone_annotations-2.18.0.jar
esri-geometry-api-2.0.0.jar
eureka-client-1.10.14.jar
eureka-core-1.10.14.jar
@@ -132,6 +132,7 @@ flink-file-sink-common-1.12.2.jar
flink-hadoop-fs-1.12.2.jar
flink-java-1.12.2.jar
flink-json-1.12.2.jar
+flink-kubernetes_2.12-1.12.2.jar
flink-metrics-core-1.12.2.jar
flink-optimizer_2.12-1.12.2.jar
flink-queryable-state-client-java-1.12.2.jar
@@ -153,9 +154,9 @@ flink-table-common-1.12.2.jar
flink-table-planner-blink_2.12-1.12.2.jar
flink-table-runtime-blink_2.12-1.12.2.jar
flink-yarn_2.12-1.12.2.jar
-flink-kubernetes_2.12-1.12.2.jar
force-shading-1.12.2.jar
freemarker-2.3.31.jar
+generex-1.0.2.jar
grizzled-slf4j_2.12-1.3.2.jar
gson-2.8.9.jar
guava-32.0.0-jre.jar
@@ -221,7 +222,6 @@ ini4j-0.5.4.jar
ion-java-1.0.2.jar
istack-commons-runtime-3.0.12.jar
ivy-2.4.0.jar
-j2objc-annotations-1.3.jar
j2objc-annotations-2.8.jar
jackson-annotations-2.13.4.jar
jackson-core-2.13.4.jar
@@ -229,6 +229,7 @@ jackson-core-asl-1.9.13.jar
jackson-databind-2.13.4.2.jar
jackson-dataformat-cbor-2.13.4.jar
jackson-dataformat-csv-2.13.4.jar
+jackson-dataformat-properties-2.13.4.jar
jackson-dataformat-smile-2.13.4.jar
jackson-dataformat-xml-2.13.4.jar
jackson-dataformat-yaml-2.13.4.jar
@@ -268,6 +269,8 @@ jaxb-api-2.3.1.jar
jaxb-runtime-2.3.4.jar
jboss-logging-3.4.2.Final.jar
jcip-annotations-1.0-1.jar
+jcl-over-slf4j-1.7.30.jar
+jcommander-1.81.jar
jdbi3-core-3.4.0.jar
jdbi3-sqlobject-3.4.0.jar
jdo-api-3.0.1.jar
@@ -469,7 +472,6 @@ poi-5.2.3.jar
poi-ooxml-5.2.3.jar
poi-ooxml-lite-5.2.3.jar
poi-shared-strings-2.5.6.jar
-postgresql-42.3.8.jar
presto-client-0.234.jar
presto-client-1.5.0.jar
presto-resource-group-managers-0.234.jar
@@ -506,9 +508,6 @@ scala-reflect-2.12.17.jar
scala-xml_2.12-2.1.0.jar
scalap-2.12.17.jar
scopt_2.12-3.5.0.jar
-jackson-dataformat-properties-2.13.4.jar
-jcl-over-slf4j-1.7.30.jar
-jcommander-1.81.jar
seatunnel-api-2.3.1.jar
seatunnel-common-2.3.1.jar
seatunnel-config-base-2.3.1.jar
@@ -631,8 +630,9 @@ xmlbeans-5.1.1.jar
xmlenc-0.52.jar
xstream-1.4.20.jar
xz-1.5.jar
+zjsonpatch-0.3.0.jar
zookeeper-3.5.9.jar
zookeeper-jute-3.5.9.jar
zstd-jni-1.4.4-7.jar
zstd-jni-1.4.5-6.jar
-zjsonpatch-0.3.0.jar
\ No newline at end of file
+zjsonpatch-0.3.0.jar
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]