This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bfac499b5 [INLONG-10173][Sort] SortStandalone support request unified 
configuration (#10174)
1bfac499b5 is described below

commit 1bfac499b538032aaad64f1f7936bac75b9fd893
Author: vernedeng <[email protected]>
AuthorDate: Sat May 11 21:54:59 2024 +0800

    [INLONG-10173][Sort] SortStandalone support request unified configuration 
(#10174)
---
 .../apache/inlong/common/pojo/sort/SortConfig.java |   8 +-
 .../SortConfigResponse.java                        |   2 +-
 .../pojo/sortstandalone/SortClusterConfig.java     |   1 +
 .../pojo/sortstandalone/SortClusterResponse.java   |   1 +
 .../common/pojo/sortstandalone/SortTaskConfig.java |   1 +
 .../inlong/manager/service/core/SortService.java   |   2 +-
 .../manager/service/core/impl/SortServiceImpl.java |   4 +-
 .../web/controller/openapi/SortController.java     |   2 +-
 .../config/holder/SortClusterConfigHolder.java     |   1 +
 .../SortConfigHolder.java}                         |  96 ++++++----------
 .../ClassResourceSortClusterConfigLoader.java      |   1 +
 .../loader/ManagerSortClusterConfigLoader.java     |   1 +
 .../config/loader/SortClusterConfigLoader.java     |   1 +
 .../ClassResourceSortClusterConfigLoader.java      |  35 ++----
 .../{ => v2}/ManagerSortClusterConfigLoader.java   |  52 +++------
 .../config/loader/v2/SortConfigLoader.java         |  13 +--
 .../standalone/utils/FlumeConfigGenerator.java     |   1 +
 .../utils/{ => v2}/FlumeConfigGenerator.java       |  82 +++-----------
 .../apache/inlong/sort/standalone/SortCluster.java |   1 +
 .../apache/inlong/sort/standalone/SortTask.java    |   1 +
 .../sort/standalone/{ => v2}/SortCluster.java      |  53 +++------
 .../inlong/sort/standalone/{ => v2}/SortTask.java  | 125 +++++++--------------
 22 files changed, 160 insertions(+), 324 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
index ad3a6bb892..680827843d 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
@@ -17,14 +17,20 @@
 
 package org.apache.inlong.common.pojo.sort;
 
+import lombok.AllArgsConstructor;
+import lombok.Builder;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 import java.io.Serializable;
 import java.util.List;
 
 @Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
 public class SortConfig implements Serializable {
 
     private String sortClusterName;
-    private List<SortTaskConfig> clusters;
+    private List<SortTaskConfig> tasks;
 }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortConfigResponse.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfigResponse.java
similarity index 95%
rename from 
inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortConfigResponse.java
rename to 
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfigResponse.java
index ea0a230e57..327898ede0 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortConfigResponse.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfigResponse.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.common.pojo.sortstandalone;
+package org.apache.inlong.common.pojo.sort;
 
 import lombok.AllArgsConstructor;
 import lombok.Builder;
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
index 905cd61ae3..957eda406b 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterConfig.java
@@ -28,6 +28,7 @@ import java.util.List;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
+@Deprecated
 public class SortClusterConfig {
 
     String clusterName;
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
index 679031e16f..c0bcfc88d0 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortClusterResponse.java
@@ -26,6 +26,7 @@ import lombok.NoArgsConstructor;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
+@Deprecated
 public class SortClusterResponse {
 
     public static final int SUCC = 0;
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
index 20fa5f06e0..ae165eb214 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sortstandalone/SortTaskConfig.java
@@ -29,6 +29,7 @@ import java.util.Map;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
+@Deprecated
 public class SortTaskConfig {
 
     String name;
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
index 011c903c2e..be7fcb032c 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.manager.service.core;
 
 import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
+import org.apache.inlong.common.pojo.sort.SortConfigResponse;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
-import org.apache.inlong.common.pojo.sortstandalone.SortConfigResponse;
 import org.apache.inlong.manager.pojo.sort.SortStatusInfo;
 import org.apache.inlong.manager.pojo.sort.SortStatusRequest;
 
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 8a9c9691a3..d820e37129 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -20,13 +20,13 @@ package org.apache.inlong.manager.service.core.impl;
 import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
 import org.apache.inlong.common.pojo.sort.SortClusterConfig;
 import org.apache.inlong.common.pojo.sort.SortConfig;
+import org.apache.inlong.common.pojo.sort.SortConfigResponse;
 import org.apache.inlong.common.pojo.sort.SortTaskConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
 import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
 import org.apache.inlong.common.pojo.sort.node.NodeConfig;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
-import org.apache.inlong.common.pojo.sortstandalone.SortConfigResponse;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.plugin.Plugin;
@@ -305,7 +305,7 @@ public class SortServiceImpl implements SortService, 
PluginBinder {
                 }
                 map.add(sortTaskConfig);
             }
-            sortConfig.setClusters(temp.get(sortClusterName));
+            sortConfig.setTasks(temp.get(sortClusterName));
             try {
                 String configStr = objectMapper.writeValueAsString(sortConfig);
                 sortConfigs.put(sortClusterName, configStr);
diff --git 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
index 2c9d994d1c..a8d5faad54 100644
--- 
a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
+++ 
b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.manager.web.controller.openapi;
 
 import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
+import org.apache.inlong.common.pojo.sort.SortConfigResponse;
 import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
-import org.apache.inlong.common.pojo.sortstandalone.SortConfigResponse;
 import org.apache.inlong.manager.service.core.SortService;
 
 import io.swagger.annotations.Api;
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
index eeec29863e..9d82bde48e 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
@@ -39,6 +39,7 @@ import static 
org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
  * 
  * SortClusterConfigHolder
  */
+@Deprecated
 public final class SortClusterConfigHolder {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(SortClusterConfigHolder.class);
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
similarity index 60%
copy from 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
copy to 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
index eeec29863e..ef9106cc97 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/SortClusterConfigHolder.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
@@ -15,19 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.standalone.config.holder;
+package org.apache.inlong.sort.standalone.config.holder.v2;
 
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
-import 
org.apache.inlong.sort.standalone.config.loader.ClassResourceSortClusterConfigLoader;
-import 
org.apache.inlong.sort.standalone.config.loader.ManagerSortClusterConfigLoader;
-import org.apache.inlong.sort.standalone.config.loader.SortClusterConfigLoader;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.apache.inlong.common.pojo.sort.SortConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
+import 
org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader;
+import 
org.apache.inlong.sort.standalone.config.loader.v2.ManagerSortClusterConfigLoader;
+import org.apache.inlong.sort.standalone.config.loader.v2.SortConfigLoader;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
-import org.slf4j.Logger;
 
 import java.util.Date;
 import java.util.Timer;
@@ -35,39 +36,31 @@ import java.util.TimerTask;
 
 import static 
org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
 
-/**
- * 
- * SortClusterConfigHolder
- */
-public final class SortClusterConfigHolder {
-
-    public static final Logger LOG = 
InlongLoggerFactory.getLogger(SortClusterConfigHolder.class);
+@Slf4j
+public class SortConfigHolder {
 
-    private static SortClusterConfigHolder instance;
+    private static SortConfigHolder instance;
 
     private long reloadInterval;
     private Timer reloadTimer;
-    private SortClusterConfigLoader loader;
-    private SortClusterConfig config;
+    private SortConfigLoader loader;
+    private SortConfig config;
 
-    /**
-     * Constructor
-     */
-    private SortClusterConfigHolder() {
+    private SortConfigHolder() {
 
     }
 
-    /**
-     * getInstance
-     * 
-     * @return
-     */
-    private static SortClusterConfigHolder get() {
+    private static SortConfigHolder get() {
         if (instance != null) {
             return instance;
         }
-        synchronized (SortClusterConfigHolder.class) {
-            instance = new SortClusterConfigHolder();
+
+        synchronized (SortConfigHolder.class) {
+            if (instance != null) {
+                return instance;
+            }
+
+            instance = new SortConfigHolder();
             instance.reloadInterval = 
CommonPropertiesHolder.getLong(RELOAD_INTERVAL, 60000L);
             String loaderType = CommonPropertiesHolder
                     .getString(SortClusterConfigType.KEY_TYPE, 
SortClusterConfigType.MANAGER.name());
@@ -81,11 +74,11 @@ public final class SortClusterConfigHolder {
                 try {
                     Class<?> loaderClass = ClassUtils.getClass(loaderType);
                     Object loaderObject = 
loaderClass.getDeclaredConstructor().newInstance();
-                    if (loaderObject instanceof SortClusterConfigLoader) {
-                        instance.loader = (SortClusterConfigLoader) 
loaderObject;
+                    if (loaderObject instanceof SortConfigLoader) {
+                        instance.loader = (SortConfigLoader) loaderObject;
                     }
                 } catch (Throwable t) {
-                    LOG.error("Fail to init loader,loaderType:{},error:{}", 
loaderType, t.getMessage());
+                    log.error("failed to init loader, loaderType={}", 
loaderType);
                 }
             }
             if (instance.loader == null) {
@@ -96,22 +89,17 @@ public final class SortClusterConfigHolder {
                 instance.reload();
                 instance.setReloadTimer();
             } catch (Exception e) {
-                LOG.error(e.getMessage(), e);
+                log.error("failed to reload instance", e);
             }
         }
         return instance;
+
     }
 
-    /**
-     * setReloadTimer
-     */
     private void setReloadTimer() {
         reloadTimer = new Timer(true);
         TimerTask task = new TimerTask() {
 
-            /**
-             * run
-             */
             public void run() {
                 reload();
             }
@@ -119,40 +107,26 @@ public final class SortClusterConfigHolder {
         reloadTimer.schedule(task, new Date(System.currentTimeMillis() + 
reloadInterval), reloadInterval);
     }
 
-    /**
-     * reload
-     */
     private void reload() {
         try {
-            SortClusterConfig newConfig = this.loader.load();
+            SortConfig newConfig = this.loader.load();
             if (newConfig != null) {
                 this.config = newConfig;
             }
         } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
+            log.error("failed to reload sort config", e);
         }
     }
 
-    /**
-     * getClusterConfig
-     * 
-     * @return
-     */
-    public static SortClusterConfig getClusterConfig() {
+    public static SortConfig getSortConfig() {
         return get().config;
     }
 
-    /**
-     * getTaskConfig
-     * 
-     * @param  sortTaskName
-     * @return
-     */
     public static SortTaskConfig getTaskConfig(String sortTaskName) {
-        SortClusterConfig config = get().config;
-        if (config != null && config.getSortTasks() != null) {
-            for (SortTaskConfig task : config.getSortTasks()) {
-                if (StringUtils.equals(sortTaskName, task.getName())) {
+        SortConfig config = get().config;
+        if (config != null && config.getTasks() != null) {
+            for (SortTaskConfig task : config.getTasks()) {
+                if (StringUtils.equals(sortTaskName, task.getSortTaskName())) {
                     return task;
                 }
             }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
index 294fdf2cbf..8fb4a6f8ab 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
@@ -33,6 +33,7 @@ import java.nio.charset.Charset;
  * 
  * ClassResourceCommonPropertiesLoader
  */
+@Deprecated
 public class ClassResourceSortClusterConfigLoader implements 
SortClusterConfigLoader {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
index 83724b32d9..d72061353f 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
@@ -41,6 +41,7 @@ import java.util.concurrent.TimeUnit;
  * 
  * ManagerSortClusterConfigLoader
  */
+@Deprecated
 public class ManagerSortClusterConfigLoader implements SortClusterConfigLoader 
{
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
index 8751d8f732..4405a4a0db 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortClusterConfigLoader.java
@@ -25,6 +25,7 @@ import org.apache.flume.conf.Configurable;
  * 
  * SortClusterConfigLoader
  */
+@Deprecated
 public interface SortClusterConfigLoader extends Configurable {
 
     /**
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
similarity index 69%
copy from 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
copy to 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
index 294fdf2cbf..629d8fef54 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ClassResourceSortClusterConfigLoader.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
@@ -15,9 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.standalone.config.loader;
+package org.apache.inlong.sort.standalone.config.loader.v2;
 
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.SortConfig;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
@@ -26,26 +26,16 @@ import org.apache.commons.io.IOUtils;
 import org.apache.flume.Context;
 import org.slf4j.Logger;
 
-import java.io.UnsupportedEncodingException;
 import java.nio.charset.Charset;
 
-/**
- * 
- * ClassResourceCommonPropertiesLoader
- */
-public class ClassResourceSortClusterConfigLoader implements 
SortClusterConfigLoader {
+public class ClassResourceSortClusterConfigLoader implements SortConfigLoader {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
-
     private Context context;
+    private ObjectMapper objectMapper = new ObjectMapper();
 
-    /**
-     * load
-     * 
-     * @return
-     */
     @Override
-    public SortClusterConfig load() {
+    public SortConfig load() {
         String fileName = SortClusterConfigType.DEFAULT_FILE;
         try {
             if (context != null) {
@@ -55,22 +45,13 @@ public class ClassResourceSortClusterConfigLoader 
implements SortClusterConfigLo
                     Charset.defaultCharset());
             int index = confString.indexOf('{');
             confString = confString.substring(index);
-            ObjectMapper objectMapper = new ObjectMapper();
-            SortClusterConfig config = objectMapper.readValue(confString, 
SortClusterConfig.class);
-            return config;
-        } catch (UnsupportedEncodingException e) {
-            LOG.error("fail to load properties, file ={}, and e= {}", 
fileName, e);
+            return objectMapper.readValue(confString, SortConfig.class);
         } catch (Exception e) {
-            LOG.error("fail to load properties, file ={}, and e= {}", 
fileName, e);
+            LOG.error("failed to load properties, file ={}", fileName, e);
         }
-        return SortClusterConfig.builder().build();
+        return SortConfig.builder().build();
     }
 
-    /**
-     * configure
-     * 
-     * @param context
-     */
     @Override
     public void configure(Context context) {
         this.context = context;
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
similarity index 71%
copy from 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
copy to 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
index 83724b32d9..7882e4e310 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerSortClusterConfigLoader.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
@@ -15,15 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.standalone.config.loader;
+package org.apache.inlong.sort.standalone.config.loader.v2;
 
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
+import org.apache.inlong.common.pojo.sort.SortConfig;
+import org.apache.inlong.common.pojo.sort.SortConfigResponse;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.ManagerUrlHandler;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
 import org.apache.http.HttpHeaders;
@@ -33,28 +33,17 @@ import org.apache.http.client.methods.HttpGet;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClientBuilder;
 import org.apache.http.util.EntityUtils;
-import org.slf4j.Logger;
 
 import java.util.concurrent.TimeUnit;
 
-/**
- * 
- * ManagerSortClusterConfigLoader
- */
-public class ManagerSortClusterConfigLoader implements SortClusterConfigLoader 
{
-
-    public static final Logger LOG = 
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
+@Slf4j
+public class ManagerSortClusterConfigLoader implements SortConfigLoader {
 
     private Context context;
     private CloseableHttpClient httpClient;
     private ObjectMapper objectMapper = new ObjectMapper();
     private String md5;
 
-    /**
-     * constructHttpClient
-     * 
-     * @return
-     */
     private static synchronized CloseableHttpClient constructHttpClient() {
         long timeoutInMs = TimeUnit.MILLISECONDS.toMillis(50000);
         RequestConfig requestConfig = RequestConfig.custom()
@@ -65,54 +54,43 @@ public class ManagerSortClusterConfigLoader implements 
SortClusterConfigLoader {
         return httpClientBuilder.build();
     }
 
-    /**
-     * configure
-     * 
-     * @param context
-     */
     @Override
     public void configure(Context context) {
         this.context = context;
         this.httpClient = constructHttpClient();
     }
 
-    /**
-     * load
-     * 
-     * @return
-     */
     @Override
-    public SortClusterConfig load() {
+    public SortConfig load() {
         HttpGet httpGet = null;
         try {
             String clusterName = 
this.context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
-            String url = ManagerUrlHandler.getSortClusterConfigUrl() + 
"?apiVersion=1.0&clusterName="
+            String url = ManagerUrlHandler.getSortClusterConfigUrl() + 
"?clusterName="
                     + clusterName + "&md5=";
             if (StringUtils.isNotBlank(this.md5)) {
                 url += this.md5;
             }
-            LOG.info("start to request {} to get config info", url);
+            log.info("start to request {} to get config info", url);
             httpGet = new HttpGet(url);
             httpGet.addHeader(HttpHeaders.CONNECTION, "close");
 
             // request with get
             CloseableHttpResponse response = httpClient.execute(httpGet);
             String returnStr = EntityUtils.toString(response.getEntity());
-            LOG.info("end to request {},result:{}", url, returnStr);
-            // get groupId <-> topic and m value.
+            log.info("end to request {}, result={}", url, returnStr);
 
-            SortClusterResponse clusterResponse = 
objectMapper.readValue(returnStr, SortClusterResponse.class);
+            SortConfigResponse clusterResponse = 
objectMapper.readValue(returnStr, SortConfigResponse.class);
             int errCode = clusterResponse.getCode();
-            if (errCode != SortClusterResponse.SUCC && errCode != 
SortClusterResponse.NOUPDATE) {
-                LOG.info("Fail to get config info from url:{}, error code is 
{}, msg is {}",
+            if (errCode != SortConfigResponse.SUCC && errCode != 
SortConfigResponse.NO_UPDATE) {
+                log.error("failed to get config info from url={}, error 
code={}, msg={}",
                         url, clusterResponse.getCode(), 
clusterResponse.getMsg());
                 return null;
             }
 
             this.md5 = clusterResponse.getMd5();
-            return clusterResponse.getData();
+            return objectMapper.readValue(clusterResponse.getData(), 
SortConfig.class);
         } catch (Exception ex) {
-            LOG.error("exception caught", ex);
+            log.error("exception caught", ex);
             return null;
         } finally {
             if (httpGet != null) {
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/SortConfigLoader.java
similarity index 76%
copy from 
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
copy to 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/SortConfigLoader.java
index ad3a6bb892..7ec4753c87 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/SortConfigLoader.java
@@ -15,16 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.common.pojo.sort;
+package org.apache.inlong.sort.standalone.config.loader.v2;
 
-import lombok.Data;
+import org.apache.inlong.common.pojo.sort.SortConfig;
 
-import java.io.Serializable;
-import java.util.List;
+import org.apache.flume.conf.Configurable;
 
-@Data
-public class SortConfig implements Serializable {
+public interface SortConfigLoader extends Configurable {
 
-    private String sortClusterName;
-    private List<SortTaskConfig> clusters;
+    SortConfig load();
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
index 7ba7843655..8a5e46fdf6 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
@@ -27,6 +27,7 @@ import java.util.Optional;
 /**
  * generator for flume config
  */
+@Deprecated
 public class FlumeConfigGenerator {
 
     public static final String KEY_TASK_NAME = "taskName";
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
similarity index 70%
copy from 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
copy to 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
index 7ba7843655..3edf16af72 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/FlumeConfigGenerator.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
@@ -15,18 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.standalone.utils;
+package org.apache.inlong.sort.standalone.utils.v2;
 
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
-/**
- * generator for flume config
- */
 public class FlumeConfigGenerator {
 
     public static final String KEY_TASK_NAME = "taskName";
@@ -41,48 +38,28 @@ public class FlumeConfigGenerator {
 
     public static Map<String, String> 
generateFlumeConfiguration(SortTaskConfig taskConfig) {
         Map<String, String> flumeConf = new HashMap<>();
-        String name = taskConfig.getName();
-        Map<String, String> sinkParams = taskConfig.getSinkParams();
-        // channels
-        appendChannels(flumeConf, name, sinkParams);
-        // sinks
-        appendSinks(flumeConf, name, sinkParams);
-        // sources
-        appendSources(flumeConf, name, sinkParams);
+        String sortTaskName = taskConfig.getSortTaskName();
+        appendChannels(flumeConf, sortTaskName);
+        appendSinks(flumeConf, sortTaskName);
+        appendSources(flumeConf, sortTaskName);
         return flumeConf;
     }
 
-    /**
-     * append channels config
-     *
-     * @param flumeConf final config of flume
-     * @param name sort task name
-     * @param sinkParams sink params of this task
-     */
-    private static void appendChannels(Map<String, String> flumeConf, String 
name, Map<String, String> sinkParams) {
+    private static void appendChannels(Map<String, String> flumeConf, String 
name) {
         StringBuilder builder = new StringBuilder();
         String channelName = name + "Channel";
         flumeConf.put(name + ".channels", channelName);
         String prefix = 
builder.append(name).append(".channels.").append(channelName).append(".").toString();
         builder.setLength(0);
         String channelType = builder.append(prefix).append("type").toString();
-        String channelClass = sinkParams.getOrDefault(KEY_SORT_CHANNEL_TYPE,
-                CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE));
+        String channelClass = 
CommonPropertiesHolder.getString(KEY_SORT_CHANNEL_TYPE);
         flumeConf.put(channelType, channelClass);
-        appendCommon(flumeConf, prefix, null, name);
+        appendCommon(flumeConf, prefix, name);
     }
 
-    /**
-     * appendCommon config
-     *
-     * @param flumeConf final config of flume
-     * @param prefix prefix of common properties
-     * @param componentParams common properties
-     */
     private static void appendCommon(
             Map<String, String> flumeConf,
             String prefix,
-            Map<String, String> componentParams,
             String name) {
         StringBuilder builder = new StringBuilder();
         String taskName = 
builder.append(prefix).append(KEY_TASK_NAME).toString();
@@ -93,24 +70,9 @@ public class FlumeConfigGenerator {
             String key = 
builder.append(prefix).append(entry.getKey()).toString();
             flumeConf.put(key, entry.getValue());
         }
-        // componentParams
-        if (componentParams != null) {
-            for (Map.Entry<String, String> entry : componentParams.entrySet()) 
{
-                builder.setLength(0);
-                String key = 
builder.append(prefix).append(entry.getKey()).toString();
-                flumeConf.put(key, entry.getValue());
-            }
-        }
     }
 
-    /**
-     * append sink config
-     *
-     * @param flumeConf final config of flume
-     * @param name sort task name
-     * @param sinkParams sink params of this task
-     */
-    private static void appendSinks(Map<String, String> flumeConf, String 
name, Map<String, String> sinkParams) {
+    private static void appendSinks(Map<String, String> flumeConf, String 
name) {
         // sinks
         String sinkName = name + "Sink";
         flumeConf.put(name + ".sinks", sinkName);
@@ -119,28 +81,19 @@ public class FlumeConfigGenerator {
         // type
         builder.setLength(0);
         String sinkType = builder.append(prefix).append("type").toString();
-        String sinkClass = sinkParams.getOrDefault(KEY_SORT_SINK_TYPE,
-                CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE));
+        String sinkClass = 
CommonPropertiesHolder.getString(KEY_SORT_SINK_TYPE);
         flumeConf.put(sinkType, sinkClass);
         // channel
         builder.setLength(0);
         String channelKey = 
builder.append(prefix).append("channel").toString();
         String channelName = name + "Channel";
         flumeConf.put(channelKey, channelName);
-        //
-        appendCommon(flumeConf, prefix, sinkParams, name);
+
+        // common
+        appendCommon(flumeConf, prefix, name);
     }
 
-    /**
-     * append source config
-     *
-     * @param flumeConf final config of flume
-     * @param name sort task name
-     * @param sinkParams sink params of this task
-     */
-    private static void appendSources(
-            Map<String, String> flumeConf,
-            String name, Map<String, String> sinkParams) {
+    private static void appendSources(Map<String, String> flumeConf, String 
name) {
         // sources
         String sourceName = name + "Source";
         flumeConf.put(name + ".sources", sourceName);
@@ -149,8 +102,7 @@ public class FlumeConfigGenerator {
         // type
         builder.setLength(0);
         String sourceType = builder.append(prefix).append("type").toString();
-        String sourceClass = sinkParams.getOrDefault(KEY_SORT_SOURCE_TYPE,
-                CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE));
+        String sourceClass = 
CommonPropertiesHolder.getString(KEY_SORT_SOURCE_TYPE);
         flumeConf.put(sourceType, sourceClass);
         // channel
         builder.setLength(0);
@@ -183,6 +135,6 @@ public class FlumeConfigGenerator {
         
Optional.ofNullable(CommonPropertiesHolder.getString(KEY_ROLLBACK_STOP_TIME))
                 .map(stopTime -> flumeConf.put(stopTimeKey, stopTime));
 
-        appendCommon(flumeConf, prefix, null, name);
+        appendCommon(flumeConf, prefix, name);
     }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
index 0a3ae734b2..dcd2dd3ca3 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
@@ -41,6 +41,7 @@ import static 
org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
 /**
  * SortCluster
  */
+@Deprecated
 public class SortCluster {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(SortCluster.class);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
index ee6510be89..71db1c7024 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
@@ -40,6 +40,7 @@ import java.util.concurrent.locks.ReentrantLock;
  * 
  * SortTask
  */
+@Deprecated
 public class SortTask {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(SortTask.class);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
similarity index 73%
copy from 
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
copy to 
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
index 0a3ae734b2..f7e2e58cc6 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
@@ -15,44 +15,35 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.standalone;
+package org.apache.inlong.sort.standalone.v2;
 
-import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.SortConfig;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
 import org.apache.inlong.sdk.commons.admin.AdminTask;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flume.Context;
-import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.inlong.sort.standalone.utils.Constants.RELOAD_INTERVAL;
 
-/**
- * SortCluster
- */
+@Slf4j
 public class SortCluster {
 
-    public static final Logger LOG = 
InlongLoggerFactory.getLogger(SortCluster.class);
-
     private Timer reloadTimer;
     private Map<String, SortTask> taskMap = new ConcurrentHashMap<>();
     private List<SortTask> deletingTasks = new ArrayList<>();
     private AdminTask adminTask;
 
-    /**
-     * start
-     */
     public void start() {
         try {
             this.reload();
@@ -61,18 +52,15 @@ public class SortCluster {
             this.adminTask = new AdminTask(new 
Context(CommonPropertiesHolder.get()));
             this.adminTask.start();
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            log.error("failed to start sort cluster", e);
         }
     }
 
-    /**
-     * close
-     */
     public void close() {
         try {
             this.reloadTimer.cancel();
             // stop sort task
-            for (Entry<String, SortTask> entry : this.taskMap.entrySet()) {
+            for (Map.Entry<String, SortTask> entry : this.taskMap.entrySet()) {
                 entry.getValue().stop();
             }
             // stop admin task
@@ -80,20 +68,14 @@ public class SortCluster {
                 this.adminTask.stop();
             }
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            log.error("failed to close sort cluster", e);
         }
     }
 
-    /**
-     * setReloadTimer
-     */
     private void setReloadTimer() {
         reloadTimer = new Timer(true);
         TimerTask task = new TimerTask() {
 
-            /**
-             * run
-             */
             public void run() {
                 reload();
             }
@@ -102,19 +84,16 @@ public class SortCluster {
         reloadTimer.schedule(task, new Date(System.currentTimeMillis() + 
reloadInterval), reloadInterval);
     }
 
-    /**
-     * reload
-     */
     public void reload() {
         try {
             // get new config
-            SortClusterConfig newConfig = 
SortClusterConfigHolder.getClusterConfig();
+            SortConfig newConfig = SortConfigHolder.getSortConfig();
             if (newConfig == null) {
                 return;
             }
             // add new task
-            for (SortTaskConfig taskConfig : newConfig.getSortTasks()) {
-                String newTaskName = taskConfig.getName();
+            for (SortTaskConfig taskConfig : newConfig.getTasks()) {
+                String newTaskName = taskConfig.getSortTaskName();
                 if (taskMap.containsKey(newTaskName)) {
                     continue;
                 }
@@ -124,11 +103,11 @@ public class SortCluster {
             }
             // remove task
             deletingTasks.clear();
-            for (Entry<String, SortTask> entry : taskMap.entrySet()) {
+            for (Map.Entry<String, SortTask> entry : taskMap.entrySet()) {
                 String taskName = entry.getKey();
                 boolean isFound = false;
-                for (SortTaskConfig taskConfig : newConfig.getSortTasks()) {
-                    if (taskName.equals(taskConfig.getName())) {
+                for (SortTaskConfig taskConfig : newConfig.getTasks()) {
+                    if (taskName.equals(taskConfig.getSortTaskName())) {
                         isFound = true;
                         break;
                     }
@@ -143,7 +122,7 @@ public class SortCluster {
                 taskMap.remove(task.getTaskName());
             }
         } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
+            log.error("failed to reload cluster", e);
         }
     }
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
similarity index 57%
copy from 
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
copy to 
inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
index ee6510be89..d94ba6ffdf 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/SortTask.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
@@ -15,81 +15,54 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.standalone;
+package org.apache.inlong.sort.standalone.v2;
 
-import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
-import org.apache.inlong.sort.standalone.utils.FlumeConfigGenerator;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
+import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.sort.standalone.PropertiesConfigurationProvider;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
+import org.apache.inlong.sort.standalone.utils.v2.FlumeConfigGenerator;
 
 import com.google.common.eventbus.Subscribe;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.flume.Channel;
 import org.apache.flume.SinkRunner;
 import org.apache.flume.SourceRunner;
 import org.apache.flume.lifecycle.LifecycleState;
 import org.apache.flume.lifecycle.LifecycleSupervisor;
-import org.apache.flume.lifecycle.LifecycleSupervisor.SupervisorPolicy;
 import org.apache.flume.node.MaterializedConfiguration;
-import org.slf4j.Logger;
 
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.locks.ReentrantLock;
 
-/**
- * 
- * SortTask
- */
+@Slf4j
 public class SortTask {
 
-    public static final Logger LOG = 
InlongLoggerFactory.getLogger(SortTask.class);
-
     private final String taskName;
     private final LifecycleSupervisor supervisor;
     private MaterializedConfiguration materializedConfiguration;
     private final ReentrantLock lifecycleLock = new ReentrantLock();
 
-    /**
-     * Constructor
-     * 
-     * @param taskName
-     */
     public SortTask(String taskName) {
         this.taskName = taskName;
         this.supervisor = new LifecycleSupervisor();
     }
 
-    /**
-     * get taskName
-     * 
-     * @return the taskName
-     */
-    public String getTaskName() {
-        return taskName;
-    }
-
-    /**
-     * start
-     */
     public void start() {
-        SortTaskConfig config = 
SortClusterConfigHolder.getTaskConfig(taskName);
+        SortTaskConfig config = SortConfigHolder.getTaskConfig(taskName);
         if (config == null) {
             return;
         }
-
-        //
         Map<String, String> flumeConfiguration = 
FlumeConfigGenerator.generateFlumeConfiguration(config);
-        LOG.info("Start sort task:{},config:{}", taskName, flumeConfiguration);
+        log.info("start sort task={}, config={}", taskName, 
flumeConfiguration);
         PropertiesConfigurationProvider configurationProvider = new 
PropertiesConfigurationProvider(
-                config.getName(), flumeConfiguration);
+                config.getSortTaskName(), flumeConfiguration);
         
this.handleConfigurationEvent(configurationProvider.getConfiguration());
     }
 
-    /**
-     * handleConfigurationEvent
-     * 
-     * @param conf
-     */
+    public String getTaskName() {
+        return taskName;
+    }
+
     @Subscribe
     public void handleConfigurationEvent(MaterializedConfiguration conf) {
         try {
@@ -97,8 +70,7 @@ public class SortTask {
             stopAllComponents();
             startAllComponents(conf);
         } catch (InterruptedException e) {
-            LOG.info("Interrupted while trying to handle configuration event");
-            return;
+            log.error("interrupted while trying to handle configuration 
event", e);
         } finally {
             // If interrupted while trying to lock, we don't own the lock, so 
must not attempt to unlock
             if (lifecycleLock.isHeldByCurrentThread()) {
@@ -107,9 +79,6 @@ public class SortTask {
         }
     }
 
-    /**
-     * stop
-     */
     public void stop() {
         lifecycleLock.lock();
         stopAllComponents();
@@ -120,94 +89,84 @@ public class SortTask {
         }
     }
 
-    /**
-     * stopAllComponents
-     */
     private void stopAllComponents() {
         if (this.materializedConfiguration != null) {
-            LOG.info("Shutting down configuration: {}", 
this.materializedConfiguration);
-            for (Entry<String, SourceRunner> entry : 
this.materializedConfiguration.getSourceRunners().entrySet()) {
+            log.info("shutting down configuration: {}", 
this.materializedConfiguration);
+            for (Map.Entry<String, SourceRunner> entry : 
this.materializedConfiguration.getSourceRunners().entrySet()) {
                 try {
-                    LOG.info("Stopping Source " + entry.getKey());
+                    log.info("stopping Source " + entry.getKey());
                     supervisor.unsupervise(entry.getValue());
                 } catch (Exception e) {
-                    LOG.error("Error while stopping {}", entry.getValue(), e);
+                    log.error("error while stopping {}", entry.getValue(), e);
                 }
             }
 
-            for (Entry<String, SinkRunner> entry : 
this.materializedConfiguration.getSinkRunners().entrySet()) {
+            for (Map.Entry<String, SinkRunner> entry : 
this.materializedConfiguration.getSinkRunners().entrySet()) {
                 try {
-                    LOG.info("Stopping Sink " + entry.getKey());
+                    log.info("stopping Sink " + entry.getKey());
                     supervisor.unsupervise(entry.getValue());
                 } catch (Exception e) {
-                    LOG.error("Error while stopping {}", entry.getValue(), e);
+                    log.error("error while stopping {}", entry.getValue(), e);
                 }
             }
 
-            for (Entry<String, Channel> entry : 
this.materializedConfiguration.getChannels().entrySet()) {
+            for (Map.Entry<String, Channel> entry : 
this.materializedConfiguration.getChannels().entrySet()) {
                 try {
-                    LOG.info("Stopping Channel " + entry.getKey());
+                    log.info("stopping Channel " + entry.getKey());
                     supervisor.unsupervise(entry.getValue());
                 } catch (Exception e) {
-                    LOG.error("Error while stopping {}", entry.getValue(), e);
+                    log.error("error while stopping {}", entry.getValue(), e);
                 }
             }
         }
     }
 
-    /**
-     * startAllComponents
-     * 
-     * @param materializedConfiguration
-     */
     private void startAllComponents(MaterializedConfiguration 
materializedConfiguration) {
-        LOG.info("Starting new configuration:{}", materializedConfiguration);
+        log.info("starting new configuration:{}", materializedConfiguration);
 
         this.materializedConfiguration = materializedConfiguration;
 
-        for (Entry<String, Channel> entry : 
materializedConfiguration.getChannels().entrySet()) {
+        for (Map.Entry<String, Channel> entry : 
materializedConfiguration.getChannels().entrySet()) {
             try {
-                LOG.info("Starting Channel " + entry.getKey());
+                log.info("starting channel " + entry.getKey());
                 supervisor.supervise(entry.getValue(),
-                        new SupervisorPolicy.AlwaysRestartPolicy(), 
LifecycleState.START);
+                        new 
LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), 
LifecycleState.START);
             } catch (Exception e) {
-                LOG.error("Error while starting {}", entry.getValue(), e);
+                log.error("error while starting {}", entry.getValue(), e);
             }
         }
 
-        /*
-         * Wait for all channels to start.
-         */
+        // Wait for all channels to start.
         for (Channel ch : materializedConfiguration.getChannels().values()) {
             while (ch.getLifecycleState() != LifecycleState.START
                     && !supervisor.isComponentInErrorState(ch)) {
                 try {
-                    LOG.info("Waiting for channel: " + ch.getName() + " to 
start. Sleeping for 500 ms");
+                    log.info("waiting for channel: " + ch.getName() + " to 
start. Sleeping for 500 ms");
                     Thread.sleep(500);
                 } catch (InterruptedException e) {
-                    LOG.error("Interrupted while waiting for channel to 
start.", e);
+                    log.error("interrupted while waiting for channel to 
start.", e);
                 }
             }
         }
 
-        for (Entry<String, SinkRunner> entry : 
materializedConfiguration.getSinkRunners().entrySet()) {
+        for (Map.Entry<String, SinkRunner> entry : 
materializedConfiguration.getSinkRunners().entrySet()) {
             try {
-                LOG.info("Starting Sink " + entry.getKey());
+                log.info("starting sink " + entry.getKey());
                 supervisor.supervise(entry.getValue(),
-                        new SupervisorPolicy.AlwaysRestartPolicy(), 
LifecycleState.START);
+                        new 
LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), 
LifecycleState.START);
             } catch (Exception e) {
-                LOG.error("Error while starting {}", entry.getValue(), e);
+                log.error("error while starting {}", entry.getValue(), e);
             }
         }
 
-        for (Entry<String, SourceRunner> entry : 
materializedConfiguration.getSourceRunners().entrySet()) {
+        for (Map.Entry<String, SourceRunner> entry : 
materializedConfiguration.getSourceRunners().entrySet()) {
             try {
-                LOG.info("Starting Source " + entry.getKey());
+                log.info("starting source " + entry.getKey());
                 supervisor.supervise(entry.getValue(),
-                        new SupervisorPolicy.AlwaysRestartPolicy(), 
LifecycleState.START);
+                        new 
LifecycleSupervisor.SupervisorPolicy.AlwaysRestartPolicy(), 
LifecycleState.START);
             } catch (Exception e) {
-                LOG.error("Error while starting {}", entry.getValue(), e);
+                log.error("error while starting {}", entry.getValue(), e);
             }
         }
     }
-}
\ No newline at end of file
+}

Reply via email to