This is an automated email from the ASF dual-hosted git repository.
doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new b668d483c1 [INLONG-11855][Audit] Audit supports Audit reconciliation
at the granularity of inlong group (#11856)
b668d483c1 is described below
commit b668d483c12784dda0b665b0f4b37eb969096e20
Author: doleyzi <[email protected]>
AuthorDate: Wed May 7 10:35:09 2025 +0800
[INLONG-11855][Audit] Audit supports Audit reconciliation at the
granularity of inlong group (#11856)
---
.../inlong/audit/service/cache/DayCache.java | 71 +++++---
.../inlong/audit/service/cache/RealTimeQuery.java | 181 ++++++++++++++-------
.../StatData.java => config/AuditColumn.java} | 31 ++--
.../inlong/audit/service/config/SqlConstants.java | 7 +-
.../inlong/audit/service/entities/StatData.java | 48 +++++-
.../inlong/audit/service/utils/AuditUtils.java | 58 +++++++
6 files changed, 296 insertions(+), 100 deletions(-)
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/DayCache.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/DayCache.java
index 5a5bf8b649..5fe2fc5498 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/DayCache.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/DayCache.java
@@ -21,6 +21,7 @@ import org.apache.inlong.audit.service.config.ConfigConstants;
import org.apache.inlong.audit.service.config.Configuration;
import org.apache.inlong.audit.service.entities.JdbcConfig;
import org.apache.inlong.audit.service.entities.StatData;
+import org.apache.inlong.audit.service.utils.AuditUtils;
import org.apache.inlong.audit.service.utils.JdbcUtils;
import com.zaxxer.hikari.HikariConfig;
@@ -34,11 +35,21 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
+import static
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_ID;
+import static
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_TAG;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_CNT;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_DELAY;
+import static
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_GROUP_ID;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_LOG_TS;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_SIZE;
+import static
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_STREAM_ID;
import static
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL;
import static
org.apache.inlong.audit.service.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY_DAY_SQL;
+import static
org.apache.inlong.audit.service.config.SqlConstants.WILDCARD_STREAM_ID;
/**
* Cache Of day ,for day openapi
@@ -49,16 +60,13 @@ public class DayCache implements AutoCloseable {
private static volatile DayCache dayCache = null;
private DataSource dataSource;
- private final String querySql;
-
private DayCache() {
createDataSource();
- querySql =
Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_DAY_SQL,
- DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL);
}
/**
* Get instance
+ *
* @return
*/
public static DayCache getInstance() {
@@ -74,6 +82,7 @@ public class DayCache implements AutoCloseable {
/**
* Get data
+ *
* @param startTime
* @param endTime
* @param inlongGroupId
@@ -84,27 +93,48 @@ public class DayCache implements AutoCloseable {
public List<StatData> getData(String startTime, String endTime, String
inlongGroupId,
String inlongStreamId, String auditId) {
List<StatData> result = new LinkedList<>();
+ String querySQL =
Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_DAY_SQL,
+ DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL);
+ List<String> paramList = new ArrayList<>();
+ if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+ querySQL = AuditUtils.removeStreamIdCondition(querySQL);
+ querySQL = AuditUtils.removeStreamIdColumn(querySQL);
+ paramList.add(startTime);
+ paramList.add(endTime);
+ paramList.add(inlongGroupId);
+ paramList.add(auditId);
+ } else {
+ paramList.add(startTime);
+ paramList.add(endTime);
+ paramList.add(inlongGroupId);
+ paramList.add(inlongStreamId);
+ paramList.add(auditId);
+ }
try (Connection connection = dataSource.getConnection();
- PreparedStatement pstat =
connection.prepareStatement(querySql)) {
+ PreparedStatement pstat =
connection.prepareStatement(querySQL)) {
if (connection.isClosed()) {
createDataSource();
}
- pstat.setString(1, startTime);
- pstat.setString(2, endTime);
- pstat.setString(3, inlongGroupId);
- pstat.setString(4, inlongStreamId);
- pstat.setString(5, auditId);
+ for (int i = 0; i < paramList.size(); i++) {
+ pstat.setString(i + 1, paramList.get(i));
+ }
try (ResultSet resultSet = pstat.executeQuery()) {
while (resultSet.next()) {
StatData data = new StatData();
- data.setLogTs(resultSet.getString(1));
- data.setInlongGroupId(resultSet.getString(2));
- data.setInlongStreamId(resultSet.getString(3));
- data.setAuditId(resultSet.getString(4));
- data.setAuditTag(resultSet.getString(5));
- data.setCount(resultSet.getLong(6));
- data.setSize(resultSet.getLong(7));
- data.setDelay(resultSet.getLong(8));
+ data.setLogTs(resultSet.getString(COLUMN_LOG_TS));
+
data.setInlongGroupId(resultSet.getString(COLUMN_GROUP_ID));
+ data.setAuditId(resultSet.getString(COLUMN_AUDIT_ID));
+ data.setAuditTag(resultSet.getString(COLUMN_AUDIT_TAG));
+ data.setCount(resultSet.getLong(COLUMN_CNT));
+ data.setSize(resultSet.getLong(COLUMN_SIZE));
+ data.setDelay(resultSet.getLong(COLUMN_DELAY));
+
+ if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+ data.setInlongStreamId(WILDCARD_STREAM_ID);
+ } else {
+
data.setInlongStreamId(resultSet.getString(COLUMN_STREAM_ID));
+ }
+
result.add(data);
}
} catch (SQLException sqlException) {
@@ -113,7 +143,10 @@ public class DayCache implements AutoCloseable {
} catch (Exception exception) {
LOGGER.error("Query has exception! ", exception);
}
- return result;
+
+ return WILDCARD_STREAM_ID.equals(inlongStreamId)
+ ? AuditUtils.aggregateStatData(result, inlongStreamId)
+ : result;
}
/**
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
index 923c98b5cf..6ac403a53e 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
@@ -47,6 +47,16 @@ import java.util.concurrent.Executors;
import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
+import static
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_ID;
+import static
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_TAG;
+import static
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_VERSION;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_CNT;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_DELAY;
+import static
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_GROUP_ID;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_IP;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_LOG_TS;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_SIZE;
+import static
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_STREAM_ID;
import static
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_RECONCILIATION_DISTINCT_SQL;
import static
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_RECONCILIATION_SQL;
import static
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL;
@@ -57,6 +67,7 @@ import static
org.apache.inlong.audit.service.config.SqlConstants.KEY_RECONCILIA
import static
org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_IDS_SQL;
import static
org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_IPS_SQL;
import static
org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_MINUTE_SQL;
+import static
org.apache.inlong.audit.service.config.SqlConstants.WILDCARD_STREAM_ID;
/**
* Real time query data from audit source.
@@ -67,10 +78,6 @@ public class RealTimeQuery {
private static volatile RealTimeQuery realTimeQuery = null;
private final List<BasicDataSource> dataSourceList = new LinkedList<>();
-
- private final String queryLogTsSql;
- private final String queryIdsByIpSql;
- private final String queryReportIpsSql;
private final ExecutorService executor =
Executors.newFixedThreadPool(
Configuration.getInstance().get(KEY_API_THREAD_POOL_SIZE,
DEFAULT_API_THREAD_POOL_SIZE));
@@ -103,13 +110,6 @@ public class RealTimeQuery {
dataSourceList.add(dataSource);
}
-
- queryLogTsSql =
Configuration.getInstance().get(KEY_SOURCE_QUERY_MINUTE_SQL,
- DEFAULT_SOURCE_QUERY_MINUTE_SQL);
- queryIdsByIpSql =
Configuration.getInstance().get(KEY_SOURCE_QUERY_IDS_SQL,
- DEFAULT_SOURCE_QUERY_IDS_SQL);
- queryReportIpsSql =
Configuration.getInstance().get(KEY_SOURCE_QUERY_IPS_SQL,
- DEFAULT_SOURCE_QUERY_IPS_SQL);
}
public static RealTimeQuery getInstance() {
@@ -152,7 +152,14 @@ public class RealTimeQuery {
CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).join();
LOGGER.info("Query log ts by params: {} {} {} {} {}, total cost {}
ms", startTime, endTime, inlongGroupId,
inlongStreamId, auditId, System.currentTimeMillis() -
currentTime);
- return filterMaxAuditVersion(statDataList);
+
+ List<StatData> maxAuditVersion = filterMaxAuditVersion(statDataList);
+ // If querying for wildcard stream ID, aggregate the data
+ if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+ return AuditUtils.aggregateStatData(maxAuditVersion,
WILDCARD_STREAM_ID);
+ }
+ // Otherwise return the filtered data directly
+ return maxAuditVersion;
}
/**
@@ -202,26 +209,51 @@ public class RealTimeQuery {
String inlongStreamId, String auditId) {
long currentTime = System.currentTimeMillis();
List<StatData> result = new LinkedList<>();
+
+ String querySQL =
Configuration.getInstance().get(KEY_SOURCE_QUERY_MINUTE_SQL,
+ DEFAULT_SOURCE_QUERY_MINUTE_SQL);
+ List<String> paramList = new ArrayList<>();
+
+ if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+ querySQL = AuditUtils.removeStreamIdCondition(querySQL);
+ querySQL = AuditUtils.removeStreamIdColumn(querySQL);
+ paramList.add(startTime);
+ paramList.add(endTime);
+ paramList.add(inlongGroupId);
+ paramList.add(auditId);
+ } else {
+ paramList.add(startTime);
+ paramList.add(endTime);
+ paramList.add(inlongGroupId);
+ paramList.add(inlongStreamId);
+ paramList.add(auditId);
+ }
+
try (Connection connection = dataSource.getConnection();
- PreparedStatement pstat =
connection.prepareStatement(queryLogTsSql)) {
- pstat.setString(1, startTime);
- pstat.setString(2, endTime);
- pstat.setString(3, inlongGroupId);
- pstat.setString(4, inlongStreamId);
- pstat.setString(5, auditId);
+ PreparedStatement pstat =
connection.prepareStatement(querySQL)) {
+ for (int i = 0; i < paramList.size(); i++) {
+ pstat.setString(i + 1, paramList.get(i));
+ }
+
try (ResultSet resultSet = pstat.executeQuery()) {
while (resultSet.next()) {
StatData data = new StatData();
- data.setLogTs(resultSet.getString(1));
- data.setInlongGroupId(resultSet.getString(2));
- data.setInlongStreamId(resultSet.getString(3));
- data.setAuditId(resultSet.getString(4));
- data.setAuditTag(resultSet.getString(5));
- long count = resultSet.getLong(6);
+ data.setLogTs(resultSet.getString(COLUMN_LOG_TS));
+
data.setInlongGroupId(resultSet.getString(COLUMN_GROUP_ID));
+ data.setAuditId(resultSet.getString(COLUMN_AUDIT_ID));
+ data.setAuditTag(resultSet.getString(COLUMN_AUDIT_TAG));
+ long count = resultSet.getLong(COLUMN_CNT);
data.setCount(count);
- data.setSize(resultSet.getLong(7));
- data.setDelay(CacheUtils.calculateAverageDelay(count,
resultSet.getLong(8)));
- data.setAuditVersion(resultSet.getLong(9));
+ data.setDelay(CacheUtils.calculateAverageDelay(count,
resultSet.getLong(COLUMN_DELAY)));
+ data.setSize(resultSet.getLong(COLUMN_SIZE));
+
data.setAuditVersion(resultSet.getLong(COLUMN_AUDIT_VERSION));
+
+ if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+ data.setInlongStreamId(WILDCARD_STREAM_ID);
+ } else {
+
data.setInlongStreamId(resultSet.getString(COLUMN_STREAM_ID));
+ }
+
result.add(data);
}
} catch (SQLException sqlException) {
@@ -270,23 +302,30 @@ public class RealTimeQuery {
private List<StatData> doQueryIdsByIp(DataSource dataSource, String
startTime, String endTime, String ip,
String auditId) {
List<StatData> result = new LinkedList<>();
+ String querySQL =
Configuration.getInstance().get(KEY_SOURCE_QUERY_IDS_SQL,
+ DEFAULT_SOURCE_QUERY_IDS_SQL);
+ List<String> paramList = new ArrayList<>();
+ paramList.add(startTime);
+ paramList.add(endTime);
+ paramList.add(auditId);
+ paramList.add(ip);
+
try (Connection connection = dataSource.getConnection();
- PreparedStatement pstat =
connection.prepareStatement(queryIdsByIpSql)) {
- pstat.setString(1, startTime);
- pstat.setString(2, endTime);
- pstat.setString(3, auditId);
- pstat.setString(4, ip);
+ PreparedStatement pstat =
connection.prepareStatement(querySQL)) {
+ for (int i = 0; i < paramList.size(); i++) {
+ pstat.setString(i + 1, paramList.get(i));
+ }
try (ResultSet resultSet = pstat.executeQuery()) {
while (resultSet.next()) {
StatData data = new StatData();
- data.setInlongGroupId(resultSet.getString(1));
- data.setInlongStreamId(resultSet.getString(2));
- data.setAuditId(resultSet.getString(3));
- data.setAuditTag(resultSet.getString(4));
- long count = resultSet.getLong(5);
+
data.setInlongGroupId(resultSet.getString(COLUMN_GROUP_ID));
+
data.setInlongStreamId(resultSet.getString(COLUMN_STREAM_ID));
+ data.setAuditId(resultSet.getString(COLUMN_AUDIT_ID));
+ data.setAuditTag(resultSet.getString(COLUMN_AUDIT_TAG));
+ long count = resultSet.getLong(COLUMN_CNT);
data.setCount(count);
- data.setSize(resultSet.getLong(6));
- data.setDelay(CacheUtils.calculateAverageDelay(count,
resultSet.getLong(7)));
+ data.setSize(resultSet.getLong(COLUMN_SIZE));
+ data.setDelay(CacheUtils.calculateAverageDelay(count,
resultSet.getLong(COLUMN_DELAY)));
result.add(data);
}
} catch (SQLException sqlException) {
@@ -337,20 +376,38 @@ public class RealTimeQuery {
String inlongGroupId,
String inlongStreamId, String auditId) {
List<StatData> result = new LinkedList<>();
+ String querySQL =
Configuration.getInstance().get(KEY_SOURCE_QUERY_IPS_SQL,
+ DEFAULT_SOURCE_QUERY_IPS_SQL);
+ List<String> paramList = new ArrayList<>();
+ if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+ querySQL = AuditUtils.removeStreamIdCondition(querySQL);
+ paramList.add(startTime);
+ paramList.add(endTime);
+ paramList.add(inlongGroupId);
+ paramList.add(auditId);
+ } else {
+ paramList.add(startTime);
+ paramList.add(endTime);
+ paramList.add(inlongGroupId);
+ paramList.add(inlongStreamId);
+ paramList.add(auditId);
+ }
try (Connection connection = dataSource.getConnection();
- PreparedStatement pstat =
connection.prepareStatement(queryReportIpsSql)) {
- pstat.setString(1, startTime);
- pstat.setString(2, endTime);
- pstat.setString(3, inlongGroupId);
- pstat.setString(4, inlongStreamId);
- pstat.setString(5, auditId);
+ PreparedStatement pstat =
connection.prepareStatement(querySQL)) {
+ for (int i = 0; i < paramList.size(); i++) {
+ pstat.setString(i + 1, paramList.get(i));
+ }
try (ResultSet resultSet = pstat.executeQuery()) {
while (resultSet.next()) {
StatData data = new StatData();
- data.setIp(resultSet.getString(1));
- long count = resultSet.getLong(2);
+ data.setIp(resultSet.getString(COLUMN_IP));
+ long count = resultSet.getLong(COLUMN_CNT);
+ data.setSize(resultSet.getLong(COLUMN_SIZE));
+ data.setLogTs(startTime);
+ data.setInlongGroupId(inlongGroupId);
+ data.setInlongStreamId(inlongStreamId);
+ data.setAuditId(auditId);
data.setCount(count);
- data.setSize(resultSet.getLong(3));
data.setDelay(CacheUtils.calculateAverageDelay(count,
resultSet.getLong(4)));
result.add(data);
}
@@ -397,21 +454,33 @@ public class RealTimeQuery {
String querySQL = distinct
?
Configuration.getInstance().get(KEY_RECONCILIATION_DISTINCT_SQL,
DEFAULT_RECONCILIATION_DISTINCT_SQL)
: Configuration.getInstance().get(KEY_RECONCILIATION_SQL,
DEFAULT_RECONCILIATION_SQL);
+ List<String> paramList = new ArrayList<>();
+ if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+ querySQL = AuditUtils.removeStreamIdCondition(querySQL);
+ paramList.add(startTime);
+ paramList.add(endTime);
+ paramList.add(auditId);
+ paramList.add(inlongGroupId);
+ } else {
+ paramList.add(startTime);
+ paramList.add(endTime);
+ paramList.add(auditId);
+ paramList.add(inlongGroupId);
+ paramList.add(inlongStreamId);
+ }
+ paramList.add(auditTag);
try (Connection connection = dataSource.getConnection();
PreparedStatement pstat =
connection.prepareStatement(querySQL)) {
+ for (int i = 0; i < paramList.size(); i++) {
+ pstat.setString(i + 1, paramList.get(i));
+ }
- pstat.setString(1, startTime);
- pstat.setString(2, endTime);
- pstat.setString(3, auditId);
- pstat.setString(4, inlongGroupId);
- pstat.setString(5, inlongStreamId);
- pstat.setString(6, auditTag);
try (ResultSet resultSet = pstat.executeQuery()) {
while (resultSet.next()) {
StatData data = new StatData();
- data.setAuditVersion(resultSet.getLong(1));
- data.setCount(resultSet.getLong(2));
+
data.setAuditVersion(resultSet.getLong(COLUMN_AUDIT_VERSION));
+ data.setCount(resultSet.getLong(COLUMN_CNT));
data.setLogTs(startTime);
data.setInlongGroupId(inlongGroupId);
data.setInlongStreamId(inlongStreamId);
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/AuditColumn.java
similarity index 54%
copy from
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
copy to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/AuditColumn.java
index ab463b25a3..17e81ab063 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/AuditColumn.java
@@ -15,25 +15,18 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service.entities;
+package org.apache.inlong.audit.service.config;
-import lombok.Data;
+public class AuditColumn {
-import java.sql.Timestamp;
-
-@Data
-public class StatData {
-
- private long auditVersion;
- private String logTs;
- private String inlongGroupId;
- private String inlongStreamId;
- private String auditId;
- private String auditTag;
- private Long count;
- private Long size;
- private Long delay;
- private Timestamp updateTime;
- private String ip;
- private String sourceName;
+ public static final String COLUMN_LOG_TS = "log_ts";
+ public static final String COLUMN_GROUP_ID = "inlong_group_id";
+ public static final String COLUMN_STREAM_ID = "inlong_stream_id";
+ public static final String COLUMN_AUDIT_ID = "audit_id";
+ public static final String COLUMN_AUDIT_TAG = "audit_tag";
+ public static final String COLUMN_CNT = "cnt";
+ public static final String COLUMN_SIZE = "size";
+ public static final String COLUMN_DELAY = "delay";
+ public static final String COLUMN_IP = "ip";
+ public static final String COLUMN_AUDIT_VERSION = "audit_version";
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
index 914f2cb88e..61fd1bafd9 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
@@ -143,7 +143,7 @@ public class SqlConstants {
public static final String KEY_MYSQL_SOURCE_QUERY_DAY_SQL =
"mysql.query.day.sql";
public static final String DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL =
- "select
log_ts,inlong_group_id,inlong_stream_id,audit_id,audit_tag,count,size,delay " +
+ "select
log_ts,inlong_group_id,inlong_stream_id,audit_id,audit_tag,count AS
cnt,size,delay " +
"from audit_data_day where log_ts >= ? AND log_ts < ? AND
inlong_group_id=? AND inlong_stream_id=? AND audit_id =? ";
public static final String KEY_MYSQL_QUERY_AUDIT_ID_SQL =
"mysql.query.audit.id.sql";
@@ -182,7 +182,7 @@ public class SqlConstants {
public static final String KEY_RECONCILIATION_SQL =
"audit.reconciliation.sql";
public static final String DEFAULT_RECONCILIATION_SQL = "select\n" +
"audit_version,\n" +
- "sum(count) count\n" +
+ "sum(count) cnt\n" +
"from\n" +
" audit_data\n" +
"where\n" +
@@ -200,7 +200,7 @@ public class SqlConstants {
public static final String KEY_RECONCILIATION_DISTINCT_SQL =
"audit.reconciliation.distinct.sql";
public static final String DEFAULT_RECONCILIATION_DISTINCT_SQL =
"SELECT\n" +
" audit_version,\n" +
- " sum(count) AS count\n" +
+ " sum(count) AS cnt\n" +
"FROM\n" +
" (\n" +
" SELECT\n" +
@@ -251,4 +251,5 @@ public class SqlConstants {
" ) t_distinct\n" +
"GROUP BY\n" +
" audit_version";
+ public static final String WILDCARD_STREAM_ID = "*";
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
index ab463b25a3..b91eaa78bb 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
@@ -18,10 +18,12 @@
package org.apache.inlong.audit.service.entities;
import lombok.Data;
+import lombok.NoArgsConstructor;
import java.sql.Timestamp;
@Data
+@NoArgsConstructor
public class StatData {
private long auditVersion;
@@ -30,10 +32,50 @@ public class StatData {
private String inlongStreamId;
private String auditId;
private String auditTag;
- private Long count;
- private Long size;
- private Long delay;
+ private Long count = 0L;
+ private Long size = 0L;
+ private Long delay = 0L;
private Timestamp updateTime;
private String ip;
private String sourceName;
+
+ /**
+ * Add values to statistics fields with null checks
+ * @param count message count to add
+ * @param size data size to add
+ * @param delay delay time to add
+ */
+ public void add(Long count, Long size, Long delay) {
+ if (count != null) {
+ this.count += count;
+ }
+ if (size != null) {
+ this.size += size;
+ }
+ if (delay != null) {
+ this.delay += delay;
+ }
+ }
+
+ /**
+ * Build composite key with format "logTs|inlongGroupId|auditId"
+ * @return composite key string
+ */
+ public String getCompositeKey() {
+ return String.join("|", logTs, inlongGroupId, auditId);
+ }
+
+ /**
+ * Constructor with essential fields
+ * @param logTs timestamp string
+ * @param inlongGroupId group ID
+ * @param inlongStreamId stream ID
+ * @param auditId audit ID
+ */
+ public StatData(String logTs, String inlongGroupId, String inlongStreamId,
String auditId) {
+ this.logTs = logTs;
+ this.inlongGroupId = inlongGroupId;
+ this.inlongStreamId = inlongStreamId;
+ this.auditId = auditId;
+ }
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
index 34338336e4..9dfa63320c 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
@@ -24,11 +24,15 @@ import org.apache.inlong.audit.service.entities.StatData;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class AuditUtils {
private static final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern(ConfigConstants.DATE_FORMAT);
+ private static final String STREAM_ID_CONDITION_PATTERN =
"(?i)\\band\\s+inlong_stream_id\\s*=\\s*\\?\\s*";
public static double calculateDiffRatio(long srcCount, long destCount) {
if (srcCount == 0 && destCount == 0) {
@@ -90,4 +94,58 @@ public class AuditUtils {
return mergedStatData;
}
+
+ /**
+ * Aggregates statistics data by composite key.
+ *
+ * @param data List of StatData to be aggregated
+ * @param streamId Stream ID to use for aggregation
+ * @return List of aggregated StatData
+ */
+ public static List<StatData> aggregateStatData(List<StatData> data, String
streamId) {
+ if (data == null || data.isEmpty()) {
+ return new ArrayList<>();
+ }
+
+ Map<String, StatData> aggregatedMap = new HashMap<>(data.size());
+ for (StatData stat : data) {
+ StatData aggregatedStatData = aggregatedMap.computeIfAbsent(
+ stat.getCompositeKey(),
+ k -> new StatData(stat.getLogTs(),
stat.getInlongGroupId(), streamId, stat.getAuditId()));
+ aggregatedStatData.add(stat.getCount(), stat.getSize(),
stat.getDelay());
+ }
+
+ return new ArrayList<>(aggregatedMap.values());
+ }
+
+ /**
+ * Removes stream ID condition from SQL query.
+ *
+ * @param sqlQuery Original SQL query string
+ * @return SQL query with stream ID condition removed
+ */
+ public static String removeStreamIdCondition(String sqlQuery) {
+ return sqlQuery == null ? null :
sqlQuery.replaceAll(STREAM_ID_CONDITION_PATTERN, " ");
+ }
+
+ /**
+ * Remove 'inlong_stream_id' column from SQL SELECT statement.
+ * Handles cases where the column appears:
+ * 1. At the beginning/middle with optional comma and spaces
+ * 2. At the end with optional comma and spaces
+ *
+ * @param sql Original SQL string
+ * @return SQL string with 'inlong_stream_id' column removed
+ */
+ public static String removeStreamIdColumn(String sql) {
+ if (sql == null) {
+ return null;
+ }
+ // Remove inlong_stream_id when it appears at start/middle (with
optional comma and spaces)
+ sql = sql.replaceAll("(?i)\\b(inlong_stream_id)\\b\\s*,?\\s*", "");
+ // Remove inlong_stream_id when it appears at end (with optional comma
and spaces)
+ sql = sql.replaceAll(",?\\s*\\b(inlong_stream_id)\\b(?=\\s*$)", "");
+ return sql;
+ }
+
}