This is an automated email from the ASF dual-hosted git repository.

doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new b668d483c1 [INLONG-11855][Audit] Audit supports Audit reconciliation 
at the granularity of inlong group (#11856)
b668d483c1 is described below

commit b668d483c12784dda0b665b0f4b37eb969096e20
Author: doleyzi <[email protected]>
AuthorDate: Wed May 7 10:35:09 2025 +0800

    [INLONG-11855][Audit] Audit supports Audit reconciliation at the 
granularity of inlong group (#11856)
---
 .../inlong/audit/service/cache/DayCache.java       |  71 +++++---
 .../inlong/audit/service/cache/RealTimeQuery.java  | 181 ++++++++++++++-------
 .../StatData.java => config/AuditColumn.java}      |  31 ++--
 .../inlong/audit/service/config/SqlConstants.java  |   7 +-
 .../inlong/audit/service/entities/StatData.java    |  48 +++++-
 .../inlong/audit/service/utils/AuditUtils.java     |  58 +++++++
 6 files changed, 296 insertions(+), 100 deletions(-)

diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/DayCache.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/DayCache.java
index 5a5bf8b649..5fe2fc5498 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/DayCache.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/DayCache.java
@@ -21,6 +21,7 @@ import org.apache.inlong.audit.service.config.ConfigConstants;
 import org.apache.inlong.audit.service.config.Configuration;
 import org.apache.inlong.audit.service.entities.JdbcConfig;
 import org.apache.inlong.audit.service.entities.StatData;
+import org.apache.inlong.audit.service.utils.AuditUtils;
 import org.apache.inlong.audit.service.utils.JdbcUtils;
 
 import com.zaxxer.hikari.HikariConfig;
@@ -34,11 +35,21 @@ import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 
+import static 
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_ID;
+import static 
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_TAG;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_CNT;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_DELAY;
+import static 
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_GROUP_ID;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_LOG_TS;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_SIZE;
+import static 
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_STREAM_ID;
 import static 
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL;
 import static 
org.apache.inlong.audit.service.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY_DAY_SQL;
+import static 
org.apache.inlong.audit.service.config.SqlConstants.WILDCARD_STREAM_ID;
 
 /**
  * Cache Of day ,for day openapi
@@ -49,16 +60,13 @@ public class DayCache implements AutoCloseable {
     private static volatile DayCache dayCache = null;
     private DataSource dataSource;
 
-    private final String querySql;
-
     private DayCache() {
         createDataSource();
-        querySql = 
Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_DAY_SQL,
-                DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL);
     }
 
     /**
      * Get instance
+     *
      * @return
      */
     public static DayCache getInstance() {
@@ -74,6 +82,7 @@ public class DayCache implements AutoCloseable {
 
     /**
      * Get data
+     *
      * @param startTime
      * @param endTime
      * @param inlongGroupId
@@ -84,27 +93,48 @@ public class DayCache implements AutoCloseable {
     public List<StatData> getData(String startTime, String endTime, String 
inlongGroupId,
             String inlongStreamId, String auditId) {
         List<StatData> result = new LinkedList<>();
+        String querySQL = 
Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_DAY_SQL,
+                DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL);
+        List<String> paramList = new ArrayList<>();
+        if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+            querySQL = AuditUtils.removeStreamIdCondition(querySQL);
+            querySQL = AuditUtils.removeStreamIdColumn(querySQL);
+            paramList.add(startTime);
+            paramList.add(endTime);
+            paramList.add(inlongGroupId);
+            paramList.add(auditId);
+        } else {
+            paramList.add(startTime);
+            paramList.add(endTime);
+            paramList.add(inlongGroupId);
+            paramList.add(inlongStreamId);
+            paramList.add(auditId);
+        }
         try (Connection connection = dataSource.getConnection();
-                PreparedStatement pstat = 
connection.prepareStatement(querySql)) {
+                PreparedStatement pstat = 
connection.prepareStatement(querySQL)) {
             if (connection.isClosed()) {
                 createDataSource();
             }
-            pstat.setString(1, startTime);
-            pstat.setString(2, endTime);
-            pstat.setString(3, inlongGroupId);
-            pstat.setString(4, inlongStreamId);
-            pstat.setString(5, auditId);
+            for (int i = 0; i < paramList.size(); i++) {
+                pstat.setString(i + 1, paramList.get(i));
+            }
             try (ResultSet resultSet = pstat.executeQuery()) {
                 while (resultSet.next()) {
                     StatData data = new StatData();
-                    data.setLogTs(resultSet.getString(1));
-                    data.setInlongGroupId(resultSet.getString(2));
-                    data.setInlongStreamId(resultSet.getString(3));
-                    data.setAuditId(resultSet.getString(4));
-                    data.setAuditTag(resultSet.getString(5));
-                    data.setCount(resultSet.getLong(6));
-                    data.setSize(resultSet.getLong(7));
-                    data.setDelay(resultSet.getLong(8));
+                    data.setLogTs(resultSet.getString(COLUMN_LOG_TS));
+                    
data.setInlongGroupId(resultSet.getString(COLUMN_GROUP_ID));
+                    data.setAuditId(resultSet.getString(COLUMN_AUDIT_ID));
+                    data.setAuditTag(resultSet.getString(COLUMN_AUDIT_TAG));
+                    data.setCount(resultSet.getLong(COLUMN_CNT));
+                    data.setSize(resultSet.getLong(COLUMN_SIZE));
+                    data.setDelay(resultSet.getLong(COLUMN_DELAY));
+
+                    if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+                        data.setInlongStreamId(WILDCARD_STREAM_ID);
+                    } else {
+                        
data.setInlongStreamId(resultSet.getString(COLUMN_STREAM_ID));
+                    }
+
                     result.add(data);
                 }
             } catch (SQLException sqlException) {
@@ -113,7 +143,10 @@ public class DayCache implements AutoCloseable {
         } catch (Exception exception) {
             LOGGER.error("Query has exception! ", exception);
         }
-        return result;
+
+        return WILDCARD_STREAM_ID.equals(inlongStreamId)
+                ? AuditUtils.aggregateStatData(result, inlongStreamId)
+                : result;
     }
 
     /**
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
index 923c98b5cf..6ac403a53e 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
@@ -47,6 +47,16 @@ import java.util.concurrent.Executors;
 
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
+import static 
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_ID;
+import static 
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_TAG;
+import static 
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_VERSION;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_CNT;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_DELAY;
+import static 
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_GROUP_ID;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_IP;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_LOG_TS;
+import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_SIZE;
+import static 
org.apache.inlong.audit.service.config.AuditColumn.COLUMN_STREAM_ID;
 import static 
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_RECONCILIATION_DISTINCT_SQL;
 import static 
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_RECONCILIATION_SQL;
 import static 
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL;
@@ -57,6 +67,7 @@ import static 
org.apache.inlong.audit.service.config.SqlConstants.KEY_RECONCILIA
 import static 
org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_IDS_SQL;
 import static 
org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_IPS_SQL;
 import static 
org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_MINUTE_SQL;
+import static 
org.apache.inlong.audit.service.config.SqlConstants.WILDCARD_STREAM_ID;
 
 /**
  * Real time query data from audit source.
@@ -67,10 +78,6 @@ public class RealTimeQuery {
     private static volatile RealTimeQuery realTimeQuery = null;
 
     private final List<BasicDataSource> dataSourceList = new LinkedList<>();
-
-    private final String queryLogTsSql;
-    private final String queryIdsByIpSql;
-    private final String queryReportIpsSql;
     private final ExecutorService executor =
             Executors.newFixedThreadPool(
                     Configuration.getInstance().get(KEY_API_THREAD_POOL_SIZE, 
DEFAULT_API_THREAD_POOL_SIZE));
@@ -103,13 +110,6 @@ public class RealTimeQuery {
 
             dataSourceList.add(dataSource);
         }
-
-        queryLogTsSql = 
Configuration.getInstance().get(KEY_SOURCE_QUERY_MINUTE_SQL,
-                DEFAULT_SOURCE_QUERY_MINUTE_SQL);
-        queryIdsByIpSql = 
Configuration.getInstance().get(KEY_SOURCE_QUERY_IDS_SQL,
-                DEFAULT_SOURCE_QUERY_IDS_SQL);
-        queryReportIpsSql = 
Configuration.getInstance().get(KEY_SOURCE_QUERY_IPS_SQL,
-                DEFAULT_SOURCE_QUERY_IPS_SQL);
     }
 
     public static RealTimeQuery getInstance() {
@@ -152,7 +152,14 @@ public class RealTimeQuery {
         CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).join();
         LOGGER.info("Query log ts by params: {} {} {} {} {}, total cost {} 
ms", startTime, endTime, inlongGroupId,
                 inlongStreamId, auditId, System.currentTimeMillis() - 
currentTime);
-        return filterMaxAuditVersion(statDataList);
+
+        List<StatData> maxAuditVersion = filterMaxAuditVersion(statDataList);
+        // If querying for wildcard stream ID, aggregate the data
+        if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+            return AuditUtils.aggregateStatData(maxAuditVersion, 
WILDCARD_STREAM_ID);
+        }
+        // Otherwise return the filtered data directly
+        return maxAuditVersion;
     }
 
     /**
@@ -202,26 +209,51 @@ public class RealTimeQuery {
             String inlongStreamId, String auditId) {
         long currentTime = System.currentTimeMillis();
         List<StatData> result = new LinkedList<>();
+
+        String querySQL = 
Configuration.getInstance().get(KEY_SOURCE_QUERY_MINUTE_SQL,
+                DEFAULT_SOURCE_QUERY_MINUTE_SQL);
+        List<String> paramList = new ArrayList<>();
+
+        if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+            querySQL = AuditUtils.removeStreamIdCondition(querySQL);
+            querySQL = AuditUtils.removeStreamIdColumn(querySQL);
+            paramList.add(startTime);
+            paramList.add(endTime);
+            paramList.add(inlongGroupId);
+            paramList.add(auditId);
+        } else {
+            paramList.add(startTime);
+            paramList.add(endTime);
+            paramList.add(inlongGroupId);
+            paramList.add(inlongStreamId);
+            paramList.add(auditId);
+        }
+
         try (Connection connection = dataSource.getConnection();
-                PreparedStatement pstat = 
connection.prepareStatement(queryLogTsSql)) {
-            pstat.setString(1, startTime);
-            pstat.setString(2, endTime);
-            pstat.setString(3, inlongGroupId);
-            pstat.setString(4, inlongStreamId);
-            pstat.setString(5, auditId);
+                PreparedStatement pstat = 
connection.prepareStatement(querySQL)) {
+            for (int i = 0; i < paramList.size(); i++) {
+                pstat.setString(i + 1, paramList.get(i));
+            }
+
             try (ResultSet resultSet = pstat.executeQuery()) {
                 while (resultSet.next()) {
                     StatData data = new StatData();
-                    data.setLogTs(resultSet.getString(1));
-                    data.setInlongGroupId(resultSet.getString(2));
-                    data.setInlongStreamId(resultSet.getString(3));
-                    data.setAuditId(resultSet.getString(4));
-                    data.setAuditTag(resultSet.getString(5));
-                    long count = resultSet.getLong(6);
+                    data.setLogTs(resultSet.getString(COLUMN_LOG_TS));
+                    
data.setInlongGroupId(resultSet.getString(COLUMN_GROUP_ID));
+                    data.setAuditId(resultSet.getString(COLUMN_AUDIT_ID));
+                    data.setAuditTag(resultSet.getString(COLUMN_AUDIT_TAG));
+                    long count = resultSet.getLong(COLUMN_CNT);
                     data.setCount(count);
-                    data.setSize(resultSet.getLong(7));
-                    data.setDelay(CacheUtils.calculateAverageDelay(count, 
resultSet.getLong(8)));
-                    data.setAuditVersion(resultSet.getLong(9));
+                    data.setDelay(CacheUtils.calculateAverageDelay(count, 
resultSet.getLong(COLUMN_DELAY)));
+                    data.setSize(resultSet.getLong(COLUMN_SIZE));
+                    
data.setAuditVersion(resultSet.getLong(COLUMN_AUDIT_VERSION));
+
+                    if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+                        data.setInlongStreamId(WILDCARD_STREAM_ID);
+                    } else {
+                        
data.setInlongStreamId(resultSet.getString(COLUMN_STREAM_ID));
+                    }
+
                     result.add(data);
                 }
             } catch (SQLException sqlException) {
@@ -270,23 +302,30 @@ public class RealTimeQuery {
     private List<StatData> doQueryIdsByIp(DataSource dataSource, String 
startTime, String endTime, String ip,
             String auditId) {
         List<StatData> result = new LinkedList<>();
+        String querySQL = 
Configuration.getInstance().get(KEY_SOURCE_QUERY_IDS_SQL,
+                DEFAULT_SOURCE_QUERY_IDS_SQL);
+        List<String> paramList = new ArrayList<>();
+        paramList.add(startTime);
+        paramList.add(endTime);
+        paramList.add(auditId);
+        paramList.add(ip);
+
         try (Connection connection = dataSource.getConnection();
-                PreparedStatement pstat = 
connection.prepareStatement(queryIdsByIpSql)) {
-            pstat.setString(1, startTime);
-            pstat.setString(2, endTime);
-            pstat.setString(3, auditId);
-            pstat.setString(4, ip);
+                PreparedStatement pstat = 
connection.prepareStatement(querySQL)) {
+            for (int i = 0; i < paramList.size(); i++) {
+                pstat.setString(i + 1, paramList.get(i));
+            }
             try (ResultSet resultSet = pstat.executeQuery()) {
                 while (resultSet.next()) {
                     StatData data = new StatData();
-                    data.setInlongGroupId(resultSet.getString(1));
-                    data.setInlongStreamId(resultSet.getString(2));
-                    data.setAuditId(resultSet.getString(3));
-                    data.setAuditTag(resultSet.getString(4));
-                    long count = resultSet.getLong(5);
+                    
data.setInlongGroupId(resultSet.getString(COLUMN_GROUP_ID));
+                    
data.setInlongStreamId(resultSet.getString(COLUMN_STREAM_ID));
+                    data.setAuditId(resultSet.getString(COLUMN_AUDIT_ID));
+                    data.setAuditTag(resultSet.getString(COLUMN_AUDIT_TAG));
+                    long count = resultSet.getLong(COLUMN_CNT);
                     data.setCount(count);
-                    data.setSize(resultSet.getLong(6));
-                    data.setDelay(CacheUtils.calculateAverageDelay(count, 
resultSet.getLong(7)));
+                    data.setSize(resultSet.getLong(COLUMN_SIZE));
+                    data.setDelay(CacheUtils.calculateAverageDelay(count, 
resultSet.getLong(COLUMN_DELAY)));
                     result.add(data);
                 }
             } catch (SQLException sqlException) {
@@ -337,20 +376,38 @@ public class RealTimeQuery {
             String inlongGroupId,
             String inlongStreamId, String auditId) {
         List<StatData> result = new LinkedList<>();
+        String querySQL = 
Configuration.getInstance().get(KEY_SOURCE_QUERY_IPS_SQL,
+                DEFAULT_SOURCE_QUERY_IPS_SQL);
+        List<String> paramList = new ArrayList<>();
+        if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+            querySQL = AuditUtils.removeStreamIdCondition(querySQL);
+            paramList.add(startTime);
+            paramList.add(endTime);
+            paramList.add(inlongGroupId);
+            paramList.add(auditId);
+        } else {
+            paramList.add(startTime);
+            paramList.add(endTime);
+            paramList.add(inlongGroupId);
+            paramList.add(inlongStreamId);
+            paramList.add(auditId);
+        }
         try (Connection connection = dataSource.getConnection();
-                PreparedStatement pstat = 
connection.prepareStatement(queryReportIpsSql)) {
-            pstat.setString(1, startTime);
-            pstat.setString(2, endTime);
-            pstat.setString(3, inlongGroupId);
-            pstat.setString(4, inlongStreamId);
-            pstat.setString(5, auditId);
+                PreparedStatement pstat = 
connection.prepareStatement(querySQL)) {
+            for (int i = 0; i < paramList.size(); i++) {
+                pstat.setString(i + 1, paramList.get(i));
+            }
             try (ResultSet resultSet = pstat.executeQuery()) {
                 while (resultSet.next()) {
                     StatData data = new StatData();
-                    data.setIp(resultSet.getString(1));
-                    long count = resultSet.getLong(2);
+                    data.setIp(resultSet.getString(COLUMN_IP));
+                    long count = resultSet.getLong(COLUMN_CNT);
+                    data.setSize(resultSet.getLong(COLUMN_SIZE));
+                    data.setLogTs(startTime);
+                    data.setInlongGroupId(inlongGroupId);
+                    data.setInlongStreamId(inlongStreamId);
+                    data.setAuditId(auditId);
                     data.setCount(count);
-                    data.setSize(resultSet.getLong(3));
                     data.setDelay(CacheUtils.calculateAverageDelay(count, 
resultSet.getLong(4)));
                     result.add(data);
                 }
@@ -397,21 +454,33 @@ public class RealTimeQuery {
         String querySQL = distinct
                 ? 
Configuration.getInstance().get(KEY_RECONCILIATION_DISTINCT_SQL, 
DEFAULT_RECONCILIATION_DISTINCT_SQL)
                 : Configuration.getInstance().get(KEY_RECONCILIATION_SQL, 
DEFAULT_RECONCILIATION_SQL);
+        List<String> paramList = new ArrayList<>();
+        if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
+            querySQL = AuditUtils.removeStreamIdCondition(querySQL);
+            paramList.add(startTime);
+            paramList.add(endTime);
+            paramList.add(auditId);
+            paramList.add(inlongGroupId);
+        } else {
+            paramList.add(startTime);
+            paramList.add(endTime);
+            paramList.add(auditId);
+            paramList.add(inlongGroupId);
+            paramList.add(inlongStreamId);
+        }
+        paramList.add(auditTag);
 
         try (Connection connection = dataSource.getConnection();
                 PreparedStatement pstat = 
connection.prepareStatement(querySQL)) {
+            for (int i = 0; i < paramList.size(); i++) {
+                pstat.setString(i + 1, paramList.get(i));
+            }
 
-            pstat.setString(1, startTime);
-            pstat.setString(2, endTime);
-            pstat.setString(3, auditId);
-            pstat.setString(4, inlongGroupId);
-            pstat.setString(5, inlongStreamId);
-            pstat.setString(6, auditTag);
             try (ResultSet resultSet = pstat.executeQuery()) {
                 while (resultSet.next()) {
                     StatData data = new StatData();
-                    data.setAuditVersion(resultSet.getLong(1));
-                    data.setCount(resultSet.getLong(2));
+                    
data.setAuditVersion(resultSet.getLong(COLUMN_AUDIT_VERSION));
+                    data.setCount(resultSet.getLong(COLUMN_CNT));
                     data.setLogTs(startTime);
                     data.setInlongGroupId(inlongGroupId);
                     data.setInlongStreamId(inlongStreamId);
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/AuditColumn.java
similarity index 54%
copy from 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
copy to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/AuditColumn.java
index ab463b25a3..17e81ab063 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/AuditColumn.java
@@ -15,25 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service.entities;
+package org.apache.inlong.audit.service.config;
 
-import lombok.Data;
+public class AuditColumn {
 
-import java.sql.Timestamp;
-
-@Data
-public class StatData {
-
-    private long auditVersion;
-    private String logTs;
-    private String inlongGroupId;
-    private String inlongStreamId;
-    private String auditId;
-    private String auditTag;
-    private Long count;
-    private Long size;
-    private Long delay;
-    private Timestamp updateTime;
-    private String ip;
-    private String sourceName;
+    public static final String COLUMN_LOG_TS = "log_ts";
+    public static final String COLUMN_GROUP_ID = "inlong_group_id";
+    public static final String COLUMN_STREAM_ID = "inlong_stream_id";
+    public static final String COLUMN_AUDIT_ID = "audit_id";
+    public static final String COLUMN_AUDIT_TAG = "audit_tag";
+    public static final String COLUMN_CNT = "cnt";
+    public static final String COLUMN_SIZE = "size";
+    public static final String COLUMN_DELAY = "delay";
+    public static final String COLUMN_IP = "ip";
+    public static final String COLUMN_AUDIT_VERSION = "audit_version";
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
index 914f2cb88e..61fd1bafd9 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
@@ -143,7 +143,7 @@ public class SqlConstants {
 
     public static final String KEY_MYSQL_SOURCE_QUERY_DAY_SQL = 
"mysql.query.day.sql";
     public static final String DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL =
-            "select 
log_ts,inlong_group_id,inlong_stream_id,audit_id,audit_tag,count,size,delay " +
+            "select 
log_ts,inlong_group_id,inlong_stream_id,audit_id,audit_tag,count AS 
cnt,size,delay " +
                     "from audit_data_day where log_ts >= ? AND log_ts < ? AND 
inlong_group_id=? AND inlong_stream_id=? AND audit_id =? ";
 
     public static final String KEY_MYSQL_QUERY_AUDIT_ID_SQL = 
"mysql.query.audit.id.sql";
@@ -182,7 +182,7 @@ public class SqlConstants {
     public static final String KEY_RECONCILIATION_SQL = 
"audit.reconciliation.sql";
     public static final String DEFAULT_RECONCILIATION_SQL = "select\n" +
             "audit_version,\n" +
-            "sum(count) count\n" +
+            "sum(count) cnt\n" +
             "from\n" +
             "    audit_data\n" +
             "where\n" +
@@ -200,7 +200,7 @@ public class SqlConstants {
     public static final String KEY_RECONCILIATION_DISTINCT_SQL = 
"audit.reconciliation.distinct.sql";
     public static final String DEFAULT_RECONCILIATION_DISTINCT_SQL = 
"SELECT\n" +
             "    audit_version,\n" +
-            "    sum(count) AS count\n" +
+            "    sum(count) AS cnt\n" +
             "FROM\n" +
             "    (\n" +
             "        SELECT\n" +
@@ -251,4 +251,5 @@ public class SqlConstants {
             "    ) t_distinct\n" +
             "GROUP BY\n" +
             "    audit_version";
+    public static final String WILDCARD_STREAM_ID = "*";
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
index ab463b25a3..b91eaa78bb 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/StatData.java
@@ -18,10 +18,12 @@
 package org.apache.inlong.audit.service.entities;
 
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 import java.sql.Timestamp;
 
 @Data
+@NoArgsConstructor
 public class StatData {
 
     private long auditVersion;
@@ -30,10 +32,50 @@ public class StatData {
     private String inlongStreamId;
     private String auditId;
     private String auditTag;
-    private Long count;
-    private Long size;
-    private Long delay;
+    private Long count = 0L;
+    private Long size = 0L;
+    private Long delay = 0L;
     private Timestamp updateTime;
     private String ip;
     private String sourceName;
+
+    /**
+     * Add values to statistics fields with null checks
+     * @param count message count to add
+     * @param size data size to add
+     * @param delay delay time to add
+     */
+    public void add(Long count, Long size, Long delay) {
+        if (count != null) {
+            this.count += count;
+        }
+        if (size != null) {
+            this.size += size;
+        }
+        if (delay != null) {
+            this.delay += delay;
+        }
+    }
+
+    /**
+     * Build composite key with format "logTs|inlongGroupId|auditId"
+     * @return composite key string
+     */
+    public String getCompositeKey() {
+        return String.join("|", logTs, inlongGroupId, auditId);
+    }
+
+    /**
+     * Constructor with essential fields
+     * @param logTs timestamp string
+     * @param inlongGroupId group ID
+     * @param inlongStreamId stream ID
+     * @param auditId audit ID
+     */
+    public StatData(String logTs, String inlongGroupId, String inlongStreamId, 
String auditId) {
+        this.logTs = logTs;
+        this.inlongGroupId = inlongGroupId;
+        this.inlongStreamId = inlongStreamId;
+        this.auditId = auditId;
+    }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
index 34338336e4..9dfa63320c 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
@@ -24,11 +24,15 @@ import org.apache.inlong.audit.service.entities.StatData;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class AuditUtils {
 
     private static final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern(ConfigConstants.DATE_FORMAT);
+    private static final String STREAM_ID_CONDITION_PATTERN = 
"(?i)\\band\\s+inlong_stream_id\\s*=\\s*\\?\\s*";
 
     public static double calculateDiffRatio(long srcCount, long destCount) {
         if (srcCount == 0 && destCount == 0) {
@@ -90,4 +94,58 @@ public class AuditUtils {
 
         return mergedStatData;
     }
+
+    /**
+     * Aggregates statistics data by composite key.
+     *
+     * @param data     List of StatData to be aggregated
+     * @param streamId Stream ID to use for aggregation
+     * @return List of aggregated StatData
+     */
+    public static List<StatData> aggregateStatData(List<StatData> data, String 
streamId) {
+        if (data == null || data.isEmpty()) {
+            return new ArrayList<>();
+        }
+
+        Map<String, StatData> aggregatedMap = new HashMap<>(data.size());
+        for (StatData stat : data) {
+            StatData aggregatedStatData = aggregatedMap.computeIfAbsent(
+                    stat.getCompositeKey(),
+                    k -> new StatData(stat.getLogTs(), 
stat.getInlongGroupId(), streamId, stat.getAuditId()));
+            aggregatedStatData.add(stat.getCount(), stat.getSize(), 
stat.getDelay());
+        }
+
+        return new ArrayList<>(aggregatedMap.values());
+    }
+
+    /**
+     * Removes stream ID condition from SQL query.
+     *
+     * @param sqlQuery Original SQL query string
+     * @return SQL query with stream ID condition removed
+     */
+    public static String removeStreamIdCondition(String sqlQuery) {
+        return sqlQuery == null ? null : 
sqlQuery.replaceAll(STREAM_ID_CONDITION_PATTERN, " ");
+    }
+
+    /**
+     * Remove 'inlong_stream_id' column from SQL SELECT statement.
+     * Handles cases where the column appears:
+     * 1. At the beginning/middle with optional comma and spaces
+     * 2. At the end with optional comma and spaces
+     *
+     * @param sql Original SQL string
+     * @return SQL string with 'inlong_stream_id' column removed
+     */
+    public static String removeStreamIdColumn(String sql) {
+        if (sql == null) {
+            return null;
+        }
+        // Remove inlong_stream_id when it appears at start/middle (with 
optional comma and spaces)
+        sql = sql.replaceAll("(?i)\\b(inlong_stream_id)\\b\\s*,?\\s*", "");
+        // Remove inlong_stream_id when it appears at end (with optional comma 
and spaces)
+        sql = sql.replaceAll(",?\\s*\\b(inlong_stream_id)\\b(?=\\s*$)", "");
+        return sql;
+    }
+
 }

Reply via email to