This is an automated email from the ASF dual-hosted git repository.

doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c13aeb33a [INLONG-11817][Audit] Audit SDK supports CDC scenario audit 
reconciliation (#11818)
9c13aeb33a is described below

commit 9c13aeb33a73be889e3f1a0d51848e238603129a
Author: doleyzi <[email protected]>
AuthorDate: Thu Apr 3 18:01:41 2025 +0800

    [INLONG-11817][Audit] Audit SDK supports CDC scenario audit reconciliation 
(#11818)
    
    * [INLONG-11817] Audit SDK supports CDC scenario audit reconciliation
---
 .../org/apache/inlong/audit/AuditReporterImpl.java |  21 ++++
 .../java/org/apache/inlong/audit/CdcIdEnum.java    | 121 ++++++++++++++++++
 .../java/org/apache/inlong/audit/MetricIdEnum.java |  10 +-
 .../entity/{AuditType.java => AuditRules.java}     |  35 ++----
 .../org/apache/inlong/audit/entity/AuditType.java  |   5 +-
 .../audit/entity/{AuditType.java => CdcType.java}  |  40 +++---
 .../inlong/audit/util/AuditManagerUtils.java       | 139 ++++++++++++++++++++-
 .../inlong/audit/util/AuditManagerUtilsTest.java   |  73 +++++++++++
 8 files changed, 382 insertions(+), 62 deletions(-)

diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index bd2c5aae47..2efa69d6fc 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -20,6 +20,7 @@ package org.apache.inlong.audit;
 import org.apache.inlong.audit.entity.AuditComponent;
 import org.apache.inlong.audit.entity.AuditInformation;
 import org.apache.inlong.audit.entity.AuditMetric;
+import org.apache.inlong.audit.entity.CdcType;
 import org.apache.inlong.audit.entity.FlowType;
 import org.apache.inlong.audit.loader.SocketAddressListLoader;
 import org.apache.inlong.audit.protocol.AuditApi;
@@ -683,6 +684,26 @@ public class AuditReporterImpl implements Serializable {
         SenderManager.setMaxGlobalAuditMemory(maxGlobalAuditMemory);
     }
 
+    public int getCdcId(String auditType, FlowType flowType, CdcType cdcType) {
+        return AuditManagerUtils.getCdcId(auditType, flowType, cdcType);
+    }
+
+    public List<AuditInformation> getAllCdcIdInformation() {
+        return AuditManagerUtils.getAllCdcIdInformation();
+    }
+
+    public List<AuditInformation> getAllCdcIdInformation(String auditType) {
+        return AuditManagerUtils.getAllCdcIdInformation(auditType);
+    }
+
+    public List<AuditInformation> getAllCdcIdInformation(String auditType, 
FlowType flowType) {
+        return AuditManagerUtils.getAllCdcIdInformation(auditType, flowType);
+    }
+
+    public AuditInformation getCdcIdInformation(String auditType, FlowType 
flowType, CdcType cdcType) {
+        return AuditManagerUtils.getCdcIdInformation(auditType, flowType, 
cdcType);
+    }
+
     public void shutdown() {
         ProxyManager.getInstance().shutdown();
         timerExecutor.shutdown();
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/CdcIdEnum.java 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/CdcIdEnum.java
new file mode 100644
index 0000000000..49f0344c37
--- /dev/null
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/CdcIdEnum.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit;
+
+import org.apache.inlong.audit.entity.AuditType;
+import org.apache.inlong.audit.entity.CdcType;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.audit.exceptions.AuditTypeNotExistException;
+import org.apache.inlong.audit.util.AuditManagerUtils;
+
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.inlong.audit.entity.AuditType.MYSQL;
+import static org.apache.inlong.audit.entity.AuditType.TDSQL_MYSQL;
+import static org.apache.inlong.audit.entity.CdcType.DELETE;
+import static org.apache.inlong.audit.entity.CdcType.INSERT;
+import static org.apache.inlong.audit.entity.CdcType.UPDATE_AFTER;
+import static org.apache.inlong.audit.entity.CdcType.UPDATE_BEFORE;
+
+/**
+ * Enumeration for CDC (Change Data Capture) audit identifiers.
+ * Defines different audit types for various database operations 
(INSERT/DELETE/UPDATE)
+ * on different database systems (MySQL/TDSQL).
+ */
+public enum CdcIdEnum {
+
+    MYSQL_INSERT(1, INSERT, MYSQL, "Insert Audit Metrics for MySQL"),
+    MYSQL_DELETE(2, DELETE, MYSQL, "Delete Audit Metrics for MySQL"),
+    MYSQL_UPDATE_BEFORE(3, UPDATE_BEFORE, MYSQL, "Update before Audit Metrics 
for MySQL"),
+    MYSQL_UPDATE_AFTER(4, UPDATE_AFTER, MYSQL, "Update after Audit Metrics for 
MySQL"),
+
+    TDSQL_INSERT(101, INSERT, TDSQL_MYSQL, "Insert Audit Metrics for TDSQL"),
+    TDSQL_DELETE(102, DELETE, TDSQL_MYSQL, "Delete Audit Metrics for TDSQL"),
+    TDSQL_UPDATE_BEFORE(103, UPDATE_BEFORE, TDSQL_MYSQL, "Update before Audit 
Metrics for TDSQL"),
+    TDSQL_UPDATE_AFTER(104, UPDATE_AFTER, TDSQL_MYSQL, "Update after Audit 
Metrics for TDSQL");
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(CdcIdEnum.class);
+    private final int auditId;
+    @Getter
+    private final CdcType cdcType;
+    @Getter
+    private final AuditType auditType;
+    @Getter
+    private final String description;
+
+    CdcIdEnum(int auditId, CdcType cdcType, AuditType auditType, String 
description) {
+        this.auditId = auditId;
+        this.cdcType = cdcType;
+        this.auditType = auditType;
+        this.description = description;
+    }
+
+    public int getValue(FlowType flowType) {
+        if (flowType == null) {
+            LOGGER.error("Invalid flow type: must not be null");
+            return -1;
+        }
+
+        try {
+            return auditId + (flowType == FlowType.INPUT
+                    ? AuditManagerUtils.getStartAuditIdForCdcInput()
+                    : AuditManagerUtils.getStartAuditIdForCdcOutput());
+        } catch (Exception e) {
+            LOGGER.error("Failed to get audit ID for flow type: {}", flowType, 
e);
+            return -1;
+        }
+    }
+
+    public String getEnglishDescription(FlowType flowType) {
+        return String.join("",
+                auditType.value(),
+                flowType.getNameInEnglish(),
+                cdcType.getNameInEnglish());
+    }
+
+    public String getChineseDescription(FlowType flowType) {
+        return String.join("",
+                auditType.value(),
+                flowType.getNameInChinese(),
+                cdcType.getNameInChinese());
+    }
+
+    public static CdcIdEnum getCdcIdEnum(String auditType, CdcType cdcType) {
+        if (auditType == null || cdcType == null) {
+            throw new IllegalArgumentException("Audit type and CDC type must 
not be null");
+        }
+
+        for (CdcIdEnum cdcIdEnum : CdcIdEnum.values()) {
+            if (cdcIdEnum.getCdcType() == cdcType &&
+                    
auditType.equalsIgnoreCase(cdcIdEnum.getAuditType().value())) {
+                return cdcIdEnum;
+            }
+        }
+
+        String errorMsg = String.format("Audit type %s does not exist for cdc 
type %s", auditType, cdcType);
+        LOGGER.error(errorMsg);
+        throw new AuditTypeNotExistException(errorMsg);
+    }
+
+    public static int getCdcId(String auditType, FlowType flowType, CdcType 
cdcType) {
+        CdcIdEnum cdcIdEnum = getCdcIdEnum(auditType, cdcType);
+        return cdcIdEnum.getValue(flowType);
+    }
+}
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/MetricIdEnum.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/MetricIdEnum.java
index aa8c535a8b..494cf38742 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/MetricIdEnum.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/MetricIdEnum.java
@@ -19,6 +19,7 @@ package org.apache.inlong.audit;
 
 import org.apache.inlong.audit.util.AuditManagerUtils;
 
+import lombok.Getter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +50,9 @@ public enum MetricIdEnum {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(MetricIdEnum.class);
     private final int metricId;
+    @Getter
     private final String chineseDescription;
+    @Getter
     private final String englishDescription;
 
     MetricIdEnum(int metricId, String chineseDescription, String 
englishDescription) {
@@ -62,11 +65,4 @@ public enum MetricIdEnum {
         return metricId + AuditManagerUtils.getStartAuditIdForMetric();
     }
 
-    public String getChineseDescription() {
-        return chineseDescription;
-    }
-
-    public String getEnglishDescription() {
-        return englishDescription;
-    }
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditRules.java
similarity index 58%
copy from 
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
copy to 
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditRules.java
index 3b195b8214..3778ef24ae 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditRules.java
@@ -17,32 +17,17 @@
 
 package org.apache.inlong.audit.entity;
 
-public enum AuditType {
+/**
+ * Audit rules constants definition.
+ */
+public class AuditRules {
 
-    SDK("SDK"),
-    AGENT("Agent"),
-    DATAPROXY("DataProxy"),
-    HIVE("Hive"),
-    CLICKHOUSE("ClickHouse"),
-    ELASTICSEARCH("ElasticSearch"),
-    STARROCKS("StarRocks"),
-    HUDI("HuDi"),
-    ICEBERG("Iceberg"),
-    HBASE("HBase"),
-    DORIS("Doris"),
-    KUDU("Kudu"),
-    POSTGRES("Postgres"),
-    BINLOG("MYSQL_BINLOG"),
-    TUBEMQ("TubeMQ"),
-    MYSQL("MYSQL_SQL"),
-    HDFS("HDFS");
+    // Metric ID starting range
+    public static final int START_METRIC_ID = 0;
 
-    private final String auditType;
+    // CDC input ID starting range
+    public static final int START_CDC_INPUT_ID = 100000;
 
-    AuditType(String auditType) {
-        this.auditType = auditType;
-    }
-    public String value() {
-        return auditType;
-    }
+    // CDC output ID starting range
+    public static final int START_CDC_OUTPUT_ID = 200000;
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
index 3b195b8214..735a6fdc09 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
@@ -34,8 +34,9 @@ public enum AuditType {
     POSTGRES("Postgres"),
     BINLOG("MYSQL_BINLOG"),
     TUBEMQ("TubeMQ"),
-    MYSQL("MYSQL_SQL"),
-    HDFS("HDFS");
+    MYSQL("MYSQL"),
+    HDFS("HDFS"),
+    TDSQL_MYSQL("TDSQL_MYSQL");
 
     private final String auditType;
 
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/CdcType.java
similarity index 59%
copy from 
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
copy to 
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/CdcType.java
index 3b195b8214..0dbc632e5d 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/CdcType.java
@@ -17,32 +17,24 @@
 
 package org.apache.inlong.audit.entity;
 
-public enum AuditType {
+import lombok.Getter;
 
-    SDK("SDK"),
-    AGENT("Agent"),
-    DATAPROXY("DataProxy"),
-    HIVE("Hive"),
-    CLICKHOUSE("ClickHouse"),
-    ELASTICSEARCH("ElasticSearch"),
-    STARROCKS("StarRocks"),
-    HUDI("HuDi"),
-    ICEBERG("Iceberg"),
-    HBASE("HBase"),
-    DORIS("Doris"),
-    KUDU("Kudu"),
-    POSTGRES("Postgres"),
-    BINLOG("MYSQL_BINLOG"),
-    TUBEMQ("TubeMQ"),
-    MYSQL("MYSQL_SQL"),
-    HDFS("HDFS");
+/**
+ * CDC (Change Data Capture) operation types.
+ */
+@Getter
+public enum CdcType {
 
-    private final String auditType;
+    INSERT(" insert ", "写入"),
+    DELETE(" delete ", "删除"),
+    UPDATE_BEFORE("update before", "update before"),
+    UPDATE_AFTER("update after", "update after");
 
-    AuditType(String auditType) {
-        this.auditType = auditType;
-    }
-    public String value() {
-        return auditType;
+    private final String nameInEnglish;
+    private final String nameInChinese;
+
+    CdcType(String nameInEnglish, String nameInChinese) {
+        this.nameInEnglish = nameInEnglish;
+        this.nameInChinese = nameInChinese;
     }
 }
diff --git 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditManagerUtils.java
 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditManagerUtils.java
index f470251b0e..6e324d2f4d 100644
--- 
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditManagerUtils.java
+++ 
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditManagerUtils.java
@@ -18,17 +18,28 @@
 package org.apache.inlong.audit.util;
 
 import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.CdcIdEnum;
 import org.apache.inlong.audit.MetricIdEnum;
 import org.apache.inlong.audit.entity.AuditInformation;
+import org.apache.inlong.audit.entity.CdcType;
 import org.apache.inlong.audit.entity.FlowType;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
-import static org.apache.inlong.audit.AuditIdEnum.*;
+import static org.apache.inlong.audit.AuditIdEnum.AGENT_INPUT;
+import static org.apache.inlong.audit.AuditIdEnum.AGENT_OUTPUT;
+import static org.apache.inlong.audit.AuditIdEnum.DATA_PROXY_INPUT;
+import static org.apache.inlong.audit.AuditIdEnum.DATA_PROXY_OUTPUT;
+import static org.apache.inlong.audit.AuditIdEnum.SDK_INPUT;
+import static org.apache.inlong.audit.AuditIdEnum.SDK_OUTPUT;
+import static org.apache.inlong.audit.entity.AuditRules.START_CDC_INPUT_ID;
+import static org.apache.inlong.audit.entity.AuditRules.START_CDC_OUTPUT_ID;
+import static org.apache.inlong.audit.entity.AuditRules.START_METRIC_ID;
 
 /**
  * Audit item ID generation rules: composed of basic audit item ID + extension 
bits.
@@ -235,11 +246,131 @@ public class AuditManagerUtils {
     }
 
     /**
-     * Get max Audit ID.
+     * Gets the maximum possible audit ID based on the defined bit length 
constraints.
      *
-     * @return
+     * @return int The maximum audit ID calculated as:
+     *         1 shifted left by (AUDIT_SUFFIX_LENGTH + 
AUDIT_MAX_PREFIX_LENGTH)
+     *         This represents the upper bound of the regular audit ID space.
      */
-    public static int getStartAuditIdForMetric() {
+    private static int getMaxAuditId() {
         return 1 << (AUDIT_SUFFIX_LENGTH + AUDIT_MAX_PREFIX_LENGTH);
     }
+
+    /**
+     * Gets the starting audit ID for metric items.
+     *
+     * @return int The starting audit ID for metrics, calculated as:
+     *         maximum audit ID plus predefined metric ID offset.
+     *         This ensures metric IDs are in a separate range from regular 
audit IDs.
+     */
+    public static int getStartAuditIdForMetric() {
+        return getMaxAuditId() + START_METRIC_ID;
+    }
+
+    /**
+     * Gets the starting audit ID for CDC (Change Data Capture) items.
+     *
+     * @return int The starting audit ID for CDC, calculated as:
+     * maximum audit ID plus predefined CDC input ID offset.
+     * This ensures CDC IDs are in a separate range from regular audit IDs.
+     */
+    public static int getStartAuditIdForCdcInput() {
+        return getMaxAuditId() + START_CDC_INPUT_ID;
+    }
+
+    /**
+     * Gets the starting audit ID for CDC output items.
+     *
+     * @return int The starting audit ID for CDC output, calculated as:
+     * maximum audit ID plus predefined CDC output ID offset.
+     * This ensures CDC output IDs are in a separate range from regular audit 
IDs.
+     */
+    public static int getStartAuditIdForCdcOutput() {
+        return getMaxAuditId() + START_CDC_OUTPUT_ID;
+    }
+
+    /**
+     * Get all CDC audit information by combining all flow types with all CDC 
ID enums.
+     * @return List of AuditInformation containing all possible CDC audit 
combinations
+     */
+    public static List<AuditInformation> getAllCdcIdInformation() {
+        List<AuditInformation> result = new 
ArrayList<>(FlowType.values().length * CdcIdEnum.values().length);
+        for (FlowType flowType : FlowType.values()) {
+            for (CdcIdEnum cdcIdEnum : CdcIdEnum.values()) {
+                result.add(createAuditInformation(cdcIdEnum, flowType));
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Get the CDC ID based on the audit type, flow type, and CDC type.
+     * @param auditType The type of audit (e.g., MySQL, TDSQL)
+     * @param flowType  The flow type (e.g., INPUT, OUTPUT)
+     * @param cdcType   The CDC type (e.g., INSERT, DELETE, UPDATE_BEFORE, 
UPDATE_AFTER)
+     * @return int The corresponding CDC ID
+     */
+    public static int getCdcId(String auditType, FlowType flowType, CdcType 
cdcType) {
+        return CdcIdEnum.getCdcId(auditType, flowType, cdcType);
+    }
+
+    /**
+     * Get all CDC audit information based on the audit type.
+     * @param auditType The type of audit (e.g., MySQL, TDSQL)
+     * @return List of AuditInformation containing all possible CDC audit 
combinations for the given audit type
+     */
+    public static List<AuditInformation> getAllCdcIdInformation(String 
auditType) {
+        List<AuditInformation> result = new 
ArrayList<>(FlowType.values().length * CdcIdEnum.values().length);
+        for (FlowType flowType : FlowType.values()) {
+            for (CdcIdEnum cdcIdEnum : CdcIdEnum.values()) {
+                if (cdcIdEnum.getAuditType().value().equals(auditType)) {
+                    result.add(createAuditInformation(cdcIdEnum, flowType));
+                }
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Get all CDC audit information based on audit type and flow type.
+     * @param auditType The type of audit (e.g., MySQL, TDSQL)
+     * @param flowType  The flow type (e.g., INPUT, OUTPUT)
+     * @return List of AuditInformation containing matching CDC audit 
combinations
+     */
+    public static List<AuditInformation> getAllCdcIdInformation(String 
auditType, FlowType flowType) {
+        List<AuditInformation> result = new 
ArrayList<>(CdcIdEnum.values().length);
+        for (CdcIdEnum cdcIdEnum : CdcIdEnum.values()) {
+            if (cdcIdEnum.getAuditType().value().equals(auditType)) {
+                result.add(createAuditInformation(cdcIdEnum, flowType));
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Get specific CDC audit information based on audit type, flow type and 
CDC type.
+     * @param auditType The type of audit (e.g., MySQL, TDSQL)
+     * @param flowType  The flow type (e.g., INPUT, OUTPUT)
+     * @param cdcType   The CDC type (e.g., INSERT, DELETE, UPDATE_BEFORE, 
UPDATE_AFTER)
+     * @return AuditInformation matching the criteria, or null if not found
+     */
+    public static AuditInformation getCdcIdInformation(String auditType, 
FlowType flowType, CdcType cdcType) {
+        for (CdcIdEnum cdcIdEnum : CdcIdEnum.values()) {
+            if (cdcIdEnum.getAuditType().value().equals(auditType) && 
cdcIdEnum.getCdcType() == cdcType) {
+                return createAuditInformation(cdcIdEnum, flowType);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Helper method to create AuditInformation from CdcIdEnum and FlowType.
+     */
+    private static AuditInformation createAuditInformation(CdcIdEnum 
cdcIdEnum, FlowType flowType) {
+        return new AuditInformation(
+                cdcIdEnum.getValue(flowType),
+                cdcIdEnum.getEnglishDescription(flowType),
+                cdcIdEnum.getChineseDescription(flowType));
+    }
+
 }
diff --git 
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditManagerUtilsTest.java
 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditManagerUtilsTest.java
index 3994bbfa53..6132d69ed7 100644
--- 
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditManagerUtilsTest.java
+++ 
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditManagerUtilsTest.java
@@ -18,7 +18,9 @@
 package org.apache.inlong.audit.util;
 
 import org.apache.inlong.audit.AuditOperator;
+import org.apache.inlong.audit.CdcIdEnum;
 import org.apache.inlong.audit.entity.AuditInformation;
+import org.apache.inlong.audit.entity.CdcType;
 import org.apache.inlong.audit.entity.FlowType;
 
 import org.junit.Test;
@@ -30,6 +32,8 @@ import static 
org.apache.inlong.audit.AuditIdEnum.DATA_PROXY_INPUT;
 import static org.apache.inlong.audit.AuditIdEnum.SORT_HIVE_INPUT;
 import static org.apache.inlong.audit.AuditIdEnum.SORT_STARROCKS_INPUT;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 public class AuditManagerUtilsTest {
@@ -77,6 +81,11 @@ public class AuditManagerUtilsTest {
         auditInfo = AuditManagerUtils.buildAuditInformation(auditType, 
flowType, success, isRealtime, discard, retry);
         assertEquals(SORT_HIVE_INPUT.getValue(), auditInfo.getAuditId());
         assertEquals("Hive 接收成功", auditInfo.getNameInChinese());
+        assertEquals(7, auditInfo.getAuditId());
+
+        auditInfo = AuditManagerUtils.buildAuditInformation(auditType, 
flowType, true, false, false, false);
+        assertEquals(262151, auditInfo.getAuditId());
+        assertEquals("Hive 接收成功(CheckPoint)", auditInfo.getNameInChinese());
 
         auditType = "STARROCKS";
         auditInfo = AuditManagerUtils.buildAuditInformation(auditType, 
flowType, success, isRealtime, discard, retry);
@@ -131,4 +140,68 @@ public class AuditManagerUtilsTest {
         System.out.println(metricInformationList);
         assertTrue(metricInformationList.size() > 0);
     }
+
+    @Test
+    public void getAllCdcIdInformation() {
+        List<AuditInformation> cdcIdInfoList = 
AuditManagerUtils.getAllCdcIdInformation();
+        assertNotNull("CDC ID information list should not be null", 
cdcIdInfoList);
+        assertFalse("CDC ID information list should not be empty", 
cdcIdInfoList.isEmpty());
+    }
+
+    @Test
+    public void getCdcIdEnum() {
+        CdcType cdcType = CdcType.INSERT;
+        String auditType = "TDSQL_MYSQL";
+        CdcIdEnum cdcIdEnum = CdcIdEnum.getCdcIdEnum(auditType, cdcType);
+        assertNotNull("CDC ID enum should not be null", cdcIdEnum);
+        assertTrue("CDC ID value should be greater than 0", 
cdcIdEnum.getValue(FlowType.INPUT) > 0);
+    }
+
+    @Test
+    public void getCdcId() {
+        int expectedCdcId = 1073841925;
+        int actualCdcId = AuditManagerUtils.getCdcId("TDSQL_MYSQL", 
FlowType.INPUT, CdcType.INSERT);
+        assertEquals("CDC ID should match expected value", expectedCdcId, 
actualCdcId);
+    }
+
+    @Test
+    public void getAllCdcIdInformationWithAuditType() {
+        String auditType = "MYSQL";
+        List<AuditInformation> cdcIdInfoList = 
AuditManagerUtils.getAllCdcIdInformation(auditType);
+        assertNotNull("CDC ID information list should not be null", 
cdcIdInfoList);
+        assertFalse("CDC ID information list should not be empty", 
cdcIdInfoList.isEmpty());
+
+        for (AuditInformation info : cdcIdInfoList) {
+            assertTrue("Name should contain audit type",
+                    info.getNameInEnglish().toUpperCase().contains(auditType));
+        }
+    }
+
+    @Test
+    public void getAllCdcIdInformationWithAuditTypeAndFlowType() {
+        String auditType = "MYSQL";
+        FlowType flowType = FlowType.INPUT;
+        List<AuditInformation> cdcIdInfoList = 
AuditManagerUtils.getAllCdcIdInformation(auditType, flowType);
+        assertNotNull("CDC ID information list should not be null", 
cdcIdInfoList);
+
+        for (AuditInformation info : cdcIdInfoList) {
+            assertTrue("Name should contain receive for input flow",
+                    info.getNameInEnglish().toLowerCase().contains("receive"));
+        }
+    }
+
+    @Test
+    public void getCdcIdInformationWithAuditTypeAndFlowTypeAndCdcType() {
+        String auditType = "MYSQL";
+        FlowType flowType = FlowType.INPUT;
+        CdcType cdcType = CdcType.INSERT;
+        int expectedAuditId = 1073841825;
+        String expectedName = "MYSQL 接收写入";
+
+        AuditInformation cdcIdInfo = 
AuditManagerUtils.getCdcIdInformation(auditType, flowType, cdcType);
+        assertNotNull("CDC ID information should not be null", cdcIdInfo);
+        assertEquals("Audit ID should match expected value", expectedAuditId, 
cdcIdInfo.getAuditId());
+        assertEquals("Name in Chinese should match expected value", 
expectedName, cdcIdInfo.getNameInChinese());
+    }
+
 }

Reply via email to