This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new f56adeaa70 [INLONG-11885][Sort] Unified Metadata supports delayed and 
phased decommissioning of metadata configurations (#11886)
f56adeaa70 is described below

commit f56adeaa70be9c36e0ec87c3e90f9aa1d2325b84
Author: ChunLiang Lu <[email protected]>
AuthorDate: Fri Jun 13 12:06:45 2025 +0800

    [INLONG-11885][Sort] Unified Metadata supports delayed and phased 
decommissioning of metadata configurations (#11886)
---
 .../config/holder/v2/SortConfigHolder.java         | 91 ++++++++++++++++++++--
 1 file changed, 85 insertions(+), 6 deletions(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
index 7a3ea1ac0f..94cb5f34d8 100644
--- 
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
+++ 
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
@@ -21,6 +21,7 @@ import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
 import org.apache.inlong.common.pojo.sort.SortConfig;
 import org.apache.inlong.common.pojo.sort.TaskConfig;
 import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
 import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
 import 
org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortConfigLoader;
 import 
org.apache.inlong.sort.standalone.config.loader.v2.ManagerSortConfigLoader;
@@ -32,6 +33,7 @@ import org.apache.commons.lang3.ClassUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flume.Context;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
@@ -50,7 +52,9 @@ public class SortConfigHolder {
     private long reloadInterval;
     private Timer reloadTimer;
     private SortConfigLoader loader;
-    private SortConfig config;
+    private SortConfig lastConfig = null;
+    private SortConfig currentConfig = null;
+    private SortConfig finalConfig = null;
     private Map<String, Map<String, String>> auditTagMap;
 
     private SortConfigHolder() {
@@ -117,12 +121,22 @@ public class SortConfigHolder {
     private void reload() {
         try {
             SortConfig newConfig = this.loader.load();
-            if (newConfig == null) {
+            if (newConfig == null && currentConfig == null) {
                 return;
             }
+            this.lastConfig = currentConfig;
+            this.currentConfig = newConfig;
+            SortConfig finalConfig = new SortConfig();
+            finalConfig.setTasks(new ArrayList<>());
+            if (this.lastConfig != null) {
+                this.mergeSortConfig(finalConfig, lastConfig);
+            }
+            if (this.currentConfig != null) {
+                this.mergeSortConfig(finalConfig, currentConfig);
+            }
 
             // <SortTaskName, <InlongId, AuditTag>>
-            this.auditTagMap = newConfig.getTasks()
+            this.auditTagMap = finalConfig.getTasks()
                     .stream()
                     .collect(Collectors.toMap(TaskConfig::getSortTaskName,
                             v -> v.getClusterTagConfigs()
@@ -134,18 +148,83 @@ public class SortConfigHolder {
                                             flow.getInlongStreamId()),
                                             DataFlowConfig::getAuditTag,
                                             (flow1, flow2) -> flow1))));
-            this.config = newConfig;
+            this.finalConfig = finalConfig;
         } catch (Throwable e) {
             log.error("failed to reload sort config", e);
         }
     }
 
+    /**
+     * mergeSortConfig
+     */
+    private void mergeSortConfig(SortConfig finalConfig, SortConfig 
appendConfig) {
+        if (appendConfig == null) {
+            return;
+        }
+        finalConfig.setSortClusterName(appendConfig.getSortClusterName());
+        Map<String, TaskConfig> finalMap = new HashMap<>();
+        finalConfig.getTasks().forEach(v -> finalMap.put(v.getSortTaskName(), 
v));
+        for (TaskConfig taskConfig : appendConfig.getTasks()) {
+            String taskName = taskConfig.getSortTaskName();
+            TaskConfig finalTask = finalMap.get(taskName);
+            // new
+            if (finalTask == null) {
+                finalMap.put(taskName, taskConfig);
+                continue;
+            }
+            this.mergeTaskConfig(finalTask, taskConfig);
+        }
+        finalConfig.getTasks().clear();
+        finalConfig.getTasks().addAll(finalMap.values());
+    }
+
+    private void mergeTaskConfig(TaskConfig finalConfig, TaskConfig 
appendConfig) {
+        if (appendConfig == null) {
+            return;
+        }
+        finalConfig.setSortTaskName(appendConfig.getSortTaskName());
+        finalConfig.setNodeConfig(appendConfig.getNodeConfig());
+        Map<String, ClusterTagConfig> finalMap = new HashMap<>();
+        finalConfig.getClusterTagConfigs().forEach(v -> 
finalMap.put(v.getClusterTag(), v));
+        for (ClusterTagConfig tagConfig : appendConfig.getClusterTagConfigs()) 
{
+            String clusterTag = tagConfig.getClusterTag();
+            ClusterTagConfig finalTag = finalMap.get(clusterTag);
+            // new
+            if (finalTag == null) {
+                finalMap.put(clusterTag, tagConfig);
+                continue;
+            }
+            this.mergeTagConfig(finalTag, tagConfig);
+        }
+        finalConfig.getClusterTagConfigs().clear();
+        finalConfig.getClusterTagConfigs().addAll(finalMap.values());
+    }
+
+    private void mergeTagConfig(ClusterTagConfig finalConfig, ClusterTagConfig 
appendConfig) {
+        if (appendConfig == null) {
+            return;
+        }
+        finalConfig.setClusterTag(appendConfig.getClusterTag());
+        // mqClusterConfigs
+        Map<String, MqClusterConfig> finalMqMap = new HashMap<>();
+        finalConfig.getMqClusterConfigs().forEach(v -> 
finalMqMap.put(v.getClusterName(), v));
+        appendConfig.getMqClusterConfigs().forEach(v -> 
finalMqMap.put(v.getClusterName(), v));
+        finalConfig.getMqClusterConfigs().clear();
+        finalConfig.getMqClusterConfigs().addAll(finalMqMap.values());
+        // dataFlowConfigs
+        Map<String, DataFlowConfig> finalMap = new HashMap<>();
+        finalConfig.getDataFlowConfigs().forEach(v -> 
finalMap.put(v.getDataflowId(), v));
+        appendConfig.getDataFlowConfigs().forEach(v -> 
finalMap.put(v.getDataflowId(), v));
+        finalConfig.getDataFlowConfigs().clear();
+        finalConfig.getDataFlowConfigs().addAll(finalMap.values());
+    }
+
     public static SortConfig getSortConfig() {
-        return get().config;
+        return get().finalConfig;
     }
 
     public static TaskConfig getTaskConfig(String sortTaskName) {
-        SortConfig config = get().config;
+        SortConfig config = get().finalConfig;
         if (config != null && config.getTasks() != null) {
             for (TaskConfig task : config.getTasks()) {
                 if (StringUtils.equals(sortTaskName, task.getSortTaskName())) {

Reply via email to