This is an automated email from the ASF dual-hosted git repository.

zhaoqingran pushed a commit to branch log-oltp
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git

commit 7019c9e90bf69b477f1cbb2d2032c603e10b21c4
Author: Logic <[email protected]>
AuthorDate: Mon Nov 18 21:44:29 2024 +0800

    log init
---
 hertzbeat-common/pom.xml                           |   5 +
 .../common/constants/ConfigConstants.java          |   2 +
 .../client/VictoriaLogsQueryClient.java            | 205 +++++++++++++++++++++
 .../config/VictoriaLogsConfiguration.java          |  52 ++++++
 .../config/VictoriaLogsProperties.java             |  66 +++++++
 .../controller/LogAnalysisController.java          | 145 +++++++++++++++
 .../exception/VictoriaLogsQueryException.java      |  32 ++++
 .../log/victorialogs/model/LogConstants.java       |   6 +
 .../log/victorialogs/model/LogQueryRequest.java    |  73 ++++++++
 .../log/victorialogs/model/LogQueryResponse.java   | 116 ++++++++++++
 .../victorialogs/service/LogAnalysisService.java   | 197 ++++++++++++++++++++
 hertzbeat-manager/pom.xml                          |  10 +-
 .../src/main/resources/application.yml             |   5 +
 hertzbeat-manager/src/main/resources/sureness.yml  |   1 +
 pom.xml                                            |   7 +
 15 files changed, 917 insertions(+), 5 deletions(-)

diff --git a/hertzbeat-common/pom.xml b/hertzbeat-common/pom.xml
index bfa3faa7c..29442a921 100644
--- a/hertzbeat-common/pom.xml
+++ b/hertzbeat-common/pom.xml
@@ -66,6 +66,11 @@
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-validation</artifactId>
         </dependency>
+        <!-- okhttp -->
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+        </dependency>
         <!-- jackson -->
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/ConfigConstants.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/ConfigConstants.java
index 727d9d7d1..d71e7dc2e 100644
--- 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/ConfigConstants.java
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/ConfigConstants.java
@@ -65,6 +65,8 @@ public interface ConfigConstants {
         String INFO = "info";
 
         String GRAFANA = "grafana";
+
+        String LOG = "log";
     }
 
 }
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java
new file mode 100644
index 000000000..a4d42b46b
--- /dev/null
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.log.victorialogs.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.log.victorialogs.config.VictoriaLogsProperties;
+import org.apache.hertzbeat.log.victorialogs.model.LogQueryResponse;
+import org.apache.hertzbeat.log.victorialogs.model.LogQueryRequest;
+import 
org.apache.hertzbeat.log.victorialogs.exception.VictoriaLogsQueryException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.web.client.RestTemplateBuilder;
+import org.springframework.http.HttpMethod;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+import org.springframework.web.client.RestTemplate;
+import org.springframework.web.util.UriComponentsBuilder;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+@Slf4j
+@Component
+public class VictoriaLogsQueryClient {
+
+    private static final String QUERY = "/select/logsql/query";
+    private static final String TAIL = "/select/logsql/tail";
+    private static final String HITS = "/select/logsql/hits";
+    private static final String STATS_QUERY = "/select/logsql/stats_query";
+    private static final String STATS_QUERY_RANGE = 
"/select/logsql/stats_query_range";
+    private static final String STREAMS_IDS = "/select/logsql/streams_ids";
+    private static final String STREAMS = "/select/logsql/streams";
+    private static final String STREAM_FIELD_NAMES = 
"/select/logsql/stream_field_names";
+    private static final String STREAM_FIELD_VALUES = 
"/select/logsql/stream_field_values";
+    private static final String FIELD_NAMES = "/select/logsql/field_names";
+    private static final String FIELD_VALUES = "/select/logsql/field_values";
+
+    private final RestTemplate restTemplate;
+    private final VictoriaLogsProperties properties;
+    private final ObjectMapper objectMapper;
+
+    @Autowired
+    public VictoriaLogsQueryClient(RestTemplateBuilder restTemplateBuilder,
+                                   VictoriaLogsProperties properties,
+                                   ObjectMapper objectMapper) {
+        this.properties = properties;
+        this.objectMapper = objectMapper;
+        this.restTemplate = restTemplateBuilder
+                .setConnectTimeout(Duration.ofSeconds(5))
+                .setReadTimeout(Duration.ZERO)
+                .build();
+    }
+
+    /**
+     * Execute a regular query
+     * @param request Query request containing search parameters
+     * @return List of log entries matching the query
+     */
+    public List<LogQueryResponse> query(LogQueryRequest request) {
+        String url = buildQueryUrl(request);
+        log.debug("Executing query: {}", url);
+
+        try {
+            String response = restTemplate.getForObject(url, String.class);
+            List<LogQueryResponse> entries = new ArrayList<>();
+            if (response != null) {
+                String[] lines = response.split("\n");
+                for (String line : lines) {
+                    if (!StringUtils.hasText(line)) {
+                        continue;
+                    }
+                    entries.add(objectMapper.readValue(line, 
LogQueryResponse.class));
+                }
+            }
+            return entries;
+        } catch (Exception e) {
+            log.error("Failed to execute query: {}", url, e);
+            throw new VictoriaLogsQueryException("Failed to execute query", e);
+        }
+    }
+
+    /**
+     * Count log entries matching the query
+     * @param request Query request containing search parameters
+     * @return Number of matching log entries
+     */
+    public long count(LogQueryRequest request) {
+        String url = buildCountUrl(request);
+        log.debug("Executing count query: {}", url);
+
+        try {
+            String response = restTemplate.getForObject(url, String.class);
+            if (StringUtils.hasText(response)) {
+                return Long.parseLong(response.trim());
+            }
+            return 0;
+        } catch (Exception e) {
+            log.error("Failed to execute count query: {}", url, e);
+            throw new VictoriaLogsQueryException("Failed to execute count 
query", e);
+        }
+    }
+
+    /**
+     * Execute a streaming query for real-time log tailing
+     * @param query Query expression
+     * @param handler Callback function to handle each log entry
+     */
+    public void tail(String query, Consumer<LogQueryResponse> handler) {
+        String url = buildTailUrl(query);
+        log.debug("Starting log tail stream: {}", url);
+
+        try {
+            restTemplate.execute(url, HttpMethod.GET, null, response -> {
+                try (BufferedReader reader = new BufferedReader(
+                        new InputStreamReader(response.getBody()))) {
+                    String line;
+                    while ((line = reader.readLine()) != null) {
+                        if (!StringUtils.hasText(line)) {
+                            continue;
+                        }
+                        LogQueryResponse entry = objectMapper.readValue(line, 
LogQueryResponse.class);
+                        handler.accept(entry);
+                    }
+                }
+                return null;
+            });
+        } catch (Exception e) {
+            log.error("Error in log tail stream: {}", query, e);
+            throw new VictoriaLogsQueryException("Failed to tail logs", e);
+        }
+    }
+
+    /**
+     * Build URL for regular queries
+     */
+    private String buildQueryUrl(LogQueryRequest request) {
+        UriComponentsBuilder builder = UriComponentsBuilder
+                .fromHttpUrl(properties.url())
+                .path(QUERY)
+                .queryParam("query", request.getQuery());
+
+        if (StringUtils.hasText(request.getStart())) {
+            builder.queryParam("start", request.getStart());
+        }
+        if (StringUtils.hasText(request.getEnd())) {
+            builder.queryParam("end", request.getEnd());
+        }
+        if (request.getLimit() != null) {
+            builder.queryParam("limit", request.getLimit());
+        }
+
+        return builder.build().encode().toUriString();
+    }
+
+    /**
+     * Build URL for count queries
+     */
+    private String buildCountUrl(LogQueryRequest request) {
+        UriComponentsBuilder builder = UriComponentsBuilder
+                .fromHttpUrl(properties.url())
+                .path("/api/v1/query/count")
+                .queryParam("query", request.getQuery());
+
+        if (StringUtils.hasText(request.getStart())) {
+            builder.queryParam("start", request.getStart());
+        }
+        if (StringUtils.hasText(request.getEnd())) {
+            builder.queryParam("end", request.getEnd());
+        }
+
+        return builder.build().encode().toUriString();
+    }
+
+    /**
+     * Build URL for streaming queries
+     */
+    private String buildTailUrl(String query) {
+        return UriComponentsBuilder
+                .fromHttpUrl(properties.url())
+                .path(TAIL)
+                .queryParam("query", query)
+                .build()
+                .encode()
+                .toUriString();
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java
new file mode 100644
index 000000000..776837b87
--- /dev/null
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.log.victorialogs.config;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hertzbeat.common.constants.ConfigConstants;
+import org.apache.hertzbeat.common.constants.SignConstants;
+import org.apache.hertzbeat.log.victorialogs.client.VictoriaLogsQueryClient;
+import org.apache.hertzbeat.log.victorialogs.model.LogConstants;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import 
org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.boot.web.client.RestTemplateBuilder;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.client.SimpleClientHttpRequestFactory;
+import org.springframework.web.client.RestTemplate;
+
+/**
+ * VictoriaLogs configuration class
+ */
+@Configuration
+@EnableConfigurationProperties(VictoriaLogsProperties.class)
+public class VictoriaLogsConfiguration {
+    /**
+     * Creates VictoriaLogs query client
+     */
+    @Bean
+    @ConditionalOnProperty(prefix = ConfigConstants.FunctionModuleConstants.LOG
+            + SignConstants.DOT
+            + LogConstants.VICTORIA_LOGS, name = "enabled", havingValue = 
"true")
+    public VictoriaLogsQueryClient victoriaLogsQueryClient(
+            VictoriaLogsProperties properties,
+            RestTemplateBuilder victoriaLogsRestTemplatebuilder,
+            ObjectMapper objectMapper) {
+        return new VictoriaLogsQueryClient(victoriaLogsRestTemplatebuilder, 
properties, objectMapper);
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java
new file mode 100644
index 000000000..b05caf446
--- /dev/null
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.log.victorialogs.config;
+
+import jakarta.validation.Valid;
+import jakarta.validation.constraints.NotNull;
+import org.apache.hertzbeat.common.constants.ConfigConstants;
+import org.apache.hertzbeat.common.constants.SignConstants;
+import org.apache.hertzbeat.log.victorialogs.model.LogConstants;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.bind.DefaultValue;
+
+/**
+ * VictoriaLogs Configuration Properties
+ *
+ */
+@ConfigurationProperties(prefix = ConfigConstants.FunctionModuleConstants.LOG
+        + SignConstants.DOT
+        + LogConstants.VICTORIA_LOGS)
+public record VictoriaLogsProperties(
+        /**
+         * Whether to enable VictoriaLogs integration
+         */
+        @DefaultValue("false")
+        boolean enabled,
+
+        /**
+         * VictoriaLogs server URL
+         */
+        @DefaultValue("http://localhost:9428";)
+        @NotNull
+        String url
+
+) {
+    @Valid
+    public VictoriaLogsProperties {
+        if (enabled && !isValidUrl(url)) {
+            throw new IllegalArgumentException("Invalid VictoriaLogs URL: " + 
url);
+        }
+    }
+
+    /**
+     * Validate URL format
+     *
+     * @param url URL to validate
+     * @return true if valid, false otherwise
+     */
+    private boolean isValidUrl(String url) {
+        return url != null && (url.startsWith("http://";) || 
url.startsWith("https://";));
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java
new file mode 100644
index 000000000..aba4ea7cc
--- /dev/null
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.log.victorialogs.controller;
+
+import static org.apache.hertzbeat.common.constants.CommonConstants.FAIL_CODE;
+import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE;
+
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.Parameter;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import jakarta.validation.constraints.NotBlank;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.common.entity.dto.Message;
+import org.apache.hertzbeat.log.victorialogs.model.LogQueryResponse;
+import org.apache.hertzbeat.log.victorialogs.model.LogQueryRequest;
+import org.apache.hertzbeat.log.victorialogs.service.LogAnalysisService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.*;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import java.util.List;
+
+@Tag(name = "Log Analysis API")
+@RestController
+@RequestMapping(path = "/api/logs")
+@Validated
+@Slf4j
+public class LogAnalysisController {
+
+    @Autowired
+    private LogAnalysisService logAnalysisService;
+
+    @Operation(summary = "Search logs", description = "Search logs with query 
parameters")
+    @GetMapping(value = "/search", produces = APPLICATION_JSON_VALUE)
+    public ResponseEntity<Message<List<LogQueryResponse>>> searchLogs(
+            @Parameter(description = "Log query expression", required = true)
+            @RequestParam @NotBlank String query,
+            @Parameter(description = "Start time for the query")
+            @RequestParam(required = false) String start,
+            @Parameter(description = "End time for the query")
+            @RequestParam(required = false) String end,
+            @Parameter(description = "Maximum number of results to return")
+            @RequestParam(required = false) Integer limit) {
+        try {
+            LogQueryRequest request = LogQueryRequest.builder()
+                    .query(query)
+                    .start(start)
+                    .end(end)
+                    .limit(limit)
+                    .build();
+            List<LogQueryResponse> entries = 
logAnalysisService.searchLogs(request);
+            return ResponseEntity.ok(Message.success(entries));
+        } catch (Exception e) {
+            log.error("search logs error", e);
+            return ResponseEntity.ok(Message.fail(FAIL_CODE, e.getMessage()));
+        }
+    }
+
+    @Operation(summary = "Query log entries by keyword", description = "Search 
log entries containing specific keyword")
+    @GetMapping(value = "/query/keyword", produces = APPLICATION_JSON_VALUE)
+    public ResponseEntity<Message<List<LogQueryResponse>>> queryByKeyword(
+            @Parameter(description = "Keyword to search for", required = true)
+            @RequestParam @NotBlank String keyword,
+            @Parameter(description = "Start time for the query")
+            @RequestParam(required = false) String start,
+            @Parameter(description = "End time for the query")
+            @RequestParam(required = false) String end,
+            @Parameter(description = "Maximum number of results to return")
+            @RequestParam(required = false) Integer limit) {
+        try {
+            List<LogQueryResponse> entries = 
logAnalysisService.queryByKeyword(keyword, start, end, limit);
+            return ResponseEntity.ok(Message.success(entries));
+        } catch (Exception e) {
+            log.error("query by keyword error", e);
+            return ResponseEntity.ok(Message.fail(FAIL_CODE, e.getMessage()));
+        }
+    }
+
+    @Operation(summary = "Count keyword occurrences", description = "Count 
occurrences of a keyword in logs")
+    @GetMapping(value = "/count/keyword", produces = APPLICATION_JSON_VALUE)
+    public ResponseEntity<Message<Long>> countKeywordOccurrences(
+            @Parameter(description = "Keyword to count", required = true)
+            @RequestParam @NotBlank String keyword,
+            @Parameter(description = "Start time for the query")
+            @RequestParam(required = false) String start,
+            @Parameter(description = "End time for the query")
+            @RequestParam(required = false) String end) {
+        try {
+            long count = logAnalysisService.countKeywordOccurrences(keyword, 
start, end);
+            return ResponseEntity.ok(Message.success(count));
+        } catch (Exception e) {
+            log.error("count keyword occurrences error", e);
+            return ResponseEntity.ok(Message.fail(FAIL_CODE, e.getMessage()));
+        }
+    }
+
+    @GetMapping(value = "/stream", produces = 
MediaType.TEXT_EVENT_STREAM_VALUE)
+    public SseEmitter streamLogs(@RequestParam String query) {
+        SseEmitter emitter = new SseEmitter(180000L); // 3 minutes timeout
+        logAnalysisService.streamLogs(query, emitter);
+        return emitter;
+    }
+
+    @Operation(summary = "Stop all streams", description = "Stop all active 
log streams")
+    @PostMapping(value = "/stream/stop", produces = APPLICATION_JSON_VALUE)
+    public ResponseEntity<Message<String>> stopAllStreams() {
+        try {
+            logAnalysisService.stopAllStreams();
+            return ResponseEntity.ok(Message.success("Successfully stopped all 
streams"));
+        } catch (Exception e) {
+            log.error("Failed to stop streams", e);
+            return ResponseEntity.ok(Message.fail(FAIL_CODE, "Failed to stop 
streams"));
+        }
+    }
+
+    @Operation(summary = "Get active stream count", description = "Get the 
number of active log streams")
+    @GetMapping(value = "/stream/count", produces = APPLICATION_JSON_VALUE)
+    public ResponseEntity<Message<Integer>> getActiveStreamCount() {
+        try {
+            int count = logAnalysisService.getActiveStreamCount();
+            return ResponseEntity.ok(Message.success(count));
+        } catch (Exception e) {
+            log.error("Failed to get stream count", e);
+            return ResponseEntity.ok(Message.fail(FAIL_CODE, "Failed to get 
stream count"));
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/exception/VictoriaLogsQueryException.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/exception/VictoriaLogsQueryException.java
new file mode 100644
index 000000000..44a2bf6ba
--- /dev/null
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/exception/VictoriaLogsQueryException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.log.victorialogs.exception;
+
+/**
+ * Custom exception for VictoriaLogs query errors
+ */
+public class VictoriaLogsQueryException extends RuntimeException {
+
+    public VictoriaLogsQueryException(String message) {
+        super(message);
+    }
+
+    public VictoriaLogsQueryException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogConstants.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogConstants.java
new file mode 100644
index 000000000..76bc3b1c5
--- /dev/null
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogConstants.java
@@ -0,0 +1,6 @@
+package org.apache.hertzbeat.log.victorialogs.model;
+
+public interface LogConstants {
+
+    String VICTORIA_LOGS = "victoria-logs";
+}
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java
new file mode 100644
index 000000000..f7e37b567
--- /dev/null
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.log.victorialogs.model;
+
+import jakarta.validation.constraints.Max;
+import jakarta.validation.constraints.Min;
+import jakarta.validation.constraints.NotBlank;
+import java.util.Set;
+import lombok.Builder;
+import lombok.Data;
+
+
+/**
+ * Request parameters for log query
+ */
+@Data
+@Builder
+public class LogQueryRequest {
+    /**
+     * Query string for log search
+     */
+    @NotBlank(message = "Query cannot be empty")
+    private String query;
+
+    /**
+     * Start time for log search
+     * Format: ISO-8601 or relative time like "-1h", "-30m"
+     */
+    private String start;
+
+    /**
+     * End time for log search
+     * Format: ISO-8601 or relative time like "now"
+     */
+    private String end;
+
+    /**
+     * Maximum number of results to return
+     */
+    @Min(value = 1, message = "Limit must be greater than 0")
+    @Max(value = 10000, message = "Limit cannot exceed 10000")
+    private Integer limit;
+
+    /**
+     * Field to order results by
+     */
+    private String orderBy;
+
+    /**
+     * Sort order direction
+     */
+    private Boolean ascending;
+
+    /**
+     * Specific fields to return in the response
+     */
+    private Set<String> fields;
+}
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryResponse.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryResponse.java
new file mode 100644
index 000000000..16e28701e
--- /dev/null
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryResponse.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.log.victorialogs.model;
+
+import com.fasterxml.jackson.annotation.JsonAnyGetter;
+import com.fasterxml.jackson.annotation.JsonAnySetter;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class LogQueryResponse {
+
+    /**
+     * Timestamp of the log entry
+     */
+    @JsonProperty("_time")
+    private String timestamp;
+
+    /**
+     * Log message content
+     */
+    @JsonProperty("_msg")
+    private String message;
+
+    /**
+     * Unique identifier for the log stream
+     */
+    @JsonProperty("_stream_id")
+    private String streamId;
+
+    /**
+     * Stream metadata in JSON format
+     */
+    @JsonProperty("_stream")
+    private String stream;
+
+    /**
+     * Log level (e.g., INFO, ERROR)
+     */
+    @JsonProperty("severity")
+    private String level;
+
+    /**
+     * Name of the service generating the log
+     */
+    @JsonProperty("service.name")
+    private String serviceName;
+
+    /**
+     * Additional properties not covered by standard fields
+     */
+    @Builder.Default
+    private Map<String, Object> additionalProperties = new HashMap<>();
+
+    /**
+     * Getter for additional properties
+     *
+     * @return Map of additional properties
+     */
+    @JsonAnyGetter
+    public Map<String, Object> getAdditionalProperties() {
+        return additionalProperties;
+    }
+
+    /**
+     * Setter for additional properties. Captures any unmapped JSON properties.
+     *
+     * @param key Property key
+     * @param value Property value
+     */
+    @JsonAnySetter
+    public void addAdditionalProperty(String key, Object value) {
+        if (!isStandardField(key)) {
+            additionalProperties.put(key, value);
+        }
+    }
+
+    /**
+     * Check if a field is one of the standard mapped fields
+     *
+     * @param fieldName Name of the field to check
+     * @return true if field is standard, false otherwise
+     */
+    private boolean isStandardField(String fieldName) {
+        return fieldName.equals("_time") ||
+                fieldName.equals("_msg") ||
+                fieldName.equals("_stream_id") ||
+                fieldName.equals("_stream") ||
+                fieldName.equals("severity") ||
+                fieldName.equals("service.name");
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java
new file mode 100644
index 000000000..e1085369f
--- /dev/null
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.log.victorialogs.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.log.victorialogs.client.VictoriaLogsQueryClient;
+import org.apache.hertzbeat.log.victorialogs.model.LogQueryResponse;
+import org.apache.hertzbeat.log.victorialogs.model.LogQueryRequest;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+
+import javax.annotation.PreDestroy;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+
+@Service
+@Slf4j
+public class LogAnalysisService {
+
+    private final Set<SseEmitter> activeEmitters = 
ConcurrentHashMap.newKeySet();
+    private final VictoriaLogsQueryClient victoriaLogsQueryClient;
+    private final ScheduledExecutorService heartbeatExecutor = 
Executors.newSingleThreadScheduledExecutor();
+
+    private static final long HEARTBEAT_DELAY = 15; // seconds
+
+    @Autowired
+    public LogAnalysisService(VictoriaLogsQueryClient victoriaLogsQueryClient) 
{
+        this.victoriaLogsQueryClient = victoriaLogsQueryClient;
+    }
+
+    @PreDestroy
+    public void destroy() {
+        stopAllStreams();
+        heartbeatExecutor.shutdown();
+        try {
+            if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+                heartbeatExecutor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            heartbeatExecutor.shutdownNow();
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    public List<LogQueryResponse> searchLogs(LogQueryRequest request) {
+        return victoriaLogsQueryClient.query(request);
+    }
+
+    public List<LogQueryResponse> queryByKeyword(String keyword, String start, 
String end, Integer limit) {
+        LogQueryRequest request = LogQueryRequest.builder()
+                .query(buildKeywordQuery(keyword))
+                .start(start)
+                .end(end)
+                .limit(limit)
+                .build();
+        return victoriaLogsQueryClient.query(request);
+    }
+
+    public long countKeywordOccurrences(String keyword, String start, String 
end) {
+        LogQueryRequest request = LogQueryRequest.builder()
+                .query(buildKeywordQuery(keyword))
+                .start(start)
+                .end(end)
+                .build();
+        return victoriaLogsQueryClient.count(request);
+    }
+
+    private String buildKeywordQuery(String keyword) {
+        String escapedKeyword = keyword.replace("\"", "\\\"");
+        return String.format("message ILIKE \"%%%s%%\"", escapedKeyword);
+    }
+
+    public void streamLogs(String query, SseEmitter emitter) {
+        activeEmitters.add(emitter);
+
+        // Start heartbeat for this emitter
+        ScheduledFuture<?> heartbeatFuture = startHeartbeat(emitter);
+
+        CompletableFuture.runAsync(() -> {
+            try {
+                // Send initial connection established event
+                sendEvent(emitter, "open", "Connection established", null);
+
+                victoriaLogsQueryClient.tail(query, logQueryResponse -> {
+                    try {
+                        if (activeEmitters.contains(emitter)) {
+                            sendEvent(emitter, "log", logQueryResponse, 
String.valueOf(System.currentTimeMillis()));
+                        }
+                    } catch (IOException e) {
+                        handleEmitterError(emitter, heartbeatFuture, e);
+                    }
+                });
+            } catch (Exception e) {
+                handleEmitterError(emitter, heartbeatFuture, e);
+            }
+        });
+
+        // Set up completion callback
+        emitter.onCompletion(() -> {
+            log.debug("SSE completed");
+            cleanup(emitter, heartbeatFuture);
+        });
+
+        // Set up timeout callback
+        emitter.onTimeout(() -> {
+            log.debug("SSE timeout");
+            cleanup(emitter, heartbeatFuture);
+        });
+
+        // Set up error callback
+        emitter.onError(ex -> {
+            log.error("SSE error", ex);
+            cleanup(emitter, heartbeatFuture);
+        });
+    }
+
+    private ScheduledFuture<?> startHeartbeat(SseEmitter emitter) {
+        return heartbeatExecutor.scheduleAtFixedRate(() -> {
+            try {
+                if (activeEmitters.contains(emitter)) {
+                    emitter.send(SseEmitter.event()
+                            .comment("heartbeat")
+                            .build());
+                }
+            } catch (IOException e) {
+                log.warn("Failed to send heartbeat, closing connection", e);
+                removeEmitter(emitter);
+            }
+        }, HEARTBEAT_DELAY, HEARTBEAT_DELAY, TimeUnit.SECONDS);
+    }
+
+    private void sendEvent(SseEmitter emitter, String eventName, Object data, 
String id) throws IOException {
+        SseEmitter.SseEventBuilder builder = SseEmitter.event()
+                .name(eventName)
+                .data(data);
+
+        if (id != null) {
+            builder.id(id);
+        }
+
+        emitter.send(builder.build());
+    }
+
+    private void handleEmitterError(SseEmitter emitter, ScheduledFuture<?> 
heartbeatFuture, Exception e) {
+        log.error("Error in log streaming", e);
+        cleanup(emitter, heartbeatFuture);
+        try {
+            sendEvent(emitter, "error", e.getMessage(), null);
+        } catch (IOException ex) {
+            log.error("Failed to send error message", ex);
+        }
+    }
+
+    private void cleanup(SseEmitter emitter, ScheduledFuture<?> 
heartbeatFuture) {
+        removeEmitter(emitter);
+        if (heartbeatFuture != null) {
+            heartbeatFuture.cancel(true);
+        }
+    }
+
+    public void removeEmitter(SseEmitter emitter) {
+        activeEmitters.remove(emitter);
+    }
+
+    public int getActiveStreamCount() {
+        return activeEmitters.size();
+    }
+
+    public void stopAllStreams() {
+        activeEmitters.forEach(emitter -> {
+            try {
+                emitter.complete();
+            } catch (Exception e) {
+                log.error("Error completing emitter", e);
+            }
+        });
+        activeEmitters.clear();
+    }
+}
\ No newline at end of file
diff --git a/hertzbeat-manager/pom.xml b/hertzbeat-manager/pom.xml
index 8b459bb86..50f2c176f 100644
--- a/hertzbeat-manager/pom.xml
+++ b/hertzbeat-manager/pom.xml
@@ -84,6 +84,11 @@
             <groupId>org.apache.hertzbeat</groupId>
             <artifactId>hertzbeat-grafana</artifactId>
         </dependency>
+        <!-- log -->
+        <dependency>
+            <groupId>org.apache.hertzbeat</groupId>
+            <artifactId>hertzbeat-log</artifactId>
+        </dependency>
         <!-- spring -->
         <dependency>
             <groupId>org.springframework.boot</groupId>
@@ -164,11 +169,6 @@
             <groupId>com.usthe.sureness</groupId>
             <artifactId>spring-boot3-starter-sureness</artifactId>
         </dependency>
-        <!-- okhttp -->
-        <dependency>
-            <groupId>com.squareup.okhttp3</groupId>
-            <artifactId>okhttp</artifactId>
-        </dependency>
         <!-- h2 database-->
         <dependency>
             <groupId>com.h2database</groupId>
diff --git a/hertzbeat-manager/src/main/resources/application.yml 
b/hertzbeat-manager/src/main/resources/application.yml
index 1d0d878d8..02e3ace06 100644
--- a/hertzbeat-manager/src/main/resources/application.yml
+++ b/hertzbeat-manager/src/main/resources/application.yml
@@ -211,6 +211,11 @@ grafana:
   username: admin
   password: admin
 
+log:
+  victoria-logs:
+    enabled: true
+    url: http://localhost:9428
+
 # See the documentation for details : 
https://hertzbeat.apache.org/zh-cn/docs/help/aiConfig
 ai:
   # AI Type:zhiPu、alibabaAi、kimiAi、sparkDesk
diff --git a/hertzbeat-manager/src/main/resources/sureness.yml 
b/hertzbeat-manager/src/main/resources/sureness.yml
index 9ebd316af..e1fdd2a4f 100644
--- a/hertzbeat-manager/src/main/resources/sureness.yml
+++ b/hertzbeat-manager/src/main/resources/sureness.yml
@@ -71,6 +71,7 @@ resourceRole:
 # rule: api===method 
 # eg: /api/v1/source3===get means /api/v1/source3===get can be access by 
anyone, no need auth.
 excludedResource:
+  - /api/logs/**===*
   - /api/alerts/report/**===*
   - /api/account/auth/**===*
   - /api/i18n/**===get
diff --git a/pom.xml b/pom.xml
index 613360716..118f0f9ca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
         <module>hertzbeat-push</module>
         <module>hertzbeat-plugin</module>
         <module>hertzbeat-grafana</module>
+        <module>hertzbeat-log</module>
         <module>hertzbeat-e2e</module>
     </modules>
 
@@ -222,6 +223,12 @@
                 <artifactId>hertzbeat-grafana</artifactId>
                 <version>${hertzbeat.version}</version>
             </dependency>
+            <!-- log -->
+            <dependency>
+                <groupId>org.apache.hertzbeat</groupId>
+                <artifactId>hertzbeat-log</artifactId>
+                <version>${hertzbeat.version}</version>
+            </dependency>
             <!-- collector-basic -->
             <dependency>
                 <groupId>org.apache.hertzbeat</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to