This is an automated email from the ASF dual-hosted git repository. shenlin pushed a commit to branch feat/runtime-manager in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git
commit 52f82faf2b1a9efd1e79db6d010737a67bf857a5 Author: wangkai <wang...@zhongan.com> AuthorDate: Sun May 12 19:40:14 2024 +0800 [runtime-on-k8s] runtime-on-k8s --- adapter/pom.xml | 1 + .../repository/WorkerInstanceRepository.java | 10 +- .../runtime/manager/worker/WorkerResource.java | 4 +- .../runtime/manager/worker/WorkerStatusEnum.java | 2 + adapter/runtime-on-k8s/pom.xml | 24 +- .../manager/k8s/api/BaseK8SApiService.java} | 31 +- .../manager/k8s/api/K8SConfigMapService.java | 76 +++-- .../manager/k8s/api/K8SCustomResourceService.java | 78 ++++- .../manager/k8s/api/K8SDeploymentService.java | 144 ++++---- .../manager/k8s/api/K8SNameSpaceService.java | 17 + .../runtime/manager/k8s/api/KubectlService.java | 114 ++++++- .../runtime/manager/k8s/common/K8SConstants.java | 20 ++ .../manager/k8s/common/WorkerStatusEnum.java | 47 --- .../manager/k8s/config/DeploySpecTemplateSpec.java | 31 ++ .../runtime/manager/k8s/config/SpecContainer.java | 54 +++ .../k8s/model/K8SCRDConnectTaskSetModel.java | 17 + .../repository/WorkerInstanceRepositoryOnK8S.java | 361 ++++++++++++++++++++- .../WorkerInstanceRepositoryOnK8STest.java | 133 +++++--- .../src/test/resources/application.properties | 16 + .../rocketmq/eventbridge/tools/JsonUtil.java | 34 +- pom.xml | 2 +- 21 files changed, 985 insertions(+), 231 deletions(-) diff --git a/adapter/pom.xml b/adapter/pom.xml index 010492c..5afe442 100644 --- a/adapter/pom.xml +++ b/adapter/pom.xml @@ -29,6 +29,7 @@ <module>storage</module> <module>benchmark</module> <module>runtime-manager</module> + <module>runtime-on-k8s</module> </modules> </project> \ No newline at end of file diff --git a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/repository/WorkerInstanceRepository.java b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/repository/WorkerInstanceRepository.java index 1c74a3b..c25f5c9 100644 --- a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/repository/WorkerInstanceRepository.java +++ b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/repository/WorkerInstanceRepository.java @@ -38,17 +38,19 @@ public interface WorkerInstanceRepository { * Delete the worker * * @param name + * @param environments * @return */ - boolean deleteWorkerInstance(String name); + boolean deleteWorkerInstance(String name, Map<String, Object> environments); /** * Get the status of worker * * @param name + * @param environments * @return */ - WorkerStatusEnum getWorkerInstanceStatus(String name); + WorkerStatusEnum getWorkerInstanceStatus(String name, Map<String, Object> environments); /** * Apply(Create/Update) the config to the worker instance. It may contains more than one config in worker instance. @@ -58,7 +60,7 @@ public interface WorkerInstanceRepository { * @param config * @return */ - boolean applyWorkerInstanceConfigFile(String name, String filePath, String config); + boolean applyWorkerInstanceConfigFile(String name, String filePath, String config, Map<String, Object> environments); /** * Get the config to the worker instance. @@ -67,5 +69,5 @@ public interface WorkerInstanceRepository { * @param filePath * @return */ - boolean getWorkerInstanceConfigFile(String name, String filePath); + String getWorkerInstanceConfigFile(String name, String filePath, Map<String, Object> environments); } \ No newline at end of file diff --git a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java index 4d48fbf..e2152c4 100644 --- a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java +++ b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java @@ -21,6 +21,6 @@ import lombok.Data; @Data public class WorkerResource { - Double cpu; - Double memory; + Long cpu; + Long memory; } \ No newline at end of file diff --git a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerStatusEnum.java b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerStatusEnum.java index 1cc34aa..b483922 100644 --- a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerStatusEnum.java +++ b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerStatusEnum.java @@ -19,6 +19,8 @@ package org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker; public enum WorkerStatusEnum { UNKNOWN(-1, "Unknown"), + + WAIT_DEPLOY(0, "Deployed"), STARTING(3, "Starting"), RUN(5, "Run"), STOP(10, "Stop"), diff --git a/adapter/runtime-on-k8s/pom.xml b/adapter/runtime-on-k8s/pom.xml index 1644b88..b0a1241 100644 --- a/adapter/runtime-on-k8s/pom.xml +++ b/adapter/runtime-on-k8s/pom.xml @@ -27,20 +27,40 @@ <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <commons.io.version>2.8.0</commons.io.version> - <junit.version>4.10</junit.version> + <fabric8.version>6.6.2</fabric8.version> </properties> <dependencies> <dependency> + <groupId>io.fabric8</groupId> + <artifactId>kubernetes-client</artifactId> + <version>${fabric8.version}</version> + </dependency> + + <!--<dependency> <groupId>io.kubernetes</groupId> <artifactId>client-java</artifactId> <version>10.0.0</version> - </dependency> + </dependency>--> + <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-eventbridge-adapter-runtime-manager</artifactId> <version>1.0.0</version> </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>io.fabric8</groupId> + <artifactId>kubernetes-server-mock</artifactId> + <version>${fabric8.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.springframework.boot</groupId> + <artifactId>spring-boot-starter-test</artifactId> + <scope>test</scope> + </dependency> </dependencies> diff --git a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/BaseK8SApiService.java similarity index 54% copy from adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java copy to adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/BaseK8SApiService.java index 4d48fbf..6aa0540 100644 --- a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java +++ b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/BaseK8SApiService.java @@ -15,12 +15,29 @@ * limitations under the License. */ -package org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker; +package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api; -import lombok.Data; +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.lang3.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; -@Data -public class WorkerResource { - Double cpu; - Double memory; -} \ No newline at end of file +import java.io.IOException; + +public abstract class BaseK8SApiService { + + @Autowired + private KubectlService kubectlService; + + protected KubernetesClient getKubernetesClient(String clientId) throws IOException { + KubernetesClient client = kubectlService.getClient(); + if (StringUtils.isNotBlank(clientId)) { + client = kubectlService.generateKubeApiClient(clientId); + } + return client; + } + + protected KubernetesClient getKubernetesClient() { + return kubectlService.getClient(); + } + +} diff --git a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SConfigMapService.java b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SConfigMapService.java index a9f147c..56e2c98 100644 --- a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SConfigMapService.java +++ b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SConfigMapService.java @@ -1,43 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api; import com.google.gson.Gson; -import io.kubernetes.client.openapi.ApiClient; -import io.kubernetes.client.openapi.Configuration; -import io.kubernetes.client.openapi.apis.CoreV1Api; -import io.kubernetes.client.openapi.models.V1ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.client.KubernetesClient; +import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.eventbridge.exception.EventBridgeException; import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.rocketmq.eventbridge.tools.JsonUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -@Service -public class K8SConfigMapService { - private static final Logger log = LoggerFactory.getLogger(K8SConfigMapService.class); +@Slf4j +@Service +public class K8SConfigMapService extends BaseK8SApiService { - @Autowired - ApiClient apiClient; + /*@Autowired + ApiClient apiClient;*/ @Autowired - K8SNameSpaceService k8SNameSpaceService; + private K8SNameSpaceService k8SNameSpaceService; - @Autowired - KubectlService kubectlService; - public boolean createConfigMap(String regionId, String csClusterId, Object configYaml) { + public boolean createConfigMap(String clientId, Object configYaml) { try { - CoreV1Api api = new CoreV1Api(apiClient); - if (csClusterId != null) { - ApiClient genApiClient = kubectlService.generateKubeApiClient(regionId, csClusterId); - api = new CoreV1Api(genApiClient); - } - V1ConfigMap configMap = Configuration.getDefaultApiClient() + /*V1ConfigMap configMap = Configuration.getDefaultApiClient() .getJSON() .deserialize(new Gson().toJson(configYaml), V1ConfigMap.class); - - api.createNamespacedConfigMap(k8SNameSpaceService.getNameSpace(), configMap, "true", null, null); + api.createNamespacedConfigMap(k8SNameSpaceService.getNameSpace(), configMap, "true", null, null);*/ + KubernetesClient client = getKubernetesClient(clientId); + ConfigMap configMap = (ConfigMap)JsonUtil.parse(JsonUtil.toJson(configYaml), ConfigMap.class); + client.configMaps().inNamespace(k8SNameSpaceService.getNameSpace()).resource(configMap).create(); } catch (Throwable e) { if ("Conflict".equals(e.getMessage())) { return Boolean.TRUE; @@ -49,14 +59,16 @@ public class K8SConfigMapService { return Boolean.TRUE; } - public boolean deleteConfigMap(String regionId, String csClusterId, String name) { + public boolean deleteConfigMap(String clientId, String name) { try { - CoreV1Api api = new CoreV1Api(apiClient); + /* CoreV1Api api = new CoreV1Api(apiClient); if (csClusterId != null) { ApiClient genApiClient = kubectlService.generateKubeApiClient(regionId, csClusterId); api = new CoreV1Api(genApiClient); } - api.deleteNamespacedConfigMap(name, k8SNameSpaceService.getNameSpace(), "true", null, null, null, null, null); + api.deleteNamespacedConfigMap(name, k8SNameSpaceService.getNameSpace(), "true", null, null, null, null, null);*/ + KubernetesClient client = getKubernetesClient(clientId); + client.configMaps().inNamespace(k8SNameSpaceService.getNameSpace()).withName(name).delete(); } catch (Throwable e) { if ("Not Found".equals(e.getMessage())) { return Boolean.FALSE; @@ -68,9 +80,9 @@ public class K8SConfigMapService { return Boolean.TRUE; } - public void replaceConfigMap(String regionId, String csClusterId, String name, Object configYaml) { + public void replaceConfigMap(String clientId, Object configYaml) { try { - CoreV1Api api = new CoreV1Api(apiClient); + /*CoreV1Api api = new CoreV1Api(apiClient); if (csClusterId != null) { ApiClient genApiClient = kubectlService.generateKubeApiClient(regionId, csClusterId); api = new CoreV1Api(genApiClient); @@ -80,10 +92,14 @@ public class K8SConfigMapService { .getJSON() .deserialize(new Gson().toJson(configYaml), V1ConfigMap.class); - api.replaceNamespacedConfigMap(name, k8SNameSpaceService.getNameSpace(), configMap, "true", null, null); + api.replaceNamespacedConfigMap(name, k8SNameSpaceService.getNameSpace(), configMap, "true", null, null);*/ + + KubernetesClient client = getKubernetesClient(clientId); + ConfigMap configMap = JsonUtil.parse(JsonUtil.toJson(configYaml), ConfigMap.class); + client.configMaps().inNamespace(k8SNameSpaceService.getNameSpace()).resource(configMap).update(); } catch (Throwable e) { if ("Not Found".equals(e.getMessage())) { - this.createConfigMap(regionId, csClusterId, configYaml); + this.createConfigMap(clientId, configYaml); } else { log.error("replaceConfigMap failed.{}", e); throw new EventBridgeException(DefaultErrorCode.InternalError, new Gson().toJson(configYaml)); diff --git a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SCustomResourceService.java b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SCustomResourceService.java index 4b561f8..d71cbe4 100644 --- a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SCustomResourceService.java +++ b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SCustomResourceService.java @@ -1,30 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api; -import io.kubernetes.client.openapi.ApiClient; -import io.kubernetes.client.openapi.ApiException; -import io.kubernetes.client.openapi.apis.CustomObjectsApi; +import io.fabric8.kubernetes.api.model.GenericKubernetesResource; +import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext; import org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.common.K8SConstants; import org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.model.K8SCRDConnectTaskSetModel; import org.apache.rocketmq.eventbridge.exception.EventBridgeException; import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode; +import org.apache.rocketmq.eventbridge.tools.JsonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service -public class K8SCustomResourceService{ +public class K8SCustomResourceService extends BaseK8SApiService{ private static final Logger log = LoggerFactory.getLogger(K8SCustomResourceService.class); - @Autowired - ApiClient apiClient; + /*@Autowired + ApiClient apiClient;*/ @Autowired - K8SNameSpaceService k8SNameSpaceService; + private K8SNameSpaceService k8SNameSpaceService; + + private ResourceDefinitionContext resourceDefinitionContext; + public K8SCustomResourceService () { + resourceDefinitionContext = new ResourceDefinitionContext.Builder() + .withGroup(K8SConstants.CONNECT_TASK_SETS_GROUP) + .withVersion(K8SConstants.CONNECT_TASK_SETS_VERSION) + .withPlural(K8SConstants.CONNECT_TASK_SETS_PLURAL) + .withNamespaced(true) + .build(); + } + public boolean createConnectTaskSet(String taskName, String lanucherConfig, String taskConfig, int replicas) { - CustomObjectsApi api2 = new CustomObjectsApi(apiClient); + /*CustomObjectsApi api2 = new CustomObjectsApi(apiClient); K8SCRDConnectTaskSetModel body = new K8SCRDConnectTaskSetModel(taskName, lanucherConfig, taskConfig, replicas, getImage()); try { @@ -35,11 +63,23 @@ public class K8SCustomResourceService{ log.error(DefaultErrorCode.InternalError.getMsg(), e); throw new EventBridgeException(DefaultErrorCode.InternalError, e, taskConfig); } + String rawJsonCustomResourceObj = "{\"apiVersion\":\"jungle.example.com/v1\"," + + "\"kind\":\"Animal\",\"metadata\": {\"name\": \"walrus\"}," + + "\"spec\": {\"image\": \"my-awesome-walrus-image\"}}";*/ + + K8SCRDConnectTaskSetModel body = new K8SCRDConnectTaskSetModel(taskName, lanucherConfig, taskConfig, replicas, + getImage()); + try { + getKubernetesClient().genericKubernetesResources(resourceDefinitionContext).inNamespace(k8SNameSpaceService.getNameSpace()).load(JsonUtil.toJson(body)).create(); + } catch (Throwable e) { + log.error(DefaultErrorCode.InternalError.getMsg(), e); + throw new EventBridgeException(DefaultErrorCode.InternalError, e, taskConfig); + } return Boolean.TRUE; } public boolean deleteConnectTaskSet(String taskName) { - CustomObjectsApi api2 = new CustomObjectsApi(apiClient); + /*CustomObjectsApi api2 = new CustomObjectsApi(apiClient); try { api2.deleteNamespacedCustomObject(K8SConstants.CONNECT_TASK_SETS_GROUP, K8SConstants.CONNECT_TASK_SETS_VERSION, k8SNameSpaceService.getNameSpace(), @@ -47,12 +87,18 @@ public class K8SCustomResourceService{ } catch (Throwable e) { log.error("Delete Connector failed", e); throw new EventBridgeException(DefaultErrorCode.InternalError, e, taskName); + }*/ + try { + getKubernetesClient().genericKubernetesResources(resourceDefinitionContext).withName(taskName).delete(); + } catch (Throwable e) { + log.error("Delete Connector failed", e); + throw new EventBridgeException(DefaultErrorCode.InternalError, e, taskName); } return Boolean.TRUE; } public boolean isConnectTaskExist(String taskName) { - CustomObjectsApi api2 = new CustomObjectsApi(apiClient); + /*CustomObjectsApi api2 = new CustomObjectsApi(apiClient); try { api2.getNamespacedCustomObject(K8SConstants.CONNECT_TASK_SETS_GROUP, K8SConstants.CONNECT_TASK_SETS_VERSION, k8SNameSpaceService.getNameSpace(), K8SConstants.CONNECT_TASK_SETS_PLURAL, taskName); @@ -64,8 +110,18 @@ public class K8SCustomResourceService{ } log.error("Get Connector task failed", e); throw new EventBridgeException(DefaultErrorCode.InternalError, e); + }*/ + + try { + GenericKubernetesResource customResourceObject = getKubernetesClient().genericKubernetesResources(resourceDefinitionContext).withName(taskName).get(); + if (null != customResourceObject) { + return Boolean.TRUE; + } + return Boolean.FALSE; + } catch (Throwable e) { + log.error("Delete Connector failed", e); + throw new EventBridgeException(DefaultErrorCode.InternalError, e, taskName); } - return Boolean.TRUE; } private String getImage() { diff --git a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SDeploymentService.java b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SDeploymentService.java index 6c29991..7c5a142 100644 --- a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SDeploymentService.java +++ b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SDeploymentService.java @@ -1,86 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api; import com.google.gson.Gson; -import io.kubernetes.client.openapi.ApiClient; -import io.kubernetes.client.openapi.Configuration; -import io.kubernetes.client.openapi.apis.AppsV1Api; -import io.kubernetes.client.openapi.models.V1Deployment; -import org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.common.WorkerStatusEnum; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerStatusEnum; import org.apache.rocketmq.eventbridge.exception.EventBridgeException; import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.util.Map; -@Service -public class K8SDeploymentService { - - private static final Logger log = LoggerFactory.getLogger(K8SDeploymentService.class); - @Autowired - ApiClient apiClient; - - @Autowired - K8SNameSpaceService k8SNameSpaceService; +@Slf4j +@Service +public class K8SDeploymentService extends BaseK8SApiService { @Autowired - KubectlService kubectlService; + private K8SNameSpaceService k8SNameSpaceService; - public boolean updateDeployment(String regionId, String csClusterId, String name, Object deployYaml) { + public boolean updateDeployment(String clientId, String name, Deployment deployment) { try { - AppsV1Api api = new AppsV1Api(apiClient); - if (csClusterId != null) { - ApiClient genApiClient = kubectlService.generateKubeApiClient(regionId, csClusterId); - api = new AppsV1Api(genApiClient); - } - V1Deployment body = Configuration.getDefaultApiClient() - .getJSON() - .deserialize(new Gson().toJson(deployYaml), V1Deployment.class); - api.replaceNamespacedDeployment(name, k8SNameSpaceService.getNameSpace(), body, "true", null, null); + KubernetesClient client = getKubernetesClient(clientId); + client.apps().deployments().inNamespace(k8SNameSpaceService.getNameSpace()).resource(deployment).update(); } catch (Throwable e) { if ("Not Found".equals(e.getMessage())) { - this.createDeployment(regionId, csClusterId, deployYaml); + this.createDeployment(clientId, deployment); } else { log.error(DefaultErrorCode.InternalError.getMsg(), e); - throw new EventBridgeException(DefaultErrorCode.InternalError,e, new Gson().toJson(deployYaml)); + throw new EventBridgeException(DefaultErrorCode.InternalError,e, new Gson().toJson(deployment)); } } return Boolean.TRUE; } - public WorkerStatusEnum getDeploymentStatus(String regionId, String csClusterId, String name) { - V1Deployment v1Deployment = null; + public WorkerStatusEnum getDeploymentStatus(String clientId, String name) { + Deployment deployment; try { - AppsV1Api api = new AppsV1Api(apiClient); - if (csClusterId != null) { - ApiClient genApiClient = kubectlService.generateKubeApiClient(regionId, csClusterId); - api = new AppsV1Api(genApiClient); - } - v1Deployment = api.readNamespacedDeployment(name, k8SNameSpaceService.getNameSpace(), "true", true, false); + KubernetesClient client = getKubernetesClient(clientId); + deployment = client.apps().deployments().inNamespace(k8SNameSpaceService.getNameSpace()).withName(name).get(); } catch (Throwable e) { log.error(DefaultErrorCode.InternalError.getMsg(), e); throw new EventBridgeException(DefaultErrorCode.InternalError, e, "name=" + name); } - if (v1Deployment == null || v1Deployment.getStatus() == null || v1Deployment.getStatus().getReadyReplicas() == null) { - return WorkerStatusEnum.STARTING; + if (deployment == null || deployment.getStatus() == null || deployment.getStatus().getReadyReplicas() == null) { + return WorkerStatusEnum.UNKNOWN; } - if (v1Deployment.getStatus().getReadyReplicas() == 1) { + if (deployment.getStatus().getReadyReplicas() == 1) { return WorkerStatusEnum.RUN; } - return WorkerStatusEnum.DEFAULT; + return WorkerStatusEnum.UNKNOWN; } - public boolean deleteDeployment(String regionId, String csClusterId, String name) { + public boolean deleteDeployment(String clientId, String name) { try { - AppsV1Api api = new AppsV1Api(apiClient); - if (csClusterId != null) { - ApiClient genApiClient = kubectlService.generateKubeApiClient(regionId, csClusterId); - api = new AppsV1Api(genApiClient); - } - api.deleteNamespacedDeployment(name, k8SNameSpaceService.getNameSpace(), "true", null, null, null, null, - null); + KubernetesClient client = getKubernetesClient(clientId); + client.apps().deployments().inNamespace(k8SNameSpaceService.getNameSpace()).withName(name).delete(); } catch (Throwable e) { if ("Not Found".equals(e.getMessage())) { return Boolean.FALSE; @@ -88,31 +84,59 @@ public class K8SDeploymentService { log.error(DefaultErrorCode.InternalError.getMsg(), e); throw new EventBridgeException(DefaultErrorCode.InternalError, e, name); } - } return Boolean.TRUE; } - public boolean createDeployment(String regionId, String csClusterId, Object deployYaml) { + public boolean createDeployment(String clientId, Deployment deployment) { try { - AppsV1Api api = new AppsV1Api(apiClient); - if (csClusterId != null) { - ApiClient genApiClient = kubectlService.generateKubeApiClient(regionId, csClusterId); - api = new AppsV1Api(genApiClient); + KubernetesClient client = getKubernetesClient(clientId); + client.apps().deployments().inNamespace(k8SNameSpaceService.getNameSpace()).resource(deployment).create(); + } catch (Throwable e) { + if ("Conflict".equals(e.getMessage())) { + return Boolean.TRUE; + } else { + log.error(DefaultErrorCode.InternalError.getMsg(), e); + throw new EventBridgeException(DefaultErrorCode.InternalError,e, new Gson().toJson(deployment)); } - V1Deployment body = Configuration.getDefaultApiClient() - .getJSON() - .deserialize(new Gson().toJson(deployYaml), V1Deployment.class); - api.createNamespacedDeployment(k8SNameSpaceService.getNameSpace(), body, "true", null, null); + } + return Boolean.TRUE; + } + + public boolean createConfigMap(String clientId, ConfigMap configMap) { + try { + KubernetesClient client = getKubernetesClient(clientId); + client.resources(ConfigMap.class).resource(configMap).create(); } catch (Throwable e) { if ("Conflict".equals(e.getMessage())) { return Boolean.TRUE; } else { log.error(DefaultErrorCode.InternalError.getMsg(), e); - throw new EventBridgeException(DefaultErrorCode.InternalError,e, new Gson().toJson(deployYaml)); + throw new EventBridgeException(DefaultErrorCode.InternalError,e, configMap); } } return Boolean.TRUE; } + public String getConfigMap(String clientId, String name, String filePath) { + try { + KubernetesClient client = getKubernetesClient(clientId); + Resource<ConfigMap> resource = client.resources(ConfigMap.class) + .resource(new ConfigMapBuilder().withNewMetadata().withName(name).endMetadata().build()); + if (resource != null) { + ConfigMap configMap = resource.get(); + if (configMap != null) { + Map<String, String> map = configMap.getData(); + if (map != null && map.containsKey(filePath)) { + return map.get(filePath); + } + } + } + } catch (Throwable e) { + log.error(DefaultErrorCode.InternalError.getMsg(), e); + throw new EventBridgeException(DefaultErrorCode.InternalError,e, "name=" + name + ", filePath=" + filePath); + } + return null; + } + } diff --git a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SNameSpaceService.java b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SNameSpaceService.java index 69ca895..4da57d2 100644 --- a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SNameSpaceService.java +++ b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SNameSpaceService.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api; import org.springframework.stereotype.Service; diff --git a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java index aaae526..a41c8cd 100644 --- a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java +++ b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java @@ -1,20 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api; -import io.kubernetes.client.openapi.ApiClient; -import io.kubernetes.client.util.ClientBuilder; -import io.kubernetes.client.util.KubeConfig; +import io.fabric8.kubernetes.client.*; + import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; + +@Slf4j @Service -public class KubectlService { +public class KubectlService implements AutoCloseable{ - private static final Logger log = LoggerFactory.getLogger(KubectlService.class); + private KubernetesClient client ; @Value("${eventbus.cs.accessKey:}") private String accessKey; @@ -25,21 +45,79 @@ public class KubectlService { @Value("${conductor.run.env:online}") private String env; - private Map<String, ApiClient> kubeConfigMap = new ConcurrentHashMap<>(); + @Value("${kubernates.api.server}") + private String apiServer; - public ApiClient generateKubeApiClient(String regionId, String csClusterId) throws IOException { - if (kubeConfigMap.containsKey(csClusterId)) { - return kubeConfigMap.get(csClusterId); - } - KubeConfig config = getKubeConfig(regionId, csClusterId, accessKey, secretKey); + @Value("${kubernates.api.version:apps/v1}") + private String apiVersion; + + @Value("${kubernates.auth.token:}") + private String oauthToken; + + private final String DEFAULT_KEY= "default"; + + private Map<String, KubernetesClient> kubernetesClientMap = new ConcurrentHashMap<>(); + + @PostConstruct + public void initClient(){ + client = getKubernetesClient(); + } + + public KubernetesClient getClient(){ + return this.client; + } - ApiClient client = ClientBuilder.kubeconfig(config).build(); - //client.setDebugging(true); - kubeConfigMap.putIfAbsent(csClusterId, client); + public KubernetesClient generateKubeApiClient(String clientId) throws IOException { + if (StringUtils.isBlank(clientId)) { + clientId = DEFAULT_KEY; + } + if (kubernetesClientMap.containsKey(clientId)) { + return kubernetesClientMap.get(clientId); + } + KubernetesClient client = getKubernetesClient(); + kubernetesClientMap.putIfAbsent(clientId, client); return client; } - private KubeConfig getKubeConfig(String regionId, String csClusterId, String ak, String sk) throws IOException { - return null; + private KubernetesClient getKubernetesClient() { + Config config = getKubeConfig(); + log.info("connect to api server [{}]", apiServer); + return new KubernetesClientBuilder().withConfig(config).build(); + } + + private Config getKubeConfig() { + Config config = new ConfigBuilder() + .withMasterUrl(apiServer) + .withApiVersion(apiVersion) + .build(); + if (StringUtils.isNotBlank(accessKey) && StringUtils.isNotBlank(secretKey)) { + config.setUsername(accessKey); + config.setPassword(secretKey); + log.info("use ak and sk connect to api server."); + } else if(StringUtils.isNotBlank(oauthToken)){ + config.setTrustCerts(true); + config.setOauthToken(oauthToken); + log.info("use auth token connect to api server."); + } else { + log.warn("this is no authenticated connection to the API server."); + } + return config; + } + + @Override + public void close() throws Exception { + + if(client != null) { + client.close(); + } + + if (!kubernetesClientMap.isEmpty()) { + for(String clientId : kubernetesClientMap.keySet()) { + KubernetesClient kubernetesClient = kubernetesClientMap.get(clientId); + if(kubernetesClient != null){ + kubernetesClient.close(); + } + } + } } } diff --git a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/K8SConstants.java b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/K8SConstants.java index b54356a..e1d85fa 100644 --- a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/K8SConstants.java +++ b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/K8SConstants.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.common; public class K8SConstants { @@ -8,4 +25,7 @@ public class K8SConstants { public static final String DEFAULT_CLUSTER_SUPPORT_CONNECTOR = "ALL"; public static final String DEFAULT_CONNECTOR_KEY_TASK_SIZE = "TASK_SIZE"; + public static final String SELECTOR_LABEL_KEY = "app"; + + public static final String DEPLOYMENT_KIND = "Deployment"; } diff --git a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/WorkerStatusEnum.java b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/WorkerStatusEnum.java deleted file mode 100644 index c8ff69f..0000000 --- a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/WorkerStatusEnum.java +++ /dev/null @@ -1,47 +0,0 @@ -package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.common; - -public enum WorkerStatusEnum { - /** - * 集群状态 - */ - DEFAULT(-1, "未知状态"), - WAIT_DEPLOY(0, "待部署"), - STARTING(3, "启动中"), - RUN(5, "服务中"), - STOP(10, "停止"), - RELEASING(11, "释放中"); - - private int value; - private String desc; - - WorkerStatusEnum(int value, String desc) { - this.value = value; - this.desc = desc; - } - - public static WorkerStatusEnum valueOf(int value) { - for (WorkerStatusEnum temp : WorkerStatusEnum.values()) { - if (temp.getValue() == value) { - return temp; - } - } - return DEFAULT; - } - - public static WorkerStatusEnum nameOf(String name) { - for (WorkerStatusEnum temp : WorkerStatusEnum.values()) { - if (temp.name().equals(name)) { - return temp; - } - } - return DEFAULT; - } - - public String getDesc() { - return desc; - } - - public int getValue() { - return value; - } -} diff --git a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/config/DeploySpecTemplateSpec.java b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/config/DeploySpecTemplateSpec.java new file mode 100644 index 0000000..fcdc670 --- /dev/null +++ b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/config/DeploySpecTemplateSpec.java @@ -0,0 +1,31 @@ +package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.config; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import java.util.List; +import java.util.Map; + +@Getter +@Setter +@ConfigurationProperties(prefix="deploy.spec.template.spec") +@EnableConfigurationProperties +@Configuration +public class DeploySpecTemplateSpec { + + private Long terminationGracePeriodSeconds; + + @Autowired + private SpecContainer containers; + + private Map<String, String> nodeSelector; + + + private List<Map<String, Object>> volumes; + + private String podManagementPolicy; +} diff --git a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/config/SpecContainer.java b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/config/SpecContainer.java new file mode 100644 index 0000000..acce560 --- /dev/null +++ b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/config/SpecContainer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.config; + +import lombok.Getter; +import lombok.Setter; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Configuration; + +import java.util.List; +import java.util.Map; + + +@Getter +@Setter +@ConfigurationProperties(prefix="deploy.spec.template.spec.containers") +@EnableConfigurationProperties +@Configuration +public class SpecContainer implements Cloneable{ + private String imagePullPolicy = "IfNotPresent"; + private String registry; + private List<String> args; + private List<Map<String, String>> volumeMounts; + private List<Map<String, String>> env; + private List<Map<String, String>> ports; + + public Object clone() { + SpecContainer specContainer = new SpecContainer(); + specContainer.setImagePullPolicy(this.imagePullPolicy); + specContainer.setRegistry(this.registry); + specContainer.setArgs(this.args); + specContainer.setVolumeMounts(this.volumeMounts); + specContainer.setEnv(this.env); + specContainer.setPorts(this.ports); + return specContainer; + } + +} diff --git a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/model/K8SCRDConnectTaskSetModel.java b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/model/K8SCRDConnectTaskSetModel.java index 6d78922..0b3cb6c 100644 --- a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/model/K8SCRDConnectTaskSetModel.java +++ b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/model/K8SCRDConnectTaskSetModel.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.model; import com.google.common.collect.Lists; diff --git a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8S.java b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8S.java index 20a95e9..cde8ea7 100644 --- a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8S.java +++ b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8S.java @@ -17,34 +17,379 @@ package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.repository; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Objects; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource; +import io.fabric8.kubernetes.api.model.ConfigMapVolumeSourceBuilder; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.ContainerPort; +import io.fabric8.kubernetes.api.model.ContainerPortBuilder; +import io.fabric8.kubernetes.api.model.EnvVar; +import io.fabric8.kubernetes.api.model.EnvVarBuilder; +import io.fabric8.kubernetes.api.model.HostPathVolumeSource; +import io.fabric8.kubernetes.api.model.HostPathVolumeSourceBuilder; +import io.fabric8.kubernetes.api.model.LabelSelector; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.api.model.PodSpec; +import io.fabric8.kubernetes.api.model.PodSpecBuilder; +import io.fabric8.kubernetes.api.model.PodTemplateSpec; +import io.fabric8.kubernetes.api.model.PodTemplateSpecBuilder; +import io.fabric8.kubernetes.api.model.Probe; +import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.QuantityBuilder; +import io.fabric8.kubernetes.api.model.ResourceRequirements; +import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeBuilder; +import io.fabric8.kubernetes.api.model.VolumeMount; +import io.fabric8.kubernetes.api.model.VolumeMountBuilder; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api.K8SDeploymentService; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api.K8SNameSpaceService; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.config.DeploySpecTemplateSpec; import org.apache.rocketmq.eventbridge.adapter.runtime.manager.repository.WorkerInstanceRepository; import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerResource; import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerStatusEnum; +import org.apache.rocketmq.eventbridge.tools.JsonUtil; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Service; + +import static org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.common.K8SConstants.DEPLOYMENT_KIND; +@Slf4j +@Service public class WorkerInstanceRepositoryOnK8S implements WorkerInstanceRepository { + @Autowired + private K8SDeploymentService k8SDeploymentService; + + @Value("${kubernates.api.deployment.version:apps/v1}") + private String deploymentApiVersion; + + @Value("${pod.limit.over.request.cpu.factor:1.5}") + private Double limitOverRequestCpuFactor; + + @Value("${pod.limit.over.request.memory.factor:1.5}") + private Double limitOverRequestMemoryFactor; + + @Autowired + private DeploySpecTemplateSpec deploySpecTemplateSpec; + + @Autowired + private K8SNameSpaceService k8SNameSpaceService; @Override public boolean applyWorkerInstance(String name, String image, WorkerResource resources, Map<String, Object> environments) { - return false; + + PodSpec podSpec = getPodSpec(name, + image, + resources, + environments); + + Map<String, String> specLabes = getSpecLabels(environments.get("specLabels"), name); + ObjectMeta templateMeta = new ObjectMetaBuilder() + .withLabels(specLabes) + .build(); + + PodTemplateSpec podTemplateSpec = new PodTemplateSpecBuilder() + .withMetadata(templateMeta) + .withSpec(podSpec) + .build(); + + LabelSelector labelSelector = new LabelSelector(); + labelSelector.setMatchLabels(specLabes); + + Deployment deployment = new DeploymentBuilder() + .withApiVersion(deploymentApiVersion) + .withKind(DEPLOYMENT_KIND) + .withNewMetadata() + .withName(name) + .withNamespace(k8SNameSpaceService.getNameSpace()) + .withLabels(getMetaLabels(environments.get("metaLabels"))) + .endMetadata() + .withNewSpec() + .withReplicas(getReplicas(environments.get("replicas"))) + .withSelector(labelSelector) + .withTemplate(podTemplateSpec) + .endSpec() + .build(); + + return k8SDeploymentService.createDeployment((String) environments.get("clientId"), deployment); + } + + @Override + public boolean deleteWorkerInstance(String name, Map<String, Object> environments) { + return k8SDeploymentService.deleteDeployment((String) environments.get("clientId"), name); } @Override - public boolean deleteWorkerInstance(String name) { - return false; + public WorkerStatusEnum getWorkerInstanceStatus(String name, Map<String, Object> environments) { + WorkerStatusEnum workerStatusEnum = k8SDeploymentService.getDeploymentStatus((String) environments.get("clientId"), name); + return WorkerStatusEnum.valueOf(workerStatusEnum.getValue()); + } + + @Override + public boolean applyWorkerInstanceConfigFile(String name, String filePath, String config, Map<String, Object> environments) { + ConfigMap configMap = new ConfigMapBuilder() + .withNewMetadata() + .withNamespace(k8SNameSpaceService.getNameSpace()) + .withName(name) + .endMetadata() + .addToData(filePath, config) + .build(); + return k8SDeploymentService.createConfigMap((String) environments.get("clientId"), configMap); + } + + @Override + public String getWorkerInstanceConfigFile(String name, String filePath, Map<String, Object> environments) { + return k8SDeploymentService.getConfigMap((String) environments.get("clientId"), name, filePath); + } + + private Integer getReplicas(Object replicas) { + if (Objects.isNull(replicas)) { + replicas = 1; + } + return (Integer) replicas; + } + + private PodSpec getPodSpec(String name,String image, WorkerResource resources, Map<String, Object> environments) { + assert resources.getCpu() != null; + assert resources.getMemory() != null; + long cpuCoreLimit = getCpuLimit(resources.getCpu()); + long memoryLimit = getMemoryLimt(resources.getMemory()); + + Map<String, Quantity> requests = getResource(resources.getCpu(), resources.getMemory()); + Map<String, Quantity> limits = getResource(cpuCoreLimit, memoryLimit); + ResourceRequirements resourceRequirements = new ResourceRequirementsBuilder() + .withRequests(requests) + .withLimits(limits) + .build(); + + + Container container = new ContainerBuilder() + .withName(name) + .withImagePullPolicy(getImagePullPolicy(environments.get("imagePullPolicy"))) + .withImage(image) + .withArgs(getArgs(environments.get("args"))) + .withResources(resourceRequirements) + .withEnv(getEnvs(environments.get("env"))) + .withLivenessProbe(getLivenessProbe(environments.get("lProbe"))) + .withReadinessProbe(getReadinessProbe(environments.get("rProbe"))) + .withStartupProbe(getStartupProbe(environments.get("sProbe"))) + .withTerminationMessagePolicy(getTerminationMessagePolicy(environments.get("tMesPolicy"))) + .withTerminationMessagePath(getTerminationMessagePath(environments.get("tMesPath"))) + .withVolumeMounts(getVolumeMounts(environments.get("volumeMounts"))) + .withPorts(getContainerPorts(environments.get("ports"))) + .build(); + + return new PodSpecBuilder() + .withTerminationGracePeriodSeconds(getTerminationGracePeriodSeconds(environments.get("tPeriodSec"))) + .withContainers(container) + .withVolumes(getPodVolumes(environments.get("volums"))) + .withNodeSelector(getPodLabels(environments.get("podLabels"))) + .build(); + } + + private String getImagePullPolicy(Object imagePullPolicy) { + if (Objects.isNull(imagePullPolicy)) { + return deploySpecTemplateSpec.getContainers().getImagePullPolicy(); + } + return (String) imagePullPolicy; } - @Override public WorkerStatusEnum getWorkerInstanceStatus(String name) { + @SuppressWarnings("unchecked") + private List<String> getArgs(Object args){ + if (Objects.isNull(args) && Objects.nonNull(deploySpecTemplateSpec.getContainers().getArgs())) { + args = deploySpecTemplateSpec.getContainers().getArgs(); + } else { + args = Collections.emptyList(); + } + return (List<String>) args; + } + + @SuppressWarnings("unchecked") + private List<EnvVar> getEnvs(Object env) { + List<EnvVar> envVarList = new ArrayList<>(); + if(Objects.isNull(env) && Objects.nonNull(deploySpecTemplateSpec.getContainers().getEnv())){ + env = deploySpecTemplateSpec.getContainers().getEnv(); + } + if (Objects.nonNull(env)) { + List<Map<String, String>> envs = (List<Map<String, String>>) env; + for(Map<String, String> envMap : envs) { + EnvVar envVar = new EnvVarBuilder() + .withName(envMap.get("name")) + .withValue(envMap.get("value")) + .build(); + envVarList.add(envVar); + } + } + return envVarList; + } + + private Probe getLivenessProbe(Object lProbe) { + if (Objects.nonNull(lProbe)) { + return JsonUtil.parse((String) lProbe, Probe.class); + } + return null; + } + + private Probe getReadinessProbe(Object rProbe) { + if (Objects.nonNull(rProbe)) { + return JsonUtil.parse((String) rProbe, Probe.class); + } + return null; + } + + private Probe getStartupProbe(Object sProbe) { + if (Objects.nonNull(sProbe)) { + return JsonUtil.parse((String) sProbe, Probe.class); + } return null; } - @Override public boolean applyWorkerInstanceConfigFile(String name, String filePath, String config) { - return false; + private String getTerminationMessagePolicy(Object tMesPolicy) { + return (String) tMesPolicy; + } + + private String getTerminationMessagePath(Object tMesPath) { + return (String) tMesPath; + } + + @SuppressWarnings("unchecked") + private List<ContainerPort> getContainerPorts(Object port) { + List<ContainerPort> containerPorts = new ArrayList<>(); + if(port == null) { + port = ImmutableList.of(ImmutableMap.of("name","http1", "containerPort","7001"), ImmutableMap.of("name","http2", "containerPort","7002")); + } + List<Map<String, String>> ports = (List<Map<String, String>>) port; + for(Map<String, String> configPort : ports){ + ContainerPort containerPort = new ContainerPortBuilder() + .withName(configPort.get("name")) + .withContainerPort(Integer.parseInt(configPort.get("containerPort"))) + .build(); + containerPorts.add(containerPort); + } + return containerPorts; + } + + @SuppressWarnings("unchecked") + private List<VolumeMount> getVolumeMounts(Object volumeMount){ + List<VolumeMount> volumeMountList = new ArrayList<>(); + if(volumeMount == null){ + return volumeMountList; + } + List<Map<String, String>> volumeMounts = (List<Map<String, String>>) volumeMount; + for(Map<String, String> volume : volumeMounts){ + VolumeMount mount = new VolumeMountBuilder() + .withName(volume.get("name")) + .withMountPath(volume.get("mountPath")) + .build(); + volumeMountList.add(mount); + } + return volumeMountList; } - @Override public boolean getWorkerInstanceConfigFile(String name, String filePath) { - return false; + private Long getTerminationGracePeriodSeconds(Object tPeriodSec) { + if (Objects.isNull(tPeriodSec) && Objects.nonNull(deploySpecTemplateSpec.getTerminationGracePeriodSeconds())) { + tPeriodSec = deploySpecTemplateSpec.getTerminationGracePeriodSeconds(); + } + return (Long) tPeriodSec; } + @SuppressWarnings("unchecked") + private List<Volume> getPodVolumes(Object volums) { + if (Objects.isNull(volums) && Objects.nonNull(deploySpecTemplateSpec.getVolumes())) { + volums = deploySpecTemplateSpec.getVolumes(); + } + List<Volume> volumeList = new ArrayList<>(); + if (Objects.nonNull(volums)) { + List<Map<String, Object>> vList = (List<Map<String, Object>>) volums; + for(Map<String, Object> volume : vList) { + Volume confVolume = new VolumeBuilder().build(); + for(Map.Entry<String, Object> yamlConf : volume.entrySet()) { + String key = yamlConf.getKey(); + if("name".equals(key)) { + confVolume.setName(String.valueOf(yamlConf.getValue())); + } else if("configMap".equals(key)) { + Map<String, String> configMap = (Map<String, String>) yamlConf.getValue(); + String name = configMap.get("name"); + ConfigMapVolumeSource configMapVolume = new ConfigMapVolumeSourceBuilder() + .withName(name) + .build(); + confVolume.setConfigMap(configMapVolume); + } else if("hostPath".equals(key)) { + Map<String, String> configMap = (Map<String, String>) yamlConf.getValue(); + String path = configMap.get("path"); + HostPathVolumeSource hostPathVolumeSource = new HostPathVolumeSourceBuilder() + .withPath(path) + .build(); + + confVolume.setHostPath(hostPathVolumeSource); + } else { + log.error("not implements this pod volumes type : {}", key); + } + } + volumeList.add(confVolume); + } + } + return volumeList; + } + + @SuppressWarnings("unchecked") + private Map<String, String> getPodLabels(Object nodeSeletor) { + if (Objects.isNull(nodeSeletor) && Objects.nonNull(deploySpecTemplateSpec.getNodeSelector())) { + nodeSeletor = deploySpecTemplateSpec.getNodeSelector(); + } + return (Map<String, String>) nodeSeletor; + } + + @SuppressWarnings("unchecked") + private Map<String, String> getMetaLabels(Object nodeSeletor) { + if (Objects.isNull(nodeSeletor) && Objects.nonNull(deploySpecTemplateSpec.getNodeSelector())) { + nodeSeletor = deploySpecTemplateSpec.getNodeSelector(); + } + return (Map<String, String>) nodeSeletor; + } + + @SuppressWarnings("unchecked") + private Map<String, String> getSpecLabels(Object nodeSeletor, String name) { + if (Objects.isNull(nodeSeletor) && Objects.nonNull(deploySpecTemplateSpec.getNodeSelector())) { + nodeSeletor = deploySpecTemplateSpec.getNodeSelector(); + } + Map<String, String> labels = (Map<String, String>) nodeSeletor; + if (labels == null || labels.isEmpty()) { + return ImmutableMap.of("app", name); + } + return (Map<String, String>) nodeSeletor; + } + + protected Map<String, Quantity> getResource(Long cpuCore, Long memory){ + Map<String, Quantity> resource = new HashMap<>(); + resource.put("cpu", new QuantityBuilder().withAmount(String.valueOf(cpuCore)).withFormat("m").build()); + resource.put("memory", new QuantityBuilder().withAmount(String.valueOf(memory)).withFormat("Mi").build()); + return resource; + } + + + private Long getCpuLimit(Long requestCore){ + return Math.round(requestCore * limitOverRequestCpuFactor); + } + + private Long getMemoryLimt(Long requestMemory){ + return Math.round(requestMemory * limitOverRequestMemoryFactor); + } } \ No newline at end of file diff --git a/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java index 877799e..1c539e2 100644 --- a/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java +++ b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java @@ -1,73 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.repository; import com.google.common.collect.Maps; import com.google.gson.Gson; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; import java.util.Map; + +import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerResource; +import org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerStatusEnum; import org.junit.Ignore; -import org.junit.jupiter.api.Test; -import org.mockito.InjectMocks; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; +import static org.junit.Assert.assertEquals; +@Slf4j +@RunWith(SpringRunner.class) +@SpringBootTest(classes = WorkerInstanceRepositoryOnK8STest.class) +@SpringBootApplication(scanBasePackages = {"org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.*"}) public class WorkerInstanceRepositoryOnK8STest { - @InjectMocks + @Autowired private WorkerInstanceRepositoryOnK8S workerInstanceRepositoryOnK8S; @Test - @Ignore - void applyWorkerInstance() { + public void applyWorkerInstance() { Map<String, Object> environments = Maps.newHashMap(); environments.put("key1", "value1"); environments.put("key2", "value2"); - workerInstanceRepositoryOnK8S.applyWorkerInstance("worker-4", "registry.cn-beijing.cr.aliyuncs.com/eventbridge:20231115195431f55971", new Gson().fromJson("{\"cpu\":1,\"memory\":1}", WorkerResource.class), environments); + workerInstanceRepositoryOnK8S.applyWorkerInstance("worker-4", "apache/rocketmq-eventbridge:1.1.0", new Gson().fromJson("{\"cpu\":100,\"memory\":100}", WorkerResource.class), environments); } @Test - @Ignore - void deleteWorkerInstance() { - workerInstanceRepositoryOnK8S.deleteWorkerInstance("worker-4"); + public void applyWorkerInstanceWithConfigMap() { + Map<String, Object> environments = Maps.newHashMap(); + environments.put("key1", "value1"); + environments.put("key2", "value2"); + List<Map<String, String>> volums = new ArrayList<>(); + volums.add(new HashMap<String, String>(){{ + put("name", "worker-config"); + }}); + environments.put("volums", volums); + workerInstanceRepositoryOnK8S.applyWorkerInstance("worker-4", "apache/rocketmq-eventbridge:1.1.0", new Gson().fromJson("{\"cpu\":100,\"memory\":100}", WorkerResource.class), environments); } @Test - @Ignore - void getWorkerInstanceStatus() { - workerInstanceRepositoryOnK8S.getWorkerInstanceStatus("worker-4"); + public void deleteWorkerInstance() { + Map<String, Object> environments = Maps.newHashMap(); + environments.put("key1", "value1"); + environments.put("key2", "value2"); + workerInstanceRepositoryOnK8S.deleteWorkerInstance("worker-4", environments); } @Test - @Ignore - void applyWorkerInstanceConfigFile() { + public void getWorkerInstanceStatus() { + Map<String, Object> environments = Maps.newHashMap(); + environments.put("key1", "value1"); + environments.put("key2", "value2"); + assertEquals(WorkerStatusEnum.RUN, workerInstanceRepositoryOnK8S.getWorkerInstanceStatus("worker-4", environments)); + } + + @Test + public void applyWorkerInstanceConfigFile() { + Map<String, Object> environments = Maps.newHashMap(); + environments.put("key1", "value1"); + environments.put("key2", "value2"); String taskConfig = "[\n" + - " {\n" + - " \"name\":\"demo-runner\",\n" + - " \"components\":[\n" + - " {\n" + - " \"accountId\": \"654321\",\n" + - " \"eventBusName\":\"demo-bus\"\n" + - " },\n" + - " {\n" + - " \"filterPattern\":\"{}\",\n" + - " \"class\":\"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeFilterTransform\"\n" + - " },\n" + - " {\n" + - " \"data\":\"{\\\"form\\\":\\\"TEMPLATE\\\",\\\"value\\\":\\\"{\\\\\\\"content\\\\\\\":\\\\\\\"$.data.body\\\\\\\"}\\\",\\\"template\\\":\\\"{\\\\\\\"text\\\\\\\":{\\\\\\\"content\\\\\\\":\\\\\\\"${content}\\\\\\\"},\\\\\\\"msgtype\\\\\\\":\\\\\\\"text\\\\\\\"}\\\"}\",\n" + - " \"class\": \"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform\"\n" + - " },\n" + - " {\n" + - " \"class\":\"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkTask\",\n" + - " \"webHook\":\"xxxxxxxxxxx\",\n" + - " \"secretKey\":\"xxxxxxxxxxx\"\n" + - " }\n" + - " ]\n" + - " }\n" + - "]"; - workerInstanceRepositoryOnK8S.applyWorkerInstanceConfigFile("worker-4", "/eventbridge/task-config", taskConfig); + " {\n" + + " \"name\":\"demo-runner\",\n" + + " \"components\":[\n" + + " {\n" + + " \"accountId\": \"654321\",\n" + + " \"eventBusName\":\"demo-bus\"\n" + + " },\n" + + " {\n" + + " \"filterPattern\":\"{}\",\n" + + " \"class\":\"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeFilterTransform\"\n" + + " },\n" + + " {\n" + + " \"data\":\"{\\\"form\\\":\\\"TEMPLATE\\\",\\\"value\\\":\\\"{\\\\\\\"content\\\\\\\":\\\\\\\"$.data.body\\\\\\\"}\\\",\\\"template\\\":\\\"{\\\\\\\"text\\\\\\\":{\\\\\\\"content\\\\\\\":\\\\\\\"${content}\\\\\\\"},\\\\\\\"msgtype\\\\\\\":\\\\\\\"text\\\\\\\"}\\\"}\",\n" + + " \"class\": \"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform\"\n" + + " },\n" + + " {\n" + + " \"class\":\"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkTask\",\n" + + " \"webHook\":\"xxxxxxxxxxx\",\n" + + " \"secretKey\":\"xxxxxxxxxxx\"\n" + + " }\n" + + " ]\n" + + " }\n" + + "]"; + workerInstanceRepositoryOnK8S.applyWorkerInstanceConfigFile("worker-config", "task-config", taskConfig, environments); } @Test @Ignore - void getWorkerInstanceConfigFile() { - workerInstanceRepositoryOnK8S.getWorkerInstanceConfigFile("worker-4", "/eventbridge/task-config"); + public void getWorkerInstanceConfigFile() { + Map<String, Object> environments = Maps.newHashMap(); + environments.put("key1", "value1"); + environments.put("key2", "value2"); + workerInstanceRepositoryOnK8S.getWorkerInstanceConfigFile("worker-4", "/eventbridge/task-config", environments); } } \ No newline at end of file diff --git a/adapter/runtime-on-k8s/src/test/resources/application.properties b/adapter/runtime-on-k8s/src/test/resources/application.properties new file mode 100644 index 0000000..87076bc --- /dev/null +++ b/adapter/runtime-on-k8s/src/test/resources/application.properties @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +kubernates.api.server=https://kubernetes.docker.internal:6443 diff --git a/common/src/main/java/org/apache/rocketmq/eventbridge/tools/JsonUtil.java b/common/src/main/java/org/apache/rocketmq/eventbridge/tools/JsonUtil.java index 393ee21..62698ab 100644 --- a/common/src/main/java/org/apache/rocketmq/eventbridge/tools/JsonUtil.java +++ b/common/src/main/java/org/apache/rocketmq/eventbridge/tools/JsonUtil.java @@ -18,16 +18,22 @@ package org.apache.rocketmq.eventbridge.tools; import com.google.common.collect.Maps; + +import java.util.Map; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import com.google.gson.JsonElement; import com.google.gson.JsonParser; import com.google.gson.JsonPrimitive; import com.google.gson.JsonSyntaxException; -import java.util.Map; +import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.eventbridge.tools.pattern.InvalidEventPatternException; import org.apache.rocketmq.eventbridge.tools.pattern.PatternErrorMessages; public class JsonUtil { + private static Gson gson = new GsonBuilder().create(); /** * Convert the value of map to json element * @@ -62,4 +68,30 @@ public class JsonUtil { } return "{}".equals(element.toString()) ? Boolean.TRUE : Boolean.FALSE; } + + public static <T> T parse(String str, Class<T> clazz) { + if (StringUtils.isNotEmpty(str) && clazz != null) { + try { + return gson.fromJson(str, clazz); + } catch (Exception e) { + throw new InvalidEventPatternException(PatternErrorMessages.INVALID_JSON_STRING, e); + } + } else { + return null; + } + } + + + public static <T> String toJson(T obj) { + if (obj == null) { + return null; + } else { + try { + return gson.toJson(obj); + } catch (Exception var2) { + return null; + } + } + } + } diff --git a/pom.xml b/pom.xml index 41ee0f3..071ff4a 100644 --- a/pom.xml +++ b/pom.xml @@ -89,7 +89,7 @@ <cloudevents.version>2.3.0</cloudevents.version> <apache.commons-text.version>1.10.0</apache.commons-text.version> <mockito.version>2.13.0</mockito.version> - <jupiter.version>5.9.2</jupiter.version> + <jupiter.version>5.10.2</jupiter.version> <caffeine.version>2.9.3</caffeine.version> <fastjson.version>1.2.83</fastjson.version> <jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>