This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new af93ac24b7 [INLONG-9369][Agent] Increase sending failure audit and
real-time audit (#9370)
af93ac24b7 is described below
commit af93ac24b72b90e488233a87e1a2fd62a7f9ad72
Author: justinwwhuang <[email protected]>
AuthorDate: Thu Nov 30 18:10:49 2023 +0800
[INLONG-9369][Agent] Increase sending failure audit and real-time audit
(#9370)
* [INLONG-9369][Agent] Increase sending failure audit and real-time audit
* [INLONG-9369][Agent] Increase sending failure audit and real-time audit
---
.../main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java | 4 ++++
.../apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java | 6 ++++++
.../java/org/apache/inlong/agent/plugin/sources/LogFileSource.java | 4 ++++
3 files changed, 14 insertions(+)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
index 68a4e64842..b4eda2ce60 100644
---
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/metrics/audit/AuditUtils.java
@@ -42,6 +42,10 @@ public class AuditUtils {
public static final int AUDIT_DEFAULT_MAX_CACHE_ROWS = 2000000;
public static final int AUDIT_ID_AGENT_READ_SUCCESS = 3;
public static final int AUDIT_ID_AGENT_SEND_SUCCESS = 4;
+ public static final int AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME = 25;
+ public static final int AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME = 26;
+ public static final int AUDIT_ID_AGENT_SEND_FAILED = 10004;
+ public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 10026;
private static boolean IS_AUDIT = true;
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
index 45fe9bc63e..d027672b18 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.java
@@ -346,11 +346,17 @@ public class SenderManager {
getMetricItem(groupId,
streamId).pluginSendSuccessCount.addAndGet(msgCnt);
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS,
groupId, streamId,
dataTime, message.getMsgCnt(), message.getTotalSize());
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS_REAL_TIME, groupId,
streamId,
+ AgentUtils.getCurrentTime(), message.getMsgCnt(),
message.getTotalSize());
} else {
LOGGER.warn("send groupId {}, streamId {}, taskId {},
instanceId {}, dataTime {} fail with times {}, "
+ "error {}", groupId, streamId, taskId, instanceId,
dataTime, retry, result);
getMetricItem(groupId,
streamId).pluginSendFailCount.addAndGet(msgCnt);
putInResendQueue(new AgentSenderCallback(message, retry));
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED, groupId,
streamId,
+ dataTime, message.getMsgCnt(), message.getTotalSize());
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME, groupId,
streamId,
+ AgentUtils.getCurrentTime(), message.getMsgCnt(),
message.getTotalSize());
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
index 7c719b1e47..0cc97afb8c 100755
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/LogFileSource.java
@@ -319,6 +319,8 @@ public class LogFileSource extends AbstractSource {
if (overLen) {
LOGGER.warn("readLines over len finally string len
{}",
new String(baos.toByteArray()).length());
+
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME, inlongGroupId,
+ inlongStreamId,
AgentUtils.getCurrentTime(), 1, maxPackSize);
}
baos.reset();
overLen = false;
@@ -382,6 +384,8 @@ public class LogFileSource extends AbstractSource {
}
AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS, inlongGroupId,
header.get(PROXY_KEY_STREAM_ID),
auditTime, 1, msgWithMetaData.length());
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_READ_SUCCESS_REAL_TIME,
inlongGroupId, header.get(PROXY_KEY_STREAM_ID),
+ AgentUtils.getCurrentTime(), 1, msgWithMetaData.length());
Message finalMsg = new
DefaultMessage(msgWithMetaData.getBytes(StandardCharsets.UTF_8), header);
// if the message size is greater than max pack size,should drop it.
if (finalMsg.getBody().length > maxPackSize) {