healchow commented on code in PR #12099:
URL: https://github.com/apache/inlong/pull/12099#discussion_r2870827552
##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java:
##########
@@ -168,83 +173,138 @@ public List<AuditVO> listByCondition(AuditRequest
request) throws Exception {
// for now, we use the first sink type only.
// this is temporary behavior before multiple sinks in one stream is
fully supported.
- String sinkNodeType = null;
- String sourceNodeType = null;
- Integer sinkId = request.getSinkId();
- StreamSinkEntity sinkEntity = null;
- if (StringUtils.isNotBlank(streamId)) {
- List<StreamSinkEntity> sinkEntityList =
sinkEntityMapper.selectByRelatedId(groupId, streamId);
- if (sinkId != null) {
- sinkEntity = sinkEntityMapper.selectByPrimaryKey(sinkId);
- } else if (CollectionUtils.isNotEmpty(sinkEntityList)) {
- sinkEntity = sinkEntityList.get(0);
- }
- // if sink info is existed, get sink type for query audit info.
- if (sinkEntity != null) {
- sinkNodeType = sinkEntity.getSinkType();
- }
- } else {
- sinkNodeType = request.getSinkType();
+ String sinkNodeType = request.getSinkType();
+ // if sinkNodeType is not blank, should use directly
+ if (StringUtils.isBlank(sinkNodeType) &&
Boolean.TRUE.equals(request.getNeedSinkAudit())) {
+ sinkNodeType = getSinkNodeType(request.getSinkId(), groupId,
streamId);
}
- Map<String, String> auditIdMap = new HashMap<>();
+ // key: auditId, value: nodeType
+ Map<String, String> auditId2NodeTypeMap = new HashMap<>(8);
+ fillAuditId2SinkNodeTypeMap(auditId2NodeTypeMap, sinkNodeType);
+
+ // set sourceNodeType is sinkNodeType firstly
+ String sourceNodeType = request.getSourceType();
+ if (StringUtils.isBlank(sourceNodeType)) {
+ sourceNodeType = sinkNodeType;
+ }
if (StringUtils.isNotBlank(groupId)) {
- InlongGroupEntity groupEntity =
inlongGroupMapper.selectByGroupId(groupId);
- List<StreamSourceEntity> sourceEntityList =
sourceEntityMapper.selectByRelatedId(groupId, streamId, null);
- if (CollectionUtils.isNotEmpty(sourceEntityList)) {
- sourceNodeType = sourceEntityList.get(0).getSourceType();
+ List<StreamSourceEntity> sourceList = null;
+ if (StringUtils.isNotBlank(streamId)) {
+ // if sourceNodeType is blank, get sourceNodeType by groupId
and streamId from StreamSourceEntity
+ if (StringUtils.isBlank(sourceNodeType) &&
Boolean.TRUE.equals(request.getNeedSourceAudit())) {
+ sourceList = sourceEntityMapper.selectByRelatedId(groupId,
streamId, null);
+ if (CollectionUtils.isNotEmpty(sourceList)) {
+ sourceNodeType = sourceList.get(0).getSourceType();
+ fillAuditId2SourceNodeTypeMap(auditId2NodeTypeMap,
sourceNodeType);
+ }
+ }
}
- auditIdMap.put(getAuditId(sinkNodeType,
IndicatorType.SEND_SUCCESS), sinkNodeType);
-
if (CollectionUtils.isEmpty(request.getAuditIds())) {
- // properly overwrite audit ids by role and stream config
- if
(InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode())
- ||
InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupEntity.getInlongGroupMode()))
{
- List<AuditInformation> cdcAuditInfoList =
- getCdcAuditInfoList(sourceNodeType,
IndicatorType.RECEIVED_SUCCESS);
- List<String> cdcAuditIdList =
- cdcAuditInfoList.stream().map(v ->
String.valueOf(v.getAuditId()))
- .collect(Collectors.toList());
- if (CollectionUtils.isNotEmpty(cdcAuditIdList)) {
- String tempSourceNodeType = sourceNodeType;
- cdcAuditIdList.forEach(v -> auditIdMap.put(v,
tempSourceNodeType));
- }
- auditIdMap.put(getAuditId(sourceNodeType,
IndicatorType.RECEIVED_SUCCESS), sourceNodeType);
- request.setAuditIds(getAuditIds(groupId, streamId,
sourceNodeType, sinkNodeType));
- } else {
- auditIdMap.put(getAuditId(sinkNodeType,
IndicatorType.RECEIVED_SUCCESS), sinkNodeType);
- request.setAuditIds(getAuditIds(groupId, streamId, null,
sinkNodeType));
- }
+ InlongGroupEntity groupEntity =
inlongGroupMapper.selectByGroupId(groupId);
Review Comment:
Yes, I checked the GroupMode in the getAuditIds method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]