This is an automated email from the ASF dual-hosted git repository.

vernedeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 8724e9e198 [INLONG-10526][Sort] ClsSink support switch metadata 
acquire mode (#10539)
8724e9e198 is described below

commit 8724e9e198ae15373027b2b658ec751ad78d5a1d
Author: vernedeng <[email protected]>
AuthorDate: Mon Jul 1 17:18:55 2024 +0800

    [INLONG-10526][Sort] ClsSink support switch metadata acquire mode (#10539)
    
    * [INLONG-10526][Sort] ClsSink support switch metadata acquire mode
    ---------
    
    Co-authored-by: vernedeng <[email protected]>
---
 ...ortClusterConfig.java => ClusterTagConfig.java} |  56 +++----
 .../apache/inlong/common/pojo/sort/SortConfig.java |  14 +-
 .../sort/{SortTaskConfig.java => TaskConfig.java}  |  51 +++----
 .../manager/service/core/impl/SortServiceImpl.java |  16 +-
 .../config/holder/CommonPropertiesHolder.java      |   1 +
 .../config/holder/ManagerUrlHandler.java           |   4 +
 .../config/holder/v2/SortConfigHolder.java         |  31 ++--
 .../v2/SortConfigType.java}                        |  25 +---
 .../loader/CommonPropertiesManagerUrlLoader.java   |  18 +++
 .../standalone/config/loader/ManagerUrlLoader.java |   2 +
 .../loader/SortConfigQueryConsumeConfig.java       |  10 +-
 ...der.java => ClassResourceSortConfigLoader.java} |  10 +-
 ...figLoader.java => ManagerSortConfigLoader.java} |   4 +-
 .../standalone/utils/v2/FlumeConfigGenerator.java  |   4 +-
 .../inlong/sort/standalone/sink/SinkContext.java   | 117 +++------------
 .../sort/standalone/sink/cls/ClsIdConfig.java      |   2 +
 .../sort/standalone/sink/cls/ClsSinkContext.java   |  61 ++++++--
 .../sort/standalone/sink/elasticsearch/EsSink.java |   2 +-
 .../sink/elasticsearch/EsSinkContext.java          |  20 +--
 .../sink/kafka/KafkaFederationSinkContext.java     |  16 +-
 .../sink/pulsar/PulsarFederationSinkContext.java   |  16 +-
 .../sort/standalone/sink/v2/SinkContext.java       | 165 ---------------------
 .../inlong/sort/standalone/v2/SortCluster.java     |   6 +-
 .../apache/inlong/sort/standalone/v2/SortTask.java |   4 +-
 .../{SortClusterConfig.conf => SortConfig.conf}    |   2 +-
 .../src/test/java/common.properties                |   1 +
 .../{SortClusterConfig.conf => SortConfig.conf}    |   2 +-
 .../src/test/resources/common.properties           |   1 +
 28 files changed, 233 insertions(+), 428 deletions(-)

diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/ClusterTagConfig.java
similarity index 73%
rename from 
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
rename to 
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/ClusterTagConfig.java
index 6124ffb834..6ce473a340 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortClusterConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/ClusterTagConfig.java
@@ -35,13 +35,13 @@ import java.util.function.BiFunction;
 @Builder
 @AllArgsConstructor
 @NoArgsConstructor
-public class SortClusterConfig implements Serializable {
+public class ClusterTagConfig implements Serializable {
 
     private String clusterTag;
     private List<MqClusterConfig> mqClusterConfigs;
     private List<DataFlowConfig> dataFlowConfigs;
 
-    public static SortClusterConfig checkDelete(SortClusterConfig last, 
SortClusterConfig current) {
+    public static ClusterTagConfig checkDelete(ClusterTagConfig last, 
ClusterTagConfig current) {
         if (CollectionUtils.isEmpty(current.getMqClusterConfigs())) {
             return last;
         }
@@ -49,11 +49,11 @@ public class SortClusterConfig implements Serializable {
         return check(last, current, MqClusterConfig::batchCheckLast, 
DataFlowConfig::batchCheckDelete);
     }
 
-    public static SortClusterConfig checkNew(SortClusterConfig last, 
SortClusterConfig current) {
+    public static ClusterTagConfig checkNew(ClusterTagConfig last, 
ClusterTagConfig current) {
         return check(last, current, MqClusterConfig::batchCheckLatest, 
DataFlowConfig::batchCheckNew);
     }
 
-    public static SortClusterConfig checkUpdate(SortClusterConfig last, 
SortClusterConfig current) {
+    public static ClusterTagConfig checkUpdate(ClusterTagConfig last, 
ClusterTagConfig current) {
         List<MqClusterConfig> updateCluster =
                 MqClusterConfig.batchCheckUpdate(last.getMqClusterConfigs(), 
current.getMqClusterConfigs());
 
@@ -77,7 +77,7 @@ public class SortClusterConfig implements Serializable {
                     
DataFlowConfig.batchCheckNoUpdate(last.getDataFlowConfigs(), 
current.getDataFlowConfigs());
             noUpdateDataflows.addAll(updateDataflows);
 
-            return SortClusterConfig.builder()
+            return ClusterTagConfig.builder()
                     .clusterTag(last.getClusterTag())
                     .mqClusterConfigs(latestCluster)
                     .dataFlowConfigs(noUpdateDataflows)
@@ -90,14 +90,14 @@ public class SortClusterConfig implements Serializable {
         }
 
         // if only dataflow update, use latest mq and update dataflow
-        return SortClusterConfig.builder()
+        return ClusterTagConfig.builder()
                 .clusterTag(last.getClusterTag())
                 .mqClusterConfigs(latestCluster)
                 .dataFlowConfigs(updateDataflows)
                 .build();
     }
 
-    public static SortClusterConfig checkLatest(SortClusterConfig last, 
SortClusterConfig current) {
+    public static ClusterTagConfig checkLatest(ClusterTagConfig last, 
ClusterTagConfig current) {
         if (CollectionUtils.isEmpty(current.getMqClusterConfigs())) {
             return null;
         }
@@ -105,40 +105,40 @@ public class SortClusterConfig implements Serializable {
         return check(last, current, MqClusterConfig::batchCheckLatest, 
DataFlowConfig::batchCheckLatest);
     }
 
-    public static List<SortClusterConfig> batchCheckDelete(
-            List<SortClusterConfig> last,
-            List<SortClusterConfig> current) {
+    public static List<ClusterTagConfig> batchCheckDelete(
+            List<ClusterTagConfig> last,
+            List<ClusterTagConfig> current) {
         return SortConfigUtil.batchCheckDeleteRecursive(last, current,
-                SortClusterConfig::getClusterTag, 
SortClusterConfig::checkDelete);
+                ClusterTagConfig::getClusterTag, 
ClusterTagConfig::checkDelete);
     }
 
-    public static List<SortClusterConfig> batchCheckNew(
-            List<SortClusterConfig> last,
-            List<SortClusterConfig> current) {
+    public static List<ClusterTagConfig> batchCheckNew(
+            List<ClusterTagConfig> last,
+            List<ClusterTagConfig> current) {
         return SortConfigUtil.batchCheckNewRecursive(last, current,
-                SortClusterConfig::getClusterTag, SortClusterConfig::checkNew);
+                ClusterTagConfig::getClusterTag, ClusterTagConfig::checkNew);
     }
 
-    public static List<SortClusterConfig> batchCheckUpdate(
-            List<SortClusterConfig> last,
-            List<SortClusterConfig> current) {
+    public static List<ClusterTagConfig> batchCheckUpdate(
+            List<ClusterTagConfig> last,
+            List<ClusterTagConfig> current) {
         return SortConfigUtil.batchCheckUpdateRecursive(last, current,
-                SortClusterConfig::getClusterTag, 
SortClusterConfig::checkUpdate);
+                ClusterTagConfig::getClusterTag, 
ClusterTagConfig::checkUpdate);
     }
 
-    public static List<SortClusterConfig> batchCheckLatest(
-            List<SortClusterConfig> last,
-            List<SortClusterConfig> current) {
+    public static List<ClusterTagConfig> batchCheckLatest(
+            List<ClusterTagConfig> last,
+            List<ClusterTagConfig> current) {
         return SortConfigUtil.batchCheckLatestRecursive(last, current,
-                SortClusterConfig::getClusterTag, 
SortClusterConfig::checkLatest);
+                ClusterTagConfig::getClusterTag, 
ClusterTagConfig::checkLatest);
     }
 
-    public static SortClusterConfig check(
-            SortClusterConfig last, SortClusterConfig current,
+    public static ClusterTagConfig check(
+            ClusterTagConfig last, ClusterTagConfig current,
             BiFunction<List<MqClusterConfig>, List<MqClusterConfig>, 
List<MqClusterConfig>> mqCheckFunction,
             BiFunction<List<DataFlowConfig>, List<DataFlowConfig>, 
List<DataFlowConfig>> flowCheckFunction) {
 
-        List<MqClusterConfig> checkCluster = mqCheckFunction
+        List<MqClusterConfig> checkMqCluster = mqCheckFunction
                 .apply(last.getMqClusterConfigs(), 
current.getMqClusterConfigs());
         List<DataFlowConfig> checkDataflows = flowCheckFunction
                 .apply(last.getDataFlowConfigs(), 
current.getDataFlowConfigs());
@@ -147,9 +147,9 @@ public class SortClusterConfig implements Serializable {
             return null;
         }
 
-        return SortClusterConfig.builder()
+        return ClusterTagConfig.builder()
                 .clusterTag(last.getClusterTag())
-                .mqClusterConfigs(checkCluster)
+                .mqClusterConfigs(checkMqCluster)
                 .dataFlowConfigs(checkDataflows)
                 .build();
     }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
index 5f0b6c0b6d..39b7572f88 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortConfig.java
@@ -34,7 +34,7 @@ import java.util.function.BiFunction;
 public class SortConfig implements Serializable {
 
     private String sortClusterName;
-    private List<SortTaskConfig> tasks;
+    private List<TaskConfig> tasks;
 
     public static SortConfig checkLatest(SortConfig last, SortConfig current) {
         if (last == null) {
@@ -45,7 +45,7 @@ public class SortConfig implements Serializable {
         }
         return SortConfig.builder()
                 .sortClusterName(current.getSortClusterName())
-                .tasks(SortTaskConfig.batchCheckLatest(last.getTasks(), 
current.getTasks()))
+                .tasks(TaskConfig.batchCheckLatest(last.getTasks(), 
current.getTasks()))
                 .build();
     }
 
@@ -56,14 +56,14 @@ public class SortConfig implements Serializable {
         if (current == null) {
             return last;
         }
-        return check(last, current, SortTaskConfig::batchCheckDelete);
+        return check(last, current, TaskConfig::batchCheckDelete);
     }
 
     public static SortConfig checkUpdate(SortConfig last, SortConfig current) {
         if (last == null || current == null) {
             return null;
         }
-        return check(last, current, SortTaskConfig::batchCheckUpdate);
+        return check(last, current, TaskConfig::batchCheckUpdate);
     }
 
     public static SortConfig checkNew(SortConfig last, SortConfig current) {
@@ -73,17 +73,17 @@ public class SortConfig implements Serializable {
         if (current == null) {
             return null;
         }
-        return check(last, current, SortTaskConfig::batchCheckNew);
+        return check(last, current, TaskConfig::batchCheckNew);
     }
 
     public static SortConfig check(
             SortConfig last, SortConfig current,
-            BiFunction<List<SortTaskConfig>, List<SortTaskConfig>, 
List<SortTaskConfig>> taskCheckFunction) {
+            BiFunction<List<TaskConfig>, List<TaskConfig>, List<TaskConfig>> 
taskCheckFunction) {
         if (!last.getSortClusterName().equals(current.getSortClusterName())) {
             return null;
         }
 
-        List<SortTaskConfig> checkTasks = 
taskCheckFunction.apply(last.getTasks(), current.getTasks());
+        List<TaskConfig> checkTasks = taskCheckFunction.apply(last.getTasks(), 
current.getTasks());
         if (CollectionUtils.isEmpty(checkTasks)) {
             return null;
         }
diff --git 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/TaskConfig.java
similarity index 59%
rename from 
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
rename to 
inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/TaskConfig.java
index 107efcec80..69dd63cd53 100644
--- 
a/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/SortTaskConfig.java
+++ 
b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sort/TaskConfig.java
@@ -34,39 +34,39 @@ import java.util.function.BiFunction;
 @Builder
 @AllArgsConstructor
 @NoArgsConstructor
-public class SortTaskConfig implements Serializable {
+public class TaskConfig implements Serializable {
 
     private String sortTaskName;
-    private List<SortClusterConfig> clusters;
+    private List<ClusterTagConfig> clusterTagConfigs;
     private NodeConfig nodeConfig;
 
-    public static List<SortTaskConfig> batchCheckDelete(List<SortTaskConfig> 
last, List<SortTaskConfig> current) {
+    public static List<TaskConfig> batchCheckDelete(List<TaskConfig> last, 
List<TaskConfig> current) {
         return SortConfigUtil.batchCheckDeleteRecursive(last, current,
-                SortTaskConfig::getSortTaskName, SortTaskConfig::checkDelete);
+                TaskConfig::getSortTaskName, TaskConfig::checkDelete);
     }
 
-    public static List<SortTaskConfig> batchCheckUpdate(List<SortTaskConfig> 
last, List<SortTaskConfig> current) {
+    public static List<TaskConfig> batchCheckUpdate(List<TaskConfig> last, 
List<TaskConfig> current) {
         return SortConfigUtil.batchCheckUpdateRecursive(last, current,
-                SortTaskConfig::getSortTaskName, SortTaskConfig::checkUpdate);
+                TaskConfig::getSortTaskName, TaskConfig::checkUpdate);
     }
 
-    public static List<SortTaskConfig> batchCheckNew(List<SortTaskConfig> 
last, List<SortTaskConfig> current) {
+    public static List<TaskConfig> batchCheckNew(List<TaskConfig> last, 
List<TaskConfig> current) {
         return SortConfigUtil.batchCheckNewRecursive(last, current,
-                SortTaskConfig::getSortTaskName, SortTaskConfig::checkNew);
+                TaskConfig::getSortTaskName, TaskConfig::checkNew);
     }
 
-    public static List<SortTaskConfig> batchCheckLatest(List<SortTaskConfig> 
latest, List<SortTaskConfig> current) {
+    public static List<TaskConfig> batchCheckLatest(List<TaskConfig> latest, 
List<TaskConfig> current) {
         return SortConfigUtil.batchCheckLatestRecursive(latest, current,
-                SortTaskConfig::getSortTaskName, SortTaskConfig::checkLatest);
+                TaskConfig::getSortTaskName, TaskConfig::checkLatest);
     }
 
-    public static SortTaskConfig checkDelete(SortTaskConfig last, 
SortTaskConfig current) {
-        return check(last, current, SortClusterConfig::batchCheckDelete,
+    public static TaskConfig checkDelete(TaskConfig last, TaskConfig current) {
+        return check(last, current, ClusterTagConfig::batchCheckDelete,
                 (lastNode, currentNode) -> lastNode);
     }
 
-    public static SortTaskConfig checkUpdate(SortTaskConfig last, 
SortTaskConfig current) {
-        return check(last, current, SortClusterConfig::batchCheckUpdate,
+    public static TaskConfig checkUpdate(TaskConfig last, TaskConfig current) {
+        return check(last, current, ClusterTagConfig::batchCheckUpdate,
                 (lastNode, currentNode) -> {
                     if (lastNode == null || currentNode == null) {
                         return null;
@@ -75,8 +75,8 @@ public class SortTaskConfig implements Serializable {
                 });
     }
 
-    public static SortTaskConfig checkNew(SortTaskConfig last, SortTaskConfig 
current) {
-        return check(last, current, SortClusterConfig::batchCheckNew,
+    public static TaskConfig checkNew(TaskConfig last, TaskConfig current) {
+        return check(last, current, ClusterTagConfig::batchCheckNew,
                 (lastNode, currentNode) -> {
                     if (lastNode == null || currentNode == null) {
                         return null;
@@ -85,8 +85,8 @@ public class SortTaskConfig implements Serializable {
                 });
     }
 
-    public static SortTaskConfig checkLatest(SortTaskConfig last, 
SortTaskConfig current) {
-        return check(last, current, SortClusterConfig::batchCheckLatest,
+    public static TaskConfig checkLatest(TaskConfig last, TaskConfig current) {
+        return check(last, current, ClusterTagConfig::batchCheckLatest,
                 (lastNode, currentNode) -> {
                     if (lastNode == null || currentNode == null) {
                         return null;
@@ -95,23 +95,24 @@ public class SortTaskConfig implements Serializable {
                 });
     }
 
-    public static SortTaskConfig check(
-            SortTaskConfig last, SortTaskConfig current,
-            BiFunction<List<SortClusterConfig>, List<SortClusterConfig>, 
List<SortClusterConfig>> clusterCheckFunction,
+    public static TaskConfig check(
+            TaskConfig last, TaskConfig current,
+            BiFunction<List<ClusterTagConfig>, List<ClusterTagConfig>, 
List<ClusterTagConfig>> clusterCheckFunction,
             BiFunction<NodeConfig, NodeConfig, NodeConfig> nodeCheckFunction) {
 
-        List<SortClusterConfig> checkCluster = 
clusterCheckFunction.apply(last.getClusters(), current.getClusters());
+        List<ClusterTagConfig> checkClusterTags =
+                clusterCheckFunction.apply(last.getClusterTagConfigs(), 
current.getClusterTagConfigs());
 
         NodeConfig checkNode = nodeCheckFunction.apply(last.getNodeConfig(), 
current.getNodeConfig());
 
-        if (CollectionUtils.isEmpty(checkCluster) && checkNode == null) {
+        if (CollectionUtils.isEmpty(checkClusterTags) && checkNode == null) {
             return null;
         }
 
-        return SortTaskConfig
+        return TaskConfig
                 .builder()
                 .sortTaskName(last.getSortTaskName())
-                .clusters(checkCluster)
+                .clusterTagConfigs(checkClusterTags)
                 .nodeConfig(checkNode)
                 .build();
     }
diff --git 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 507cc1fc1a..2f87d3b92e 100644
--- 
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ 
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -18,10 +18,10 @@
 package org.apache.inlong.manager.service.core.impl;
 
 import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
 import org.apache.inlong.common.pojo.sort.SortConfig;
 import org.apache.inlong.common.pojo.sort.SortConfigResponse;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
 import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
@@ -272,7 +272,7 @@ public class SortServiceImpl implements SortService, 
PluginBinder {
         ObjectMapper objectMapper = new ObjectMapper();
         Map<String, byte[]> sortConfigs = new HashMap<>();
         Map<String, String> sortConfigMd5s = new HashMap<>();
-        Map<String, List<SortTaskConfig>> temp = new HashMap<>();
+        Map<String, List<TaskConfig>> temp = new HashMap<>();
         List<SortConfigEntity> sinkConfigEntityList = 
configLoader.loadAllSortConfigEntity();
         for (SortConfigEntity sortConfigEntity : sinkConfigEntityList) {
             if (StringUtils.isBlank(sortConfigEntity.getSortTaskName())) {
@@ -284,16 +284,16 @@ public class SortServiceImpl implements SortService, 
PluginBinder {
                         
Collectors.groupingBy(SortConfigEntity::getSortTaskName,
                                 
Collectors.groupingBy(SortConfigEntity::getInlongClusterTag))));
         for (String sortClusterName : cluster2SinkMap.keySet()) {
-            List<SortTaskConfig> map = temp.computeIfAbsent(sortClusterName,
+            List<TaskConfig> map = temp.computeIfAbsent(sortClusterName,
                     v -> new ArrayList<>());
             SortConfig sortConfig = new SortConfig();
             sortConfig.setSortClusterName(sortClusterName);
             Map<String, Map<String, List<SortConfigEntity>>> sortTaskNameMap = 
cluster2SinkMap.get(sortClusterName);
             for (String sortTaskName : sortTaskNameMap.keySet()) {
                 Map<String, List<SortConfigEntity>> clusterTagMap = 
sortTaskNameMap.get(sortTaskName);
-                SortTaskConfig sortTaskConfig = SortTaskConfig.builder()
+                TaskConfig sortTaskConfig = TaskConfig.builder()
                         .sortTaskName(sortTaskName)
-                        .clusters(new ArrayList<>())
+                        .clusterTagConfigs(new ArrayList<>())
                         .nodeConfig(nodeInfoMap.get(sortTaskName))
                         .build();
                 for (String clusterTag : clusterTagMap.keySet()) {
@@ -308,12 +308,12 @@ public class SortServiceImpl implements SortService, 
PluginBinder {
                     }).filter(Objects::nonNull)
                             .sorted(Comparator.comparingInt(x -> 
Integer.parseInt(x.getDataflowId())))
                             .collect(Collectors.toList());
-                    SortClusterConfig sortClusterConfig = 
SortClusterConfig.builder()
+                    ClusterTagConfig sortClusterConfig = 
ClusterTagConfig.builder()
                             
.mqClusterConfigs(mqClusterConfigMap.getOrDefault(clusterTag, new 
ArrayList<>()))
                             .clusterTag(clusterTag)
                             .dataFlowConfigs(dataFlowConfigs)
                             .build();
-                    sortTaskConfig.getClusters().add(sortClusterConfig);
+                    
sortTaskConfig.getClusterTagConfigs().add(sortClusterConfig);
                 }
                 map.add(sortTaskConfig);
             }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
index b91e5801c8..1d8447121c 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/CommonPropertiesHolder.java
@@ -40,6 +40,7 @@ public class CommonPropertiesHolder {
     public static final String KEY_COMMON_PROPERTIES = 
"common_properties_loader";
     public static final String KEY_CLUSTER_ID = "clusterId";
     public static final String KEY_SORT_SOURCE_ACKPOLICY = 
"sortSource.ackPolicy";
+    public static final String KEY_USE_UNIFIED_CONFIGURATION = 
"useUnifiedConfiguration";
 
     private static Map<String, String> props;
     private static Context context;
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java
index 25deaf87a5..df357743cb 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/ManagerUrlHandler.java
@@ -66,6 +66,10 @@ public class ManagerUrlHandler {
         return get().acquireSortClusterConfigUrl();
     }
 
+    public static String getSortConfigUrl() {
+        return get().acquireSortConfigUrl();
+    }
+
     private static ManagerUrlLoader get() {
         if (instance != null) {
             return instance;
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
index f1c52d2fc4..7a3ea1ac0f 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
@@ -17,14 +17,13 @@
 
 package org.apache.inlong.sort.standalone.config.holder.v2;
 
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
 import org.apache.inlong.common.pojo.sort.SortConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
-import 
org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader;
-import 
org.apache.inlong.sort.standalone.config.loader.v2.ManagerSortClusterConfigLoader;
+import 
org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortConfigLoader;
+import 
org.apache.inlong.sort.standalone.config.loader.v2.ManagerSortConfigLoader;
 import org.apache.inlong.sort.standalone.config.loader.v2.SortConfigLoader;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 
@@ -71,12 +70,12 @@ public class SortConfigHolder {
             instance = new SortConfigHolder();
             instance.reloadInterval = 
CommonPropertiesHolder.getLong(RELOAD_INTERVAL, 60000L);
             String loaderType = CommonPropertiesHolder
-                    .getString(SortClusterConfigType.KEY_TYPE, 
SortClusterConfigType.MANAGER.name());
+                    .getString(SortConfigType.KEY_TYPE, 
SortConfigType.MANAGER.name());
 
-            if 
(SortClusterConfigType.FILE.name().equalsIgnoreCase(loaderType)) {
-                instance.loader = new ClassResourceSortClusterConfigLoader();
-            } else if 
(SortClusterConfigType.MANAGER.name().equalsIgnoreCase(loaderType)) {
-                instance.loader = new ManagerSortClusterConfigLoader();
+            if (SortConfigType.FILE.name().equalsIgnoreCase(loaderType)) {
+                instance.loader = new ClassResourceSortConfigLoader();
+            } else if 
(SortConfigType.MANAGER.name().equalsIgnoreCase(loaderType)) {
+                instance.loader = new ManagerSortConfigLoader();
             } else {
                 // user-defined
                 try {
@@ -90,7 +89,7 @@ public class SortConfigHolder {
                 }
             }
             if (instance.loader == null) {
-                instance.loader = new ClassResourceSortClusterConfigLoader();
+                instance.loader = new ClassResourceSortConfigLoader();
             }
             try {
                 instance.loader.configure(new 
Context(CommonPropertiesHolder.get()));
@@ -125,10 +124,10 @@ public class SortConfigHolder {
             // <SortTaskName, <InlongId, AuditTag>>
             this.auditTagMap = newConfig.getTasks()
                     .stream()
-                    .collect(Collectors.toMap(SortTaskConfig::getSortTaskName,
-                            v -> v.getClusters()
+                    .collect(Collectors.toMap(TaskConfig::getSortTaskName,
+                            v -> v.getClusterTagConfigs()
                                     .stream()
-                                    .map(SortClusterConfig::getDataFlowConfigs)
+                                    .map(ClusterTagConfig::getDataFlowConfigs)
                                     .flatMap(Collection::stream)
                                     .filter(flow -> 
StringUtils.isNotEmpty(flow.getAuditTag()))
                                     .collect(Collectors.toMap(flow -> 
InlongId.generateUid(flow.getInlongGroupId(),
@@ -145,10 +144,10 @@ public class SortConfigHolder {
         return get().config;
     }
 
-    public static SortTaskConfig getTaskConfig(String sortTaskName) {
+    public static TaskConfig getTaskConfig(String sortTaskName) {
         SortConfig config = get().config;
         if (config != null && config.getTasks() != null) {
-            for (SortTaskConfig task : config.getTasks()) {
+            for (TaskConfig task : config.getTasks()) {
                 if (StringUtils.equals(sortTaskName, task.getSortTaskName())) {
                     return task;
                 }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigType.java
similarity index 63%
copy from 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
copy to 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigType.java
index 4b6f4f3d00..d5196625b6 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigType.java
@@ -15,26 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.sort.standalone.config.loader;
+package org.apache.inlong.sort.standalone.config.holder.v2;
 
-import org.apache.flume.conf.Configurable;
+public enum SortConfigType {
 
-/**
- * Interface of ManagerUrlLoader.
- */
-public interface ManagerUrlLoader extends Configurable {
-
-    /**
-     * Acquire SortSourceConfigUrl
-     *
-     * @return SortSourceConfigUrl
-     */
-    String acquireSortSourceConfigUrl();
+    FILE, MANAGER, USER_DEFINED;
 
-    /**
-     * Acquire SortClusterConfigUrl
-     *
-     * @return SortClusterConfigUrl
-     */
-    String acquireSortClusterConfigUrl();
+    public static final String KEY_TYPE = "sortConfig.type";
+    public static final String KEY_FILE = "sortConfig.file";
+    public static final String DEFAULT_FILE = "SortConfig.conf";
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesManagerUrlLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesManagerUrlLoader.java
index a02538ca77..089f8fbe60 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesManagerUrlLoader.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/CommonPropertiesManagerUrlLoader.java
@@ -33,9 +33,11 @@ public class CommonPropertiesManagerUrlLoader implements 
ManagerUrlLoader {
     private static final Logger LOG = 
InlongLoggerFactory.getLogger(CommonPropertiesManagerUrlLoader.class);
     private static final String KEY_SORT_CLUSTER_CONFIG_MANAGER_URL = 
"sortClusterConfig.managerUrl";
     private static final String KEY_SORT_SOURCE_CONFIG_MANAGER_URL = 
"sortSourceConfig.managerUrl";
+    private static final String KEY_SORT_CONFIG_MANAGER_URL = 
"sortConfig.managerUrl";
 
     private String sortSourceConfigUrl;
     private String sortClusterConfigUrl;
+    private String sortConfigUrl;
     public Context context;
 
     @Override
@@ -70,6 +72,22 @@ public class CommonPropertiesManagerUrlLoader implements 
ManagerUrlLoader {
         return sortClusterConfigUrl;
     }
 
+    @Override
+    public String acquireSortConfigUrl() {
+        if (sortConfigUrl != null) {
+            return sortConfigUrl;
+        }
+        sortConfigUrl = context.getString(KEY_SORT_CONFIG_MANAGER_URL);
+        if (StringUtils.isBlank(sortConfigUrl)) {
+            String warnMsg = "Get key" + KEY_SORT_CONFIG_MANAGER_URL
+                    + " from CommonPropertiesHolder failed, it's a optional 
property use to specify "
+                    + "the url where Sort-Standalone request 
SortSourceConfig.";
+            LOG.warn(warnMsg);
+            sortConfigUrl = warnMsg;
+        }
+        return sortConfigUrl;
+    }
+
     @Override
     public void configure(Context context) {
         Optional.ofNullable(context).ifPresent(c -> this.context = c);
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
index 4b6f4f3d00..4e133369b6 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/ManagerUrlLoader.java
@@ -37,4 +37,6 @@ public interface ManagerUrlLoader extends Configurable {
      * @return SortClusterConfigUrl
      */
     String acquireSortClusterConfigUrl();
+
+    String acquireSortConfigUrl();
 }
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
index 3f893041a6..079044f020 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/SortConfigQueryConsumeConfig.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.sort.standalone.config.loader;
 
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
 import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
 import org.apache.inlong.common.pojo.sort.mq.PulsarClusterConfig;
@@ -49,8 +49,8 @@ public class SortConfigQueryConsumeConfig implements 
QueryConsumeConfig {
 
     @Override
     public ConsumeConfig queryCurrentConsumeConfig(String sortTaskId) {
-        SortTaskConfig taskConfig = SortConfigHolder.getTaskConfig(sortTaskId);
-        List<InLongTopic> topics = taskConfig.getClusters()
+        TaskConfig taskConfig = SortConfigHolder.getTaskConfig(sortTaskId);
+        List<InLongTopic> topics = taskConfig.getClusterTagConfigs()
                 .stream()
                 .map(this::parseTopics)
                 .flatMap(Collection::stream)
@@ -59,7 +59,7 @@ public class SortConfigQueryConsumeConfig implements 
QueryConsumeConfig {
         return new ConsumeConfig(topics);
     }
 
-    public List<InLongTopic> parseTopics(SortClusterConfig clusterConfig) {
+    public List<InLongTopic> parseTopics(ClusterTagConfig clusterConfig) {
         List<InLongTopic> topics = new ArrayList<>();
         List<MqClusterConfig> mqClusterConfigs = 
clusterConfig.getMqClusterConfigs();
         List<DataFlowConfig> dataFlowConfigs = 
clusterConfig.getDataFlowConfigs();
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortConfigLoader.java
similarity index 84%
rename from 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
rename to 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortConfigLoader.java
index 0d3cd08089..77f8580f00 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortClusterConfigLoader.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ClassResourceSortConfigLoader.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sort.standalone.config.loader.v2;
 
 import org.apache.inlong.common.pojo.sort.SortConfig;
-import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigType;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigType;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import org.apache.commons.io.IOUtils;
@@ -28,18 +28,18 @@ import org.slf4j.Logger;
 
 import java.nio.charset.Charset;
 
-public class ClassResourceSortClusterConfigLoader implements SortConfigLoader {
+public class ClassResourceSortConfigLoader implements SortConfigLoader {
 
-    public static final Logger LOG = 
InlongLoggerFactory.getLogger(ClassResourceSortClusterConfigLoader.class);
+    public static final Logger LOG = 
InlongLoggerFactory.getLogger(ClassResourceSortConfigLoader.class);
     private Context context;
     private ObjectMapper objectMapper = new ObjectMapper();
 
     @Override
     public SortConfig load() {
-        String fileName = SortClusterConfigType.DEFAULT_FILE;
+        String fileName = SortConfigType.DEFAULT_FILE;
         try {
             if (context != null) {
-                fileName = context.getString(SortClusterConfigType.KEY_FILE, 
SortClusterConfigType.DEFAULT_FILE);
+                fileName = context.getString(SortConfigType.KEY_FILE, 
SortConfigType.DEFAULT_FILE);
             }
             String confString = 
IOUtils.toString(getClass().getClassLoader().getResource(fileName),
                     Charset.defaultCharset());
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortConfigLoader.java
similarity index 96%
rename from 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
rename to 
inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortConfigLoader.java
index 44b55ee4c8..229b53c591 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortClusterConfigLoader.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/loader/v2/ManagerSortConfigLoader.java
@@ -38,7 +38,7 @@ import org.apache.http.util.EntityUtils;
 import java.util.concurrent.TimeUnit;
 
 @Slf4j
-public class ManagerSortClusterConfigLoader implements SortConfigLoader {
+public class ManagerSortConfigLoader implements SortConfigLoader {
 
     private Context context;
     private CloseableHttpClient httpClient;
@@ -66,7 +66,7 @@ public class ManagerSortClusterConfigLoader implements 
SortConfigLoader {
         HttpGet httpGet = null;
         try {
             String clusterName = 
this.context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
-            String url = ManagerUrlHandler.getSortClusterConfigUrl() + 
"?clusterName="
+            String url = ManagerUrlHandler.getSortConfigUrl() + "?clusterName="
                     + clusterName + "&md5=";
             if (StringUtils.isNotBlank(this.md5)) {
                 url += this.md5;
diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
index 3edf16af72..ded015d43b 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/utils/v2/FlumeConfigGenerator.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.sort.standalone.utils.v2;
 
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 
 import java.util.HashMap;
@@ -36,7 +36,7 @@ public class FlumeConfigGenerator {
     public static final String KEY_ROLLBACK_START_TIME = "rollback.startTime";
     public static final String KEY_ROLLBACK_STOP_TIME = "rollback.stopTime";
 
-    public static Map<String, String> 
generateFlumeConfiguration(SortTaskConfig taskConfig) {
+    public static Map<String, String> generateFlumeConfiguration(TaskConfig 
taskConfig) {
         Map<String, String> flumeConf = new HashMap<>();
         String sortTaskName = taskConfig.getSortTaskName();
         appendChannels(flumeConf, sortTaskName);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
index 16b15ab8b9..36e3d11254 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/SinkContext.java
@@ -18,10 +18,12 @@
 package org.apache.inlong.sort.standalone.sink;
 
 import org.apache.inlong.common.metric.MetricRegister;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
+import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
 import org.apache.inlong.sort.standalone.utils.BufferQueue;
@@ -37,85 +39,62 @@ import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 
-/**
- * 
- * SinkContext
- */
-@Deprecated
 public class SinkContext {
 
     public static final Logger LOG = 
InlongLoggerFactory.getLogger(SinkContext.class);
-
     public static final String KEY_MAX_THREADS = "maxThreads";
     public static final String KEY_PROCESSINTERVAL = "processInterval";
     public static final String KEY_RELOADINTERVAL = "reloadInterval";
     public static final String KEY_TASK_NAME = "taskName";
     public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = 
"maxBufferQueueSizeKb";
     public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
-
     protected final String clusterId;
     protected final String taskName;
     protected final String sinkName;
     protected final Context sinkContext;
-
+    protected TaskConfig taskConfig;
+    @Deprecated
     protected SortTaskConfig sortTaskConfig;
-
     protected final Channel channel;
-    //
     protected final int maxThreads;
     protected final long processInterval;
     protected final long reloadInterval;
-    //
+    protected final boolean unifiedConfiguration;
     protected final SortMetricItemSet metricItemSet;
     protected Timer reloadTimer;
 
-    /**
-     * Constructor
-     * 
-     * @param sinkName
-     * @param context
-     * @param channel
-     */
     public SinkContext(String sinkName, Context context, Channel channel) {
         this.sinkName = sinkName;
         this.sinkContext = context;
         this.channel = channel;
-        this.clusterId = 
context.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
-        this.taskName = context.getString(KEY_TASK_NAME);
+        this.clusterId = 
sinkContext.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
+        this.taskName = sinkContext.getString(KEY_TASK_NAME);
         this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
         this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 
100);
         this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
-        //
         this.metricItemSet = new SortMetricItemSet(sinkName);
+        this.unifiedConfiguration = 
sinkContext.getBoolean(CommonPropertiesHolder.KEY_USE_UNIFIED_CONFIGURATION,
+                false);
         MetricRegister.register(this.metricItemSet);
     }
 
-    /**
-     * start
-     */
     public void start() {
         try {
             this.reload();
             this.setReloadTimer();
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            LOG.error("failed to start sink context", e);
         }
     }
 
-    /**
-     * close
-     */
     public void close() {
         try {
             this.reloadTimer.cancel();
         } catch (Exception e) {
-            LOG.error(e.getMessage(), e);
+            LOG.error("failed to close sink context", e);
         }
     }
 
-    /**
-     * setReloadTimer
-     */
     protected void setReloadTimer() {
         reloadTimer = new Timer(true);
         TimerTask task = new TimerTask() {
@@ -127,113 +106,63 @@ public class SinkContext {
         reloadTimer.schedule(task, new Date(System.currentTimeMillis() + 
reloadInterval), reloadInterval);
     }
 
-    /**
-     * reload
-     */
     public void reload() {
         try {
             this.sortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
+            this.taskConfig = SortConfigHolder.getTaskConfig(taskName);
         } catch (Throwable e) {
-            LOG.error(e.getMessage(), e);
+            LOG.error("failed to stop sink context", e);
         }
     }
 
-    /**
-     * get clusterId
-     * 
-     * @return the clusterId
-     */
     public String getClusterId() {
         return clusterId;
     }
 
-    /**
-     * get taskName
-     * 
-     * @return the taskName
-     */
     public String getTaskName() {
         return taskName;
     }
 
-    /**
-     * get sinkName
-     * 
-     * @return the sinkName
-     */
     public String getSinkName() {
         return sinkName;
     }
 
-    /**
-     * get sinkContext
-     * 
-     * @return the sinkContext
-     */
     public Context getSinkContext() {
         return sinkContext;
     }
 
-    /**
-     * get sortTaskConfig
-     * 
-     * @return the sortTaskConfig
-     */
+    public TaskConfig getTaskConfig() {
+        return taskConfig;
+    }
+
     public SortTaskConfig getSortTaskConfig() {
         return sortTaskConfig;
     }
 
-    /**
-     * get channel
-     * 
-     * @return the channel
-     */
+    public boolean isUnifiedConfiguration() {
+        return unifiedConfiguration;
+    }
+
     public Channel getChannel() {
         return channel;
     }
 
-    /**
-     * get maxThreads
-     * 
-     * @return the maxThreads
-     */
     public int getMaxThreads() {
         return maxThreads;
     }
 
-    /**
-     * get processInterval
-     * 
-     * @return the processInterval
-     */
     public long getProcessInterval() {
         return processInterval;
     }
 
-    /**
-     * get reloadInterval
-     * 
-     * @return the reloadInterval
-     */
     public long getReloadInterval() {
         return reloadInterval;
     }
 
-    /**
-     * get metricItemSet
-     * 
-     * @return the metricItemSet
-     */
     public SortMetricItemSet getMetricItemSet() {
         return metricItemSet;
     }
 
-    /**
-     * fillInlongId
-     *
-     * @param currentRecord
-     * @param dimensions
-     */
     public static void fillInlongId(ProfileEvent currentRecord, Map<String, 
String> dimensions) {
         String inlongGroupId = currentRecord.getInlongGroupId();
         inlongGroupId = (StringUtils.isBlank(inlongGroupId)) ? "-" : 
inlongGroupId;
@@ -243,10 +172,6 @@ public class SinkContext {
         dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId);
     }
 
-    /**
-     * createBufferQueue
-     * @return
-     */
     public static <U> BufferQueue<U> createBufferQueue() {
         int maxBufferQueueSizeKb = 
CommonPropertiesHolder.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB,
                 DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
index f0fd784acb..3c167be300 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsIdConfig.java
@@ -25,6 +25,7 @@ import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 
 import java.util.List;
@@ -37,6 +38,7 @@ import java.util.stream.Collectors;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
+@EqualsAndHashCode
 public class ClsIdConfig {
 
     private String inlongGroupId;
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
index 34bddaed32..09c8742c04 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.java
@@ -17,18 +17,22 @@
 
 package org.apache.inlong.sort.standalone.sink.cls;
 
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.common.pojo.sort.node.ClsNodeConfig;
+import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
+import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
+import org.apache.inlong.sort.standalone.utils.Constants;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.tencentcloudapi.cls.producer.AsyncProducerClient;
@@ -97,20 +101,24 @@ public class ClsSinkContext extends SinkContext {
                 }
             });
 
-            SortTaskConfig newSortTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
-            if (newSortTaskConfig == null || 
newSortTaskConfig.equals(sortTaskConfig)) {
+            TaskConfig newTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
+            SortTaskConfig newSortTaskConfig = 
SortClusterConfigHolder.getTaskConfig(taskName);
+            if ((newTaskConfig == null || newTaskConfig.equals(taskConfig))
+                    && (newSortTaskConfig == null || 
newSortTaskConfig.equals(sortTaskConfig))) {
                 return;
             }
-            LOG.info("get new SortTaskConfig:taskName:{}:config:{}", taskName,
-                    objectMapper.writeValueAsString(newSortTaskConfig));
-            this.sortTaskConfig = newSortTaskConfig;
-            ClsNodeConfig requestNodeConfig = (ClsNodeConfig) 
sortTaskConfig.getNodeConfig();
+            LOG.info("get new SortTaskConfig:taskName:{}", taskName);
+            ClsNodeConfig requestNodeConfig = (ClsNodeConfig) 
newTaskConfig.getNodeConfig();
             if (clsNodeConfig == null || requestNodeConfig.getVersion() > 
clsNodeConfig.getVersion()) {
                 this.clsNodeConfig = requestNodeConfig;
             }
-            this.keywordMaxLength = DEFAULT_KEYWORD_MAX_LENGTH;
-            this.reloadIdParams();
-            this.reloadClients();
+            this.taskConfig = newTaskConfig;
+            this.sortTaskConfig = newSortTaskConfig;
+
+            Map<String, ClsIdConfig> fromTaskConfig = 
reloadIdParamsFromTaskConfig(taskConfig, clsNodeConfig);
+            Map<String, ClsIdConfig> fromSortTaskConfig = 
reloadIdParamsFromSortTaskConfig(sortTaskConfig);
+            idConfigMap = unifiedConfiguration ? fromTaskConfig : 
fromSortTaskConfig;
+            this.reloadClients(idConfigMap);
             this.reloadHandler();
         } catch (Exception e) {
             LOG.error(e.getMessage(), e);
@@ -134,10 +142,13 @@ public class ClsSinkContext extends SinkContext {
         }
     }
 
-    private void reloadIdParams() {
-        this.idConfigMap = this.sortTaskConfig.getClusters()
+    private Map<String, ClsIdConfig> reloadIdParamsFromTaskConfig(TaskConfig 
taskConfig, ClsNodeConfig clsNodeConfig) {
+        if (taskConfig == null) {
+            return new HashMap<>();
+        }
+        return taskConfig.getClusterTagConfigs()
                 .stream()
-                .map(SortClusterConfig::getDataFlowConfigs)
+                .map(ClusterTagConfig::getDataFlowConfigs)
                 .flatMap(Collection::stream)
                 .map(dataFlowConfig -> ClsIdConfig.create(dataFlowConfig, 
clsNodeConfig))
                 .collect(Collectors.toMap(
@@ -145,7 +156,25 @@ public class ClsSinkContext extends SinkContext {
                         v -> v));
     }
 
-    private void reloadClients() {
+    private Map<String, ClsIdConfig> 
reloadIdParamsFromSortTaskConfig(SortTaskConfig sortTaskConfig)
+            throws JsonProcessingException {
+        if (sortTaskConfig == null) {
+            return new HashMap<>();
+        }
+        List<Map<String, String>> idList = this.sortTaskConfig.getIdParams();
+        Map<String, ClsIdConfig> newIdConfigMap = new ConcurrentHashMap<>();
+        for (Map<String, String> idParam : idList) {
+            String inlongGroupId = idParam.get(Constants.INLONG_GROUP_ID);
+            String inlongStreamId = idParam.get(Constants.INLONG_STREAM_ID);
+            String uid = InlongId.generateUid(inlongGroupId, inlongStreamId);
+            String jsonIdConfig = objectMapper.writeValueAsString(idParam);
+            ClsIdConfig idConfig = objectMapper.readValue(jsonIdConfig, 
ClsIdConfig.class);
+            newIdConfigMap.put(uid, idConfig);
+        }
+        return newIdConfigMap;
+    }
+
+    private void reloadClients(Map<String, ClsIdConfig> idConfigMap) {
         // get update secretIds
         Map<String, ClsIdConfig> updateConfigMap = idConfigMap.values()
                 .stream()
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
index a6d3dba5b3..f283e5c855 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSink.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
-import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
 import org.apache.inlong.sort.standalone.utils.BufferQueue;
 
 import org.apache.flume.Context;
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
index e9dcb26a7f..6357dc8330 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/elasticsearch/EsSinkContext.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.sort.standalone.sink.elasticsearch;
 
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.common.pojo.sort.node.EsNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
@@ -26,7 +26,7 @@ import 
org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
 import org.apache.inlong.sort.standalone.utils.BufferQueue;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
@@ -138,23 +138,23 @@ public class EsSinkContext extends SinkContext {
             LOG.info("SortTask:{},dispatchQueue:{},offer:{},take:{},back:{}",
                     taskName, dispatchQueue.size(), offerCounter.getAndSet(0),
                     takeCounter.getAndSet(0), backCounter.getAndSet(0));
-            SortTaskConfig newSortTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
-            if (this.sortTaskConfig != null && 
this.sortTaskConfig.equals(newSortTaskConfig)) {
+            TaskConfig newSortTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
+            if (this.taskConfig != null && 
this.taskConfig.equals(newSortTaskConfig)) {
                 return;
             }
             LOG.info("get new SortTaskConfig:taskName:{}:config:{}", taskName,
                     objectMapper.writeValueAsString(newSortTaskConfig));
-            this.sortTaskConfig = newSortTaskConfig;
-            EsNodeConfig requestNodeConfig = (EsNodeConfig) 
sortTaskConfig.getNodeConfig();
+            this.taskConfig = newSortTaskConfig;
+            EsNodeConfig requestNodeConfig = (EsNodeConfig) 
taskConfig.getNodeConfig();
             if (esNodeConfig == null || requestNodeConfig.getVersion() > 
esNodeConfig.getVersion()) {
                 this.esNodeConfig = requestNodeConfig;
             }
-            Map<String, String> properties = 
this.sortTaskConfig.getNodeConfig().getProperties();
+            Map<String, String> properties = 
this.taskConfig.getNodeConfig().getProperties();
             this.sinkContext = new Context(properties != null ? properties : 
new HashMap<>());
             // change current config
-            this.idConfigMap = this.sortTaskConfig.getClusters()
+            this.idConfigMap = this.taskConfig.getClusterTagConfigs()
                     .stream()
-                    .map(SortClusterConfig::getDataFlowConfigs)
+                    .map(ClusterTagConfig::getDataFlowConfigs)
                     .flatMap(Collection::stream)
                     .map(EsIdConfig::create)
                     .collect(Collectors.toMap(
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
index 4e94c67248..ad19c73547 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.sort.standalone.sink.kafka;
 
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
@@ -26,7 +26,7 @@ import 
org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import org.apache.commons.lang3.ClassUtils;
@@ -59,24 +59,24 @@ public class KafkaFederationSinkContext extends SinkContext 
{
     public void reload() {
         LOG.info("reload KafkaFederationSinkContext.");
         try {
-            SortTaskConfig newSortTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
+            TaskConfig newSortTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
             if (newSortTaskConfig == null) {
                 LOG.error("newSortTaskConfig is null.");
                 return;
             }
-            if (this.sortTaskConfig != null && 
this.sortTaskConfig.equals(newSortTaskConfig)) {
+            if (this.taskConfig != null && 
this.taskConfig.equals(newSortTaskConfig)) {
                 LOG.info("Same sortTaskConfig, do nothing.");
                 return;
             }
-            this.sortTaskConfig = newSortTaskConfig;
+            this.taskConfig = newSortTaskConfig;
             KafkaNodeConfig requestNodeConfig = (KafkaNodeConfig) 
newSortTaskConfig.getNodeConfig();
             if (kafkaNodeConfig == null || requestNodeConfig.getVersion() > 
kafkaNodeConfig.getVersion()) {
                 this.kafkaNodeConfig = requestNodeConfig;
             }
 
-            this.idConfigMap = this.sortTaskConfig.getClusters()
+            this.idConfigMap = this.taskConfig.getClusterTagConfigs()
                     .stream()
-                    .map(SortClusterConfig::getDataFlowConfigs)
+                    .map(ClusterTagConfig::getDataFlowConfigs)
                     .flatMap(Collection::stream)
                     .map(KafkaIdConfig::create)
                     .collect(Collectors.toMap(
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
index e5ed8985b2..770f028da0 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarFederationSinkContext.java
@@ -17,8 +17,8 @@
 
 package org.apache.inlong.sort.standalone.sink.pulsar;
 
-import org.apache.inlong.common.pojo.sort.SortClusterConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.common.pojo.sort.node.PulsarNodeConfig;
 import org.apache.inlong.sort.standalone.channel.ProfileEvent;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
@@ -26,7 +26,7 @@ import 
org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.config.pojo.InlongId;
 import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
 import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
-import org.apache.inlong.sort.standalone.sink.v2.SinkContext;
+import org.apache.inlong.sort.standalone.sink.SinkContext;
 import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
 
 import org.apache.commons.lang3.ClassUtils;
@@ -53,25 +53,25 @@ public class PulsarFederationSinkContext extends 
SinkContext {
 
     public void reload() {
         try {
-            SortTaskConfig newSortTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
+            TaskConfig newSortTaskConfig = 
SortConfigHolder.getTaskConfig(taskName);
             if (newSortTaskConfig == null) {
                 LOG.error("newSortTaskConfig is null.");
                 return;
             }
-            if (this.sortTaskConfig != null && 
this.sortTaskConfig.equals(newSortTaskConfig)) {
+            if (this.taskConfig != null && 
this.taskConfig.equals(newSortTaskConfig)) {
                 LOG.info("Same sortTaskConfig, do nothing.");
                 return;
             }
-            this.sortTaskConfig = newSortTaskConfig;
+            this.taskConfig = newSortTaskConfig;
 
             PulsarNodeConfig requestNodeConfig = (PulsarNodeConfig) 
newSortTaskConfig.getNodeConfig();
             if (pulsarNodeConfig == null || requestNodeConfig.getVersion() > 
pulsarNodeConfig.getVersion()) {
                 this.pulsarNodeConfig = requestNodeConfig;
             }
 
-            this.idConfigMap = this.sortTaskConfig.getClusters()
+            this.idConfigMap = this.taskConfig.getClusterTagConfigs()
                     .stream()
-                    .map(SortClusterConfig::getDataFlowConfigs)
+                    .map(ClusterTagConfig::getDataFlowConfigs)
                     .flatMap(Collection::stream)
                     .map(PulsarIdConfig::create)
                     .collect(Collectors.toMap(
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java
deleted file mode 100644
index 251a6d56af..0000000000
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/v2/SinkContext.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sort.standalone.sink.v2;
-
-import org.apache.inlong.common.metric.MetricRegister;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
-import org.apache.inlong.sort.standalone.channel.ProfileEvent;
-import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
-import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
-import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
-import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
-import org.apache.inlong.sort.standalone.utils.BufferQueue;
-import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flume.Channel;
-import org.apache.flume.Context;
-import org.slf4j.Logger;
-
-import java.util.Date;
-import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
-
-public class SinkContext {
-
-    public static final Logger LOG = 
InlongLoggerFactory.getLogger(SinkContext.class);
-    public static final String KEY_MAX_THREADS = "maxThreads";
-    public static final String KEY_PROCESSINTERVAL = "processInterval";
-    public static final String KEY_RELOADINTERVAL = "reloadInterval";
-    public static final String KEY_TASK_NAME = "taskName";
-    public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = 
"maxBufferQueueSizeKb";
-    public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 128 * 1024;
-    protected final String clusterId;
-    protected final String taskName;
-    protected final String sinkName;
-    protected final Context sinkContext;
-    protected SortTaskConfig sortTaskConfig;
-    protected final Channel channel;
-    protected final int maxThreads;
-    protected final long processInterval;
-    protected final long reloadInterval;
-    protected final SortMetricItemSet metricItemSet;
-    protected Timer reloadTimer;
-
-    public SinkContext(String sinkName, Context context, Channel channel) {
-        this.sinkName = sinkName;
-        this.sinkContext = context;
-        this.channel = channel;
-        this.clusterId = 
sinkContext.getString(CommonPropertiesHolder.KEY_CLUSTER_ID);
-        this.taskName = sinkContext.getString(KEY_TASK_NAME);
-        this.maxThreads = sinkContext.getInteger(KEY_MAX_THREADS, 10);
-        this.processInterval = sinkContext.getInteger(KEY_PROCESSINTERVAL, 
100);
-        this.reloadInterval = sinkContext.getLong(KEY_RELOADINTERVAL, 60000L);
-        this.metricItemSet = new SortMetricItemSet(sinkName);
-        MetricRegister.register(this.metricItemSet);
-    }
-
-    public void start() {
-        try {
-            this.reload();
-            this.setReloadTimer();
-        } catch (Exception e) {
-            LOG.error("failed to start sink context", e);
-        }
-    }
-
-    public void close() {
-        try {
-            this.reloadTimer.cancel();
-        } catch (Exception e) {
-            LOG.error("failed to close sink context", e);
-        }
-    }
-
-    protected void setReloadTimer() {
-        reloadTimer = new Timer(true);
-        TimerTask task = new TimerTask() {
-
-            public void run() {
-                reload();
-            }
-        };
-        reloadTimer.schedule(task, new Date(System.currentTimeMillis() + 
reloadInterval), reloadInterval);
-    }
-
-    public void reload() {
-        try {
-            this.sortTaskConfig = SortConfigHolder.getTaskConfig(taskName);
-        } catch (Throwable e) {
-            LOG.error("failed to stop sink context", e);
-        }
-    }
-
-    public String getClusterId() {
-        return clusterId;
-    }
-
-    public String getTaskName() {
-        return taskName;
-    }
-
-    public String getSinkName() {
-        return sinkName;
-    }
-
-    public Context getSinkContext() {
-        return sinkContext;
-    }
-
-    public SortTaskConfig getSortTaskConfig() {
-        return sortTaskConfig;
-    }
-
-    public Channel getChannel() {
-        return channel;
-    }
-
-    public int getMaxThreads() {
-        return maxThreads;
-    }
-
-    public long getProcessInterval() {
-        return processInterval;
-    }
-
-    public long getReloadInterval() {
-        return reloadInterval;
-    }
-
-    public SortMetricItemSet getMetricItemSet() {
-        return metricItemSet;
-    }
-
-    public static void fillInlongId(ProfileEvent currentRecord, Map<String, 
String> dimensions) {
-        String inlongGroupId = currentRecord.getInlongGroupId();
-        inlongGroupId = (StringUtils.isBlank(inlongGroupId)) ? "-" : 
inlongGroupId;
-        String inlongStreamId = currentRecord.getInlongStreamId();
-        inlongStreamId = (StringUtils.isBlank(inlongStreamId)) ? "-" : 
inlongStreamId;
-        dimensions.put(SortMetricItem.KEY_INLONG_GROUP_ID, inlongGroupId);
-        dimensions.put(SortMetricItem.KEY_INLONG_STREAM_ID, inlongStreamId);
-    }
-
-    public static <U> BufferQueue<U> createBufferQueue() {
-        int maxBufferQueueSizeKb = 
CommonPropertiesHolder.getInteger(KEY_MAX_BUFFERQUEUE_SIZE_KB,
-                DEFAULT_MAX_BUFFERQUEUE_SIZE_KB);
-        BufferQueue<U> dispatchQueue = new BufferQueue<>(maxBufferQueueSizeKb);
-        return dispatchQueue;
-    }
-}
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
index f7e2e58cc6..406c0ed0de 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortCluster.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.sort.standalone.v2;
 
 import org.apache.inlong.common.pojo.sort.SortConfig;
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.sdk.commons.admin.AdminTask;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
@@ -92,7 +92,7 @@ public class SortCluster {
                 return;
             }
             // add new task
-            for (SortTaskConfig taskConfig : newConfig.getTasks()) {
+            for (TaskConfig taskConfig : newConfig.getTasks()) {
                 String newTaskName = taskConfig.getSortTaskName();
                 if (taskMap.containsKey(newTaskName)) {
                     continue;
@@ -106,7 +106,7 @@ public class SortCluster {
             for (Map.Entry<String, SortTask> entry : taskMap.entrySet()) {
                 String taskName = entry.getKey();
                 boolean isFound = false;
-                for (SortTaskConfig taskConfig : newConfig.getTasks()) {
+                for (TaskConfig taskConfig : newConfig.getTasks()) {
                     if (taskName.equals(taskConfig.getSortTaskName())) {
                         isFound = true;
                         break;
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
index d94ba6ffdf..231f7a41bd 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/v2/SortTask.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.sort.standalone.v2;
 
-import org.apache.inlong.common.pojo.sort.SortTaskConfig;
+import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.sort.standalone.PropertiesConfigurationProvider;
 import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
 import org.apache.inlong.sort.standalone.utils.v2.FlumeConfigGenerator;
@@ -48,7 +48,7 @@ public class SortTask {
     }
 
     public void start() {
-        SortTaskConfig config = SortConfigHolder.getTaskConfig(taskName);
+        TaskConfig config = SortConfigHolder.getTaskConfig(taskName);
         if (config == null) {
             return;
         }
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
 b/inlong-sort-standalone/sort-standalone-source/src/test/java/SortConfig.conf
similarity index 99%
rename from 
inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
rename to 
inlong-sort-standalone/sort-standalone-source/src/test/java/SortConfig.conf
index 637698dfa7..82b9cfe05a 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/SortClusterConfig.conf
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/SortConfig.conf
@@ -20,7 +20,7 @@
     "tasks": [
         {
             "sortTaskName": "sid_es_es-rmrv7g7a_v3",
-            "clusters": [
+            "clusterTagConfigs": [
                 {
                     "clusterTag": "default_cluster",
                     "mqClusterConfigs": [
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
index 512d53f179..5f79465039 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/java/common.properties
@@ -25,6 +25,7 @@ 
sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
 sortSink.type=org.apache.inlong.sort.standalone.sink.elasticsearch.EsSink
 sortSource.type=org.apache.inlong.sort.standalone.source.readapi.ReadApiSource
 
sortClusterConfig.type=org.apache.inlong.sort.standalone.config.loader.ClassResourceSortClusterConfigLoader
+sortConfig.type=org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader
 
 
indexRequestHandler=org.apache.inlong.sort.standalone.sink.elasticsearch.DefaultEvent2IndexRequestHandler
 
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
 
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortConfig.conf
similarity index 99%
rename from 
inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
rename to 
inlong-sort-standalone/sort-standalone-source/src/test/resources/SortConfig.conf
index 637698dfa7..82b9cfe05a 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortClusterConfig.conf
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/SortConfig.conf
@@ -20,7 +20,7 @@
     "tasks": [
         {
             "sortTaskName": "sid_es_es-rmrv7g7a_v3",
-            "clusters": [
+            "clusterTagConfigs": [
                 {
                     "clusterTag": "default_cluster",
                     "mqClusterConfigs": [
diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
 
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
index 512d53f179..5f79465039 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/test/resources/common.properties
@@ -25,6 +25,7 @@ 
sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
 sortSink.type=org.apache.inlong.sort.standalone.sink.elasticsearch.EsSink
 sortSource.type=org.apache.inlong.sort.standalone.source.readapi.ReadApiSource
 
sortClusterConfig.type=org.apache.inlong.sort.standalone.config.loader.ClassResourceSortClusterConfigLoader
+sortConfig.type=org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortClusterConfigLoader
 
 
indexRequestHandler=org.apache.inlong.sort.standalone.sink.elasticsearch.DefaultEvent2IndexRequestHandler
 

Reply via email to