This is an automated email from the ASF dual-hosted git repository.
doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9c13aeb33a [INLONG-11817][Audit] Audit SDK supports CDC scenario audit
reconciliation (#11818)
9c13aeb33a is described below
commit 9c13aeb33a73be889e3f1a0d51848e238603129a
Author: doleyzi <[email protected]>
AuthorDate: Thu Apr 3 18:01:41 2025 +0800
[INLONG-11817][Audit] Audit SDK supports CDC scenario audit reconciliation
(#11818)
* [INLONG-11817] Audit SDK supports CDC scenario audit reconciliation
---
.../org/apache/inlong/audit/AuditReporterImpl.java | 21 ++++
.../java/org/apache/inlong/audit/CdcIdEnum.java | 121 ++++++++++++++++++
.../java/org/apache/inlong/audit/MetricIdEnum.java | 10 +-
.../entity/{AuditType.java => AuditRules.java} | 35 ++----
.../org/apache/inlong/audit/entity/AuditType.java | 5 +-
.../audit/entity/{AuditType.java => CdcType.java} | 40 +++---
.../inlong/audit/util/AuditManagerUtils.java | 139 ++++++++++++++++++++-
.../inlong/audit/util/AuditManagerUtilsTest.java | 73 +++++++++++
8 files changed, 382 insertions(+), 62 deletions(-)
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
index bd2c5aae47..2efa69d6fc 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/AuditReporterImpl.java
@@ -20,6 +20,7 @@ package org.apache.inlong.audit;
import org.apache.inlong.audit.entity.AuditComponent;
import org.apache.inlong.audit.entity.AuditInformation;
import org.apache.inlong.audit.entity.AuditMetric;
+import org.apache.inlong.audit.entity.CdcType;
import org.apache.inlong.audit.entity.FlowType;
import org.apache.inlong.audit.loader.SocketAddressListLoader;
import org.apache.inlong.audit.protocol.AuditApi;
@@ -683,6 +684,26 @@ public class AuditReporterImpl implements Serializable {
SenderManager.setMaxGlobalAuditMemory(maxGlobalAuditMemory);
}
+ public int getCdcId(String auditType, FlowType flowType, CdcType cdcType) {
+ return AuditManagerUtils.getCdcId(auditType, flowType, cdcType);
+ }
+
+ public List<AuditInformation> getAllCdcIdInformation() {
+ return AuditManagerUtils.getAllCdcIdInformation();
+ }
+
+ public List<AuditInformation> getAllCdcIdInformation(String auditType) {
+ return AuditManagerUtils.getAllCdcIdInformation(auditType);
+ }
+
+ public List<AuditInformation> getAllCdcIdInformation(String auditType,
FlowType flowType) {
+ return AuditManagerUtils.getAllCdcIdInformation(auditType, flowType);
+ }
+
+ public AuditInformation getCdcIdInformation(String auditType, FlowType
flowType, CdcType cdcType) {
+ return AuditManagerUtils.getCdcIdInformation(auditType, flowType,
cdcType);
+ }
+
public void shutdown() {
ProxyManager.getInstance().shutdown();
timerExecutor.shutdown();
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/CdcIdEnum.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/CdcIdEnum.java
new file mode 100644
index 0000000000..49f0344c37
--- /dev/null
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/CdcIdEnum.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit;
+
+import org.apache.inlong.audit.entity.AuditType;
+import org.apache.inlong.audit.entity.CdcType;
+import org.apache.inlong.audit.entity.FlowType;
+import org.apache.inlong.audit.exceptions.AuditTypeNotExistException;
+import org.apache.inlong.audit.util.AuditManagerUtils;
+
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.inlong.audit.entity.AuditType.MYSQL;
+import static org.apache.inlong.audit.entity.AuditType.TDSQL_MYSQL;
+import static org.apache.inlong.audit.entity.CdcType.DELETE;
+import static org.apache.inlong.audit.entity.CdcType.INSERT;
+import static org.apache.inlong.audit.entity.CdcType.UPDATE_AFTER;
+import static org.apache.inlong.audit.entity.CdcType.UPDATE_BEFORE;
+
+/**
+ * Enumeration for CDC (Change Data Capture) audit identifiers.
+ * Defines different audit types for various database operations
(INSERT/DELETE/UPDATE)
+ * on different database systems (MySQL/TDSQL).
+ */
+public enum CdcIdEnum {
+
+ MYSQL_INSERT(1, INSERT, MYSQL, "Insert Audit Metrics for MySQL"),
+ MYSQL_DELETE(2, DELETE, MYSQL, "Delete Audit Metrics for MySQL"),
+ MYSQL_UPDATE_BEFORE(3, UPDATE_BEFORE, MYSQL, "Update before Audit Metrics
for MySQL"),
+ MYSQL_UPDATE_AFTER(4, UPDATE_AFTER, MYSQL, "Update after Audit Metrics for
MySQL"),
+
+ TDSQL_INSERT(101, INSERT, TDSQL_MYSQL, "Insert Audit Metrics for TDSQL"),
+ TDSQL_DELETE(102, DELETE, TDSQL_MYSQL, "Delete Audit Metrics for TDSQL"),
+ TDSQL_UPDATE_BEFORE(103, UPDATE_BEFORE, TDSQL_MYSQL, "Update before Audit
Metrics for TDSQL"),
+ TDSQL_UPDATE_AFTER(104, UPDATE_AFTER, TDSQL_MYSQL, "Update after Audit
Metrics for TDSQL");
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CdcIdEnum.class);
+ private final int auditId;
+ @Getter
+ private final CdcType cdcType;
+ @Getter
+ private final AuditType auditType;
+ @Getter
+ private final String description;
+
+ CdcIdEnum(int auditId, CdcType cdcType, AuditType auditType, String
description) {
+ this.auditId = auditId;
+ this.cdcType = cdcType;
+ this.auditType = auditType;
+ this.description = description;
+ }
+
+ public int getValue(FlowType flowType) {
+ if (flowType == null) {
+ LOGGER.error("Invalid flow type: must not be null");
+ return -1;
+ }
+
+ try {
+ return auditId + (flowType == FlowType.INPUT
+ ? AuditManagerUtils.getStartAuditIdForCdcInput()
+ : AuditManagerUtils.getStartAuditIdForCdcOutput());
+ } catch (Exception e) {
+ LOGGER.error("Failed to get audit ID for flow type: {}", flowType,
e);
+ return -1;
+ }
+ }
+
+ public String getEnglishDescription(FlowType flowType) {
+ return String.join("",
+ auditType.value(),
+ flowType.getNameInEnglish(),
+ cdcType.getNameInEnglish());
+ }
+
+ public String getChineseDescription(FlowType flowType) {
+ return String.join("",
+ auditType.value(),
+ flowType.getNameInChinese(),
+ cdcType.getNameInChinese());
+ }
+
+ public static CdcIdEnum getCdcIdEnum(String auditType, CdcType cdcType) {
+ if (auditType == null || cdcType == null) {
+ throw new IllegalArgumentException("Audit type and CDC type must
not be null");
+ }
+
+ for (CdcIdEnum cdcIdEnum : CdcIdEnum.values()) {
+ if (cdcIdEnum.getCdcType() == cdcType &&
+
auditType.equalsIgnoreCase(cdcIdEnum.getAuditType().value())) {
+ return cdcIdEnum;
+ }
+ }
+
+ String errorMsg = String.format("Audit type %s does not exist for cdc
type %s", auditType, cdcType);
+ LOGGER.error(errorMsg);
+ throw new AuditTypeNotExistException(errorMsg);
+ }
+
+ public static int getCdcId(String auditType, FlowType flowType, CdcType
cdcType) {
+ CdcIdEnum cdcIdEnum = getCdcIdEnum(auditType, cdcType);
+ return cdcIdEnum.getValue(flowType);
+ }
+}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/MetricIdEnum.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/MetricIdEnum.java
index aa8c535a8b..494cf38742 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/MetricIdEnum.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/MetricIdEnum.java
@@ -19,6 +19,7 @@ package org.apache.inlong.audit;
import org.apache.inlong.audit.util.AuditManagerUtils;
+import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,7 +50,9 @@ public enum MetricIdEnum {
private static final Logger LOGGER =
LoggerFactory.getLogger(MetricIdEnum.class);
private final int metricId;
+ @Getter
private final String chineseDescription;
+ @Getter
private final String englishDescription;
MetricIdEnum(int metricId, String chineseDescription, String
englishDescription) {
@@ -62,11 +65,4 @@ public enum MetricIdEnum {
return metricId + AuditManagerUtils.getStartAuditIdForMetric();
}
- public String getChineseDescription() {
- return chineseDescription;
- }
-
- public String getEnglishDescription() {
- return englishDescription;
- }
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditRules.java
similarity index 58%
copy from
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
copy to
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditRules.java
index 3b195b8214..3778ef24ae 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditRules.java
@@ -17,32 +17,17 @@
package org.apache.inlong.audit.entity;
-public enum AuditType {
+/**
+ * Audit rules constants definition.
+ */
+public class AuditRules {
- SDK("SDK"),
- AGENT("Agent"),
- DATAPROXY("DataProxy"),
- HIVE("Hive"),
- CLICKHOUSE("ClickHouse"),
- ELASTICSEARCH("ElasticSearch"),
- STARROCKS("StarRocks"),
- HUDI("HuDi"),
- ICEBERG("Iceberg"),
- HBASE("HBase"),
- DORIS("Doris"),
- KUDU("Kudu"),
- POSTGRES("Postgres"),
- BINLOG("MYSQL_BINLOG"),
- TUBEMQ("TubeMQ"),
- MYSQL("MYSQL_SQL"),
- HDFS("HDFS");
+ // Metric ID starting range
+ public static final int START_METRIC_ID = 0;
- private final String auditType;
+ // CDC input ID starting range
+ public static final int START_CDC_INPUT_ID = 100000;
- AuditType(String auditType) {
- this.auditType = auditType;
- }
- public String value() {
- return auditType;
- }
+ // CDC output ID starting range
+ public static final int START_CDC_OUTPUT_ID = 200000;
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
index 3b195b8214..735a6fdc09 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
@@ -34,8 +34,9 @@ public enum AuditType {
POSTGRES("Postgres"),
BINLOG("MYSQL_BINLOG"),
TUBEMQ("TubeMQ"),
- MYSQL("MYSQL_SQL"),
- HDFS("HDFS");
+ MYSQL("MYSQL"),
+ HDFS("HDFS"),
+ TDSQL_MYSQL("TDSQL_MYSQL");
private final String auditType;
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/CdcType.java
similarity index 59%
copy from
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
copy to
inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/CdcType.java
index 3b195b8214..0dbc632e5d 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/AuditType.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/entity/CdcType.java
@@ -17,32 +17,24 @@
package org.apache.inlong.audit.entity;
-public enum AuditType {
+import lombok.Getter;
- SDK("SDK"),
- AGENT("Agent"),
- DATAPROXY("DataProxy"),
- HIVE("Hive"),
- CLICKHOUSE("ClickHouse"),
- ELASTICSEARCH("ElasticSearch"),
- STARROCKS("StarRocks"),
- HUDI("HuDi"),
- ICEBERG("Iceberg"),
- HBASE("HBase"),
- DORIS("Doris"),
- KUDU("Kudu"),
- POSTGRES("Postgres"),
- BINLOG("MYSQL_BINLOG"),
- TUBEMQ("TubeMQ"),
- MYSQL("MYSQL_SQL"),
- HDFS("HDFS");
+/**
+ * CDC (Change Data Capture) operation types.
+ */
+@Getter
+public enum CdcType {
- private final String auditType;
+ INSERT(" insert ", "写入"),
+ DELETE(" delete ", "删除"),
+ UPDATE_BEFORE("update before", "update before"),
+ UPDATE_AFTER("update after", "update after");
- AuditType(String auditType) {
- this.auditType = auditType;
- }
- public String value() {
- return auditType;
+ private final String nameInEnglish;
+ private final String nameInChinese;
+
+ CdcType(String nameInEnglish, String nameInChinese) {
+ this.nameInEnglish = nameInEnglish;
+ this.nameInChinese = nameInChinese;
}
}
diff --git
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditManagerUtils.java
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditManagerUtils.java
index f470251b0e..6e324d2f4d 100644
---
a/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditManagerUtils.java
+++
b/inlong-audit/audit-sdk/src/main/java/org/apache/inlong/audit/util/AuditManagerUtils.java
@@ -18,17 +18,28 @@
package org.apache.inlong.audit.util;
import org.apache.inlong.audit.AuditIdEnum;
+import org.apache.inlong.audit.CdcIdEnum;
import org.apache.inlong.audit.MetricIdEnum;
import org.apache.inlong.audit.entity.AuditInformation;
+import org.apache.inlong.audit.entity.CdcType;
import org.apache.inlong.audit.entity.FlowType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-import static org.apache.inlong.audit.AuditIdEnum.*;
+import static org.apache.inlong.audit.AuditIdEnum.AGENT_INPUT;
+import static org.apache.inlong.audit.AuditIdEnum.AGENT_OUTPUT;
+import static org.apache.inlong.audit.AuditIdEnum.DATA_PROXY_INPUT;
+import static org.apache.inlong.audit.AuditIdEnum.DATA_PROXY_OUTPUT;
+import static org.apache.inlong.audit.AuditIdEnum.SDK_INPUT;
+import static org.apache.inlong.audit.AuditIdEnum.SDK_OUTPUT;
+import static org.apache.inlong.audit.entity.AuditRules.START_CDC_INPUT_ID;
+import static org.apache.inlong.audit.entity.AuditRules.START_CDC_OUTPUT_ID;
+import static org.apache.inlong.audit.entity.AuditRules.START_METRIC_ID;
/**
* Audit item ID generation rules: composed of basic audit item ID + extension
bits.
@@ -235,11 +246,131 @@ public class AuditManagerUtils {
}
/**
- * Get max Audit ID.
+ * Gets the maximum possible audit ID based on the defined bit length
constraints.
*
- * @return
+ * @return int The maximum audit ID calculated as:
+ * 1 shifted left by (AUDIT_SUFFIX_LENGTH +
AUDIT_MAX_PREFIX_LENGTH)
+ * This represents the upper bound of the regular audit ID space.
*/
- public static int getStartAuditIdForMetric() {
+ private static int getMaxAuditId() {
return 1 << (AUDIT_SUFFIX_LENGTH + AUDIT_MAX_PREFIX_LENGTH);
}
+
+ /**
+ * Gets the starting audit ID for metric items.
+ *
+ * @return int The starting audit ID for metrics, calculated as:
+ * maximum audit ID plus predefined metric ID offset.
+ * This ensures metric IDs are in a separate range from regular
audit IDs.
+ */
+ public static int getStartAuditIdForMetric() {
+ return getMaxAuditId() + START_METRIC_ID;
+ }
+
+ /**
+ * Gets the starting audit ID for CDC (Change Data Capture) items.
+ *
+ * @return int The starting audit ID for CDC, calculated as:
+ * maximum audit ID plus predefined CDC input ID offset.
+ * This ensures CDC IDs are in a separate range from regular audit IDs.
+ */
+ public static int getStartAuditIdForCdcInput() {
+ return getMaxAuditId() + START_CDC_INPUT_ID;
+ }
+
+ /**
+ * Gets the starting audit ID for CDC output items.
+ *
+ * @return int The starting audit ID for CDC output, calculated as:
+ * maximum audit ID plus predefined CDC output ID offset.
+ * This ensures CDC output IDs are in a separate range from regular audit
IDs.
+ */
+ public static int getStartAuditIdForCdcOutput() {
+ return getMaxAuditId() + START_CDC_OUTPUT_ID;
+ }
+
+ /**
+ * Get all CDC audit information by combining all flow types with all CDC
ID enums.
+ * @return List of AuditInformation containing all possible CDC audit
combinations
+ */
+ public static List<AuditInformation> getAllCdcIdInformation() {
+ List<AuditInformation> result = new
ArrayList<>(FlowType.values().length * CdcIdEnum.values().length);
+ for (FlowType flowType : FlowType.values()) {
+ for (CdcIdEnum cdcIdEnum : CdcIdEnum.values()) {
+ result.add(createAuditInformation(cdcIdEnum, flowType));
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Get the CDC ID based on the audit type, flow type, and CDC type.
+ * @param auditType The type of audit (e.g., MySQL, TDSQL)
+ * @param flowType The flow type (e.g., INPUT, OUTPUT)
+ * @param cdcType The CDC type (e.g., INSERT, DELETE, UPDATE_BEFORE,
UPDATE_AFTER)
+ * @return int The corresponding CDC ID
+ */
+ public static int getCdcId(String auditType, FlowType flowType, CdcType
cdcType) {
+ return CdcIdEnum.getCdcId(auditType, flowType, cdcType);
+ }
+
+ /**
+ * Get all CDC audit information based on the audit type.
+ * @param auditType The type of audit (e.g., MySQL, TDSQL)
+ * @return List of AuditInformation containing all possible CDC audit
combinations for the given audit type
+ */
+ public static List<AuditInformation> getAllCdcIdInformation(String
auditType) {
+ List<AuditInformation> result = new
ArrayList<>(FlowType.values().length * CdcIdEnum.values().length);
+ for (FlowType flowType : FlowType.values()) {
+ for (CdcIdEnum cdcIdEnum : CdcIdEnum.values()) {
+ if (cdcIdEnum.getAuditType().value().equals(auditType)) {
+ result.add(createAuditInformation(cdcIdEnum, flowType));
+ }
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Get all CDC audit information based on audit type and flow type.
+ * @param auditType The type of audit (e.g., MySQL, TDSQL)
+ * @param flowType The flow type (e.g., INPUT, OUTPUT)
+ * @return List of AuditInformation containing matching CDC audit
combinations
+ */
+ public static List<AuditInformation> getAllCdcIdInformation(String
auditType, FlowType flowType) {
+ List<AuditInformation> result = new
ArrayList<>(CdcIdEnum.values().length);
+ for (CdcIdEnum cdcIdEnum : CdcIdEnum.values()) {
+ if (cdcIdEnum.getAuditType().value().equals(auditType)) {
+ result.add(createAuditInformation(cdcIdEnum, flowType));
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Get specific CDC audit information based on audit type, flow type and
CDC type.
+ * @param auditType The type of audit (e.g., MySQL, TDSQL)
+ * @param flowType The flow type (e.g., INPUT, OUTPUT)
+ * @param cdcType The CDC type (e.g., INSERT, DELETE, UPDATE_BEFORE,
UPDATE_AFTER)
+ * @return AuditInformation matching the criteria, or null if not found
+ */
+ public static AuditInformation getCdcIdInformation(String auditType,
FlowType flowType, CdcType cdcType) {
+ for (CdcIdEnum cdcIdEnum : CdcIdEnum.values()) {
+ if (cdcIdEnum.getAuditType().value().equals(auditType) &&
cdcIdEnum.getCdcType() == cdcType) {
+ return createAuditInformation(cdcIdEnum, flowType);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Helper method to create AuditInformation from CdcIdEnum and FlowType.
+ */
+ private static AuditInformation createAuditInformation(CdcIdEnum
cdcIdEnum, FlowType flowType) {
+ return new AuditInformation(
+ cdcIdEnum.getValue(flowType),
+ cdcIdEnum.getEnglishDescription(flowType),
+ cdcIdEnum.getChineseDescription(flowType));
+ }
+
}
diff --git
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditManagerUtilsTest.java
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditManagerUtilsTest.java
index 3994bbfa53..6132d69ed7 100644
---
a/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditManagerUtilsTest.java
+++
b/inlong-audit/audit-sdk/src/test/java/org/apache/inlong/audit/util/AuditManagerUtilsTest.java
@@ -18,7 +18,9 @@
package org.apache.inlong.audit.util;
import org.apache.inlong.audit.AuditOperator;
+import org.apache.inlong.audit.CdcIdEnum;
import org.apache.inlong.audit.entity.AuditInformation;
+import org.apache.inlong.audit.entity.CdcType;
import org.apache.inlong.audit.entity.FlowType;
import org.junit.Test;
@@ -30,6 +32,8 @@ import static
org.apache.inlong.audit.AuditIdEnum.DATA_PROXY_INPUT;
import static org.apache.inlong.audit.AuditIdEnum.SORT_HIVE_INPUT;
import static org.apache.inlong.audit.AuditIdEnum.SORT_STARROCKS_INPUT;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public class AuditManagerUtilsTest {
@@ -77,6 +81,11 @@ public class AuditManagerUtilsTest {
auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry);
assertEquals(SORT_HIVE_INPUT.getValue(), auditInfo.getAuditId());
assertEquals("Hive 接收成功", auditInfo.getNameInChinese());
+ assertEquals(7, auditInfo.getAuditId());
+
+ auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, true, false, false, false);
+ assertEquals(262151, auditInfo.getAuditId());
+ assertEquals("Hive 接收成功(CheckPoint)", auditInfo.getNameInChinese());
auditType = "STARROCKS";
auditInfo = AuditManagerUtils.buildAuditInformation(auditType,
flowType, success, isRealtime, discard, retry);
@@ -131,4 +140,68 @@ public class AuditManagerUtilsTest {
System.out.println(metricInformationList);
assertTrue(metricInformationList.size() > 0);
}
+
+ @Test
+ public void getAllCdcIdInformation() {
+ List<AuditInformation> cdcIdInfoList =
AuditManagerUtils.getAllCdcIdInformation();
+ assertNotNull("CDC ID information list should not be null",
cdcIdInfoList);
+ assertFalse("CDC ID information list should not be empty",
cdcIdInfoList.isEmpty());
+ }
+
+ @Test
+ public void getCdcIdEnum() {
+ CdcType cdcType = CdcType.INSERT;
+ String auditType = "TDSQL_MYSQL";
+ CdcIdEnum cdcIdEnum = CdcIdEnum.getCdcIdEnum(auditType, cdcType);
+ assertNotNull("CDC ID enum should not be null", cdcIdEnum);
+ assertTrue("CDC ID value should be greater than 0",
cdcIdEnum.getValue(FlowType.INPUT) > 0);
+ }
+
+ @Test
+ public void getCdcId() {
+ int expectedCdcId = 1073841925;
+ int actualCdcId = AuditManagerUtils.getCdcId("TDSQL_MYSQL",
FlowType.INPUT, CdcType.INSERT);
+ assertEquals("CDC ID should match expected value", expectedCdcId,
actualCdcId);
+ }
+
+ @Test
+ public void getAllCdcIdInformationWithAuditType() {
+ String auditType = "MYSQL";
+ List<AuditInformation> cdcIdInfoList =
AuditManagerUtils.getAllCdcIdInformation(auditType);
+ assertNotNull("CDC ID information list should not be null",
cdcIdInfoList);
+ assertFalse("CDC ID information list should not be empty",
cdcIdInfoList.isEmpty());
+
+ for (AuditInformation info : cdcIdInfoList) {
+ assertTrue("Name should contain audit type",
+ info.getNameInEnglish().toUpperCase().contains(auditType));
+ }
+ }
+
+ @Test
+ public void getAllCdcIdInformationWithAuditTypeAndFlowType() {
+ String auditType = "MYSQL";
+ FlowType flowType = FlowType.INPUT;
+ List<AuditInformation> cdcIdInfoList =
AuditManagerUtils.getAllCdcIdInformation(auditType, flowType);
+ assertNotNull("CDC ID information list should not be null",
cdcIdInfoList);
+
+ for (AuditInformation info : cdcIdInfoList) {
+ assertTrue("Name should contain receive for input flow",
+ info.getNameInEnglish().toLowerCase().contains("receive"));
+ }
+ }
+
+ @Test
+ public void getCdcIdInformationWithAuditTypeAndFlowTypeAndCdcType() {
+ String auditType = "MYSQL";
+ FlowType flowType = FlowType.INPUT;
+ CdcType cdcType = CdcType.INSERT;
+ int expectedAuditId = 1073841825;
+ String expectedName = "MYSQL 接收写入";
+
+ AuditInformation cdcIdInfo =
AuditManagerUtils.getCdcIdInformation(auditType, flowType, cdcType);
+ assertNotNull("CDC ID information should not be null", cdcIdInfo);
+ assertEquals("Audit ID should match expected value", expectedAuditId,
cdcIdInfo.getAuditId());
+ assertEquals("Name in Chinese should match expected value",
expectedName, cdcIdInfo.getNameInChinese());
+ }
+
}