This is an automated email from the ASF dual-hosted git repository.

doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 303d0c64cc [INLONG-11620][Audit] Provide an open API for module 
reconciliation (#11640)
303d0c64cc is described below

commit 303d0c64ccd1a7288e7df621dcc44495dfa92f47
Author: doleyzi <[email protected]>
AuthorDate: Sun Jan 5 10:51:28 2025 +0800

    [INLONG-11620][Audit] Provide an open API for module reconciliation (#11640)
    
    Co-authored-by: doleyzi <[email protected]>
---
 .../audit/Exception/InvalidRequestException.java}  |  12 +-
 .../inlong/audit/consts/OpenApiConstants.java      |   4 +-
 .../apache/inlong/audit/service/auditor/Audit.java | 190 +++++++++++++++++++++
 .../audit/service/auditor/ReconciliationData.java  |  60 +++++++
 .../ApiType.java => auditor/RequestInfo.java}      |  21 ++-
 .../inlong/audit/service/cache/AbstractCache.java  |  45 ++---
 .../inlong/audit/service/cache/RealTimeQuery.java  |  70 ++++++++
 .../inlong/audit/service/config/SqlConstants.java  |  73 ++++++++
 .../inlong/audit/service/entities/ApiType.java     |   2 +-
 .../inlong/audit/service/node/ApiService.java      | 104 ++++++++---
 .../inlong/audit/service/utils/AuditUtils.java     |  93 ++++++++++
 11 files changed, 615 insertions(+), 59 deletions(-)

diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/Exception/InvalidRequestException.java
similarity index 81%
copy from 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
copy to 
inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/Exception/InvalidRequestException.java
index 1703fdaf4f..5f9703d5e2 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/Exception/InvalidRequestException.java
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service.entities;
+package org.apache.inlong.audit.Exception;
 
-/**
- * OpenAPI type
- */
-public enum ApiType {
-    MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY;
+public class InvalidRequestException extends Exception {
+
+    public InvalidRequestException(String message) {
+        super(message);
+    }
 }
diff --git 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
index cdb2a05fda..cefd789aaa 100644
--- 
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
+++ 
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
@@ -35,6 +35,8 @@ public class OpenApiConstants {
     public static final String DEFAULT_API_GET_IDS_PATH = 
"/audit/query/getIds";
     public static final String KEY_API_GET_AUDIT_PROXY_PATH = 
"api.get.audit.proxy";
     public static final String DEFAULT_API_GET_AUDIT_PROXY_PATH = 
"/audit/query/getAuditProxy";
+    public static final String KEY_API_RECONCILIATION_PATH = 
"api.reconciliation.path";
+    public static final String DEFAULT_API_RECONCILIATION_PATH = 
"/audit/query/reconciliation";
     public static final String KEY_API_THREAD_POOL_SIZE = 
"api.thread.pool.size";
     public static final int DEFAULT_API_THREAD_POOL_SIZE = 10;
     public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
@@ -60,7 +62,7 @@ public class OpenApiConstants {
     public static final String PARAMS_AUDIT_CYCLE = "auditCycle";
     public static final String KEY_HTTP_BODY_SUCCESS = "success";
     public static final String KEY_HTTP_BODY_ERR_MSG = "errMsg";
-    public static final String KEY_HTTP_BODY_ERR_DATA = "data";
+    public static final String KEY_HTTP_BODY_DATA = "data";
     public static final String KEY_HTTP_HEADER_CONTENT_TYPE = "Content-Type";
     public static final String VALUE_HTTP_HEADER_CONTENT_TYPE = 
"application/json;charset=utf-8";
     public static final String KEY_HTTP_SERVER_BIND_PORT = 
"api.http.server.bind.port";
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/Audit.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/Audit.java
new file mode 100644
index 0000000000..8d1015a448
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/Audit.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.auditor;
+
+import org.apache.inlong.audit.Exception.InvalidRequestException;
+import org.apache.inlong.audit.service.cache.AbstractCache;
+import org.apache.inlong.audit.service.cache.HalfHourCache;
+import org.apache.inlong.audit.service.cache.HourCache;
+import org.apache.inlong.audit.service.cache.RealTimeQuery;
+import org.apache.inlong.audit.service.cache.TenMinutesCache;
+import org.apache.inlong.audit.service.entities.AuditCycle;
+import org.apache.inlong.audit.service.entities.StatData;
+import org.apache.inlong.audit.service.metric.MetricsManager;
+import org.apache.inlong.audit.service.utils.AuditUtils;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_DATA;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
+import static org.apache.inlong.audit.service.entities.AuditCycle.DAY;
+import static org.apache.inlong.audit.service.entities.AuditCycle.HOUR;
+import static org.apache.inlong.audit.service.entities.AuditCycle.MINUTE_10;
+import static org.apache.inlong.audit.service.entities.AuditCycle.MINUTE_30;
+
+public class Audit {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(Audit.class);
+    private static final Gson GSON = new Gson();
+    private static volatile Audit instance = null;
+
+    private Audit() {
+    }
+
+    public static Audit getInstance() {
+        if (instance == null) {
+            synchronized (Audit.class) {
+                if (instance == null) {
+                    instance = new Audit();
+                }
+            }
+        }
+        return instance;
+    }
+
+    public JsonObject getData(String requestInfo) {
+        try {
+            RequestInfo request = GSON.fromJson(requestInfo, 
RequestInfo.class);
+            AuditCycle auditCycle = 
AuditUtils.getAuditCycleTime(request.getStartTime(), request.getEndTime());
+            validateRequest(request, auditCycle);
+
+            ReconciliationData data = getAuditData(request, auditCycle);
+            return createResponseJson(true, data, null);
+
+        } catch (InvalidRequestException e) {
+            LOGGER.error("Invalid request parameters: {}", e.getMessage());
+            return createResponseJson(false, null, e.getMessage());
+        } catch (Exception e) {
+            LOGGER.error("Failed to process reconciliation request", e);
+            return createResponseJson(false, null,
+                    "Internal server error: " + e.getMessage());
+        }
+    }
+
+    private ReconciliationData getAuditData(RequestInfo request, AuditCycle 
auditCycle) {
+        // First get the data from the cache
+        ReconciliationData data = getDataFromCache(request, auditCycle);
+        if (data != null && data.isNotEmpty() && data.getDiffRatio() <= 
request.getDiffRatio()) {
+            return data;
+        }
+
+        long statTimeMillis = System.currentTimeMillis();
+
+        // Second, query the data from the data storage (without deduplication)
+        data = getDataFromStorage(request, false);
+        if (data.getDiffRatio() <= request.getDiffRatio()) {
+            return data;
+        }
+
+        // Finally, query the data from the data storage (to deduplicate the 
data)
+        data = getDataFromStorage(request, true);
+        MetricsManager.getInstance().addApiMetricNoCache(auditCycle,
+                System.currentTimeMillis() - statTimeMillis);
+        LOGGER.info("Get audit data from data storage by distinct. Request 
info: {}", request);
+        return data;
+    }
+
+    private void validateRequest(RequestInfo request, AuditCycle auditCycle) 
throws InvalidRequestException {
+        if (!areIdsValid(request) || !isAuditCycleValid(auditCycle)) {
+            throw new InvalidRequestException("Invalid parameters: " + 
request);
+        }
+        setDefaultAuditTagIfBlank(request);
+    }
+
+    private void setDefaultAuditTagIfBlank(RequestInfo request) {
+        if (StringUtils.isBlank(request.getSrcAuditTag())) {
+            request.setSrcAuditTag(DEFAULT_AUDIT_TAG);
+        }
+        if (StringUtils.isBlank(request.getDestAuditTag())) {
+            request.setDestAuditTag(DEFAULT_AUDIT_TAG);
+        }
+    }
+
+    private boolean areIdsValid(RequestInfo request) {
+        return Objects.nonNull(request.getInlongGroupId()) &&
+                Objects.nonNull(request.getInlongStreamId()) &&
+                Objects.nonNull(request.getSrcAuditId()) &&
+                Objects.nonNull(request.getDestAuditId());
+    }
+
+    private boolean isAuditCycleValid(AuditCycle auditCycle) {
+        return Arrays.asList(
+                MINUTE_10,
+                MINUTE_30,
+                HOUR,
+                DAY).contains(auditCycle);
+    }
+
+    private JsonObject createResponseJson(boolean isSuccess, 
ReconciliationData auditData, String errorMessage) {
+        JsonObject jsonObject = new JsonObject();
+        jsonObject.addProperty(KEY_HTTP_BODY_SUCCESS, isSuccess);
+        jsonObject.addProperty(KEY_HTTP_BODY_ERR_MSG, errorMessage);
+        jsonObject.add(KEY_HTTP_BODY_DATA,
+                GSON.toJsonTree(auditData != null ? 
auditData.getCombinedData() : new LinkedList<>()));
+        return jsonObject;
+    }
+
+    private ReconciliationData getDataFromCache(RequestInfo request, 
AuditCycle auditCycle) {
+        AbstractCache auditCache = getAuditCache(auditCycle);
+        if (auditCache == null) {
+            return null;
+        }
+        List<StatData> srcData = auditCache.getData(request.getStartTime(), 
request.getEndTime(),
+                request.getInlongGroupId(), request.getInlongStreamId(), 
request.getSrcAuditId(),
+                request.getSrcAuditTag(), false);
+        List<StatData> destData = auditCache.getData(request.getStartTime(), 
request.getEndTime(),
+                request.getInlongGroupId(), request.getInlongStreamId(), 
request.getDestAuditId(),
+                request.getDestAuditTag(), false);
+        return new ReconciliationData(AuditUtils.mergeStatDataList(srcData), 
AuditUtils.mergeStatDataList(destData));
+    }
+
+    private ReconciliationData getDataFromStorage(RequestInfo request, boolean 
needDistinct) {
+        List<StatData> srcData = 
RealTimeQuery.getInstance().queryAuditData(request.getStartTime(),
+                request.getEndTime(), request.getInlongGroupId(),
+                request.getInlongStreamId(), request.getSrcAuditId(), 
request.getSrcAuditTag(), needDistinct);
+        List<StatData> destData = 
RealTimeQuery.getInstance().queryAuditData(request.getStartTime(),
+                request.getEndTime(), request.getInlongGroupId(),
+                request.getInlongStreamId(), request.getDestAuditId(), 
request.getDestAuditTag(), needDistinct);
+        return new ReconciliationData(AuditUtils.mergeStatDataList(srcData), 
AuditUtils.mergeStatDataList(destData));
+    }
+
+    private AbstractCache getAuditCache(AuditCycle auditCycle) {
+        switch (auditCycle) {
+            case MINUTE_10:
+                return TenMinutesCache.getInstance();
+            case MINUTE_30:
+                return HalfHourCache.getInstance();
+            case HOUR:
+            case DAY:
+                return HourCache.getInstance();
+            default:
+                return null;
+        }
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/ReconciliationData.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/ReconciliationData.java
new file mode 100644
index 0000000000..03e4394be6
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/ReconciliationData.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.auditor;
+
+import org.apache.inlong.audit.service.entities.StatData;
+import org.apache.inlong.audit.service.utils.AuditUtils;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class ReconciliationData {
+
+    public StatData srcData;
+    public StatData destData;
+
+    public double getDiffRatio() {
+        if (srcData == null && destData == null) {
+            return 0;
+        }
+        if (srcData == null || destData == null) {
+            return 1;
+        }
+        return AuditUtils.calculateDiffRatio(srcData.getCount(), 
destData.getCount());
+    }
+
+    public List<StatData> getCombinedData() {
+        List<StatData> result = new ArrayList<>();
+        if (srcData != null) {
+            result.add(srcData);
+        }
+        if (destData != null) {
+            result.add(destData);
+        }
+        return result;
+    }
+
+    public boolean isNotEmpty() {
+        return srcData != null || destData != null;
+    }
+}
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/RequestInfo.java
similarity index 71%
copy from 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
copy to 
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/RequestInfo.java
index 1703fdaf4f..4e6161bf49 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/RequestInfo.java
@@ -15,11 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.audit.service.entities;
+package org.apache.inlong.audit.service.auditor;
 
-/**
- * OpenAPI type
- */
-public enum ApiType {
-    MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY;
+import lombok.Data;
+
+@Data
+public class RequestInfo {
+
+    String startTime;
+    String endTime;
+    String inlongGroupId;
+    String inlongStreamId;
+    String srcAuditId;
+    String srcAuditTag;
+    String destAuditId;
+    String destAuditTag;
+    double diffRatio;
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/AbstractCache.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/AbstractCache.java
index 8454884db9..f3155522c7 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/AbstractCache.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/AbstractCache.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Executors;
@@ -86,6 +87,12 @@ public class AbstractCache {
         return cache;
     }
 
+    public List<StatData> getData(String startTime, String endTime, String 
inlongGroupId,
+            String inlongStreamId, String auditId, String auditTag) {
+        return getData(startTime, endTime, inlongGroupId, inlongStreamId, 
auditId, auditTag, true);
+
+    }
+
     /**
      * @param startTime
      * @param endTime
@@ -96,31 +103,23 @@ public class AbstractCache {
      * @return
      */
     public List<StatData> getData(String startTime, String endTime, String 
inlongGroupId,
-            String inlongStreamId, String auditId, String auditTag) {
-        List<StatData> result = new LinkedList<>();
+            String inlongStreamId, String auditId, String auditTag, boolean 
needRetry) {
+        List<StatData> result = new ArrayList<>();
         List<CacheKeyEntity> keyList = buildCacheKeyList(startTime, endTime, 
inlongGroupId,
                 inlongStreamId, auditId, auditTag);
-        for (CacheKeyEntity cacheKey : keyList) {
-            StatData statData = cache.getIfPresent(cacheKey.getCacheKey());
-            if (null == statData) {
-                // Compatible with scenarios where the auditTag openapi 
parameter can be empty.
-                statData = cache.getIfPresent(cacheKey.getCacheKey() + 
DEFAULT_AUDIT_TAG);
-            }
-            if (null != statData) {
-                result.add(statData);
-            } else {
-                long currentTimeMillis = System.currentTimeMillis();
 
+        for (CacheKeyEntity cacheKey : keyList) {
+            StatData statData = fetchStatDataFromCache(cacheKey);
+            if (statData == null && needRetry) {
+                long statTimeMillis = System.currentTimeMillis();
                 statData = fetchDataFromAuditStorage(cacheKey.getStartTime(), 
cacheKey.getEndTime(), inlongGroupId,
-                        inlongStreamId,
-                        auditId, auditTag);
-                result.add(statData);
-
+                        inlongStreamId, auditId, auditTag);
                 MetricsManager.getInstance().addApiMetricNoCache(auditCycle,
-                        System.currentTimeMillis() - currentTimeMillis);
-
+                        System.currentTimeMillis() - statTimeMillis);
+            }
+            if (statData != null) {
+                result.add(statData);
             }
-
         }
         return result;
     }
@@ -195,4 +194,12 @@ public class AbstractCache {
         statData.setDelay(totalDelay);
         return statData;
     }
+    private StatData fetchStatDataFromCache(CacheKeyEntity cacheKey) {
+        StatData statData = cache.getIfPresent(cacheKey.getCacheKey());
+        if (statData == null) {
+            // Compatible with scenarios where the auditTag openapi parameter 
can be empty.
+            statData = cache.getIfPresent(cacheKey.getCacheKey() + 
DEFAULT_AUDIT_TAG);
+        }
+        return statData;
+    }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
index b167eb652a..923c98b5cf 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
@@ -22,6 +22,7 @@ import org.apache.inlong.audit.service.config.Configuration;
 import org.apache.inlong.audit.service.entities.JdbcConfig;
 import org.apache.inlong.audit.service.entities.StatData;
 import org.apache.inlong.audit.service.node.ConfigService;
+import org.apache.inlong.audit.service.utils.AuditUtils;
 import org.apache.inlong.audit.service.utils.CacheUtils;
 
 import org.apache.commons.dbcp.BasicDataSource;
@@ -46,9 +47,13 @@ import java.util.concurrent.Executors;
 
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
+import static 
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_RECONCILIATION_DISTINCT_SQL;
+import static 
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_RECONCILIATION_SQL;
 import static 
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL;
 import static 
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_SOURCE_QUERY_IPS_SQL;
 import static 
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_SOURCE_QUERY_MINUTE_SQL;
+import static 
org.apache.inlong.audit.service.config.SqlConstants.KEY_RECONCILIATION_DISTINCT_SQL;
+import static 
org.apache.inlong.audit.service.config.SqlConstants.KEY_RECONCILIATION_SQL;
 import static 
org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_IDS_SQL;
 import static 
org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_IPS_SQL;
 import static 
org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_MINUTE_SQL;
@@ -357,4 +362,69 @@ public class RealTimeQuery {
         }
         return result;
     }
+
+    public List<StatData> queryAuditData(String startTime, String endTime,
+            String inlongGroupId, String inlongStreamId, String auditId,
+            String auditTag, boolean distinct) {
+        long currentTime = System.currentTimeMillis();
+        List<StatData> statDataList = new CopyOnWriteArrayList<>();
+        if (dataSourceList.isEmpty()) {
+            return null;
+        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (DataSource dataSource : dataSourceList) {
+            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+                StatData statDataListTemp =
+                        doQueryAuditData(dataSource, startTime, endTime, 
inlongGroupId, inlongStreamId, auditId,
+                                auditTag, distinct);
+                if (statDataListTemp != null) {
+                    statDataList.add(statDataListTemp);
+                }
+            }, executor);
+            futures.add(future);
+        }
+        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).join();
+        LOGGER.info("Query audit data by params: {} {} {} {} {}, total cost {} 
ms", startTime, endTime, inlongGroupId,
+                inlongStreamId, auditId, System.currentTimeMillis() - 
currentTime);
+        return statDataList;
+
+    }
+
+    public StatData doQueryAuditData(DataSource dataSource, String startTime, 
String endTime,
+            String inlongGroupId, String inlongStreamId, String auditId,
+            String auditTag, boolean distinct) {
+        List<StatData> result = new LinkedList<>();
+        String querySQL = distinct
+                ? 
Configuration.getInstance().get(KEY_RECONCILIATION_DISTINCT_SQL, 
DEFAULT_RECONCILIATION_DISTINCT_SQL)
+                : Configuration.getInstance().get(KEY_RECONCILIATION_SQL, 
DEFAULT_RECONCILIATION_SQL);
+
+        try (Connection connection = dataSource.getConnection();
+                PreparedStatement pstat = 
connection.prepareStatement(querySQL)) {
+
+            pstat.setString(1, startTime);
+            pstat.setString(2, endTime);
+            pstat.setString(3, auditId);
+            pstat.setString(4, inlongGroupId);
+            pstat.setString(5, inlongStreamId);
+            pstat.setString(6, auditTag);
+            try (ResultSet resultSet = pstat.executeQuery()) {
+                while (resultSet.next()) {
+                    StatData data = new StatData();
+                    data.setAuditVersion(resultSet.getLong(1));
+                    data.setCount(resultSet.getLong(2));
+                    data.setLogTs(startTime);
+                    data.setInlongGroupId(inlongGroupId);
+                    data.setInlongStreamId(inlongStreamId);
+                    data.setAuditId(auditId);
+                    data.setAuditTag(auditTag);
+                    result.add(data);
+                }
+            } catch (SQLException sqlException) {
+                LOGGER.error("Query ips has SQL exception!, datasource={} ", 
dataSource, sqlException);
+            }
+        } catch (Exception exception) {
+            LOGGER.error("Query audit data has exception! ", exception);
+        }
+        return AuditUtils.getMaxAuditVersionAuditData(result);
+    }
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
index 571e1bde8f..914f2cb88e 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
@@ -178,4 +178,77 @@ public class SqlConstants {
             "ALTER TABLE audit_data_day ADD PARTITION (PARTITION %s VALUES 
LESS THAN (TO_DAYS('%s')))";
     public static final String TABLE_AUDIT_DATA_DAY = "audit_data_day";
     public static final String TABLE_AUDIT_DATA_TEMP = "audit_data_temp";
+
+    public static final String KEY_RECONCILIATION_SQL = 
"audit.reconciliation.sql";
+    public static final String DEFAULT_RECONCILIATION_SQL = "select\n" +
+            "audit_version,\n" +
+            "sum(count) count\n" +
+            "from\n" +
+            "    audit_data\n" +
+            "where\n" +
+            "    log_ts >= ? \n" +
+            "    and log_ts < ? \n" +
+            "    and audit_id = ? \n" +
+            "    and inlong_group_id = ? \n" +
+            "    and inlong_stream_id = ? \n" +
+            "    and (\n" +
+            "        audit_tag = ? \n" +
+            "        or audit_tag = '' \n" +
+            "    )\n" +
+            "group by\n" +
+            "audit_version";
+    public static final String KEY_RECONCILIATION_DISTINCT_SQL = 
"audit.reconciliation.distinct.sql";
+    public static final String DEFAULT_RECONCILIATION_DISTINCT_SQL = 
"SELECT\n" +
+            "    audit_version,\n" +
+            "    sum(count) AS count\n" +
+            "FROM\n" +
+            "    (\n" +
+            "        SELECT\n" +
+            "            audit_version,\n" +
+            "            docker_id,\n" +
+            "            thread_id,\n" +
+            "            sdk_ts,\n" +
+            "            packet_id,\n" +
+            "            log_ts,\n" +
+            "            ip,\n" +
+            "            inlong_group_id,\n" +
+            "            inlong_stream_id,\n" +
+            "            audit_id,\n" +
+            "            CASE\n" +
+            "                WHEN audit_tag = '' THEN '-1'\n" +
+            "                ELSE audit_tag\n" +
+            "            END AS audit_tag,\n" +
+            "            count,\n" +
+            "            size,\n" +
+            "            delay\n" +
+            "        FROM\n" +
+            "            audit_data\n" +
+            "        where\n" +
+            "            log_ts >= ? \n" +
+            "            and log_ts < ? \n" +
+            "            and audit_id = ? \n" +
+            "            and inlong_group_id = ? \n" +
+            "            and inlong_stream_id = ? \n" +
+            "            and (\n" +
+            "                audit_tag = ? \n" +
+            "                or audit_tag = ''\n" +
+            "            )\n" +
+            "        GROUP BY\n" +
+            "            audit_version,\n" +
+            "            docker_id,\n" +
+            "            thread_id,\n" +
+            "            sdk_ts,\n" +
+            "            packet_id,\n" +
+            "            log_ts,\n" +
+            "            ip,\n" +
+            "            inlong_group_id,\n" +
+            "            inlong_stream_id,\n" +
+            "            audit_id,\n" +
+            "            audit_tag,\n" +
+            "            count,\n" +
+            "            size,\n" +
+            "            delay\n" +
+            "    ) t_distinct\n" +
+            "GROUP BY\n" +
+            "    audit_version";
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
index 1703fdaf4f..ed9763bbdf 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
@@ -21,5 +21,5 @@ package org.apache.inlong.audit.service.entities;
  * OpenAPI type
  */
 public enum ApiType {
-    MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY;
+    MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY, RECONCILIATION;
 }
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/node/ApiService.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/node/ApiService.java
index f4b61f2443..80dde3c4c4 100644
--- 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/node/ApiService.java
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/node/ApiService.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.audit.service.node;
 
 import org.apache.inlong.audit.entity.AuditProxy;
+import org.apache.inlong.audit.service.auditor.Audit;
 import org.apache.inlong.audit.service.cache.AuditProxyCache;
 import org.apache.inlong.audit.service.cache.DayCache;
 import org.apache.inlong.audit.service.cache.HalfHourCache;
@@ -39,6 +40,9 @@ import com.sun.net.httpserver.HttpServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
@@ -58,6 +62,7 @@ import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_GET_IP
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_HOUR_PATH;
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_MINUTES_PATH;
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_RECONCILIATION_PATH;
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT;
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.HTTP_RESPOND_CODE;
@@ -69,8 +74,9 @@ import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_GET_IPS_PA
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_HOUR_PATH;
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_MINUTES_PATH;
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_RECONCILIATION_PATH;
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
-import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
+import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_DATA;
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG;
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
 import static 
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE;
@@ -91,6 +97,7 @@ import static 
org.apache.inlong.audit.service.entities.ApiType.GET_IDS;
 import static org.apache.inlong.audit.service.entities.ApiType.GET_IPS;
 import static org.apache.inlong.audit.service.entities.ApiType.HOUR;
 import static org.apache.inlong.audit.service.entities.ApiType.MINUTES;
+import static org.apache.inlong.audit.service.entities.ApiType.RECONCILIATION;
 
 public class ApiService {
 
@@ -129,6 +136,14 @@ public class ApiService {
             server.createContext(
                     
Configuration.getInstance().get(KEY_API_GET_AUDIT_PROXY_PATH, 
DEFAULT_API_GET_AUDIT_PROXY_PATH),
                     new AuditHandler(GET_AUDIT_PROXY));
+            server.createContext(
+                    
Configuration.getInstance().get(KEY_API_GET_AUDIT_PROXY_PATH, 
DEFAULT_API_GET_AUDIT_PROXY_PATH),
+                    new AuditHandler(GET_AUDIT_PROXY));
+            server.createContext(
+                    
Configuration.getInstance().get(KEY_API_RECONCILIATION_PATH,
+                            DEFAULT_API_RECONCILIATION_PATH),
+                    new AuditHandler(RECONCILIATION));
+
             server.start();
             LOGGER.info("Init http server success. Bind port is: {}", 
bindPort);
         } catch (Exception e) {
@@ -152,8 +167,6 @@ public class ApiService {
 
         @Override
         public void handle(HttpExchange exchange) {
-            LOGGER.info("handle {}", exchange.getRequestURI().toString());
-
             long currentTimeMillis = System.currentTimeMillis();
 
             if (null != limiter) {
@@ -164,20 +177,8 @@ public class ApiService {
                 @Override
                 public void run() {
                     try (OutputStream os = exchange.getResponseBody()) {
-                        JsonObject responseJson = new JsonObject();
-                        Map<String, String> params = 
parseRequestURI(exchange.getRequestURI().getQuery());
-                        if (checkNecessaryParams(params)) {
-                            handleLegalParams(responseJson, params);
-                        } else {
-                            handleInvalidParams(responseJson, exchange);
-                        }
-
-                        byte[] bytes = 
responseJson.toString().getBytes(StandardCharsets.UTF_8);
-
-                        
exchange.getResponseHeaders().set(KEY_HTTP_HEADER_CONTENT_TYPE,
-                                VALUE_HTTP_HEADER_CONTENT_TYPE);
-                        exchange.sendResponseHeaders(HTTP_RESPOND_CODE, 
bytes.length);
-                        os.write(bytes);
+                        JsonObject responseJson = handleRequest(exchange);
+                        sendResponse(exchange, os, responseJson);
                     } catch (Exception e) {
                         LOGGER.error("Audit handler has exception!", e);
                     } finally {
@@ -189,6 +190,57 @@ public class ApiService {
             MetricsManager.getInstance().addApiMetric(apiType, 
System.currentTimeMillis() - currentTimeMillis);
         }
 
+        private JsonObject handleRequest(HttpExchange exchange) throws 
IOException {
+            String requestMethod = exchange.getRequestMethod();
+            switch (requestMethod.toUpperCase()) {
+                case "GET":
+                    return handleGetRequest(exchange);
+                case "POST":
+                    return handlePostRequest(exchange);
+                default:
+                    return buildErrorResponse("Unsupported request method: " + 
requestMethod);
+            }
+        }
+
+        private JsonObject handleGetRequest(HttpExchange exchange) {
+            JsonObject responseJson = new JsonObject();
+            Map<String, String> params = 
parseRequestURI(exchange.getRequestURI().getQuery());
+            if (checkNecessaryParams(params)) {
+                handleLegalParams(responseJson, params);
+            } else {
+                handleInvalidParams(responseJson, exchange);
+            }
+            return responseJson;
+        }
+
+        private JsonObject handlePostRequest(HttpExchange exchange) throws 
IOException {
+            StringBuilder requestInfo = new StringBuilder();
+            try (BufferedReader reader = new BufferedReader(
+                    new InputStreamReader(exchange.getRequestBody(), 
StandardCharsets.UTF_8))) {
+                String line;
+                while ((line = reader.readLine()) != null) {
+                    requestInfo.append(line);
+                }
+            }
+            return Audit.getInstance().getData(requestInfo.toString());
+        }
+
+        private void sendResponse(HttpExchange exchange, OutputStream os, 
JsonObject responseJson) throws IOException {
+            byte[] bytes = 
responseJson.toString().getBytes(StandardCharsets.UTF_8);
+            exchange.getResponseHeaders().set(KEY_HTTP_HEADER_CONTENT_TYPE, 
VALUE_HTTP_HEADER_CONTENT_TYPE);
+            exchange.sendResponseHeaders(HTTP_RESPOND_CODE, bytes.length);
+            os.write(bytes);
+        }
+
+        private JsonObject buildErrorResponse(String errorMessage) {
+            Gson gson = new Gson();
+            JsonObject response = new JsonObject();
+            response.addProperty(KEY_HTTP_BODY_SUCCESS, false);
+            response.addProperty(KEY_HTTP_BODY_ERR_MSG, errorMessage);
+            response.add(KEY_HTTP_BODY_DATA, gson.toJsonTree(new 
LinkedList<>()));
+            return response;
+        }
+
         private Map<String, String> parseRequestURI(String query) {
             Map<String, String> params = new HashMap<>();
             if (query != null) {
@@ -239,7 +291,7 @@ public class ApiService {
             responseJson.addProperty(KEY_HTTP_BODY_SUCCESS, false);
             responseJson.addProperty(KEY_HTTP_BODY_ERR_MSG, "Invalid params! " 
+ exchange.getRequestURI());
             Gson gson = new Gson();
-            responseJson.add(KEY_HTTP_BODY_ERR_DATA, gson.toJsonTree(new 
LinkedList<>()));
+            responseJson.add(KEY_HTTP_BODY_DATA, gson.toJsonTree(new 
LinkedList<>()));
         }
 
         private void handleLegalParams(JsonObject responseJson, Map<String, 
String> params) {
@@ -250,7 +302,7 @@ public class ApiService {
             try {
                 switch (apiType) {
                     case MINUTES:
-                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(handleMinutesApi(params)));
+                        responseJson.add(KEY_HTTP_BODY_DATA, 
gson.toJsonTree(handleMinutesApi(params)));
                         break;
                     case HOUR:
                         statData = 
HourCache.getInstance().getData(params.get(PARAMS_START_TIME),
@@ -259,7 +311,7 @@ public class ApiService {
                                 params.get(PARAMS_INLONG_STREAM_ID),
                                 params.get(PARAMS_AUDIT_ID),
                                 params.get(PARAMS_AUDIT_TAG));
-                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(statData));
+                        responseJson.add(KEY_HTTP_BODY_DATA, 
gson.toJsonTree(statData));
                         break;
                     case DAY:
                         statData = DayCache.getInstance().getData(
@@ -268,7 +320,7 @@ public class ApiService {
                                 params.get(PARAMS_INLONG_GROUP_ID),
                                 params.get(PARAMS_INLONG_STREAM_ID),
                                 params.get(PARAMS_AUDIT_ID));
-                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(statData));
+                        responseJson.add(KEY_HTTP_BODY_DATA, 
gson.toJsonTree(statData));
                         break;
                     case GET_IDS:
                         statData = RealTimeQuery.getInstance().queryIdsByIp(
@@ -276,7 +328,7 @@ public class ApiService {
                                 params.get(PARAMS_END_TIME),
                                 params.get(PARAMS_IP),
                                 params.get(PARAMS_AUDIT_ID));
-                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(statData));
+                        responseJson.add(KEY_HTTP_BODY_DATA, 
gson.toJsonTree(statData));
                         break;
                     case GET_IPS:
                         statData = RealTimeQuery.getInstance().queryIpsById(
@@ -285,20 +337,20 @@ public class ApiService {
                                 params.get(PARAMS_INLONG_GROUP_ID),
                                 params.get(PARAMS_INLONG_STREAM_ID),
                                 params.get(PARAMS_AUDIT_ID));
-                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(statData));
+                        responseJson.add(KEY_HTTP_BODY_DATA, 
gson.toJsonTree(statData));
                         break;
                     case GET_AUDIT_PROXY:
                         List<AuditProxy> auditProxy =
                                 
AuditProxyCache.getInstance().getData(params.get(PARAMS_AUDIT_COMPONENT));
-                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(auditProxy));
+                        responseJson.add(KEY_HTTP_BODY_DATA, 
gson.toJsonTree(auditProxy));
                         break;
                     default:
                         LOGGER.error("Unsupported interface type! type is {}", 
apiType);
-                        responseJson.add(KEY_HTTP_BODY_ERR_DATA, 
gson.toJsonTree(new LinkedList<>()));
+                        responseJson.add(KEY_HTTP_BODY_DATA, 
gson.toJsonTree(new LinkedList<>()));
                 }
             } catch (Exception exception) {
                 LOGGER.error("Handle legal params has exception ", exception);
-                responseJson.add(KEY_HTTP_BODY_ERR_DATA, gson.toJsonTree(new 
LinkedList<>()));
+                responseJson.add(KEY_HTTP_BODY_DATA, gson.toJsonTree(new 
LinkedList<>()));
             }
         }
 
diff --git 
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
new file mode 100644
index 0000000000..34338336e4
--- /dev/null
+++ 
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.utils;
+
+import org.apache.inlong.audit.service.config.ConfigConstants;
+import org.apache.inlong.audit.service.entities.AuditCycle;
+import org.apache.inlong.audit.service.entities.StatData;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+
+public class AuditUtils {
+
+    private static final DateTimeFormatter dateTimeFormatter = 
DateTimeFormatter.ofPattern(ConfigConstants.DATE_FORMAT);
+
+    public static double calculateDiffRatio(long srcCount, long destCount) {
+        if (srcCount == 0 && destCount == 0) {
+            return 0;
+        } else if (srcCount == 0) {
+            return 1;
+        } else {
+            return Math.abs((double) (srcCount - destCount) / srcCount);
+        }
+    }
+
+    public static StatData getMaxAuditVersionAuditData(List<StatData> 
statDataListData) {
+        StatData maxAuditVersionStatData = null;
+        for (StatData statData : statDataListData) {
+            if (maxAuditVersionStatData == null
+                    || statData.getAuditVersion() > 
maxAuditVersionStatData.getAuditVersion()) {
+                maxAuditVersionStatData = statData;
+            }
+        }
+        return maxAuditVersionStatData;
+    }
+
+    public static AuditCycle getAuditCycleTime(String startTime, String 
endTime) {
+        LocalDateTime startDateTime = LocalDateTime.parse(startTime, 
dateTimeFormatter);
+        LocalDateTime endDateTime = LocalDateTime.parse(endTime, 
dateTimeFormatter);
+        return AuditCycle.fromInt((int) 
ChronoUnit.MINUTES.between(startDateTime, endDateTime));
+    }
+
+    public static StatData mergeStatDataList(List<StatData> statDataList) {
+        if (statDataList == null || statDataList.isEmpty()) {
+            return null;
+        }
+
+        // Assuming all other fields are the same, we take the first element 
as the base
+        StatData base = statDataList.get(0);
+
+        // Summing up the count
+        long totalCount = 0L;
+        for (StatData statData : statDataList) {
+            if (statData.getCount() != null) {
+                totalCount += statData.getCount();
+            }
+        }
+
+        // Creating a new StatData object with the summed count
+        StatData mergedStatData = new StatData();
+        mergedStatData.setAuditVersion(base.getAuditVersion());
+        mergedStatData.setLogTs(base.getLogTs());
+        mergedStatData.setInlongGroupId(base.getInlongGroupId());
+        mergedStatData.setInlongStreamId(base.getInlongStreamId());
+        mergedStatData.setAuditId(base.getAuditId());
+        mergedStatData.setAuditTag(base.getAuditTag());
+        mergedStatData.setCount(totalCount);
+        mergedStatData.setSize(base.getSize());
+        mergedStatData.setDelay(base.getDelay());
+        mergedStatData.setUpdateTime(base.getUpdateTime());
+        mergedStatData.setIp(base.getIp());
+        mergedStatData.setSourceName(base.getSourceName());
+
+        return mergedStatData;
+    }
+}


Reply via email to