This is an automated email from the ASF dual-hosted git repository.
doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 303d0c64cc [INLONG-11620][Audit] Provide an open API for module
reconciliation (#11640)
303d0c64cc is described below
commit 303d0c64ccd1a7288e7df621dcc44495dfa92f47
Author: doleyzi <[email protected]>
AuthorDate: Sun Jan 5 10:51:28 2025 +0800
[INLONG-11620][Audit] Provide an open API for module reconciliation (#11640)
Co-authored-by: doleyzi <[email protected]>
---
.../audit/Exception/InvalidRequestException.java} | 12 +-
.../inlong/audit/consts/OpenApiConstants.java | 4 +-
.../apache/inlong/audit/service/auditor/Audit.java | 190 +++++++++++++++++++++
.../audit/service/auditor/ReconciliationData.java | 60 +++++++
.../ApiType.java => auditor/RequestInfo.java} | 21 ++-
.../inlong/audit/service/cache/AbstractCache.java | 45 ++---
.../inlong/audit/service/cache/RealTimeQuery.java | 70 ++++++++
.../inlong/audit/service/config/SqlConstants.java | 73 ++++++++
.../inlong/audit/service/entities/ApiType.java | 2 +-
.../inlong/audit/service/node/ApiService.java | 104 ++++++++---
.../inlong/audit/service/utils/AuditUtils.java | 93 ++++++++++
11 files changed, 615 insertions(+), 59 deletions(-)
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/Exception/InvalidRequestException.java
similarity index 81%
copy from
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
copy to
inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/Exception/InvalidRequestException.java
index 1703fdaf4f..5f9703d5e2 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
+++
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/Exception/InvalidRequestException.java
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service.entities;
+package org.apache.inlong.audit.Exception;
-/**
- * OpenAPI type
- */
-public enum ApiType {
- MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY;
+public class InvalidRequestException extends Exception {
+
+ public InvalidRequestException(String message) {
+ super(message);
+ }
}
diff --git
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
index cdb2a05fda..cefd789aaa 100644
---
a/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
+++
b/inlong-audit/audit-common/src/main/java/org/apache/inlong/audit/consts/OpenApiConstants.java
@@ -35,6 +35,8 @@ public class OpenApiConstants {
public static final String DEFAULT_API_GET_IDS_PATH =
"/audit/query/getIds";
public static final String KEY_API_GET_AUDIT_PROXY_PATH =
"api.get.audit.proxy";
public static final String DEFAULT_API_GET_AUDIT_PROXY_PATH =
"/audit/query/getAuditProxy";
+ public static final String KEY_API_RECONCILIATION_PATH =
"api.reconciliation.path";
+ public static final String DEFAULT_API_RECONCILIATION_PATH =
"/audit/query/reconciliation";
public static final String KEY_API_THREAD_POOL_SIZE =
"api.thread.pool.size";
public static final int DEFAULT_API_THREAD_POOL_SIZE = 10;
public static final String KEY_API_BACKLOG_SIZE = "api.backlog.size";
@@ -60,7 +62,7 @@ public class OpenApiConstants {
public static final String PARAMS_AUDIT_CYCLE = "auditCycle";
public static final String KEY_HTTP_BODY_SUCCESS = "success";
public static final String KEY_HTTP_BODY_ERR_MSG = "errMsg";
- public static final String KEY_HTTP_BODY_ERR_DATA = "data";
+ public static final String KEY_HTTP_BODY_DATA = "data";
public static final String KEY_HTTP_HEADER_CONTENT_TYPE = "Content-Type";
public static final String VALUE_HTTP_HEADER_CONTENT_TYPE =
"application/json;charset=utf-8";
public static final String KEY_HTTP_SERVER_BIND_PORT =
"api.http.server.bind.port";
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/Audit.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/Audit.java
new file mode 100644
index 0000000000..8d1015a448
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/Audit.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.auditor;
+
+import org.apache.inlong.audit.Exception.InvalidRequestException;
+import org.apache.inlong.audit.service.cache.AbstractCache;
+import org.apache.inlong.audit.service.cache.HalfHourCache;
+import org.apache.inlong.audit.service.cache.HourCache;
+import org.apache.inlong.audit.service.cache.RealTimeQuery;
+import org.apache.inlong.audit.service.cache.TenMinutesCache;
+import org.apache.inlong.audit.service.entities.AuditCycle;
+import org.apache.inlong.audit.service.entities.StatData;
+import org.apache.inlong.audit.service.metric.MetricsManager;
+import org.apache.inlong.audit.service.utils.AuditUtils;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_DATA;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
+import static org.apache.inlong.audit.service.entities.AuditCycle.DAY;
+import static org.apache.inlong.audit.service.entities.AuditCycle.HOUR;
+import static org.apache.inlong.audit.service.entities.AuditCycle.MINUTE_10;
+import static org.apache.inlong.audit.service.entities.AuditCycle.MINUTE_30;
+
+public class Audit {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Audit.class);
+ private static final Gson GSON = new Gson();
+ private static volatile Audit instance = null;
+
+ private Audit() {
+ }
+
+ public static Audit getInstance() {
+ if (instance == null) {
+ synchronized (Audit.class) {
+ if (instance == null) {
+ instance = new Audit();
+ }
+ }
+ }
+ return instance;
+ }
+
+ public JsonObject getData(String requestInfo) {
+ try {
+ RequestInfo request = GSON.fromJson(requestInfo,
RequestInfo.class);
+ AuditCycle auditCycle =
AuditUtils.getAuditCycleTime(request.getStartTime(), request.getEndTime());
+ validateRequest(request, auditCycle);
+
+ ReconciliationData data = getAuditData(request, auditCycle);
+ return createResponseJson(true, data, null);
+
+ } catch (InvalidRequestException e) {
+ LOGGER.error("Invalid request parameters: {}", e.getMessage());
+ return createResponseJson(false, null, e.getMessage());
+ } catch (Exception e) {
+ LOGGER.error("Failed to process reconciliation request", e);
+ return createResponseJson(false, null,
+ "Internal server error: " + e.getMessage());
+ }
+ }
+
+ private ReconciliationData getAuditData(RequestInfo request, AuditCycle
auditCycle) {
+ // First get the data from the cache
+ ReconciliationData data = getDataFromCache(request, auditCycle);
+ if (data != null && data.isNotEmpty() && data.getDiffRatio() <=
request.getDiffRatio()) {
+ return data;
+ }
+
+ long statTimeMillis = System.currentTimeMillis();
+
+ // Second, query the data from the data storage (without deduplication)
+ data = getDataFromStorage(request, false);
+ if (data.getDiffRatio() <= request.getDiffRatio()) {
+ return data;
+ }
+
+ // Finally, query the data from the data storage (to deduplicate the
data)
+ data = getDataFromStorage(request, true);
+ MetricsManager.getInstance().addApiMetricNoCache(auditCycle,
+ System.currentTimeMillis() - statTimeMillis);
+ LOGGER.info("Get audit data from data storage by distinct. Request
info: {}", request);
+ return data;
+ }
+
+ private void validateRequest(RequestInfo request, AuditCycle auditCycle)
throws InvalidRequestException {
+ if (!areIdsValid(request) || !isAuditCycleValid(auditCycle)) {
+ throw new InvalidRequestException("Invalid parameters: " +
request);
+ }
+ setDefaultAuditTagIfBlank(request);
+ }
+
+ private void setDefaultAuditTagIfBlank(RequestInfo request) {
+ if (StringUtils.isBlank(request.getSrcAuditTag())) {
+ request.setSrcAuditTag(DEFAULT_AUDIT_TAG);
+ }
+ if (StringUtils.isBlank(request.getDestAuditTag())) {
+ request.setDestAuditTag(DEFAULT_AUDIT_TAG);
+ }
+ }
+
+ private boolean areIdsValid(RequestInfo request) {
+ return Objects.nonNull(request.getInlongGroupId()) &&
+ Objects.nonNull(request.getInlongStreamId()) &&
+ Objects.nonNull(request.getSrcAuditId()) &&
+ Objects.nonNull(request.getDestAuditId());
+ }
+
+ private boolean isAuditCycleValid(AuditCycle auditCycle) {
+ return Arrays.asList(
+ MINUTE_10,
+ MINUTE_30,
+ HOUR,
+ DAY).contains(auditCycle);
+ }
+
+ private JsonObject createResponseJson(boolean isSuccess,
ReconciliationData auditData, String errorMessage) {
+ JsonObject jsonObject = new JsonObject();
+ jsonObject.addProperty(KEY_HTTP_BODY_SUCCESS, isSuccess);
+ jsonObject.addProperty(KEY_HTTP_BODY_ERR_MSG, errorMessage);
+ jsonObject.add(KEY_HTTP_BODY_DATA,
+ GSON.toJsonTree(auditData != null ?
auditData.getCombinedData() : new LinkedList<>()));
+ return jsonObject;
+ }
+
+ private ReconciliationData getDataFromCache(RequestInfo request,
AuditCycle auditCycle) {
+ AbstractCache auditCache = getAuditCache(auditCycle);
+ if (auditCache == null) {
+ return null;
+ }
+ List<StatData> srcData = auditCache.getData(request.getStartTime(),
request.getEndTime(),
+ request.getInlongGroupId(), request.getInlongStreamId(),
request.getSrcAuditId(),
+ request.getSrcAuditTag(), false);
+ List<StatData> destData = auditCache.getData(request.getStartTime(),
request.getEndTime(),
+ request.getInlongGroupId(), request.getInlongStreamId(),
request.getDestAuditId(),
+ request.getDestAuditTag(), false);
+ return new ReconciliationData(AuditUtils.mergeStatDataList(srcData),
AuditUtils.mergeStatDataList(destData));
+ }
+
+ private ReconciliationData getDataFromStorage(RequestInfo request, boolean
needDistinct) {
+ List<StatData> srcData =
RealTimeQuery.getInstance().queryAuditData(request.getStartTime(),
+ request.getEndTime(), request.getInlongGroupId(),
+ request.getInlongStreamId(), request.getSrcAuditId(),
request.getSrcAuditTag(), needDistinct);
+ List<StatData> destData =
RealTimeQuery.getInstance().queryAuditData(request.getStartTime(),
+ request.getEndTime(), request.getInlongGroupId(),
+ request.getInlongStreamId(), request.getDestAuditId(),
request.getDestAuditTag(), needDistinct);
+ return new ReconciliationData(AuditUtils.mergeStatDataList(srcData),
AuditUtils.mergeStatDataList(destData));
+ }
+
+ private AbstractCache getAuditCache(AuditCycle auditCycle) {
+ switch (auditCycle) {
+ case MINUTE_10:
+ return TenMinutesCache.getInstance();
+ case MINUTE_30:
+ return HalfHourCache.getInstance();
+ case HOUR:
+ case DAY:
+ return HourCache.getInstance();
+ default:
+ return null;
+ }
+ }
+}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/ReconciliationData.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/ReconciliationData.java
new file mode 100644
index 0000000000..03e4394be6
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/ReconciliationData.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.auditor;
+
+import org.apache.inlong.audit.service.entities.StatData;
+import org.apache.inlong.audit.service.utils.AuditUtils;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Data
+@AllArgsConstructor
+public class ReconciliationData {
+
+ public StatData srcData;
+ public StatData destData;
+
+ public double getDiffRatio() {
+ if (srcData == null && destData == null) {
+ return 0;
+ }
+ if (srcData == null || destData == null) {
+ return 1;
+ }
+ return AuditUtils.calculateDiffRatio(srcData.getCount(),
destData.getCount());
+ }
+
+ public List<StatData> getCombinedData() {
+ List<StatData> result = new ArrayList<>();
+ if (srcData != null) {
+ result.add(srcData);
+ }
+ if (destData != null) {
+ result.add(destData);
+ }
+ return result;
+ }
+
+ public boolean isNotEmpty() {
+ return srcData != null || destData != null;
+ }
+}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/RequestInfo.java
similarity index 71%
copy from
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
copy to
inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/RequestInfo.java
index 1703fdaf4f..4e6161bf49 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/auditor/RequestInfo.java
@@ -15,11 +15,20 @@
* limitations under the License.
*/
-package org.apache.inlong.audit.service.entities;
+package org.apache.inlong.audit.service.auditor;
-/**
- * OpenAPI type
- */
-public enum ApiType {
- MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY;
+import lombok.Data;
+
+@Data
+public class RequestInfo {
+
+ String startTime;
+ String endTime;
+ String inlongGroupId;
+ String inlongStreamId;
+ String srcAuditId;
+ String srcAuditTag;
+ String destAuditId;
+ String destAuditTag;
+ double diffRatio;
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/AbstractCache.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/AbstractCache.java
index 8454884db9..f3155522c7 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/AbstractCache.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/AbstractCache.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Executors;
@@ -86,6 +87,12 @@ public class AbstractCache {
return cache;
}
+ public List<StatData> getData(String startTime, String endTime, String
inlongGroupId,
+ String inlongStreamId, String auditId, String auditTag) {
+ return getData(startTime, endTime, inlongGroupId, inlongStreamId,
auditId, auditTag, true);
+
+ }
+
/**
* @param startTime
* @param endTime
@@ -96,31 +103,23 @@ public class AbstractCache {
* @return
*/
public List<StatData> getData(String startTime, String endTime, String
inlongGroupId,
- String inlongStreamId, String auditId, String auditTag) {
- List<StatData> result = new LinkedList<>();
+ String inlongStreamId, String auditId, String auditTag, boolean
needRetry) {
+ List<StatData> result = new ArrayList<>();
List<CacheKeyEntity> keyList = buildCacheKeyList(startTime, endTime,
inlongGroupId,
inlongStreamId, auditId, auditTag);
- for (CacheKeyEntity cacheKey : keyList) {
- StatData statData = cache.getIfPresent(cacheKey.getCacheKey());
- if (null == statData) {
- // Compatible with scenarios where the auditTag openapi
parameter can be empty.
- statData = cache.getIfPresent(cacheKey.getCacheKey() +
DEFAULT_AUDIT_TAG);
- }
- if (null != statData) {
- result.add(statData);
- } else {
- long currentTimeMillis = System.currentTimeMillis();
+ for (CacheKeyEntity cacheKey : keyList) {
+ StatData statData = fetchStatDataFromCache(cacheKey);
+ if (statData == null && needRetry) {
+ long statTimeMillis = System.currentTimeMillis();
statData = fetchDataFromAuditStorage(cacheKey.getStartTime(),
cacheKey.getEndTime(), inlongGroupId,
- inlongStreamId,
- auditId, auditTag);
- result.add(statData);
-
+ inlongStreamId, auditId, auditTag);
MetricsManager.getInstance().addApiMetricNoCache(auditCycle,
- System.currentTimeMillis() - currentTimeMillis);
-
+ System.currentTimeMillis() - statTimeMillis);
+ }
+ if (statData != null) {
+ result.add(statData);
}
-
}
return result;
}
@@ -195,4 +194,12 @@ public class AbstractCache {
statData.setDelay(totalDelay);
return statData;
}
+ private StatData fetchStatDataFromCache(CacheKeyEntity cacheKey) {
+ StatData statData = cache.getIfPresent(cacheKey.getCacheKey());
+ if (statData == null) {
+ // Compatible with scenarios where the auditTag openapi parameter
can be empty.
+ statData = cache.getIfPresent(cacheKey.getCacheKey() +
DEFAULT_AUDIT_TAG);
+ }
+ return statData;
+ }
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
index b167eb652a..923c98b5cf 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java
@@ -22,6 +22,7 @@ import org.apache.inlong.audit.service.config.Configuration;
import org.apache.inlong.audit.service.entities.JdbcConfig;
import org.apache.inlong.audit.service.entities.StatData;
import org.apache.inlong.audit.service.node.ConfigService;
+import org.apache.inlong.audit.service.utils.AuditUtils;
import org.apache.inlong.audit.service.utils.CacheUtils;
import org.apache.commons.dbcp.BasicDataSource;
@@ -46,9 +47,13 @@ import java.util.concurrent.Executors;
import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
+import static
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_RECONCILIATION_DISTINCT_SQL;
+import static
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_RECONCILIATION_SQL;
import static
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL;
import static
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_SOURCE_QUERY_IPS_SQL;
import static
org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_SOURCE_QUERY_MINUTE_SQL;
+import static
org.apache.inlong.audit.service.config.SqlConstants.KEY_RECONCILIATION_DISTINCT_SQL;
+import static
org.apache.inlong.audit.service.config.SqlConstants.KEY_RECONCILIATION_SQL;
import static
org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_IDS_SQL;
import static
org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_IPS_SQL;
import static
org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_MINUTE_SQL;
@@ -357,4 +362,69 @@ public class RealTimeQuery {
}
return result;
}
+
+ public List<StatData> queryAuditData(String startTime, String endTime,
+ String inlongGroupId, String inlongStreamId, String auditId,
+ String auditTag, boolean distinct) {
+ long currentTime = System.currentTimeMillis();
+ List<StatData> statDataList = new CopyOnWriteArrayList<>();
+ if (dataSourceList.isEmpty()) {
+ return null;
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (DataSource dataSource : dataSourceList) {
+ CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
+ StatData statDataListTemp =
+ doQueryAuditData(dataSource, startTime, endTime,
inlongGroupId, inlongStreamId, auditId,
+ auditTag, distinct);
+ if (statDataListTemp != null) {
+ statDataList.add(statDataListTemp);
+ }
+ }, executor);
+ futures.add(future);
+ }
+ CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).join();
+ LOGGER.info("Query audit data by params: {} {} {} {} {}, total cost {}
ms", startTime, endTime, inlongGroupId,
+ inlongStreamId, auditId, System.currentTimeMillis() -
currentTime);
+ return statDataList;
+
+ }
+
+ public StatData doQueryAuditData(DataSource dataSource, String startTime,
String endTime,
+ String inlongGroupId, String inlongStreamId, String auditId,
+ String auditTag, boolean distinct) {
+ List<StatData> result = new LinkedList<>();
+ String querySQL = distinct
+ ?
Configuration.getInstance().get(KEY_RECONCILIATION_DISTINCT_SQL,
DEFAULT_RECONCILIATION_DISTINCT_SQL)
+ : Configuration.getInstance().get(KEY_RECONCILIATION_SQL,
DEFAULT_RECONCILIATION_SQL);
+
+ try (Connection connection = dataSource.getConnection();
+ PreparedStatement pstat =
connection.prepareStatement(querySQL)) {
+
+ pstat.setString(1, startTime);
+ pstat.setString(2, endTime);
+ pstat.setString(3, auditId);
+ pstat.setString(4, inlongGroupId);
+ pstat.setString(5, inlongStreamId);
+ pstat.setString(6, auditTag);
+ try (ResultSet resultSet = pstat.executeQuery()) {
+ while (resultSet.next()) {
+ StatData data = new StatData();
+ data.setAuditVersion(resultSet.getLong(1));
+ data.setCount(resultSet.getLong(2));
+ data.setLogTs(startTime);
+ data.setInlongGroupId(inlongGroupId);
+ data.setInlongStreamId(inlongStreamId);
+ data.setAuditId(auditId);
+ data.setAuditTag(auditTag);
+ result.add(data);
+ }
+ } catch (SQLException sqlException) {
+ LOGGER.error("Query ips has SQL exception!, datasource={} ",
dataSource, sqlException);
+ }
+ } catch (Exception exception) {
+ LOGGER.error("Query audit data has exception! ", exception);
+ }
+ return AuditUtils.getMaxAuditVersionAuditData(result);
+ }
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
index 571e1bde8f..914f2cb88e 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/config/SqlConstants.java
@@ -178,4 +178,77 @@ public class SqlConstants {
"ALTER TABLE audit_data_day ADD PARTITION (PARTITION %s VALUES
LESS THAN (TO_DAYS('%s')))";
public static final String TABLE_AUDIT_DATA_DAY = "audit_data_day";
public static final String TABLE_AUDIT_DATA_TEMP = "audit_data_temp";
+
+ public static final String KEY_RECONCILIATION_SQL =
"audit.reconciliation.sql";
+ public static final String DEFAULT_RECONCILIATION_SQL = "select\n" +
+ "audit_version,\n" +
+ "sum(count) count\n" +
+ "from\n" +
+ " audit_data\n" +
+ "where\n" +
+ " log_ts >= ? \n" +
+ " and log_ts < ? \n" +
+ " and audit_id = ? \n" +
+ " and inlong_group_id = ? \n" +
+ " and inlong_stream_id = ? \n" +
+ " and (\n" +
+ " audit_tag = ? \n" +
+ " or audit_tag = '' \n" +
+ " )\n" +
+ "group by\n" +
+ "audit_version";
+ public static final String KEY_RECONCILIATION_DISTINCT_SQL =
"audit.reconciliation.distinct.sql";
+ public static final String DEFAULT_RECONCILIATION_DISTINCT_SQL =
"SELECT\n" +
+ " audit_version,\n" +
+ " sum(count) AS count\n" +
+ "FROM\n" +
+ " (\n" +
+ " SELECT\n" +
+ " audit_version,\n" +
+ " docker_id,\n" +
+ " thread_id,\n" +
+ " sdk_ts,\n" +
+ " packet_id,\n" +
+ " log_ts,\n" +
+ " ip,\n" +
+ " inlong_group_id,\n" +
+ " inlong_stream_id,\n" +
+ " audit_id,\n" +
+ " CASE\n" +
+ " WHEN audit_tag = '' THEN '-1'\n" +
+ " ELSE audit_tag\n" +
+ " END AS audit_tag,\n" +
+ " count,\n" +
+ " size,\n" +
+ " delay\n" +
+ " FROM\n" +
+ " audit_data\n" +
+ " where\n" +
+ " log_ts >= ? \n" +
+ " and log_ts < ? \n" +
+ " and audit_id = ? \n" +
+ " and inlong_group_id = ? \n" +
+ " and inlong_stream_id = ? \n" +
+ " and (\n" +
+ " audit_tag = ? \n" +
+ " or audit_tag = ''\n" +
+ " )\n" +
+ " GROUP BY\n" +
+ " audit_version,\n" +
+ " docker_id,\n" +
+ " thread_id,\n" +
+ " sdk_ts,\n" +
+ " packet_id,\n" +
+ " log_ts,\n" +
+ " ip,\n" +
+ " inlong_group_id,\n" +
+ " inlong_stream_id,\n" +
+ " audit_id,\n" +
+ " audit_tag,\n" +
+ " count,\n" +
+ " size,\n" +
+ " delay\n" +
+ " ) t_distinct\n" +
+ "GROUP BY\n" +
+ " audit_version";
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
index 1703fdaf4f..ed9763bbdf 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/entities/ApiType.java
@@ -21,5 +21,5 @@ package org.apache.inlong.audit.service.entities;
* OpenAPI type
*/
public enum ApiType {
- MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY;
+ MINUTES, HOUR, DAY, GET_IPS, GET_IDS, GET_AUDIT_PROXY, RECONCILIATION;
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/node/ApiService.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/node/ApiService.java
index f4b61f2443..80dde3c4c4 100644
---
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/node/ApiService.java
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/node/ApiService.java
@@ -18,6 +18,7 @@
package org.apache.inlong.audit.service.node;
import org.apache.inlong.audit.entity.AuditProxy;
+import org.apache.inlong.audit.service.auditor.Audit;
import org.apache.inlong.audit.service.cache.AuditProxyCache;
import org.apache.inlong.audit.service.cache.DayCache;
import org.apache.inlong.audit.service.cache.HalfHourCache;
@@ -39,6 +40,9 @@ import com.sun.net.httpserver.HttpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
@@ -58,6 +62,7 @@ import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_GET_IP
import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_HOUR_PATH;
import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_MINUTES_PATH;
import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_REAL_LIMITER_QPS;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_RECONCILIATION_PATH;
import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
import static
org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_HTTP_SERVER_BIND_PORT;
import static
org.apache.inlong.audit.consts.OpenApiConstants.HTTP_RESPOND_CODE;
@@ -69,8 +74,9 @@ import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_GET_IPS_PA
import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_HOUR_PATH;
import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_MINUTES_PATH;
import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_REAL_LIMITER_QPS;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_RECONCILIATION_PATH;
import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
-import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_ERR_DATA;
+import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_DATA;
import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_ERR_MSG;
import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_BODY_SUCCESS;
import static
org.apache.inlong.audit.consts.OpenApiConstants.KEY_HTTP_HEADER_CONTENT_TYPE;
@@ -91,6 +97,7 @@ import static
org.apache.inlong.audit.service.entities.ApiType.GET_IDS;
import static org.apache.inlong.audit.service.entities.ApiType.GET_IPS;
import static org.apache.inlong.audit.service.entities.ApiType.HOUR;
import static org.apache.inlong.audit.service.entities.ApiType.MINUTES;
+import static org.apache.inlong.audit.service.entities.ApiType.RECONCILIATION;
public class ApiService {
@@ -129,6 +136,14 @@ public class ApiService {
server.createContext(
Configuration.getInstance().get(KEY_API_GET_AUDIT_PROXY_PATH,
DEFAULT_API_GET_AUDIT_PROXY_PATH),
new AuditHandler(GET_AUDIT_PROXY));
+ server.createContext(
+
Configuration.getInstance().get(KEY_API_GET_AUDIT_PROXY_PATH,
DEFAULT_API_GET_AUDIT_PROXY_PATH),
+ new AuditHandler(GET_AUDIT_PROXY));
+ server.createContext(
+
Configuration.getInstance().get(KEY_API_RECONCILIATION_PATH,
+ DEFAULT_API_RECONCILIATION_PATH),
+ new AuditHandler(RECONCILIATION));
+
server.start();
LOGGER.info("Init http server success. Bind port is: {}",
bindPort);
} catch (Exception e) {
@@ -152,8 +167,6 @@ public class ApiService {
@Override
public void handle(HttpExchange exchange) {
- LOGGER.info("handle {}", exchange.getRequestURI().toString());
-
long currentTimeMillis = System.currentTimeMillis();
if (null != limiter) {
@@ -164,20 +177,8 @@ public class ApiService {
@Override
public void run() {
try (OutputStream os = exchange.getResponseBody()) {
- JsonObject responseJson = new JsonObject();
- Map<String, String> params =
parseRequestURI(exchange.getRequestURI().getQuery());
- if (checkNecessaryParams(params)) {
- handleLegalParams(responseJson, params);
- } else {
- handleInvalidParams(responseJson, exchange);
- }
-
- byte[] bytes =
responseJson.toString().getBytes(StandardCharsets.UTF_8);
-
-
exchange.getResponseHeaders().set(KEY_HTTP_HEADER_CONTENT_TYPE,
- VALUE_HTTP_HEADER_CONTENT_TYPE);
- exchange.sendResponseHeaders(HTTP_RESPOND_CODE,
bytes.length);
- os.write(bytes);
+ JsonObject responseJson = handleRequest(exchange);
+ sendResponse(exchange, os, responseJson);
} catch (Exception e) {
LOGGER.error("Audit handler has exception!", e);
} finally {
@@ -189,6 +190,57 @@ public class ApiService {
MetricsManager.getInstance().addApiMetric(apiType,
System.currentTimeMillis() - currentTimeMillis);
}
+ private JsonObject handleRequest(HttpExchange exchange) throws
IOException {
+ String requestMethod = exchange.getRequestMethod();
+ switch (requestMethod.toUpperCase()) {
+ case "GET":
+ return handleGetRequest(exchange);
+ case "POST":
+ return handlePostRequest(exchange);
+ default:
+ return buildErrorResponse("Unsupported request method: " +
requestMethod);
+ }
+ }
+
+ private JsonObject handleGetRequest(HttpExchange exchange) {
+ JsonObject responseJson = new JsonObject();
+ Map<String, String> params =
parseRequestURI(exchange.getRequestURI().getQuery());
+ if (checkNecessaryParams(params)) {
+ handleLegalParams(responseJson, params);
+ } else {
+ handleInvalidParams(responseJson, exchange);
+ }
+ return responseJson;
+ }
+
+ private JsonObject handlePostRequest(HttpExchange exchange) throws
IOException {
+ StringBuilder requestInfo = new StringBuilder();
+ try (BufferedReader reader = new BufferedReader(
+ new InputStreamReader(exchange.getRequestBody(),
StandardCharsets.UTF_8))) {
+ String line;
+ while ((line = reader.readLine()) != null) {
+ requestInfo.append(line);
+ }
+ }
+ return Audit.getInstance().getData(requestInfo.toString());
+ }
+
+ private void sendResponse(HttpExchange exchange, OutputStream os,
JsonObject responseJson) throws IOException {
+ byte[] bytes =
responseJson.toString().getBytes(StandardCharsets.UTF_8);
+ exchange.getResponseHeaders().set(KEY_HTTP_HEADER_CONTENT_TYPE,
VALUE_HTTP_HEADER_CONTENT_TYPE);
+ exchange.sendResponseHeaders(HTTP_RESPOND_CODE, bytes.length);
+ os.write(bytes);
+ }
+
+ private JsonObject buildErrorResponse(String errorMessage) {
+ Gson gson = new Gson();
+ JsonObject response = new JsonObject();
+ response.addProperty(KEY_HTTP_BODY_SUCCESS, false);
+ response.addProperty(KEY_HTTP_BODY_ERR_MSG, errorMessage);
+ response.add(KEY_HTTP_BODY_DATA, gson.toJsonTree(new
LinkedList<>()));
+ return response;
+ }
+
private Map<String, String> parseRequestURI(String query) {
Map<String, String> params = new HashMap<>();
if (query != null) {
@@ -239,7 +291,7 @@ public class ApiService {
responseJson.addProperty(KEY_HTTP_BODY_SUCCESS, false);
responseJson.addProperty(KEY_HTTP_BODY_ERR_MSG, "Invalid params! "
+ exchange.getRequestURI());
Gson gson = new Gson();
- responseJson.add(KEY_HTTP_BODY_ERR_DATA, gson.toJsonTree(new
LinkedList<>()));
+ responseJson.add(KEY_HTTP_BODY_DATA, gson.toJsonTree(new
LinkedList<>()));
}
private void handleLegalParams(JsonObject responseJson, Map<String,
String> params) {
@@ -250,7 +302,7 @@ public class ApiService {
try {
switch (apiType) {
case MINUTES:
- responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(handleMinutesApi(params)));
+ responseJson.add(KEY_HTTP_BODY_DATA,
gson.toJsonTree(handleMinutesApi(params)));
break;
case HOUR:
statData =
HourCache.getInstance().getData(params.get(PARAMS_START_TIME),
@@ -259,7 +311,7 @@ public class ApiService {
params.get(PARAMS_INLONG_STREAM_ID),
params.get(PARAMS_AUDIT_ID),
params.get(PARAMS_AUDIT_TAG));
- responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(statData));
+ responseJson.add(KEY_HTTP_BODY_DATA,
gson.toJsonTree(statData));
break;
case DAY:
statData = DayCache.getInstance().getData(
@@ -268,7 +320,7 @@ public class ApiService {
params.get(PARAMS_INLONG_GROUP_ID),
params.get(PARAMS_INLONG_STREAM_ID),
params.get(PARAMS_AUDIT_ID));
- responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(statData));
+ responseJson.add(KEY_HTTP_BODY_DATA,
gson.toJsonTree(statData));
break;
case GET_IDS:
statData = RealTimeQuery.getInstance().queryIdsByIp(
@@ -276,7 +328,7 @@ public class ApiService {
params.get(PARAMS_END_TIME),
params.get(PARAMS_IP),
params.get(PARAMS_AUDIT_ID));
- responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(statData));
+ responseJson.add(KEY_HTTP_BODY_DATA,
gson.toJsonTree(statData));
break;
case GET_IPS:
statData = RealTimeQuery.getInstance().queryIpsById(
@@ -285,20 +337,20 @@ public class ApiService {
params.get(PARAMS_INLONG_GROUP_ID),
params.get(PARAMS_INLONG_STREAM_ID),
params.get(PARAMS_AUDIT_ID));
- responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(statData));
+ responseJson.add(KEY_HTTP_BODY_DATA,
gson.toJsonTree(statData));
break;
case GET_AUDIT_PROXY:
List<AuditProxy> auditProxy =
AuditProxyCache.getInstance().getData(params.get(PARAMS_AUDIT_COMPONENT));
- responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(auditProxy));
+ responseJson.add(KEY_HTTP_BODY_DATA,
gson.toJsonTree(auditProxy));
break;
default:
LOGGER.error("Unsupported interface type! type is {}",
apiType);
- responseJson.add(KEY_HTTP_BODY_ERR_DATA,
gson.toJsonTree(new LinkedList<>()));
+ responseJson.add(KEY_HTTP_BODY_DATA,
gson.toJsonTree(new LinkedList<>()));
}
} catch (Exception exception) {
LOGGER.error("Handle legal params has exception ", exception);
- responseJson.add(KEY_HTTP_BODY_ERR_DATA, gson.toJsonTree(new
LinkedList<>()));
+ responseJson.add(KEY_HTTP_BODY_DATA, gson.toJsonTree(new
LinkedList<>()));
}
}
diff --git
a/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
new file mode 100644
index 0000000000..34338336e4
--- /dev/null
+++
b/inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/utils/AuditUtils.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.service.utils;
+
+import org.apache.inlong.audit.service.config.ConfigConstants;
+import org.apache.inlong.audit.service.entities.AuditCycle;
+import org.apache.inlong.audit.service.entities.StatData;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.List;
+
+public class AuditUtils {
+
+ private static final DateTimeFormatter dateTimeFormatter =
DateTimeFormatter.ofPattern(ConfigConstants.DATE_FORMAT);
+
+ public static double calculateDiffRatio(long srcCount, long destCount) {
+ if (srcCount == 0 && destCount == 0) {
+ return 0;
+ } else if (srcCount == 0) {
+ return 1;
+ } else {
+ return Math.abs((double) (srcCount - destCount) / srcCount);
+ }
+ }
+
+ public static StatData getMaxAuditVersionAuditData(List<StatData>
statDataListData) {
+ StatData maxAuditVersionStatData = null;
+ for (StatData statData : statDataListData) {
+ if (maxAuditVersionStatData == null
+ || statData.getAuditVersion() >
maxAuditVersionStatData.getAuditVersion()) {
+ maxAuditVersionStatData = statData;
+ }
+ }
+ return maxAuditVersionStatData;
+ }
+
+ public static AuditCycle getAuditCycleTime(String startTime, String
endTime) {
+ LocalDateTime startDateTime = LocalDateTime.parse(startTime,
dateTimeFormatter);
+ LocalDateTime endDateTime = LocalDateTime.parse(endTime,
dateTimeFormatter);
+ return AuditCycle.fromInt((int)
ChronoUnit.MINUTES.between(startDateTime, endDateTime));
+ }
+
+ public static StatData mergeStatDataList(List<StatData> statDataList) {
+ if (statDataList == null || statDataList.isEmpty()) {
+ return null;
+ }
+
+ // Assuming all other fields are the same, we take the first element
as the base
+ StatData base = statDataList.get(0);
+
+ // Summing up the count
+ long totalCount = 0L;
+ for (StatData statData : statDataList) {
+ if (statData.getCount() != null) {
+ totalCount += statData.getCount();
+ }
+ }
+
+ // Creating a new StatData object with the summed count
+ StatData mergedStatData = new StatData();
+ mergedStatData.setAuditVersion(base.getAuditVersion());
+ mergedStatData.setLogTs(base.getLogTs());
+ mergedStatData.setInlongGroupId(base.getInlongGroupId());
+ mergedStatData.setInlongStreamId(base.getInlongStreamId());
+ mergedStatData.setAuditId(base.getAuditId());
+ mergedStatData.setAuditTag(base.getAuditTag());
+ mergedStatData.setCount(totalCount);
+ mergedStatData.setSize(base.getSize());
+ mergedStatData.setDelay(base.getDelay());
+ mergedStatData.setUpdateTime(base.getUpdateTime());
+ mergedStatData.setIp(base.getIp());
+ mergedStatData.setSourceName(base.getSourceName());
+
+ return mergedStatData;
+ }
+}