This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f56adeaa70 [INLONG-11885][Sort] Unified Metadata supports delayed and
phased decommissioning of metadata configurations (#11886)
f56adeaa70 is described below
commit f56adeaa70be9c36e0ec87c3e90f9aa1d2325b84
Author: ChunLiang Lu <[email protected]>
AuthorDate: Fri Jun 13 12:06:45 2025 +0800
[INLONG-11885][Sort] Unified Metadata supports delayed and phased
decommissioning of metadata configurations (#11886)
---
.../config/holder/v2/SortConfigHolder.java | 91 ++++++++++++++++++++--
1 file changed, 85 insertions(+), 6 deletions(-)
diff --git
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
index 7a3ea1ac0f..94cb5f34d8 100644
---
a/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
+++
b/inlong-sort-standalone/sort-standalone-common/src/main/java/org/apache/inlong/sort/standalone/config/holder/v2/SortConfigHolder.java
@@ -21,6 +21,7 @@ import org.apache.inlong.common.pojo.sort.ClusterTagConfig;
import org.apache.inlong.common.pojo.sort.SortConfig;
import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
+import org.apache.inlong.common.pojo.sort.mq.MqClusterConfig;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import
org.apache.inlong.sort.standalone.config.loader.v2.ClassResourceSortConfigLoader;
import
org.apache.inlong.sort.standalone.config.loader.v2.ManagerSortConfigLoader;
@@ -32,6 +33,7 @@ import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
@@ -50,7 +52,9 @@ public class SortConfigHolder {
private long reloadInterval;
private Timer reloadTimer;
private SortConfigLoader loader;
- private SortConfig config;
+ private SortConfig lastConfig = null;
+ private SortConfig currentConfig = null;
+ private SortConfig finalConfig = null;
private Map<String, Map<String, String>> auditTagMap;
private SortConfigHolder() {
@@ -117,12 +121,22 @@ public class SortConfigHolder {
private void reload() {
try {
SortConfig newConfig = this.loader.load();
- if (newConfig == null) {
+ if (newConfig == null && currentConfig == null) {
return;
}
+ this.lastConfig = currentConfig;
+ this.currentConfig = newConfig;
+ SortConfig finalConfig = new SortConfig();
+ finalConfig.setTasks(new ArrayList<>());
+ if (this.lastConfig != null) {
+ this.mergeSortConfig(finalConfig, lastConfig);
+ }
+ if (this.currentConfig != null) {
+ this.mergeSortConfig(finalConfig, currentConfig);
+ }
// <SortTaskName, <InlongId, AuditTag>>
- this.auditTagMap = newConfig.getTasks()
+ this.auditTagMap = finalConfig.getTasks()
.stream()
.collect(Collectors.toMap(TaskConfig::getSortTaskName,
v -> v.getClusterTagConfigs()
@@ -134,18 +148,83 @@ public class SortConfigHolder {
flow.getInlongStreamId()),
DataFlowConfig::getAuditTag,
(flow1, flow2) -> flow1))));
- this.config = newConfig;
+ this.finalConfig = finalConfig;
} catch (Throwable e) {
log.error("failed to reload sort config", e);
}
}
+ /**
+ * mergeSortConfig
+ */
+ private void mergeSortConfig(SortConfig finalConfig, SortConfig
appendConfig) {
+ if (appendConfig == null) {
+ return;
+ }
+ finalConfig.setSortClusterName(appendConfig.getSortClusterName());
+ Map<String, TaskConfig> finalMap = new HashMap<>();
+ finalConfig.getTasks().forEach(v -> finalMap.put(v.getSortTaskName(),
v));
+ for (TaskConfig taskConfig : appendConfig.getTasks()) {
+ String taskName = taskConfig.getSortTaskName();
+ TaskConfig finalTask = finalMap.get(taskName);
+ // new
+ if (finalTask == null) {
+ finalMap.put(taskName, taskConfig);
+ continue;
+ }
+ this.mergeTaskConfig(finalTask, taskConfig);
+ }
+ finalConfig.getTasks().clear();
+ finalConfig.getTasks().addAll(finalMap.values());
+ }
+
+ private void mergeTaskConfig(TaskConfig finalConfig, TaskConfig
appendConfig) {
+ if (appendConfig == null) {
+ return;
+ }
+ finalConfig.setSortTaskName(appendConfig.getSortTaskName());
+ finalConfig.setNodeConfig(appendConfig.getNodeConfig());
+ Map<String, ClusterTagConfig> finalMap = new HashMap<>();
+ finalConfig.getClusterTagConfigs().forEach(v ->
finalMap.put(v.getClusterTag(), v));
+ for (ClusterTagConfig tagConfig : appendConfig.getClusterTagConfigs())
{
+ String clusterTag = tagConfig.getClusterTag();
+ ClusterTagConfig finalTag = finalMap.get(clusterTag);
+ // new
+ if (finalTag == null) {
+ finalMap.put(clusterTag, tagConfig);
+ continue;
+ }
+ this.mergeTagConfig(finalTag, tagConfig);
+ }
+ finalConfig.getClusterTagConfigs().clear();
+ finalConfig.getClusterTagConfigs().addAll(finalMap.values());
+ }
+
+ private void mergeTagConfig(ClusterTagConfig finalConfig, ClusterTagConfig
appendConfig) {
+ if (appendConfig == null) {
+ return;
+ }
+ finalConfig.setClusterTag(appendConfig.getClusterTag());
+ // mqClusterConfigs
+ Map<String, MqClusterConfig> finalMqMap = new HashMap<>();
+ finalConfig.getMqClusterConfigs().forEach(v ->
finalMqMap.put(v.getClusterName(), v));
+ appendConfig.getMqClusterConfigs().forEach(v ->
finalMqMap.put(v.getClusterName(), v));
+ finalConfig.getMqClusterConfigs().clear();
+ finalConfig.getMqClusterConfigs().addAll(finalMqMap.values());
+ // dataFlowConfigs
+ Map<String, DataFlowConfig> finalMap = new HashMap<>();
+ finalConfig.getDataFlowConfigs().forEach(v ->
finalMap.put(v.getDataflowId(), v));
+ appendConfig.getDataFlowConfigs().forEach(v ->
finalMap.put(v.getDataflowId(), v));
+ finalConfig.getDataFlowConfigs().clear();
+ finalConfig.getDataFlowConfigs().addAll(finalMap.values());
+ }
+
public static SortConfig getSortConfig() {
- return get().config;
+ return get().finalConfig;
}
public static TaskConfig getTaskConfig(String sortTaskName) {
- SortConfig config = get().config;
+ SortConfig config = get().finalConfig;
if (config != null && config.getTasks() != null) {
for (TaskConfig task : config.getTasks()) {
if (StringUtils.equals(sortTaskName, task.getSortTaskName())) {