This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e2f403d833 [INLONG-9351][Manager] Support querying audit data size
(#9352)
e2f403d833 is described below
commit e2f403d833e296d33270e3f5f6d99d1b5a3dab34
Author: fuweng11 <[email protected]>
AuthorDate: Mon Dec 4 20:18:09 2023 +0800
[INLONG-9351][Manager] Support querying audit data size (#9352)
---
.../main/resources/mappers/AuditEntityMapper.xml | 2 +-
.../inlong/manager/pojo/audit/AuditInfo.java | 5 ++++-
.../service/core/impl/AuditServiceImpl.java | 26 +++++++++++++++-------
.../service/core/impl/AuditServiceTest.java | 4 ++--
4 files changed, 25 insertions(+), 12 deletions(-)
diff --git
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
index 5722857ffa..8c093ebc4b 100644
---
a/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
+++
b/inlong-manager/manager-dao/src/main/resources/mappers/AuditEntityMapper.xml
@@ -42,7 +42,7 @@
</resultMap>
<select id="sumByLogTs" resultMap="SumByLogTsResultMap">
- select date_format(log_ts, #{format, jdbcType=VARCHAR}) as log_ts,
sum(`count`) as total, sum(`delay`) as total_delay
+ select date_format(log_ts, #{format, jdbcType=VARCHAR}) as log_ts,
sum(`count`) as total, sum(`delay`) as total_delay, sum(`size`) as total_size
from (
select distinct ip, docker_id, thread_id, sdk_ts, packet_id,
log_ts, inlong_group_id, inlong_stream_id, audit_id, `count`, `size`, `delay`
from apache_inlong_audit.audit_data
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
index 7a13088885..a1ac46f9e6 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/audit/AuditInfo.java
@@ -32,13 +32,16 @@ public class AuditInfo {
private long count;
@ApiModelProperty(value = "Audit delay")
private long delay;
+ @ApiModelProperty(value = "Audit size")
+ private long size;
public AuditInfo() {
}
- public AuditInfo(String logTs, long count, long delay) {
+ public AuditInfo(String logTs, long count, long delay, long size) {
this.logTs = logTs;
this.count = count;
this.delay = delay;
+ this.size = size;
}
}
\ No newline at end of file
diff --git
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
index 0d8cf1cc94..7ffdafb3b9 100644
---
a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
+++
b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java
@@ -262,13 +262,15 @@ public class AuditServiceImpl implements AuditService {
Map<String, String> auditIdMap = new HashMap<>();
auditIdMap.put(getAuditId(sinkNodeType, true), sinkNodeType);
- // properly overwrite audit ids by role and stream config
- if
(InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
- auditIdMap.put(getAuditId(sourceNodeType, false), sourceNodeType);
- request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType,
sinkNodeType));
- } else {
- auditIdMap.put(getAuditId(sinkNodeType, false), sinkNodeType);
- request.setAuditIds(getAuditIds(groupId, streamId, null,
sinkNodeType));
+ if (CollectionUtils.isEmpty(request.getAuditIds())) {
+ // properly overwrite audit ids by role and stream config
+ if
(InlongConstants.DATASYNC_MODE.equals(groupEntity.getInlongGroupMode())) {
+ auditIdMap.put(getAuditId(sourceNodeType, false),
sourceNodeType);
+ request.setAuditIds(getAuditIds(groupId, streamId,
sourceNodeType, sinkNodeType));
+ } else {
+ auditIdMap.put(getAuditId(sinkNodeType, false), sinkNodeType);
+ request.setAuditIds(getAuditIds(groupId, streamId, null,
sinkNodeType));
+ }
}
List<AuditVO> result = new ArrayList<>();
@@ -286,6 +288,7 @@ public class AuditServiceImpl implements AuditService {
vo.setLogTs((String) s.get("logTs"));
vo.setCount(((BigDecimal) s.get("total")).longValue());
vo.setDelay(((BigDecimal)
s.get("totalDelay")).longValue());
+ vo.setSize(((BigDecimal) s.get("totalSize")).longValue());
return vo;
}).collect(Collectors.toList());
result.add(new AuditVO(auditId, auditSet,
auditIdMap.getOrDefault(auditId, null)));
@@ -322,6 +325,7 @@ public class AuditServiceImpl implements AuditService {
vo.setLogTs(resultSet.getString("log_ts"));
vo.setCount(resultSet.getLong("total"));
vo.setDelay(resultSet.getLong("total_delay"));
+ vo.setSize(resultSet.getLong("total_size"));
auditSet.add(vo);
}
result.add(new AuditVO(auditId, auditSet,
auditIdMap.getOrDefault(auditId, null)));
@@ -416,7 +420,7 @@ public class AuditServiceImpl implements AuditService {
.toString();
String sql = new SQL()
- .SELECT("log_ts", "sum(count) as total", "sum(delay) as
total_delay")
+ .SELECT("log_ts", "sum(count) as total", "sum(delay) as
total_delay", "sum(size) as total_size")
.FROM("(" + subQuery + ") as sub")
.GROUP_BY("log_ts")
.ORDER_BY("log_ts")
@@ -459,6 +463,7 @@ public class AuditServiceImpl implements AuditService {
AuditVO statInfo = new AuditVO();
HashMap<String, AtomicLong> countMap = new HashMap<>();
HashMap<String, AtomicLong> delayMap = new HashMap<>();
+ HashMap<String, AtomicLong> sizeMap = new HashMap<>();
statInfo.setAuditId(auditVO.getAuditId());
statInfo.setNodeType(auditVO.getNodeType());
for (AuditInfo auditInfo : auditVO.getAuditSet()) {
@@ -472,8 +477,12 @@ public class AuditServiceImpl implements AuditService {
if (delayMap.get(statKey) == null) {
delayMap.put(statKey, new AtomicLong(0));
}
+ if (sizeMap.get(statKey) == null) {
+ sizeMap.put(statKey, new AtomicLong(0));
+ }
countMap.get(statKey).addAndGet(auditInfo.getCount());
delayMap.get(statKey).addAndGet(auditInfo.getDelay());
+ sizeMap.get(statKey).addAndGet(auditInfo.getSize());
}
List<AuditInfo> auditInfoList = new LinkedList<>();
@@ -483,6 +492,7 @@ public class AuditServiceImpl implements AuditService {
long count = entry.getValue().get();
auditInfoStat.setCount(entry.getValue().get());
auditInfoStat.setDelay(count == 0 ? 0 :
delayMap.get(entry.getKey()).get() / count);
+ auditInfoStat.setSize(count == 0 ? 0 :
sizeMap.get(entry.getKey()).get() / count);
auditInfoList.add(auditInfoStat);
}
statInfo.setAuditSet(auditInfoList);
diff --git
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
index 7909b0e012..28b8636969 100644
---
a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
+++
b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/AuditServiceTest.java
@@ -52,8 +52,8 @@ class AuditServiceTest extends ServiceBaseTest {
List<AuditVO> result = new ArrayList<>();
AuditVO auditVO = new AuditVO();
auditVO.setAuditId("3");
- auditVO.setAuditSet(Arrays.asList(new AuditInfo("2022-01-01 00:00:00",
123L, 12L),
- new AuditInfo("2022-01-01 00:01:00", 124L, 12L)));
+ auditVO.setAuditSet(Arrays.asList(new AuditInfo("2022-01-01 00:00:00",
123L, 12L, 12L),
+ new AuditInfo("2022-01-01 00:01:00", 124L, 12L, 12L)));
result.add(auditVO);
Assertions.assertNotNull(result);
// close real test for testQueryFromMySQL due to date_format function
not support in h2