This is an automated email from the ASF dual-hosted git repository.

doleyzi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9411dc1a18 [INLONG-11999][Audit] Add alert evaluation and periodic 
audit check task (#12002)
9411dc1a18 is described below

commit 9411dc1a1805825762a940e57ce8c1630a9a4ad4
Author: Kafka <[email protected]>
AuthorDate: Mon Sep 15 10:08:28 2025 +0800

    [INLONG-11999][Audit] Add alert evaluation and periodic audit check task 
(#12002)
    
    * [INLONG-11999][Audit] Add alert evaluation and periodic audit check task
    
    * [INLONG-11999][Audit] Add alert evaluation and periodic audit check task
    
    * [INLONG-11999][Audit] Add alert evaluation and periodic audit check task
    
    * [INLONG-11999][Audit] Add alert evaluation and periodic audit check task
    
    * [INLONG-11999][Audit] Add alert evaluation and periodic audit check task
    
    * Update AuditToolMain.java
    
    ---------
    
    Co-authored-by: doleyzi <[email protected]>
---
 .../apache/inlong/audit/tool/AuditToolMain.java    |  41 +++-
 .../audit/tool/evaluator/AlertEvaluator.java       | 112 +++++++++
 .../audit/tool/reporter/OpenTelemetryReporter.java |  77 ++++++
 .../inlong/audit/tool/task/AuditCheckTask.java     | 146 +++++++++++
 .../inlong/tool/evaluator/AlertEvaluatorTest.java  | 266 +++++++++++++++++++++
 .../tool/service/AuditMetricServiceTest.java       |  47 ++--
 .../inlong/tool/task/AuditCheckTaskTest.java       | 139 +++++++++++
 7 files changed, 796 insertions(+), 32 deletions(-)

diff --git 
a/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/AuditToolMain.java
 
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/AuditToolMain.java
index 6508923127..a4c569a368 100644
--- 
a/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/AuditToolMain.java
+++ 
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/AuditToolMain.java
@@ -17,9 +17,48 @@
 
 package org.apache.inlong.audit.tool;
 
+import org.apache.inlong.audit.tool.config.AppConfig;
+import org.apache.inlong.audit.tool.evaluator.AlertEvaluator;
+import org.apache.inlong.audit.tool.manager.AuditAlertRuleManager;
+import org.apache.inlong.audit.tool.reporter.PrometheusReporter;
+import org.apache.inlong.audit.tool.task.AuditCheckTask;
+import org.apache.inlong.audit.tool.util.AuditSQLUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class AuditToolMain {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AuditToolMain.class);
+
     public static void main(String[] args) {
+        // Load application configuration
+        AppConfig appConfig = new AppConfig();
+
+        // Initialize auditAlertRule Manager
+        AuditAlertRuleManager auditAlertRuleManager = 
AuditAlertRuleManager.getInstance();
+        auditAlertRuleManager.init(appConfig);
+        auditAlertRuleManager.schedule();
+
+        // Initialize reporters
+        PrometheusReporter prometheusReporter = new PrometheusReporter();
+        prometheusReporter.init(appConfig.getPrometheusConfig());
+
+        // Database query initialization
+        AuditSQLUtil.initialize(appConfig.getProperties());
+
+        // Initialize alert evaluator
+        AlertEvaluator alertEvaluator = new AlertEvaluator(prometheusReporter, 
auditAlertRuleManager);
+        AuditCheckTask auditCheckTask =
+                new AuditCheckTask(auditAlertRuleManager, alertEvaluator, 
appConfig);
+        auditCheckTask.start();
+
+        // Keep the application running
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            auditCheckTask.stop();
+            LOGGER.error("Audit Tool stopped.");
+        }));
 
+        LOGGER.info("Audit Tool started.");
     }
-}
\ No newline at end of file
+}
diff --git 
a/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/evaluator/AlertEvaluator.java
 
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/evaluator/AlertEvaluator.java
new file mode 100644
index 0000000000..13f1880837
--- /dev/null
+++ 
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/evaluator/AlertEvaluator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.tool.evaluator;
+
+import org.apache.inlong.audit.tool.dto.AuditAlertCondition;
+import org.apache.inlong.audit.tool.dto.AuditAlertRule;
+import org.apache.inlong.audit.tool.entity.AuditMetric;
+import org.apache.inlong.audit.tool.manager.AuditAlertRuleManager;
+import org.apache.inlong.audit.tool.reporter.PrometheusReporter;
+
+import lombok.Getter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Objects;
+
+public class AlertEvaluator {
+
+    private final PrometheusReporter prometheusReporter;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AlertEvaluator.class);
+    @Getter
+    private final AuditAlertRuleManager auditAlertRuleManager;
+
+    public AlertEvaluator(PrometheusReporter prometheusReporter,
+            AuditAlertRuleManager auditAlertRuleManager) {
+        this.prometheusReporter = prometheusReporter;
+        this.auditAlertRuleManager = auditAlertRuleManager;
+    }
+
+    public void evaluateAndReportAlert(List<AuditMetric> sourceMetrics,
+            List<AuditMetric> sinkMetrics,
+            AuditAlertRule alertRule) {
+        if (sourceMetrics == null || sinkMetrics == null) {
+            return;
+        }
+
+        AuditAlertCondition condition = alertRule.getCondition();
+        double threshold = condition.getValue();
+        String op = condition.getOperator();
+
+        for (AuditMetric source : sourceMetrics) {
+            if (!Objects.equals(source.getInlongGroupId(), 
alertRule.getInlongGroupId()) ||
+                    !Objects.equals(source.getInlongStreamId(), 
alertRule.getInlongStreamId())) {
+                continue;
+            }
+            for (AuditMetric sink : sinkMetrics) {
+                if (!Objects.equals(source.getInlongGroupId(), 
sink.getInlongGroupId()) ||
+                        !Objects.equals(source.getInlongStreamId(), 
sink.getInlongStreamId())) {
+                    continue;
+                }
+
+                if (source.getCount() == 0) {
+                    continue;
+                }
+
+                double diff = (sink.getCount() - source.getCount()) / (double) 
source.getCount();
+
+                boolean hit;
+
+                switch (op) {
+                    case ">":
+                        hit = diff > threshold;
+                        break;
+                    case ">=":
+                        hit = diff >= threshold;
+                        break;
+                    case "<":
+                        hit = diff < threshold;
+                        break;
+                    case "<=":
+                        hit = diff <= threshold;
+                        break;
+                    case "==":
+                        hit = diff == threshold;
+                        break;
+                    case "!=":
+                        hit = diff != threshold;
+                        break;
+                    default:
+                        hit = false;
+                }
+
+                if (hit) {
+                    LOGGER.error(
+                            "[ALERT] groupId={}, streamId={} | sourceCount={}, 
sinkCount={} | diff={} operator={} threshold={}",
+                            source.getInlongGroupId(), 
source.getInlongStreamId(),
+                            source.getCount(), sink.getCount(), diff, op, 
threshold);
+                    if (prometheusReporter.getAuditMetric() != null) {
+                        
prometheusReporter.getAuditMetric().updateSourceAndSinkAuditDiffMetric(diff);
+                    }
+                }
+            }
+        }
+    }
+
+}
\ No newline at end of file
diff --git 
a/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/reporter/OpenTelemetryReporter.java
 
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/reporter/OpenTelemetryReporter.java
new file mode 100644
index 0000000000..83f3a04947
--- /dev/null
+++ 
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/reporter/OpenTelemetryReporter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.tool.reporter;
+
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.inlong.audit.tool.config.ConfigConstants.*;
+
+public class OpenTelemetryReporter implements MetricReporter {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(OpenTelemetryReporter.class);
+
+    protected SdkMeterProvider meterProvider;
+    public Meter meter;
+    public LongCounter alertCounter;
+    // For Gauge, we need a way to hold the latest value for each dimension 
set.
+    // A map is a good way to handle this dynamically.
+    public final Map<Attributes, Double> dataLossRateValues = new 
ConcurrentHashMap<>();
+
+    @Override
+    public void init(Map<String, Object> config) {
+        String endpoint = (String) config.getOrDefault(KEY_OTEL_ENDPOINT, 
DEFAULT_OTEL_ENDPOINT);
+
+        OtlpGrpcMetricExporter metricExporter = 
OtlpGrpcMetricExporter.builder()
+                .setEndpoint(endpoint)
+                .build();
+
+        this.meterProvider = SdkMeterProvider.builder()
+                
.registerMetricReader(PeriodicMetricReader.builder(metricExporter)
+                        .setInterval(Duration.ofSeconds(30))
+                        .build())
+                .build();
+
+        // We don't build the full OpenTelemetrySdk unless we need 
tracing/logs as well.
+        // For metrics only, managing the SdkMeterProvider is enough.
+        this.meter = meterProvider.get(AUDIT_TOOL_NAME);
+
+        this.alertCounter = meter.counterBuilder(AUDIT_TOOL_ALERTS_TOTAL)
+                .setDescription(DESC_AUDIT_TOOL_ALERTS_TOTAL)
+                .build();
+
+        // For Gauge, we use an observable gauge.
+        // It will call our callback function periodically to get the current 
value.
+        meter.gaugeBuilder(AUDIT_TOOL_DATA_LOSS_RATE)
+                .setDescription(DESC_AUDIT_TOOL_DATA_LOSS_RATE)
+                .buildWithCallback(measurement -> {
+                    dataLossRateValues.forEach((attributes, value) -> 
measurement.record(value, attributes));
+                });
+    }
+
+}
diff --git 
a/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/task/AuditCheckTask.java
 
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/task/AuditCheckTask.java
new file mode 100644
index 0000000000..777bda40c1
--- /dev/null
+++ 
b/inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/task/AuditCheckTask.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.audit.tool.task;
+
+import org.apache.inlong.audit.tool.config.AppConfig;
+import org.apache.inlong.audit.tool.config.ConfigConstants;
+import org.apache.inlong.audit.tool.dto.AuditAlertRule;
+import org.apache.inlong.audit.tool.entity.AuditMetric;
+import org.apache.inlong.audit.tool.evaluator.AlertEvaluator;
+import org.apache.inlong.audit.tool.manager.AuditAlertRuleManager;
+import org.apache.inlong.audit.tool.service.AuditMetricService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * AuditCheckTask class: Periodically fetches audit data and evaluates alert 
policies.
+ */
+public class AuditCheckTask {
+
+    private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
+    private final AlertEvaluator alertEvaluator;
+    private final AuditAlertRuleManager auditAlertRuleManager;
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AuditCheckTask.class);
+    private final AuditMetricService auditMetricService;
+    private Integer executionIntervalTime = 1;
+    private Integer intervalTimeMinute = 1;
+    private final Integer delayTimeMinute;
+    private static final DateTimeFormatter LOGS_FMT = 
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    private String sourceAuditId = "5";
+
+    public AuditCheckTask(
+            AuditAlertRuleManager auditAlertRuleManager, AlertEvaluator 
alertEvaluator, AppConfig appConfig) {
+        this.auditAlertRuleManager = auditAlertRuleManager;
+        this.alertEvaluator = alertEvaluator;
+        this.auditMetricService = new AuditMetricService();
+        try {
+            this.executionIntervalTime =
+                    
Integer.valueOf(appConfig.getProperties().getProperty(ConfigConstants.KEY_DELAY_TIME,
 "1"));
+            this.intervalTimeMinute =
+                    
Integer.parseInt(appConfig.getProperties().getProperty(ConfigConstants.KEY_INTERVAL_TIME,
 "1"));
+            this.sourceAuditId = 
appConfig.getProperties().getProperty(ConfigConstants.KEY_SOURCE_AUDIT_ID, "5");
+        } catch (Exception e) {
+            LOGGER.error(
+                    "Failed to read configuration information, default source 
AuditId is 5, delay execution time is 1, time interval is 1");
+        }
+        this.delayTimeMinute = executionIntervalTime;
+    }
+
+    /**
+     * Initiate the audit inspection task
+     */
+    public void start() {
+        scheduler.scheduleAtFixedRate(this::checkAuditData, 0, 
executionIntervalTime, TimeUnit.MINUTES);
+    }
+
+    /**
+     * Check audit data and trigger alert evaluation.
+     */
+    private void checkAuditData() {
+        // Obtain auditIds provided by the interface
+        List<String> sinkAuditIds = auditAlertRuleManager.getAuditIds();
+        if (sinkAuditIds == null) {
+            return;
+        }
+
+        // Obtain alarm strategy
+        List<AuditAlertRule> alertRules = 
auditAlertRuleManager.getAuditAlertRuleList();
+
+        // Obtain the range of logs that need to be queried
+        String startLogTs = getStartLogTs();
+        String endLogTs = getEndLogTs();
+
+        // Query the relevant indicator data of auditId source
+        List<AuditMetric> sourceAuditMetric =
+                auditMetricService.getStorageAuditMetrics(sourceAuditId, 
startLogTs, endLogTs);
+        if (sourceAuditMetric == null) {
+            return;
+        }
+
+        // Compare the source auditId related indicator data with the sink 
auditId related indicator data
+        for (String sinkAuditId : sinkAuditIds) {
+            List<AuditMetric> sinkAuditMetrics =
+                    auditMetricService.getStorageAuditMetrics(sinkAuditId, 
startLogTs, endLogTs);
+            if (sinkAuditMetrics == null || sinkAuditMetrics.isEmpty()) {
+                continue;
+            }
+            for (AuditAlertRule alertRule : alertRules) {
+                alertEvaluator.evaluateAndReportAlert(sourceAuditMetric, 
sinkAuditMetrics,
+                        alertRule);
+            }
+        }
+    }
+
+    /**
+     * Stop the audit inspection task
+     */
+    public void stop() {
+        scheduler.shutdown();
+        try {
+            if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
+                scheduler.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            scheduler.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    private String getStartLogTs() {
+        return LocalDateTime.now()
+                .withSecond(0)
+                .minusMinutes(delayTimeMinute)
+                .minusMinutes(intervalTimeMinute)
+                .format(LOGS_FMT);
+    }
+    private String getEndLogTs() {
+        return LocalDateTime.now()
+                .withSecond(0)
+                .minusMinutes(delayTimeMinute)
+                .format(LOGS_FMT);
+    }
+
+}
\ No newline at end of file
diff --git 
a/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/evaluator/AlertEvaluatorTest.java
 
b/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/evaluator/AlertEvaluatorTest.java
new file mode 100644
index 0000000000..f7798e9f75
--- /dev/null
+++ 
b/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/evaluator/AlertEvaluatorTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tool.evaluator;
+
+import org.apache.inlong.audit.tool.dto.AuditAlertCondition;
+import org.apache.inlong.audit.tool.dto.AuditAlertRule;
+import org.apache.inlong.audit.tool.entity.AuditMetric;
+import org.apache.inlong.audit.tool.evaluator.AlertEvaluator;
+import org.apache.inlong.audit.tool.manager.AuditAlertRuleManager;
+import org.apache.inlong.audit.tool.reporter.PrometheusReporter;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+class AlertEvaluatorTest {
+
+    @Mock
+    private PrometheusReporter prometheusReporter;
+
+    @Mock
+    private AuditAlertRuleManager auditAlertRuleManager;
+
+    private AlertEvaluator alertEvaluator;
+
+    @BeforeEach
+    void setUp() {
+        alertEvaluator = new AlertEvaluator(prometheusReporter, 
auditAlertRuleManager);
+    }
+
+    @Test
+    void testEvaluateAndReportAlertWithNullMetrics() {
+        // Test with null source metrics
+        alertEvaluator.evaluateAndReportAlert(null, 
Collections.singletonList(new AuditMetric()), new AuditAlertRule());
+
+        // Test with null sink metrics
+        alertEvaluator.evaluateAndReportAlert(Collections.singletonList(new 
AuditMetric()), null, new AuditAlertRule());
+
+        // Verify no interaction with prometheus reporter
+        verifyNoInteractions(prometheusReporter);
+    }
+
+    @Test
+    void testEvaluateAndReportAlertWithNonMatchingGroupAndStream() {
+        // Setup
+        AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+        AuditMetric sinkMetric = createAuditMetric("group2", 90L); // 
Different group
+
+        AuditAlertRule alertRule = createAlertRule(">", 0.1);
+
+        // Execute
+        alertEvaluator.evaluateAndReportAlert(
+                Collections.singletonList(sourceMetric),
+                Collections.singletonList(sinkMetric),
+                alertRule);
+
+        // Verify no alert was triggered
+        verifyNoInteractions(prometheusReporter);
+    }
+
+    @Test
+    void testEvaluateAndReportAlertWithZeroSourceCount() {
+        // Setup
+        AuditMetric sourceMetric = createAuditMetric("group1", 0L); // Zero 
count
+        AuditMetric sinkMetric = createAuditMetric("group1", 90L);
+
+        AuditAlertRule alertRule = createAlertRule(">", 0.1);
+
+        // Execute
+        alertEvaluator.evaluateAndReportAlert(
+                Collections.singletonList(sourceMetric),
+                Collections.singletonList(sinkMetric),
+                alertRule);
+
+        // Verify no alert was triggered
+        verifyNoInteractions(prometheusReporter);
+    }
+
+    @Test
+    void testEvaluateAndReportAlertWithGreaterThanCondition() {
+        AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+        AuditMetric sinkMetric = createAuditMetric("group1", 90L);
+
+        AuditAlertRule alertRule = createAlertRule(">", 0.1);
+
+        // Execute
+        alertEvaluator.evaluateAndReportAlert(
+                Collections.singletonList(sourceMetric),
+                Collections.singletonList(sinkMetric),
+                alertRule);
+
+        // Verify no alert was triggered
+        verifyNoInteractions(prometheusReporter);
+
+        // Setup - diff = (120-100)/100 = 0.2 which is > 0.1
+        AuditMetric sinkMetricWithAlert = createAuditMetric("group1", 120L);
+
+        // Execute
+        alertEvaluator.evaluateAndReportAlert(
+                Collections.singletonList(sourceMetric),
+                Collections.singletonList(sinkMetricWithAlert),
+                alertRule);
+
+        // Verify alert was triggered
+        verify(prometheusReporter, times(1)).getAuditMetric();
+    }
+
+    @Test
+    void testEvaluateAndReportAlertWithGreaterThanOrEqualCondition() {
+        // Setup
+        AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+        AuditMetric sinkMetric = createAuditMetric("group1", 110L); // diff = 
0.1
+
+        AuditAlertRule alertRule = createAlertRule(">=", 0.1);
+
+        // Execute
+        alertEvaluator.evaluateAndReportAlert(
+                Collections.singletonList(sourceMetric),
+                Collections.singletonList(sinkMetric),
+                alertRule);
+
+        // Verify alert was triggered
+        verify(prometheusReporter, times(1)).getAuditMetric();
+    }
+
+    @Test
+    void testEvaluateAndReportAlertWithLessThanCondition() {
+        // Setup
+        AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+        AuditMetric sinkMetric = createAuditMetric("group1", 90L); // diff = 
-0.1
+
+        AuditAlertRule alertRule = createAlertRule("<", -0.05); // -0.1 < -0.05
+
+        // Execute
+        alertEvaluator.evaluateAndReportAlert(
+                Collections.singletonList(sourceMetric),
+                Collections.singletonList(sinkMetric),
+                alertRule);
+
+        // Verify alert was triggered
+        verify(prometheusReporter, times(1)).getAuditMetric();
+    }
+
+    @Test
+    void testEvaluateAndReportAlertWithLessThanOrEqualCondition() {
+        // Setup
+        AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+        AuditMetric sinkMetric = createAuditMetric("group1", 90L); // diff = 
-0.1
+
+        AuditAlertRule alertRule = createAlertRule("<=", -0.1); // -0.1 <= -0.1
+
+        // Execute
+        alertEvaluator.evaluateAndReportAlert(
+                Collections.singletonList(sourceMetric),
+                Collections.singletonList(sinkMetric),
+                alertRule);
+
+        // Verify alert was triggered
+        verify(prometheusReporter, times(1)).getAuditMetric();
+    }
+
+    @Test
+    void testEvaluateAndReportAlertWithEqualCondition() {
+        // Setup
+        AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+        AuditMetric sinkMetric = createAuditMetric("group1", 110L); // diff = 
0.1
+
+        AuditAlertRule alertRule = createAlertRule("==", 0.1);
+
+        // Execute
+        alertEvaluator.evaluateAndReportAlert(
+                Collections.singletonList(sourceMetric),
+                Collections.singletonList(sinkMetric),
+                alertRule);
+
+        // Verify alert was triggered
+        verify(prometheusReporter, times(1)).getAuditMetric();
+    }
+
+    @Test
+    void testEvaluateAndReportAlertWithNotEqualCondition() {
+        // Setup
+        AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+        AuditMetric sinkMetric = createAuditMetric("group1", 110L); // diff = 
0.1
+
+        AuditAlertRule alertRule = createAlertRule("!=", 0.2); // 0.1 != 0.2
+
+        // Execute
+        alertEvaluator.evaluateAndReportAlert(
+                Collections.singletonList(sourceMetric),
+                Collections.singletonList(sinkMetric),
+                alertRule);
+
+        // Verify alert was triggered
+        verify(prometheusReporter, times(1)).getAuditMetric();
+    }
+
+    @Test
+    void testEvaluateAndReportAlertWithUnknownOperator() {
+        // Setup
+        AuditMetric sourceMetric = createAuditMetric("group1", 100L);
+        AuditMetric sinkMetric = createAuditMetric("group1", 110L);
+
+        AuditAlertRule alertRule = createAlertRule("unknown", 0.1); // Unknown 
operator
+
+        // Execute
+        alertEvaluator.evaluateAndReportAlert(
+                Collections.singletonList(sourceMetric),
+                Collections.singletonList(sinkMetric),
+                alertRule);
+
+        // Verify no alert was triggered for unknown operator
+        verifyNoInteractions(prometheusReporter);
+    }
+
+    @Test
+    void testGetAuditAlertRuleManager() {
+        // Test the getter method
+        assertEquals(auditAlertRuleManager, 
alertEvaluator.getAuditAlertRuleManager());
+    }
+
+    // Helper methods to reduce duplication
+    private AuditMetric createAuditMetric(String groupId, long count) {
+        AuditMetric metric = new AuditMetric();
+        metric.setInlongGroupId(groupId);
+        metric.setInlongStreamId("stream1");
+        metric.setCount(count);
+        return metric;
+    }
+
+    private AuditAlertRule createAlertRule(String operator, double value) {
+        AuditAlertRule rule = new AuditAlertRule();
+        rule.setInlongGroupId("group1");
+        rule.setInlongStreamId("stream1");
+
+        AuditAlertCondition condition = new AuditAlertCondition();
+        condition.setOperator(operator);
+        condition.setValue(value);
+        rule.setCondition(condition);
+
+        return rule;
+    }
+}
diff --git 
a/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/service/AuditMetricServiceTest.java
 
b/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/service/AuditMetricServiceTest.java
index 97e7349dd5..f7a7c4c0ec 100644
--- 
a/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/service/AuditMetricServiceTest.java
+++ 
b/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/service/AuditMetricServiceTest.java
@@ -19,54 +19,39 @@ package org.apache.inlong.tool.service;
 
 import org.apache.inlong.audit.tool.config.AppConfig;
 import org.apache.inlong.audit.tool.entity.AuditMetric;
-import org.apache.inlong.audit.tool.mapper.AuditMapper;
 import org.apache.inlong.audit.tool.service.AuditMetricService;
 import org.apache.inlong.audit.tool.util.AuditSQLUtil;
 
-import org.apache.ibatis.session.SqlSession;
 import org.junit.jupiter.api.Test;
-import org.mockito.MockedStatic;
-import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.util.Collections;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.List;
 
-import static org.junit.jupiter.api.Assertions.*;
-
 public class AuditMetricServiceTest {
 
-    @Test
-    public void getStorageAuditMetricsReturnsEmptyListWhenNoDataFound() {
-        AuditMetricService auditMetricService = new AuditMetricService();
-
-        try (MockedStatic<AuditSQLUtil> sqlUtilMockedStatic = 
Mockito.mockStatic(AuditSQLUtil.class)) {
-            SqlSession sqlSessionMock = Mockito.mock(SqlSession.class);
-            AuditMapper auditMapperMock = Mockito.mock(AuditMapper.class);
-
-            
sqlUtilMockedStatic.when(AuditSQLUtil::getSqlSession).thenReturn(sqlSessionMock);
-            
Mockito.when(sqlSessionMock.getMapper(AuditMapper.class)).thenReturn(auditMapperMock);
-            Mockito.when(auditMapperMock.getAuditMetrics(Mockito.anyString(), 
Mockito.anyString(), Mockito.anyString()))
-                    .thenReturn(Collections.emptyList());
-
-            List<AuditMetric> result = 
auditMetricService.getStorageAuditMetrics("nonexistentId", "2023-01-01 
00:00:00",
-                    "2023-01-01 01:00:00");
-            assertTrue(result.isEmpty());
-        }
-    }
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AuditMetricServiceTest.class);
 
     @Test
-    public void getStorageAuditMetricsHandlesInvalidTimestampsGracefully() {
+    public void testGetDataProxyAuditMetrics() {
         // Query service initialization
         AppConfig appConfig = new AppConfig();
         AuditSQLUtil.initialize(appConfig.getProperties());
         AuditMetricService auditMetricService = new AuditMetricService();
 
-        String invalidStartLogTs = "invalid-timestamp";
-        String invalidEndLogTs = "invalid-timestamp";
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss");
+        String endLogTs = 
LocalDateTime.now().minusMinutes(1).format(formatter);
+        String startLogTs = 
LocalDateTime.now().minusMinutes(5).format(formatter);
 
-        List<AuditMetric> result = 
auditMetricService.getStorageAuditMetrics("5", invalidStartLogTs, 
invalidEndLogTs);
+        // Search for relevant data
+        List<AuditMetric> dataproxyAuditMetrics =
+                auditMetricService.getStorageAuditMetrics("5", startLogTs, 
endLogTs);
 
-        assertTrue(result.isEmpty());
+        for (AuditMetric auditMetric : dataproxyAuditMetrics) {
+            LOGGER.error("{} {} {}", auditMetric.getInlongGroupId(), 
auditMetric.getInlongStreamId(),
+                    auditMetric.getCount());
+        }
     }
-
 }
diff --git 
a/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/task/AuditCheckTaskTest.java
 
b/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/task/AuditCheckTaskTest.java
new file mode 100644
index 0000000000..c8ded99fcf
--- /dev/null
+++ 
b/inlong-audit/audit-tool/src/test/java/org/apache/inlong/tool/task/AuditCheckTaskTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.tool.task;
+
+import org.apache.inlong.audit.tool.config.AppConfig;
+import org.apache.inlong.audit.tool.evaluator.AlertEvaluator;
+import org.apache.inlong.audit.tool.manager.AuditAlertRuleManager;
+import org.apache.inlong.audit.tool.task.AuditCheckTask;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.lang.reflect.Field;
+import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.mockito.Mockito.*;
+
+public class AuditCheckTaskTest {
+
+    @Mock
+    private AuditAlertRuleManager auditAlertRuleManager = 
AuditAlertRuleManager.getInstance();
+
+    @Mock
+    private AlertEvaluator alertEvaluator;
+
+    @Mock
+    private AppConfig appConfig;
+
+    @Mock
+    private Properties properties;
+
+    private AuditCheckTask auditCheckTask;
+
+    @BeforeEach
+    public void setUp() {
+        MockitoAnnotations.openMocks(this);
+
+        // Mock AppConfig behavior
+        when(appConfig.getProperties()).thenReturn(properties);
+        when(properties.getProperty("audit.data.time.delay.minute", 
"1")).thenReturn("1");
+        when(properties.getProperty("audit.data.time.interval.minute", 
"1")).thenReturn("5");
+        when(properties.getProperty("audit.id.source", "5")).thenReturn("5");
+
+        // Create AuditCheckTask instance
+        auditCheckTask = new AuditCheckTask(auditAlertRuleManager, 
alertEvaluator, appConfig);
+    }
+
+    @Test
+    public void testConstructor() {
+        // Verify that the object is created successfully
+        assertNotNull(auditCheckTask);
+
+        // Verify that the properties were read correctly
+        verify(appConfig, times(3)).getProperties();
+        verify(properties, 
times(1)).getProperty("audit.data.time.delay.minute", "1");
+        verify(properties, 
times(1)).getProperty("audit.data.time.interval.minute", "1");
+        verify(properties, times(1)).getProperty("audit.id.source", "5");
+    }
+
+    @Test
+    public void testStartMethod() throws NoSuchFieldException, 
IllegalAccessException {
+        // Start the task
+        auditCheckTask.start();
+
+        // Access the scheduler field to verify it's not null
+        Field schedulerField = 
AuditCheckTask.class.getDeclaredField("scheduler");
+        schedulerField.setAccessible(true);
+        ScheduledExecutorService scheduler = (ScheduledExecutorService) 
schedulerField.get(auditCheckTask);
+
+        // Verify that the scheduler is not null
+        assertNotNull(scheduler);
+
+        // Stop the scheduler to clean up
+        auditCheckTask.stop();
+    }
+
+    @Test
+    public void testStopMethod() {
+        // Start the task first
+        auditCheckTask.start();
+
+        // Stop the task
+        auditCheckTask.stop();
+
+        // We can't easily verify the internal state without exposing it,
+        // but we can ensure the method runs without exception
+        // In a real test, we might use a more sophisticated approach to 
verify shutdown
+    }
+
+    @Test
+    public void testConstructorWithNullAppConfig() {
+        // Test constructor with null AppConfig
+        AuditCheckTask task = new AuditCheckTask(auditAlertRuleManager, 
alertEvaluator, null);
+        assertNotNull(task);
+    }
+
+    @Test
+    public void testConstructorWithNullProperties() {
+        // Test constructor with null properties
+        when(appConfig.getProperties()).thenReturn(null);
+        AuditCheckTask task = new AuditCheckTask(auditAlertRuleManager, 
alertEvaluator, appConfig);
+        assertNotNull(task);
+    }
+
+    @Test
+    public void testConstructorWithInvalidInterval() {
+        // Test constructor with invalid interval value
+        
when(properties.getProperty("audit.data.time.interval.minute")).thenReturn("invalid");
+        AuditCheckTask task = new AuditCheckTask(auditAlertRuleManager, 
alertEvaluator, appConfig);
+        assertNotNull(task);
+    }
+
+    @Test
+    public void testConstructorWithEmptyInterval() {
+        // Test constructor with empty interval value
+        
when(properties.getProperty("audit.data.time.interval.minute")).thenReturn("");
+        AuditCheckTask task = new AuditCheckTask(auditAlertRuleManager, 
alertEvaluator, appConfig);
+        assertNotNull(task);
+    }
+}


Reply via email to