This is an automated email from the ASF dual-hosted git repository.

shenlin pushed a commit to branch feat/runtime-manager
in repository https://gitbox.apache.org/repos/asf/rocketmq-eventbridge.git

commit 52f82faf2b1a9efd1e79db6d010737a67bf857a5
Author: wangkai <wang...@zhongan.com>
AuthorDate: Sun May 12 19:40:14 2024 +0800

    [runtime-on-k8s] runtime-on-k8s
---
 adapter/pom.xml                                    |   1 +
 .../repository/WorkerInstanceRepository.java       |  10 +-
 .../runtime/manager/worker/WorkerResource.java     |   4 +-
 .../runtime/manager/worker/WorkerStatusEnum.java   |   2 +
 adapter/runtime-on-k8s/pom.xml                     |  24 +-
 .../manager/k8s/api/BaseK8SApiService.java}        |  31 +-
 .../manager/k8s/api/K8SConfigMapService.java       |  76 +++--
 .../manager/k8s/api/K8SCustomResourceService.java  |  78 ++++-
 .../manager/k8s/api/K8SDeploymentService.java      | 144 ++++----
 .../manager/k8s/api/K8SNameSpaceService.java       |  17 +
 .../runtime/manager/k8s/api/KubectlService.java    | 114 ++++++-
 .../runtime/manager/k8s/common/K8SConstants.java   |  20 ++
 .../manager/k8s/common/WorkerStatusEnum.java       |  47 ---
 .../manager/k8s/config/DeploySpecTemplateSpec.java |  31 ++
 .../runtime/manager/k8s/config/SpecContainer.java  |  54 +++
 .../k8s/model/K8SCRDConnectTaskSetModel.java       |  17 +
 .../repository/WorkerInstanceRepositoryOnK8S.java  | 361 ++++++++++++++++++++-
 .../WorkerInstanceRepositoryOnK8STest.java         | 133 +++++---
 .../src/test/resources/application.properties      |  16 +
 .../rocketmq/eventbridge/tools/JsonUtil.java       |  34 +-
 pom.xml                                            |   2 +-
 21 files changed, 985 insertions(+), 231 deletions(-)

diff --git a/adapter/pom.xml b/adapter/pom.xml
index 010492c..5afe442 100644
--- a/adapter/pom.xml
+++ b/adapter/pom.xml
@@ -29,6 +29,7 @@
         <module>storage</module>
         <module>benchmark</module>
         <module>runtime-manager</module>
+        <module>runtime-on-k8s</module>
     </modules>
 
 </project>
\ No newline at end of file
diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/repository/WorkerInstanceRepository.java
 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/repository/WorkerInstanceRepository.java
index 1c74a3b..c25f5c9 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/repository/WorkerInstanceRepository.java
+++ 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/repository/WorkerInstanceRepository.java
@@ -38,17 +38,19 @@ public interface WorkerInstanceRepository {
      * Delete the worker
      *
      * @param name
+     * @param environments
      * @return
      */
-    boolean deleteWorkerInstance(String name);
+    boolean deleteWorkerInstance(String name, Map<String, Object> 
environments);
 
     /**
      * Get the status of worker
      *
      * @param name
+     * @param environments
      * @return
      */
-    WorkerStatusEnum getWorkerInstanceStatus(String name);
+    WorkerStatusEnum getWorkerInstanceStatus(String name, Map<String, Object> 
environments);
 
     /**
      * Apply(Create/Update) the config to the worker instance. It may contains 
more than one config in worker instance.
@@ -58,7 +60,7 @@ public interface WorkerInstanceRepository {
      * @param config
      * @return
      */
-    boolean applyWorkerInstanceConfigFile(String name, String filePath, String 
config);
+    boolean applyWorkerInstanceConfigFile(String name, String filePath, String 
config, Map<String, Object> environments);
 
     /**
      * Get the config to the worker instance.
@@ -67,5 +69,5 @@ public interface WorkerInstanceRepository {
      * @param filePath
      * @return
      */
-    boolean getWorkerInstanceConfigFile(String name, String filePath);
+    String getWorkerInstanceConfigFile(String name, String filePath, 
Map<String, Object> environments);
 }
\ No newline at end of file
diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java
 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java
index 4d48fbf..e2152c4 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java
+++ 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java
@@ -21,6 +21,6 @@ import lombok.Data;
 
 @Data
 public class WorkerResource {
-    Double cpu;
-    Double memory;
+    Long cpu;
+    Long memory;
 }
\ No newline at end of file
diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerStatusEnum.java
 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerStatusEnum.java
index 1cc34aa..b483922 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerStatusEnum.java
+++ 
b/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerStatusEnum.java
@@ -19,6 +19,8 @@ package 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker;
 
 public enum WorkerStatusEnum {
     UNKNOWN(-1, "Unknown"),
+
+    WAIT_DEPLOY(0, "Deployed"),
     STARTING(3, "Starting"),
     RUN(5, "Run"),
     STOP(10, "Stop"),
diff --git a/adapter/runtime-on-k8s/pom.xml b/adapter/runtime-on-k8s/pom.xml
index 1644b88..b0a1241 100644
--- a/adapter/runtime-on-k8s/pom.xml
+++ b/adapter/runtime-on-k8s/pom.xml
@@ -27,20 +27,40 @@
         <maven.compiler.source>8</maven.compiler.source>
         <maven.compiler.target>8</maven.compiler.target>
         <commons.io.version>2.8.0</commons.io.version>
-        <junit.version>4.10</junit.version>
+        <fabric8.version>6.6.2</fabric8.version>
     </properties>
 
     <dependencies>
         <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-client</artifactId>
+            <version>${fabric8.version}</version>
+        </dependency>
+
+        <!--<dependency>
             <groupId>io.kubernetes</groupId>
             <artifactId>client-java</artifactId>
             <version>10.0.0</version>
-        </dependency>
+        </dependency>-->
+
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             
<artifactId>rocketmq-eventbridge-adapter-runtime-manager</artifactId>
             <version>1.0.0</version>
         </dependency>
+
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>io.fabric8</groupId>
+            <artifactId>kubernetes-server-mock</artifactId>
+            <version>${fabric8.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 
diff --git 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java
 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/BaseK8SApiService.java
similarity index 54%
copy from 
adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java
copy to 
adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/BaseK8SApiService.java
index 4d48fbf..6aa0540 100644
--- 
a/adapter/runtime-manager/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/worker/WorkerResource.java
+++ 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/BaseK8SApiService.java
@@ -15,12 +15,29 @@
  *  limitations under the License.
  */
 
-package org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker;
+package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api;
 
-import lombok.Data;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
 
-@Data
-public class WorkerResource {
-    Double cpu;
-    Double memory;
-}
\ No newline at end of file
+import java.io.IOException;
+
+public abstract class BaseK8SApiService {
+
+    @Autowired
+    private KubectlService kubectlService;
+
+    protected KubernetesClient getKubernetesClient(String clientId) throws 
IOException {
+        KubernetesClient client = kubectlService.getClient();
+        if (StringUtils.isNotBlank(clientId)) {
+            client = kubectlService.generateKubeApiClient(clientId);
+        }
+        return client;
+    }
+
+    protected KubernetesClient getKubernetesClient() {
+        return kubectlService.getClient();
+    }
+
+}
diff --git 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SConfigMapService.java
 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SConfigMapService.java
index a9f147c..56e2c98 100644
--- 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SConfigMapService.java
+++ 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SConfigMapService.java
@@ -1,43 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api;
 
 import com.google.gson.Gson;
-import io.kubernetes.client.openapi.ApiClient;
-import io.kubernetes.client.openapi.Configuration;
-import io.kubernetes.client.openapi.apis.CoreV1Api;
-import io.kubernetes.client.openapi.models.V1ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
 import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.rocketmq.eventbridge.tools.JsonUtil;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-@Service
-public class K8SConfigMapService {
 
-    private static final Logger log = 
LoggerFactory.getLogger(K8SConfigMapService.class);
+@Slf4j
+@Service
+public class K8SConfigMapService extends BaseK8SApiService {
 
-    @Autowired
-    ApiClient apiClient;
+    /*@Autowired
+    ApiClient apiClient;*/
 
     @Autowired
-    K8SNameSpaceService k8SNameSpaceService;
+    private K8SNameSpaceService k8SNameSpaceService;
 
-    @Autowired
-    KubectlService kubectlService;
 
-    public boolean createConfigMap(String regionId, String csClusterId, Object 
configYaml) {
+    public boolean createConfigMap(String clientId, Object configYaml) {
         try {
-            CoreV1Api api = new CoreV1Api(apiClient);
-            if (csClusterId != null) {
-                ApiClient genApiClient = 
kubectlService.generateKubeApiClient(regionId, csClusterId);
-                api = new CoreV1Api(genApiClient);
-            }
-            V1ConfigMap configMap = Configuration.getDefaultApiClient()
+            /*V1ConfigMap configMap = Configuration.getDefaultApiClient()
                 .getJSON()
                 .deserialize(new Gson().toJson(configYaml), V1ConfigMap.class);
-
-            api.createNamespacedConfigMap(k8SNameSpaceService.getNameSpace(), 
configMap, "true", null, null);
+            api.createNamespacedConfigMap(k8SNameSpaceService.getNameSpace(), 
configMap, "true", null, null);*/
+            KubernetesClient client = getKubernetesClient(clientId);
+            ConfigMap configMap = 
(ConfigMap)JsonUtil.parse(JsonUtil.toJson(configYaml), ConfigMap.class);
+            
client.configMaps().inNamespace(k8SNameSpaceService.getNameSpace()).resource(configMap).create();
         } catch (Throwable e) {
             if ("Conflict".equals(e.getMessage())) {
                 return Boolean.TRUE;
@@ -49,14 +59,16 @@ public class K8SConfigMapService {
         return Boolean.TRUE;
     }
 
-    public boolean deleteConfigMap(String regionId, String csClusterId, String 
name) {
+    public boolean deleteConfigMap(String clientId, String name) {
         try {
-            CoreV1Api api = new CoreV1Api(apiClient);
+           /* CoreV1Api api = new CoreV1Api(apiClient);
             if (csClusterId != null) {
                 ApiClient genApiClient = 
kubectlService.generateKubeApiClient(regionId, csClusterId);
                 api = new CoreV1Api(genApiClient);
             }
-            api.deleteNamespacedConfigMap(name, 
k8SNameSpaceService.getNameSpace(), "true", null, null, null, null, null);
+            api.deleteNamespacedConfigMap(name, 
k8SNameSpaceService.getNameSpace(), "true", null, null, null, null, null);*/
+            KubernetesClient client = getKubernetesClient(clientId);
+            
client.configMaps().inNamespace(k8SNameSpaceService.getNameSpace()).withName(name).delete();
         } catch (Throwable e) {
             if ("Not Found".equals(e.getMessage())) {
                 return Boolean.FALSE;
@@ -68,9 +80,9 @@ public class K8SConfigMapService {
         return Boolean.TRUE;
     }
 
-    public void replaceConfigMap(String regionId, String csClusterId, String 
name, Object configYaml) {
+    public void replaceConfigMap(String clientId, Object configYaml) {
         try {
-            CoreV1Api api = new CoreV1Api(apiClient);
+            /*CoreV1Api api = new CoreV1Api(apiClient);
             if (csClusterId != null) {
                 ApiClient genApiClient = 
kubectlService.generateKubeApiClient(regionId, csClusterId);
                 api = new CoreV1Api(genApiClient);
@@ -80,10 +92,14 @@ public class K8SConfigMapService {
                 .getJSON()
                 .deserialize(new Gson().toJson(configYaml), V1ConfigMap.class);
 
-            api.replaceNamespacedConfigMap(name, 
k8SNameSpaceService.getNameSpace(), configMap, "true", null, null);
+            api.replaceNamespacedConfigMap(name, 
k8SNameSpaceService.getNameSpace(), configMap, "true", null, null);*/
+
+            KubernetesClient client = getKubernetesClient(clientId);
+            ConfigMap configMap = JsonUtil.parse(JsonUtil.toJson(configYaml), 
ConfigMap.class);
+            
client.configMaps().inNamespace(k8SNameSpaceService.getNameSpace()).resource(configMap).update();
         } catch (Throwable e) {
             if ("Not Found".equals(e.getMessage())) {
-                this.createConfigMap(regionId, csClusterId, configYaml);
+                this.createConfigMap(clientId, configYaml);
             } else {
                 log.error("replaceConfigMap failed.{}", e);
                 throw new EventBridgeException(DefaultErrorCode.InternalError, 
new Gson().toJson(configYaml));
diff --git 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SCustomResourceService.java
 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SCustomResourceService.java
index 4b561f8..d71cbe4 100644
--- 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SCustomResourceService.java
+++ 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SCustomResourceService.java
@@ -1,30 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api;
 
-import io.kubernetes.client.openapi.ApiClient;
-import io.kubernetes.client.openapi.ApiException;
-import io.kubernetes.client.openapi.apis.CustomObjectsApi;
+import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
+import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.common.K8SConstants;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.model.K8SCRDConnectTaskSetModel;
 import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
 import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode;
+import org.apache.rocketmq.eventbridge.tools.JsonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 @Service
-public class K8SCustomResourceService{
+public class K8SCustomResourceService extends BaseK8SApiService{
 
     private static final Logger log = 
LoggerFactory.getLogger(K8SCustomResourceService.class);
 
-    @Autowired
-    ApiClient apiClient;
+    /*@Autowired
+    ApiClient apiClient;*/
 
     @Autowired
-    K8SNameSpaceService k8SNameSpaceService;
+    private K8SNameSpaceService k8SNameSpaceService;
+
+    private ResourceDefinitionContext resourceDefinitionContext;
+    public K8SCustomResourceService () {
+        resourceDefinitionContext = new ResourceDefinitionContext.Builder()
+                .withGroup(K8SConstants.CONNECT_TASK_SETS_GROUP)
+                .withVersion(K8SConstants.CONNECT_TASK_SETS_VERSION)
+                .withPlural(K8SConstants.CONNECT_TASK_SETS_PLURAL)
+                .withNamespaced(true)
+                .build();
+    }
+
 
     public boolean createConnectTaskSet(String taskName, String 
lanucherConfig, String taskConfig, int replicas) {
-        CustomObjectsApi api2 = new CustomObjectsApi(apiClient);
+        /*CustomObjectsApi api2 = new CustomObjectsApi(apiClient);
         K8SCRDConnectTaskSetModel body = new 
K8SCRDConnectTaskSetModel(taskName, lanucherConfig, taskConfig, replicas,
             getImage());
         try {
@@ -35,11 +63,23 @@ public class K8SCustomResourceService{
             log.error(DefaultErrorCode.InternalError.getMsg(), e);
             throw new EventBridgeException(DefaultErrorCode.InternalError, e, 
taskConfig);
         }
+        String rawJsonCustomResourceObj = 
"{\"apiVersion\":\"jungle.example.com/v1\"," +
+                "\"kind\":\"Animal\",\"metadata\": {\"name\": \"walrus\"}," +
+                "\"spec\": {\"image\": \"my-awesome-walrus-image\"}}";*/
+
+        K8SCRDConnectTaskSetModel body = new 
K8SCRDConnectTaskSetModel(taskName, lanucherConfig, taskConfig, replicas,
+                getImage());
+        try {
+            
getKubernetesClient().genericKubernetesResources(resourceDefinitionContext).inNamespace(k8SNameSpaceService.getNameSpace()).load(JsonUtil.toJson(body)).create();
+        } catch (Throwable e) {
+            log.error(DefaultErrorCode.InternalError.getMsg(), e);
+            throw new EventBridgeException(DefaultErrorCode.InternalError, e, 
taskConfig);
+        }
         return Boolean.TRUE;
     }
 
     public boolean deleteConnectTaskSet(String taskName) {
-        CustomObjectsApi api2 = new CustomObjectsApi(apiClient);
+        /*CustomObjectsApi api2 = new CustomObjectsApi(apiClient);
         try {
             
api2.deleteNamespacedCustomObject(K8SConstants.CONNECT_TASK_SETS_GROUP,
                 K8SConstants.CONNECT_TASK_SETS_VERSION, 
k8SNameSpaceService.getNameSpace(),
@@ -47,12 +87,18 @@ public class K8SCustomResourceService{
         } catch (Throwable e) {
             log.error("Delete Connector failed", e);
             throw new EventBridgeException(DefaultErrorCode.InternalError, e, 
taskName);
+        }*/
+        try {
+            
getKubernetesClient().genericKubernetesResources(resourceDefinitionContext).withName(taskName).delete();
+        } catch (Throwable e) {
+            log.error("Delete Connector failed", e);
+            throw new EventBridgeException(DefaultErrorCode.InternalError, e, 
taskName);
         }
         return Boolean.TRUE;
     }
 
     public boolean isConnectTaskExist(String taskName) {
-        CustomObjectsApi api2 = new CustomObjectsApi(apiClient);
+        /*CustomObjectsApi api2 = new CustomObjectsApi(apiClient);
         try {
             
api2.getNamespacedCustomObject(K8SConstants.CONNECT_TASK_SETS_GROUP, 
K8SConstants.CONNECT_TASK_SETS_VERSION,
                 k8SNameSpaceService.getNameSpace(), 
K8SConstants.CONNECT_TASK_SETS_PLURAL, taskName);
@@ -64,8 +110,18 @@ public class K8SCustomResourceService{
             }
             log.error("Get Connector task  failed", e);
             throw new EventBridgeException(DefaultErrorCode.InternalError, e);
+        }*/
+
+        try {
+            GenericKubernetesResource customResourceObject = 
getKubernetesClient().genericKubernetesResources(resourceDefinitionContext).withName(taskName).get();
+            if (null != customResourceObject) {
+                return Boolean.TRUE;
+            }
+            return Boolean.FALSE;
+        } catch (Throwable e) {
+            log.error("Delete Connector failed", e);
+            throw new EventBridgeException(DefaultErrorCode.InternalError, e, 
taskName);
         }
-        return Boolean.TRUE;
     }
 
     private String getImage() {
diff --git 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SDeploymentService.java
 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SDeploymentService.java
index 6c29991..7c5a142 100644
--- 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SDeploymentService.java
+++ 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SDeploymentService.java
@@ -1,86 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api;
 
 import com.google.gson.Gson;
-import io.kubernetes.client.openapi.ApiClient;
-import io.kubernetes.client.openapi.Configuration;
-import io.kubernetes.client.openapi.apis.AppsV1Api;
-import io.kubernetes.client.openapi.models.V1Deployment;
-import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.common.WorkerStatusEnum;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.fabric8.kubernetes.client.dsl.Resource;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerStatusEnum;
 import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
 import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.Map;
 
-@Service
-public class K8SDeploymentService {
-
-    private static final Logger log = 
LoggerFactory.getLogger(K8SDeploymentService.class);
 
-    @Autowired
-    ApiClient apiClient;
-
-    @Autowired
-    K8SNameSpaceService k8SNameSpaceService;
+@Slf4j
+@Service
+public class K8SDeploymentService extends BaseK8SApiService {
 
     @Autowired
-    KubectlService kubectlService;
+    private K8SNameSpaceService k8SNameSpaceService;
 
-    public boolean updateDeployment(String regionId, String csClusterId, 
String name, Object deployYaml) {
+    public boolean updateDeployment(String clientId, String name, Deployment 
deployment) {
         try {
-            AppsV1Api api = new AppsV1Api(apiClient);
-            if (csClusterId != null) {
-                ApiClient genApiClient = 
kubectlService.generateKubeApiClient(regionId, csClusterId);
-                api = new AppsV1Api(genApiClient);
-            }
-            V1Deployment body = Configuration.getDefaultApiClient()
-                .getJSON()
-                .deserialize(new Gson().toJson(deployYaml), 
V1Deployment.class);
-            api.replaceNamespacedDeployment(name, 
k8SNameSpaceService.getNameSpace(), body, "true", null, null);
+            KubernetesClient client = getKubernetesClient(clientId);
+            
client.apps().deployments().inNamespace(k8SNameSpaceService.getNameSpace()).resource(deployment).update();
         } catch (Throwable e) {
             if ("Not Found".equals(e.getMessage())) {
-                this.createDeployment(regionId, csClusterId, deployYaml);
+                this.createDeployment(clientId, deployment);
             } else {
                 log.error(DefaultErrorCode.InternalError.getMsg(), e);
-                throw new 
EventBridgeException(DefaultErrorCode.InternalError,e, new 
Gson().toJson(deployYaml));
+                throw new 
EventBridgeException(DefaultErrorCode.InternalError,e, new 
Gson().toJson(deployment));
             }
         }
         return Boolean.TRUE;
     }
 
-    public WorkerStatusEnum getDeploymentStatus(String regionId, String 
csClusterId, String name) {
-        V1Deployment v1Deployment = null;
+    public WorkerStatusEnum getDeploymentStatus(String clientId, String name) {
+        Deployment deployment;
         try {
-            AppsV1Api api = new AppsV1Api(apiClient);
-            if (csClusterId != null) {
-                ApiClient genApiClient = 
kubectlService.generateKubeApiClient(regionId, csClusterId);
-                api = new AppsV1Api(genApiClient);
-            }
-            v1Deployment = api.readNamespacedDeployment(name, 
k8SNameSpaceService.getNameSpace(), "true", true, false);
+            KubernetesClient client = getKubernetesClient(clientId);
+            deployment = 
client.apps().deployments().inNamespace(k8SNameSpaceService.getNameSpace()).withName(name).get();
         } catch (Throwable e) {
             log.error(DefaultErrorCode.InternalError.getMsg(), e);
             throw new EventBridgeException(DefaultErrorCode.InternalError, e, 
"name=" + name);
         }
-        if (v1Deployment == null || v1Deployment.getStatus() == null || 
v1Deployment.getStatus().getReadyReplicas() == null) {
-            return WorkerStatusEnum.STARTING;
+        if (deployment == null || deployment.getStatus() == null || 
deployment.getStatus().getReadyReplicas() == null) {
+            return WorkerStatusEnum.UNKNOWN;
         }
-        if (v1Deployment.getStatus().getReadyReplicas() == 1) {
+        if (deployment.getStatus().getReadyReplicas() == 1) {
             return WorkerStatusEnum.RUN;
         }
-        return WorkerStatusEnum.DEFAULT;
+        return WorkerStatusEnum.UNKNOWN;
     }
 
-    public boolean deleteDeployment(String regionId, String csClusterId, 
String name) {
+    public boolean deleteDeployment(String clientId, String name) {
         try {
-            AppsV1Api api = new AppsV1Api(apiClient);
-            if (csClusterId != null) {
-                ApiClient genApiClient = 
kubectlService.generateKubeApiClient(regionId, csClusterId);
-                api = new AppsV1Api(genApiClient);
-            }
-            api.deleteNamespacedDeployment(name, 
k8SNameSpaceService.getNameSpace(), "true", null, null, null, null,
-                null);
+            KubernetesClient client = getKubernetesClient(clientId);
+            
client.apps().deployments().inNamespace(k8SNameSpaceService.getNameSpace()).withName(name).delete();
         } catch (Throwable e) {
             if ("Not Found".equals(e.getMessage())) {
                 return Boolean.FALSE;
@@ -88,31 +84,59 @@ public class K8SDeploymentService {
                 log.error(DefaultErrorCode.InternalError.getMsg(), e);
                 throw new EventBridgeException(DefaultErrorCode.InternalError, 
e, name);
             }
-
         }
         return Boolean.TRUE;
     }
 
-    public boolean createDeployment(String regionId, String csClusterId, 
Object deployYaml) {
+    public boolean createDeployment(String clientId, Deployment deployment) {
         try {
-            AppsV1Api api = new AppsV1Api(apiClient);
-            if (csClusterId != null) {
-                ApiClient genApiClient = 
kubectlService.generateKubeApiClient(regionId, csClusterId);
-                api = new AppsV1Api(genApiClient);
+            KubernetesClient client = getKubernetesClient(clientId);
+            
client.apps().deployments().inNamespace(k8SNameSpaceService.getNameSpace()).resource(deployment).create();
+        } catch (Throwable e) {
+            if ("Conflict".equals(e.getMessage())) {
+                return Boolean.TRUE;
+            } else {
+                log.error(DefaultErrorCode.InternalError.getMsg(), e);
+                throw new 
EventBridgeException(DefaultErrorCode.InternalError,e, new 
Gson().toJson(deployment));
             }
-            V1Deployment body = Configuration.getDefaultApiClient()
-                .getJSON()
-                .deserialize(new Gson().toJson(deployYaml), 
V1Deployment.class);
-            api.createNamespacedDeployment(k8SNameSpaceService.getNameSpace(), 
body, "true", null, null);
+        }
+        return Boolean.TRUE;
+    }
+
+    public boolean createConfigMap(String clientId, ConfigMap configMap) {
+        try {
+            KubernetesClient client = getKubernetesClient(clientId);
+            client.resources(ConfigMap.class).resource(configMap).create();
         } catch (Throwable e) {
             if ("Conflict".equals(e.getMessage())) {
                 return Boolean.TRUE;
             } else {
                 log.error(DefaultErrorCode.InternalError.getMsg(), e);
-                throw new 
EventBridgeException(DefaultErrorCode.InternalError,e, new 
Gson().toJson(deployYaml));
+                throw new 
EventBridgeException(DefaultErrorCode.InternalError,e, configMap);
             }
         }
         return Boolean.TRUE;
     }
 
+    public String getConfigMap(String clientId, String name, String filePath) {
+        try {
+            KubernetesClient client = getKubernetesClient(clientId);
+            Resource<ConfigMap> resource = client.resources(ConfigMap.class)
+                    .resource(new 
ConfigMapBuilder().withNewMetadata().withName(name).endMetadata().build());
+            if (resource != null) {
+                ConfigMap configMap = resource.get();
+                if (configMap != null) {
+                    Map<String, String> map = configMap.getData();
+                    if (map != null && map.containsKey(filePath)) {
+                        return map.get(filePath);
+                    }
+                }
+            }
+        } catch (Throwable e) {
+            log.error(DefaultErrorCode.InternalError.getMsg(), e);
+            throw new EventBridgeException(DefaultErrorCode.InternalError,e, 
"name=" + name + ", filePath=" + filePath);
+        }
+        return null;
+    }
+
 }
diff --git 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SNameSpaceService.java
 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SNameSpaceService.java
index 69ca895..4da57d2 100644
--- 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SNameSpaceService.java
+++ 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/K8SNameSpaceService.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api;
 
 import org.springframework.stereotype.Service;
diff --git 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java
 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java
index aaae526..a41c8cd 100644
--- 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java
+++ 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/api/KubectlService.java
@@ -1,20 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api;
 
-import io.kubernetes.client.openapi.ApiClient;
-import io.kubernetes.client.util.ClientBuilder;
-import io.kubernetes.client.util.KubeConfig;
+import io.fabric8.kubernetes.client.*;
+
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
+
+@Slf4j
 @Service
-public class KubectlService {
+public class KubectlService implements AutoCloseable{
 
-    private static final Logger log = 
LoggerFactory.getLogger(KubectlService.class);
+    private KubernetesClient client ;
 
     @Value("${eventbus.cs.accessKey:}")
     private String accessKey;
@@ -25,21 +45,79 @@ public class KubectlService {
     @Value("${conductor.run.env:online}")
     private String env;
 
-    private Map<String, ApiClient> kubeConfigMap = new ConcurrentHashMap<>();
+    @Value("${kubernates.api.server}")
+    private String apiServer;
 
-    public ApiClient generateKubeApiClient(String regionId, String 
csClusterId) throws IOException {
-        if (kubeConfigMap.containsKey(csClusterId)) {
-            return kubeConfigMap.get(csClusterId);
-        }
-        KubeConfig config = getKubeConfig(regionId, csClusterId, accessKey, 
secretKey);
+    @Value("${kubernates.api.version:apps/v1}")
+    private String apiVersion;
+
+    @Value("${kubernates.auth.token:}")
+    private String oauthToken;
+
+    private final String DEFAULT_KEY= "default";
+
+    private Map<String, KubernetesClient> kubernetesClientMap = new 
ConcurrentHashMap<>();
+
+    @PostConstruct
+    public void initClient(){
+        client = getKubernetesClient();
+    }
+
+    public KubernetesClient getClient(){
+        return this.client;
+    }
 
-        ApiClient client = ClientBuilder.kubeconfig(config).build();
-        //client.setDebugging(true);
-        kubeConfigMap.putIfAbsent(csClusterId, client);
+    public KubernetesClient generateKubeApiClient(String clientId) throws 
IOException {
+        if (StringUtils.isBlank(clientId)) {
+            clientId = DEFAULT_KEY;
+        }
+        if (kubernetesClientMap.containsKey(clientId)) {
+            return kubernetesClientMap.get(clientId);
+        }
+        KubernetesClient client = getKubernetesClient();
+        kubernetesClientMap.putIfAbsent(clientId, client);
         return client;
     }
 
-    private KubeConfig getKubeConfig(String regionId, String csClusterId, 
String ak, String sk) throws IOException {
-        return null;
+    private KubernetesClient getKubernetesClient() {
+        Config config = getKubeConfig();
+        log.info("connect to api server [{}]", apiServer);
+        return new KubernetesClientBuilder().withConfig(config).build();
+    }
+
+    private Config getKubeConfig() {
+        Config config = new ConfigBuilder()
+                .withMasterUrl(apiServer)
+                .withApiVersion(apiVersion)
+                .build();
+        if (StringUtils.isNotBlank(accessKey) && 
StringUtils.isNotBlank(secretKey)) {
+            config.setUsername(accessKey);
+            config.setPassword(secretKey);
+            log.info("use ak and sk connect to api server.");
+        } else if(StringUtils.isNotBlank(oauthToken)){
+            config.setTrustCerts(true);
+            config.setOauthToken(oauthToken);
+            log.info("use auth token connect to api server.");
+        } else {
+            log.warn("this is no authenticated connection to the API server.");
+        }
+        return config;
+    }
+
+    @Override
+    public void close() throws Exception {
+
+        if(client != null) {
+            client.close();
+        }
+
+        if (!kubernetesClientMap.isEmpty()) {
+            for(String clientId : kubernetesClientMap.keySet()) {
+                KubernetesClient kubernetesClient = 
kubernetesClientMap.get(clientId);
+                if(kubernetesClient != null){
+                    kubernetesClient.close();
+                }
+            }
+        }
     }
 }
diff --git 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/K8SConstants.java
 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/K8SConstants.java
index b54356a..e1d85fa 100644
--- 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/K8SConstants.java
+++ 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/K8SConstants.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.common;
 
 public class K8SConstants {
@@ -8,4 +25,7 @@ public class K8SConstants {
     public static final String DEFAULT_CLUSTER_SUPPORT_CONNECTOR = "ALL";
     public static final String DEFAULT_CONNECTOR_KEY_TASK_SIZE = "TASK_SIZE";
 
+    public static final String SELECTOR_LABEL_KEY = "app";
+
+    public static final String DEPLOYMENT_KIND = "Deployment";
 }
diff --git 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/WorkerStatusEnum.java
 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/WorkerStatusEnum.java
deleted file mode 100644
index c8ff69f..0000000
--- 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/common/WorkerStatusEnum.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.common;
-
-public enum WorkerStatusEnum {
-    /**
-     * 集群状态
-     */
-    DEFAULT(-1, "未知状态"),
-    WAIT_DEPLOY(0, "待部署"),
-    STARTING(3, "启动中"),
-    RUN(5, "服务中"),
-    STOP(10, "停止"),
-    RELEASING(11, "释放中");
-
-    private int value;
-    private String desc;
-
-    WorkerStatusEnum(int value, String desc) {
-        this.value = value;
-        this.desc = desc;
-    }
-
-    public static WorkerStatusEnum valueOf(int value) {
-        for (WorkerStatusEnum temp : WorkerStatusEnum.values()) {
-            if (temp.getValue() == value) {
-                return temp;
-            }
-        }
-        return DEFAULT;
-    }
-
-    public static WorkerStatusEnum nameOf(String name) {
-        for (WorkerStatusEnum temp : WorkerStatusEnum.values()) {
-            if (temp.name().equals(name)) {
-                return temp;
-            }
-        }
-        return DEFAULT;
-    }
-
-    public String getDesc() {
-        return desc;
-    }
-
-    public int getValue() {
-        return value;
-    }
-}
diff --git 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/config/DeploySpecTemplateSpec.java
 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/config/DeploySpecTemplateSpec.java
new file mode 100644
index 0000000..fcdc670
--- /dev/null
+++ 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/config/DeploySpecTemplateSpec.java
@@ -0,0 +1,31 @@
+package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import 
org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.List;
+import java.util.Map;
+
+@Getter
+@Setter
+@ConfigurationProperties(prefix="deploy.spec.template.spec")
+@EnableConfigurationProperties
+@Configuration
+public class DeploySpecTemplateSpec {
+
+  private Long terminationGracePeriodSeconds;
+
+  @Autowired
+  private SpecContainer containers;
+
+  private Map<String, String> nodeSelector;
+
+
+  private List<Map<String, Object>> volumes;
+
+  private String podManagementPolicy;
+}
diff --git 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/config/SpecContainer.java
 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/config/SpecContainer.java
new file mode 100644
index 0000000..acce560
--- /dev/null
+++ 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/config/SpecContainer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.config;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import 
org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.List;
+import java.util.Map;
+
+
+@Getter
+@Setter
+@ConfigurationProperties(prefix="deploy.spec.template.spec.containers")
+@EnableConfigurationProperties
+@Configuration
+public class SpecContainer implements Cloneable{
+  private String imagePullPolicy = "IfNotPresent";
+  private String registry;
+  private List<String> args;
+  private List<Map<String, String>> volumeMounts;
+  private List<Map<String, String>> env;
+  private List<Map<String, String>> ports;
+
+  public Object clone() {
+    SpecContainer specContainer = new SpecContainer();
+    specContainer.setImagePullPolicy(this.imagePullPolicy);
+    specContainer.setRegistry(this.registry);
+    specContainer.setArgs(this.args);
+    specContainer.setVolumeMounts(this.volumeMounts);
+    specContainer.setEnv(this.env);
+    specContainer.setPorts(this.ports);
+    return specContainer;
+  }
+
+}
diff --git 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/model/K8SCRDConnectTaskSetModel.java
 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/model/K8SCRDConnectTaskSetModel.java
index 6d78922..0b3cb6c 100644
--- 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/model/K8SCRDConnectTaskSetModel.java
+++ 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/model/K8SCRDConnectTaskSetModel.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
 package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.model;
 
 import com.google.common.collect.Lists;
diff --git 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8S.java
 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8S.java
index 20a95e9..cde8ea7 100644
--- 
a/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8S.java
+++ 
b/adapter/runtime-on-k8s/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8S.java
@@ -17,34 +17,379 @@
 
 package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.repository;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
+import io.fabric8.kubernetes.api.model.ConfigMapVolumeSource;
+import io.fabric8.kubernetes.api.model.ConfigMapVolumeSourceBuilder;
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.ContainerPort;
+import io.fabric8.kubernetes.api.model.ContainerPortBuilder;
+import io.fabric8.kubernetes.api.model.EnvVar;
+import io.fabric8.kubernetes.api.model.EnvVarBuilder;
+import io.fabric8.kubernetes.api.model.HostPathVolumeSource;
+import io.fabric8.kubernetes.api.model.HostPathVolumeSourceBuilder;
+import io.fabric8.kubernetes.api.model.LabelSelector;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import io.fabric8.kubernetes.api.model.PodSpec;
+import io.fabric8.kubernetes.api.model.PodSpecBuilder;
+import io.fabric8.kubernetes.api.model.PodTemplateSpec;
+import io.fabric8.kubernetes.api.model.PodTemplateSpecBuilder;
+import io.fabric8.kubernetes.api.model.Probe;
+import io.fabric8.kubernetes.api.model.Quantity;
+import io.fabric8.kubernetes.api.model.QuantityBuilder;
+import io.fabric8.kubernetes.api.model.ResourceRequirements;
+import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeBuilder;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api.K8SDeploymentService;
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.api.K8SNameSpaceService;
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.config.DeploySpecTemplateSpec;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.repository.WorkerInstanceRepository;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerResource;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerStatusEnum;
+import org.apache.rocketmq.eventbridge.tools.JsonUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import static 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.common.K8SConstants.DEPLOYMENT_KIND;
 
+@Slf4j
+@Service
 public class WorkerInstanceRepositoryOnK8S implements WorkerInstanceRepository 
{
 
+    @Autowired
+    private K8SDeploymentService k8SDeploymentService;
+
+    @Value("${kubernates.api.deployment.version:apps/v1}")
+    private String deploymentApiVersion;
+
+    @Value("${pod.limit.over.request.cpu.factor:1.5}")
+    private Double limitOverRequestCpuFactor;
+
+    @Value("${pod.limit.over.request.memory.factor:1.5}")
+    private Double limitOverRequestMemoryFactor;
+
+    @Autowired
+    private DeploySpecTemplateSpec deploySpecTemplateSpec;
+
+    @Autowired
+    private K8SNameSpaceService k8SNameSpaceService;
 
     @Override
     public boolean applyWorkerInstance(String name, String image, 
WorkerResource resources, Map<String, Object> environments) {
-        return false;
+
+        PodSpec podSpec = getPodSpec(name,
+                image,
+                resources,
+                environments);
+
+        Map<String, String> specLabes = 
getSpecLabels(environments.get("specLabels"), name);
+        ObjectMeta templateMeta = new ObjectMetaBuilder()
+                .withLabels(specLabes)
+                .build();
+
+        PodTemplateSpec podTemplateSpec = new PodTemplateSpecBuilder()
+                .withMetadata(templateMeta)
+                .withSpec(podSpec)
+                .build();
+
+        LabelSelector labelSelector = new LabelSelector();
+        labelSelector.setMatchLabels(specLabes);
+
+        Deployment deployment = new DeploymentBuilder()
+                .withApiVersion(deploymentApiVersion)
+                .withKind(DEPLOYMENT_KIND)
+                .withNewMetadata()
+                    .withName(name)
+                    .withNamespace(k8SNameSpaceService.getNameSpace())
+                    .withLabels(getMetaLabels(environments.get("metaLabels")))
+                .endMetadata()
+                .withNewSpec()
+                    .withReplicas(getReplicas(environments.get("replicas")))
+                    .withSelector(labelSelector)
+                    .withTemplate(podTemplateSpec)
+                .endSpec()
+                .build();
+
+        return k8SDeploymentService.createDeployment((String) 
environments.get("clientId"), deployment);
+    }
+
+    @Override
+    public boolean deleteWorkerInstance(String name, Map<String, Object> 
environments) {
+        return k8SDeploymentService.deleteDeployment((String) 
environments.get("clientId"), name);
     }
 
     @Override
-    public boolean deleteWorkerInstance(String name) {
-        return false;
+    public WorkerStatusEnum getWorkerInstanceStatus(String name, Map<String, 
Object> environments) {
+        WorkerStatusEnum workerStatusEnum = 
k8SDeploymentService.getDeploymentStatus((String) environments.get("clientId"), 
name);
+        return WorkerStatusEnum.valueOf(workerStatusEnum.getValue());
+    }
+
+    @Override
+    public boolean applyWorkerInstanceConfigFile(String name, String filePath, 
String config, Map<String, Object> environments) {
+        ConfigMap configMap =  new ConfigMapBuilder()
+                .withNewMetadata()
+                .withNamespace(k8SNameSpaceService.getNameSpace())
+                .withName(name)
+                .endMetadata()
+                .addToData(filePath, config)
+                .build();
+        return k8SDeploymentService.createConfigMap((String) 
environments.get("clientId"), configMap);
+    }
+
+    @Override
+    public String getWorkerInstanceConfigFile(String name, String filePath, 
Map<String, Object> environments) {
+        return k8SDeploymentService.getConfigMap((String) 
environments.get("clientId"), name, filePath);
+    }
+
+    private Integer getReplicas(Object replicas) {
+        if (Objects.isNull(replicas)) {
+            replicas = 1;
+        }
+        return (Integer) replicas;
+    }
+
+    private PodSpec getPodSpec(String name,String image, WorkerResource 
resources, Map<String, Object> environments) {
+        assert resources.getCpu() != null;
+        assert resources.getMemory() != null;
+        long cpuCoreLimit = getCpuLimit(resources.getCpu());
+        long memoryLimit = getMemoryLimt(resources.getMemory());
+
+        Map<String, Quantity> requests = getResource(resources.getCpu(), 
resources.getMemory());
+        Map<String, Quantity> limits = getResource(cpuCoreLimit, memoryLimit);
+        ResourceRequirements resourceRequirements = new 
ResourceRequirementsBuilder()
+                .withRequests(requests)
+                .withLimits(limits)
+                .build();
+
+
+        Container container = new ContainerBuilder()
+                .withName(name)
+                
.withImagePullPolicy(getImagePullPolicy(environments.get("imagePullPolicy")))
+                .withImage(image)
+                .withArgs(getArgs(environments.get("args")))
+                .withResources(resourceRequirements)
+                .withEnv(getEnvs(environments.get("env")))
+                
.withLivenessProbe(getLivenessProbe(environments.get("lProbe")))
+                
.withReadinessProbe(getReadinessProbe(environments.get("rProbe")))
+                .withStartupProbe(getStartupProbe(environments.get("sProbe")))
+                
.withTerminationMessagePolicy(getTerminationMessagePolicy(environments.get("tMesPolicy")))
+                
.withTerminationMessagePath(getTerminationMessagePath(environments.get("tMesPath")))
+                
.withVolumeMounts(getVolumeMounts(environments.get("volumeMounts")))
+                .withPorts(getContainerPorts(environments.get("ports")))
+                .build();
+
+        return new PodSpecBuilder()
+                
.withTerminationGracePeriodSeconds(getTerminationGracePeriodSeconds(environments.get("tPeriodSec")))
+                .withContainers(container)
+                .withVolumes(getPodVolumes(environments.get("volums")))
+                .withNodeSelector(getPodLabels(environments.get("podLabels")))
+                .build();
+    }
+
+    private String getImagePullPolicy(Object imagePullPolicy) {
+        if (Objects.isNull(imagePullPolicy)) {
+            return deploySpecTemplateSpec.getContainers().getImagePullPolicy();
+        }
+        return (String) imagePullPolicy;
     }
 
-    @Override public WorkerStatusEnum getWorkerInstanceStatus(String name) {
+    @SuppressWarnings("unchecked")
+    private List<String> getArgs(Object args){
+        if (Objects.isNull(args) && 
Objects.nonNull(deploySpecTemplateSpec.getContainers().getArgs())) {
+            args = deploySpecTemplateSpec.getContainers().getArgs();
+        } else {
+            args = Collections.emptyList();
+        }
+        return (List<String>) args;
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<EnvVar> getEnvs(Object env) {
+        List<EnvVar> envVarList = new ArrayList<>();
+        if(Objects.isNull(env) && 
Objects.nonNull(deploySpecTemplateSpec.getContainers().getEnv())){
+            env = deploySpecTemplateSpec.getContainers().getEnv();
+        }
+        if (Objects.nonNull(env)) {
+            List<Map<String, String>> envs = (List<Map<String, String>>) env;
+            for(Map<String, String> envMap : envs) {
+                EnvVar envVar = new EnvVarBuilder()
+                        .withName(envMap.get("name"))
+                        .withValue(envMap.get("value"))
+                        .build();
+                envVarList.add(envVar);
+            }
+        }
+        return envVarList;
+    }
+
+    private Probe getLivenessProbe(Object lProbe) {
+        if (Objects.nonNull(lProbe)) {
+            return JsonUtil.parse((String) lProbe, Probe.class);
+        }
+        return null;
+    }
+
+    private Probe getReadinessProbe(Object rProbe) {
+        if (Objects.nonNull(rProbe)) {
+            return JsonUtil.parse((String) rProbe, Probe.class);
+        }
+        return null;
+    }
+
+    private Probe getStartupProbe(Object sProbe) {
+        if (Objects.nonNull(sProbe)) {
+            return JsonUtil.parse((String) sProbe, Probe.class);
+        }
         return null;
     }
 
-    @Override public boolean applyWorkerInstanceConfigFile(String name, String 
filePath, String config) {
-        return false;
+    private String getTerminationMessagePolicy(Object tMesPolicy) {
+        return (String) tMesPolicy;
+    }
+
+    private String getTerminationMessagePath(Object tMesPath) {
+        return (String) tMesPath;
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<ContainerPort> getContainerPorts(Object port) {
+        List<ContainerPort> containerPorts = new ArrayList<>();
+        if(port == null) {
+            port = ImmutableList.of(ImmutableMap.of("name","http1", 
"containerPort","7001"), ImmutableMap.of("name","http2", 
"containerPort","7002"));
+        }
+        List<Map<String, String>> ports = (List<Map<String, String>>) port;
+        for(Map<String, String> configPort : ports){
+            ContainerPort containerPort = new ContainerPortBuilder()
+                    .withName(configPort.get("name"))
+                    
.withContainerPort(Integer.parseInt(configPort.get("containerPort")))
+                    .build();
+            containerPorts.add(containerPort);
+        }
+        return containerPorts;
+    }
+
+    @SuppressWarnings("unchecked")
+    private List<VolumeMount> getVolumeMounts(Object volumeMount){
+        List<VolumeMount> volumeMountList = new ArrayList<>();
+        if(volumeMount == null){
+            return volumeMountList;
+        }
+        List<Map<String, String>> volumeMounts = (List<Map<String, String>>) 
volumeMount;
+        for(Map<String, String> volume : volumeMounts){
+            VolumeMount mount = new VolumeMountBuilder()
+                    .withName(volume.get("name"))
+                    .withMountPath(volume.get("mountPath"))
+                    .build();
+            volumeMountList.add(mount);
+        }
+        return volumeMountList;
     }
 
-    @Override public boolean getWorkerInstanceConfigFile(String name, String 
filePath) {
-        return false;
+    private Long getTerminationGracePeriodSeconds(Object tPeriodSec) {
+        if (Objects.isNull(tPeriodSec) && 
Objects.nonNull(deploySpecTemplateSpec.getTerminationGracePeriodSeconds())) {
+            tPeriodSec = 
deploySpecTemplateSpec.getTerminationGracePeriodSeconds();
+        }
+        return (Long) tPeriodSec;
     }
 
+    @SuppressWarnings("unchecked")
+    private List<Volume> getPodVolumes(Object volums) {
+        if (Objects.isNull(volums) && 
Objects.nonNull(deploySpecTemplateSpec.getVolumes())) {
+            volums = deploySpecTemplateSpec.getVolumes();
+        }
+        List<Volume> volumeList = new ArrayList<>();
+        if (Objects.nonNull(volums)) {
+            List<Map<String, Object>> vList = (List<Map<String, Object>>) 
volums;
+            for(Map<String, Object> volume : vList) {
+                Volume confVolume =  new VolumeBuilder().build();
+                for(Map.Entry<String, Object> yamlConf : volume.entrySet()) {
+                    String key = yamlConf.getKey();
+                    if("name".equals(key)) {
+                        
confVolume.setName(String.valueOf(yamlConf.getValue()));
+                    } else if("configMap".equals(key)) {
+                        Map<String, String> configMap = (Map<String, String>) 
yamlConf.getValue();
+                        String name = configMap.get("name");
+                        ConfigMapVolumeSource configMapVolume = new 
ConfigMapVolumeSourceBuilder()
+                                .withName(name)
+                                .build();
+                        confVolume.setConfigMap(configMapVolume);
+                    } else if("hostPath".equals(key)) {
+                        Map<String, String> configMap = (Map<String, String>) 
yamlConf.getValue();
+                        String path = configMap.get("path");
+                        HostPathVolumeSource hostPathVolumeSource = new 
HostPathVolumeSourceBuilder()
+                                .withPath(path)
+                                .build();
+
+                        confVolume.setHostPath(hostPathVolumeSource);
+                    } else {
+                        log.error("not implements this pod volumes type : {}", 
key);
+                    }
+                }
+                volumeList.add(confVolume);
+            }
+        }
+        return volumeList;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, String> getPodLabels(Object nodeSeletor) {
+        if (Objects.isNull(nodeSeletor) && 
Objects.nonNull(deploySpecTemplateSpec.getNodeSelector())) {
+            nodeSeletor = deploySpecTemplateSpec.getNodeSelector();
+        }
+        return (Map<String, String>) nodeSeletor;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, String> getMetaLabels(Object nodeSeletor) {
+        if (Objects.isNull(nodeSeletor) && 
Objects.nonNull(deploySpecTemplateSpec.getNodeSelector())) {
+            nodeSeletor = deploySpecTemplateSpec.getNodeSelector();
+        }
+        return (Map<String, String>) nodeSeletor;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Map<String, String> getSpecLabels(Object nodeSeletor, String name) 
{
+        if (Objects.isNull(nodeSeletor) && 
Objects.nonNull(deploySpecTemplateSpec.getNodeSelector())) {
+            nodeSeletor = deploySpecTemplateSpec.getNodeSelector();
+        }
+        Map<String, String> labels = (Map<String, String>) nodeSeletor;
+        if (labels == null || labels.isEmpty()) {
+            return ImmutableMap.of("app", name);
+        }
+        return (Map<String, String>) nodeSeletor;
+    }
+
+    protected Map<String, Quantity> getResource(Long cpuCore, Long memory){
+        Map<String, Quantity> resource = new HashMap<>();
+        resource.put("cpu", new 
QuantityBuilder().withAmount(String.valueOf(cpuCore)).withFormat("m").build());
+        resource.put("memory", new 
QuantityBuilder().withAmount(String.valueOf(memory)).withFormat("Mi").build());
+        return resource;
+    }
+
+
+    private Long getCpuLimit(Long requestCore){
+        return Math.round(requestCore * limitOverRequestCpuFactor);
+    }
+
+    private Long getMemoryLimt(Long requestMemory){
+        return Math.round(requestMemory * limitOverRequestMemoryFactor);
+    }
 }
\ No newline at end of file
diff --git 
a/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
 
b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
index 877799e..1c539e2 100644
--- 
a/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
+++ 
b/adapter/runtime-on-k8s/src/test/java/org/apache/rocketmq/eventbridge/adapter/runtime/manager/k8s/repository/WorkerInstanceRepositoryOnK8STest.java
@@ -1,73 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
 package org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.repository;
 
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+
+import lombok.extern.slf4j.Slf4j;
 import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerResource;
+import 
org.apache.rocketmq.eventbridge.adapter.runtime.manager.worker.WorkerStatusEnum;
 import org.junit.Ignore;
-import org.junit.jupiter.api.Test;
-import org.mockito.InjectMocks;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
 
+import static org.junit.Assert.assertEquals;
 
+@Slf4j
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = WorkerInstanceRepositoryOnK8STest.class)
+@SpringBootApplication(scanBasePackages = 
{"org.apache.rocketmq.eventbridge.adapter.runtime.manager.k8s.*"})
 public class WorkerInstanceRepositoryOnK8STest {
 
-    @InjectMocks
+    @Autowired
     private WorkerInstanceRepositoryOnK8S workerInstanceRepositoryOnK8S;
 
     @Test
-    @Ignore
-    void applyWorkerInstance() {
+    public void applyWorkerInstance() {
         Map<String, Object> environments = Maps.newHashMap();
         environments.put("key1", "value1");
         environments.put("key2", "value2");
-        workerInstanceRepositoryOnK8S.applyWorkerInstance("worker-4", 
"registry.cn-beijing.cr.aliyuncs.com/eventbridge:20231115195431f55971", new 
Gson().fromJson("{\"cpu\":1,\"memory\":1}", WorkerResource.class), 
environments);
+        workerInstanceRepositoryOnK8S.applyWorkerInstance("worker-4", 
"apache/rocketmq-eventbridge:1.1.0", new 
Gson().fromJson("{\"cpu\":100,\"memory\":100}", WorkerResource.class), 
environments);
     }
 
     @Test
-    @Ignore
-    void deleteWorkerInstance() {
-        workerInstanceRepositoryOnK8S.deleteWorkerInstance("worker-4");
+    public void applyWorkerInstanceWithConfigMap() {
+        Map<String, Object> environments = Maps.newHashMap();
+        environments.put("key1", "value1");
+        environments.put("key2", "value2");
+        List<Map<String, String>> volums = new ArrayList<>();
+        volums.add(new HashMap<String, String>(){{
+            put("name", "worker-config");
+        }});
+        environments.put("volums", volums);
+        workerInstanceRepositoryOnK8S.applyWorkerInstance("worker-4", 
"apache/rocketmq-eventbridge:1.1.0", new 
Gson().fromJson("{\"cpu\":100,\"memory\":100}", WorkerResource.class), 
environments);
     }
 
     @Test
-    @Ignore
-    void getWorkerInstanceStatus() {
-        workerInstanceRepositoryOnK8S.getWorkerInstanceStatus("worker-4");
+    public void deleteWorkerInstance() {
+        Map<String, Object> environments = Maps.newHashMap();
+        environments.put("key1", "value1");
+        environments.put("key2", "value2");
+        workerInstanceRepositoryOnK8S.deleteWorkerInstance("worker-4", 
environments);
     }
 
     @Test
-    @Ignore
-    void applyWorkerInstanceConfigFile() {
+    public void getWorkerInstanceStatus() {
+        Map<String, Object> environments = Maps.newHashMap();
+        environments.put("key1", "value1");
+        environments.put("key2", "value2");
+        assertEquals(WorkerStatusEnum.RUN, 
workerInstanceRepositoryOnK8S.getWorkerInstanceStatus("worker-4", 
environments));
+    }
+
+    @Test
+    public void applyWorkerInstanceConfigFile() {
+        Map<String, Object> environments = Maps.newHashMap();
+        environments.put("key1", "value1");
+        environments.put("key2", "value2");
         String taskConfig = "[\n" +
-            "  {\n" +
-            "    \"name\":\"demo-runner\",\n" +
-            "    \"components\":[\n" +
-            "      {\n" +
-            "        \"accountId\": \"654321\",\n" +
-            "        \"eventBusName\":\"demo-bus\"\n" +
-            "      },\n" +
-            "      {\n" +
-            "        \"filterPattern\":\"{}\",\n" +
-            "        
\"class\":\"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeFilterTransform\"\n"
 +
-            "      },\n" +
-            "      {\n" +
-            "        
\"data\":\"{\\\"form\\\":\\\"TEMPLATE\\\",\\\"value\\\":\\\"{\\\\\\\"content\\\\\\\":\\\\\\\"$.data.body\\\\\\\"}\\\",\\\"template\\\":\\\"{\\\\\\\"text\\\\\\\":{\\\\\\\"content\\\\\\\":\\\\\\\"${content}\\\\\\\"},\\\\\\\"msgtype\\\\\\\":\\\\\\\"text\\\\\\\"}\\\"}\",\n"
 +
-            "        \"class\": 
\"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform\"\n" +
-            "      },\n" +
-            "      {\n" +
-            "        
\"class\":\"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkTask\",\n" +
-            "        \"webHook\":\"xxxxxxxxxxx\",\n" +
-            "        \"secretKey\":\"xxxxxxxxxxx\"\n" +
-            "      }\n" +
-            "    ]\n" +
-            "  }\n" +
-            "]";
-        
workerInstanceRepositoryOnK8S.applyWorkerInstanceConfigFile("worker-4", 
"/eventbridge/task-config", taskConfig);
+                "  {\n" +
+                "    \"name\":\"demo-runner\",\n" +
+                "    \"components\":[\n" +
+                "      {\n" +
+                "        \"accountId\": \"654321\",\n" +
+                "        \"eventBusName\":\"demo-bus\"\n" +
+                "      },\n" +
+                "      {\n" +
+                "        \"filterPattern\":\"{}\",\n" +
+                "        
\"class\":\"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeFilterTransform\"\n"
 +
+                "      },\n" +
+                "      {\n" +
+                "        
\"data\":\"{\\\"form\\\":\\\"TEMPLATE\\\",\\\"value\\\":\\\"{\\\\\\\"content\\\\\\\":\\\\\\\"$.data.body\\\\\\\"}\\\",\\\"template\\\":\\\"{\\\\\\\"text\\\\\\\":{\\\\\\\"content\\\\\\\":\\\\\\\"${content}\\\\\\\"},\\\\\\\"msgtype\\\\\\\":\\\\\\\"text\\\\\\\"}\\\"}\",\n"
 +
+                "        \"class\": 
\"org.apache.rocketmq.connect.transform.eventbridge.EventBridgeTransform\"\n" +
+                "      },\n" +
+                "      {\n" +
+                "        
\"class\":\"org.apache.rocketmq.connect.dingtalk.sink.DingTalkSinkTask\",\n" +
+                "        \"webHook\":\"xxxxxxxxxxx\",\n" +
+                "        \"secretKey\":\"xxxxxxxxxxx\"\n" +
+                "      }\n" +
+                "    ]\n" +
+                "  }\n" +
+                "]";
+        
workerInstanceRepositoryOnK8S.applyWorkerInstanceConfigFile("worker-config", 
"task-config", taskConfig, environments);
     }
 
     @Test
     @Ignore
-    void getWorkerInstanceConfigFile() {
-        workerInstanceRepositoryOnK8S.getWorkerInstanceConfigFile("worker-4", 
"/eventbridge/task-config");
+    public void getWorkerInstanceConfigFile() {
+        Map<String, Object> environments = Maps.newHashMap();
+        environments.put("key1", "value1");
+        environments.put("key2", "value2");
+        workerInstanceRepositoryOnK8S.getWorkerInstanceConfigFile("worker-4", 
"/eventbridge/task-config", environments);
     }
 }
\ No newline at end of file
diff --git a/adapter/runtime-on-k8s/src/test/resources/application.properties 
b/adapter/runtime-on-k8s/src/test/resources/application.properties
new file mode 100644
index 0000000..87076bc
--- /dev/null
+++ b/adapter/runtime-on-k8s/src/test/resources/application.properties
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+kubernates.api.server=https://kubernetes.docker.internal:6443
diff --git 
a/common/src/main/java/org/apache/rocketmq/eventbridge/tools/JsonUtil.java 
b/common/src/main/java/org/apache/rocketmq/eventbridge/tools/JsonUtil.java
index 393ee21..62698ab 100644
--- a/common/src/main/java/org/apache/rocketmq/eventbridge/tools/JsonUtil.java
+++ b/common/src/main/java/org/apache/rocketmq/eventbridge/tools/JsonUtil.java
@@ -18,16 +18,22 @@
 package org.apache.rocketmq.eventbridge.tools;
 
 import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.gson.JsonElement;
 import com.google.gson.JsonParser;
 import com.google.gson.JsonPrimitive;
 import com.google.gson.JsonSyntaxException;
-import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
 import 
org.apache.rocketmq.eventbridge.tools.pattern.InvalidEventPatternException;
 import org.apache.rocketmq.eventbridge.tools.pattern.PatternErrorMessages;
 
 public class JsonUtil {
 
+    private static Gson gson = new GsonBuilder().create();
     /**
      * Convert the value of map to json element
      *
@@ -62,4 +68,30 @@ public class JsonUtil {
         }
         return "{}".equals(element.toString()) ? Boolean.TRUE : Boolean.FALSE;
     }
+
+    public static <T> T parse(String str, Class<T> clazz) {
+        if (StringUtils.isNotEmpty(str) && clazz != null) {
+            try {
+                return gson.fromJson(str, clazz);
+            } catch (Exception e) {
+                throw new 
InvalidEventPatternException(PatternErrorMessages.INVALID_JSON_STRING, e);
+            }
+        } else {
+            return null;
+        }
+    }
+
+
+    public static <T> String toJson(T obj) {
+        if (obj == null) {
+            return null;
+        } else {
+            try {
+                return gson.toJson(obj);
+            } catch (Exception var2) {
+                return null;
+            }
+        }
+    }
+
 }
diff --git a/pom.xml b/pom.xml
index 41ee0f3..071ff4a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@
         <cloudevents.version>2.3.0</cloudevents.version>
         <apache.commons-text.version>1.10.0</apache.commons-text.version>
         <mockito.version>2.13.0</mockito.version>
-        <jupiter.version>5.9.2</jupiter.version>
+        <jupiter.version>5.10.2</jupiter.version>
         <caffeine.version>2.9.3</caffeine.version>
         <fastjson.version>1.2.83</fastjson.version>
         <jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>

Reply via email to