This is an automated email from the ASF dual-hosted git repository.

zhaoqingran pushed a commit to branch log-oltp
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/log-oltp by this push:
     new 207a2dc20 modify api endpoints
207a2dc20 is described below

commit 207a2dc20b421357dde5efd6d895bfbefb186360
Author: Logic <[email protected]>
AuthorDate: Tue Nov 19 22:56:50 2024 +0800

    modify api endpoints
---
 hertzbeat-log/pom.xml                              |  61 +++++++
 .../client/VictoriaLogsQueryClient.java            | 184 +++++++++++----------
 .../config/VictoriaLogsConfiguration.java          |   6 +-
 .../config/VictoriaLogsProperties.java             |   8 +-
 .../log/victorialogs/config/WebClientConfig.java   |  51 ++++++
 .../controller/LogAnalysisController.java          |  86 +++-------
 .../log/victorialogs/model/LogQueryRequest.java    |   6 +-
 .../victorialogs/service/LogAnalysisService.java   | 121 +-------------
 8 files changed, 253 insertions(+), 270 deletions(-)

diff --git a/hertzbeat-log/pom.xml b/hertzbeat-log/pom.xml
new file mode 100644
index 000000000..ec05e0d92
--- /dev/null
+++ b/hertzbeat-log/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.hertzbeat</groupId>
+        <artifactId>hertzbeat</artifactId>
+        <version>2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>hertzbeat-log</artifactId>
+    <name>${project.artifactId}</name>
+
+    <properties>
+        <maven.compiler.source>17</maven.compiler.source>
+        <maven.compiler.target>17</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+
+    <dependencies>
+        <!-- common -->
+        <dependency>
+            <groupId>org.apache.hertzbeat</groupId>
+            <artifactId>hertzbeat-common</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <!-- hertzbeat warehouse -->
+        <dependency>
+            <groupId>org.apache.hertzbeat</groupId>
+            <artifactId>hertzbeat-warehouse</artifactId>
+        </dependency>
+        <!-- swagger -->
+        <dependency>
+            <groupId>org.springdoc</groupId>
+            <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
+        </dependency>
+        <!--webflux-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-webflux</artifactId>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java
index 60ab26eb1..d22fcff90 100644
--- 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java
@@ -17,9 +17,12 @@
 
 package org.apache.hertzbeat.log.victorialogs.client;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.log.victorialogs.config.VictoriaLogsProperties;
 import org.apache.hertzbeat.log.victorialogs.model.LogQueryResponse;
@@ -27,10 +30,19 @@ import 
org.apache.hertzbeat.log.victorialogs.model.LogQueryRequest;
 import 
org.apache.hertzbeat.log.victorialogs.exception.VictoriaLogsQueryException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.web.client.RestTemplateBuilder;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
 import org.springframework.stereotype.Component;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.util.StreamUtils;
 import org.springframework.util.StringUtils;
 import org.springframework.web.client.RestTemplate;
+import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.util.UriComponentsBuilder;
 
 import java.io.BufferedReader;
@@ -40,6 +52,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.function.Consumer;
 import org.springframework.web.util.UriUtils;
+import reactor.core.publisher.Flux;
 
 @Slf4j
 @Component
@@ -60,17 +73,20 @@ public class VictoriaLogsQueryClient {
     private final RestTemplate restTemplate;
     private final VictoriaLogsProperties properties;
     private final ObjectMapper objectMapper;
+    private final WebClient webClient;
 
     @Autowired
     public VictoriaLogsQueryClient(RestTemplateBuilder restTemplateBuilder,
                                    VictoriaLogsProperties properties,
-                                   ObjectMapper objectMapper) {
+                                   ObjectMapper objectMapper,
+                                   WebClient webClient) {
         this.properties = properties;
         this.objectMapper = objectMapper;
         this.restTemplate = restTemplateBuilder
                 .setConnectTimeout(Duration.ofSeconds(5))
                 .setReadTimeout(Duration.ZERO)
                 .build();
+        this.webClient = webClient;
     }
 
     /**
@@ -79,11 +95,26 @@ public class VictoriaLogsQueryClient {
      * @return List of log entries matching the query
      */
     public List<LogQueryResponse> query(LogQueryRequest request) {
-        String url = buildQueryUrl(request);
+        String url = properties.url() + QUERY;
         log.info("Executing query: {}", url);
 
         try {
-            String response = restTemplate.getForObject(url, String.class);
+            HttpHeaders headers = new HttpHeaders();
+            headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
+
+            MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
+            params.add("query", request.getQuery());
+            if (StringUtils.hasText(request.getStart())) {
+                params.add("start", request.getStart());
+            }
+            if (StringUtils.hasText(request.getEnd())) {
+                params.add("end", request.getEnd());
+            }
+            params.add("limit", String.valueOf(request.getLimit() != null ? 
request.getLimit() : 500));
+
+            HttpEntity<MultiValueMap<String, String>> requestEntity = new 
HttpEntity<>(params, headers);
+            String response = restTemplate.postForObject(url, requestEntity, 
String.class);
+
             List<LogQueryResponse> entries = new ArrayList<>();
             if (response != null) {
                 String[] lines = response.split("\n");
@@ -107,11 +138,25 @@ public class VictoriaLogsQueryClient {
      * @return Number of matching log entries
      */
     public long count(LogQueryRequest request) {
-        String url = buildCountUrl(request);
+        String url = properties.url() + "/api/v1/query/count";
         log.debug("Executing count query: {}", url);
 
         try {
-            String response = restTemplate.getForObject(url, String.class);
+            HttpHeaders headers = new HttpHeaders();
+            headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
+
+            MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
+            params.add("query", request.getQuery());
+            if (StringUtils.hasText(request.getStart())) {
+                params.add("start", request.getStart());
+            }
+            if (StringUtils.hasText(request.getEnd())) {
+                params.add("end", request.getEnd());
+            }
+
+            HttpEntity<MultiValueMap<String, String>> requestEntity = new 
HttpEntity<>(params, headers);
+            String response = restTemplate.postForObject(url, requestEntity, 
String.class);
+
             if (StringUtils.hasText(response)) {
                 return Long.parseLong(response.trim());
             }
@@ -125,91 +170,62 @@ public class VictoriaLogsQueryClient {
     /**
      * Execute a streaming query for real-time log tailing
      * @param query Query expression
-     * @param handler Callback function to handle each log entry
      */
-    public void tail(String query, Consumer<LogQueryResponse> handler) {
-        String url = buildTailUrl(query);
-        log.debug("Starting log tail stream: {}", url);
-
-        try {
-            restTemplate.execute(url, HttpMethod.GET, null, response -> {
-                try (BufferedReader reader = new BufferedReader(
-                        new InputStreamReader(response.getBody()))) {
-                    String line;
-                    while ((line = reader.readLine()) != null) {
-                        if (!StringUtils.hasText(line)) {
-                            continue;
-                        }
-                        LogQueryResponse entry = objectMapper.readValue(line, 
LogQueryResponse.class);
-                        handler.accept(entry);
+    public Flux<LogQueryResponse> tail(String query) {
+        String url = properties.url() + TAIL;
+
+        return webClient
+                .post()
+                .uri(url)
+                .contentType(MediaType.APPLICATION_FORM_URLENCODED)
+                .body(BodyInserters.fromFormData("query", query))
+                .retrieve()
+                .bodyToFlux(String.class)
+                .doOnNext(response -> log.debug("Received response: {}", 
response))
+                .filter(StringUtils::hasText)
+                .<LogQueryResponse>handle((line, sink) -> {
+                    try {
+                        sink.next(objectMapper.readValue(line, 
LogQueryResponse.class));
+                    } catch (JsonProcessingException e) {
+                        log.error("Failed to parse log entry: {}", line, e);
+                        sink.error(new VictoriaLogsQueryException("Failed to 
parse log entry", e));
                     }
-                }
-                return null;
-            });
-        } catch (Exception e) {
-            log.error("Error in log tail stream: {}", query, e);
-            throw new VictoriaLogsQueryException("Failed to tail logs", e);
-        }
-    }
-
-    /**
-     * Build URL for regular queries
-     */
-    private String buildQueryUrl(LogQueryRequest request) {
-        // 不使用 UriComponentsBuilder,因为它可能会导致多次编码
-        StringBuilder urlBuilder = new StringBuilder(properties.url())
-                .append("/select/logsql/query?query=");
-
-        // 只编码一次查询参数
-        urlBuilder.append(URLEncoder.encode(request.getQuery(), 
StandardCharsets.UTF_8));
-
-        // 添加其他参数
-        if (StringUtils.hasText(request.getStart())) {
-            
urlBuilder.append("&start=").append(URLEncoder.encode(request.getStart(), 
StandardCharsets.UTF_8));
-        }
-        if (StringUtils.hasText(request.getEnd())) {
-            
urlBuilder.append("&end=").append(URLEncoder.encode(request.getEnd(), 
StandardCharsets.UTF_8));
-        }
-        if (request.getLimit() != null) {
-            urlBuilder.append("&limit=").append(request.getLimit());
-        } else {
-            urlBuilder.append("&limit=500");
-        }
-
-        return urlBuilder.toString();
+                })
+                .doOnSubscribe(subscription -> log.debug("Starting 
subscription to log stream"))
+                .doOnError(error -> log.error("Error in log tail stream: {}", 
error.getMessage(), error))
+                .doOnComplete(() -> log.debug("Log stream completed"))
+                .onErrorMap(e -> new VictoriaLogsQueryException("Failed to 
tail logs", e));
     }
 
-
-
     /**
-     * Build URL for count queries
+     * Execute a streaming query for real-time log tailing with heartbeat 
mechanism
+     * @param query Query expression
      */
-    private String buildCountUrl(LogQueryRequest request) {
-        UriComponentsBuilder builder = UriComponentsBuilder
-                .fromHttpUrl(properties.url())
-                .path("/api/v1/query/count")
-                .queryParam("query", request.getQuery());
-
-        if (StringUtils.hasText(request.getStart())) {
-            builder.queryParam("start", request.getStart());
-        }
-        if (StringUtils.hasText(request.getEnd())) {
-            builder.queryParam("end", request.getEnd());
-        }
-
-        return builder.build().encode().toUriString();
+    public Flux<ServerSentEvent<LogQueryResponse>> tailAsSSE(String query) {
+        // Create a heartbeat event flux that emits every 15 seconds
+        Flux<ServerSentEvent<LogQueryResponse>> heartbeat = 
Flux.interval(Duration.ZERO, Duration.ofSeconds(15))
+                .map(i -> ServerSentEvent.<LogQueryResponse>builder()
+                        .event("heartbeat")
+                        .data(null)
+                        .build());
+
+        // Create the initial connection event
+        Flux<ServerSentEvent<LogQueryResponse>> connectionEvent = Flux.just(
+                ServerSentEvent.<LogQueryResponse>builder()
+                        .event("connected")
+                        .data(null)
+                        .build()
+        );
+
+        // Combine the data stream with heartbeat events
+        return Flux.merge(
+                connectionEvent,
+                heartbeat,
+                tail(query).map(logResponse -> 
ServerSentEvent.<LogQueryResponse>builder()
+                        .event("log")
+                        .data(logResponse)
+                        .build())
+        );
     }
 
-    /**
-     * Build URL for streaming queries
-     */
-    private String buildTailUrl(String query) {
-        return UriComponentsBuilder
-                .fromHttpUrl(properties.url())
-                .path(TAIL)
-                .queryParam("query", query)
-                .build()
-                .encode()
-                .toUriString();
-    }
 }
\ No newline at end of file
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java
index 776837b87..3cfa5adc7 100644
--- 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java
@@ -29,6 +29,7 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.http.client.SimpleClientHttpRequestFactory;
 import org.springframework.web.client.RestTemplate;
+import org.springframework.web.reactive.function.client.WebClient;
 
 /**
  * VictoriaLogs configuration class
@@ -46,7 +47,8 @@ public class VictoriaLogsConfiguration {
     public VictoriaLogsQueryClient victoriaLogsQueryClient(
             VictoriaLogsProperties properties,
             RestTemplateBuilder victoriaLogsRestTemplatebuilder,
-            ObjectMapper objectMapper) {
-        return new VictoriaLogsQueryClient(victoriaLogsRestTemplatebuilder, 
properties, objectMapper);
+            ObjectMapper objectMapper,
+            WebClient webClient) {
+        return new VictoriaLogsQueryClient(victoriaLogsRestTemplatebuilder, 
properties, objectMapper, webClient);
     }
 }
\ No newline at end of file
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java
index b05caf446..9ce4ec151 100644
--- 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java
@@ -33,14 +33,14 @@ import 
org.springframework.boot.context.properties.bind.DefaultValue;
         + SignConstants.DOT
         + LogConstants.VICTORIA_LOGS)
 public record VictoriaLogsProperties(
-        /**
-         * Whether to enable VictoriaLogs integration
+        /*
+          Whether to enable VictoriaLogs integration
          */
         @DefaultValue("false")
         boolean enabled,
 
-        /**
-         * VictoriaLogs server URL
+        /*
+          VictoriaLogs server URL
          */
         @DefaultValue("http://localhost:9428";)
         @NotNull
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/WebClientConfig.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/WebClientConfig.java
new file mode 100644
index 000000000..eca0c9366
--- /dev/null
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/WebClientConfig.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.log.victorialogs.config;
+
+import io.netty.channel.ChannelOption;
+import java.time.Duration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.netty.http.client.HttpClient;
+
+/**
+ * WebClient Configuration
+ */
+@Configuration
+public class WebClientConfig {
+
+    /**
+     * Creates WebClient
+     */
+    @Bean
+    public WebClient webClient(WebClient.Builder builder) {
+        return builder
+                .clientConnector(new ReactorClientHttpConnector(
+                        HttpClient.create()
+                                .headers(headers -> 
headers.add(HttpHeaders.CONTENT_TYPE, 
MediaType.APPLICATION_FORM_URLENCODED_VALUE))
+                                .responseTimeout(Duration.ZERO)
+                                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 
0)
+                                .keepAlive(true)
+                ))
+                .build();
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java
index 98982b2ba..33f577c4f 100644
--- 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java
@@ -32,11 +32,13 @@ import 
org.apache.hertzbeat.log.victorialogs.service.LogAnalysisService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
+import org.springframework.http.codec.ServerSentEvent;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.*;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
 import java.util.List;
+import reactor.core.publisher.Flux;
 
 @Tag(name = "Log Analysis API")
 @RestController
@@ -49,23 +51,11 @@ public class LogAnalysisController {
     private LogAnalysisService logAnalysisService;
 
     @Operation(summary = "Search logs", description = "Search logs with query 
parameters")
-    @GetMapping(value = "/query", produces = APPLICATION_JSON_VALUE)
+    @PostMapping(value = "/query", produces = APPLICATION_JSON_VALUE)
     public ResponseEntity<Message<List<LogQueryResponse>>> searchLogs(
-            @Parameter(description = "Log query expression", required = true)
-            @RequestParam @NotBlank String query,
-            @Parameter(description = "Start time for the query")
-            @RequestParam(required = false) String start,
-            @Parameter(description = "End time for the query")
-            @RequestParam(required = false) String end,
-            @Parameter(description = "Maximum number of results to return")
-            @RequestParam(required = false) Integer limit) {
+            @Parameter(description = "Log query parameters", required = true)
+            @RequestBody @Validated LogQueryRequest request) {
         try {
-            LogQueryRequest request = LogQueryRequest.builder()
-                    .query(query)
-                    .start(start)
-                    .end(end)
-                    .limit(limit)
-                    .build();
             List<LogQueryResponse> entries = 
logAnalysisService.searchLogs(request);
             return ResponseEntity.ok(Message.success(entries));
         } catch (Exception e) {
@@ -75,18 +65,16 @@ public class LogAnalysisController {
     }
 
     @Operation(summary = "Query log entries by keyword", description = "Search 
log entries containing specific keyword")
-    @GetMapping(value = "/query/keyword", produces = APPLICATION_JSON_VALUE)
+    @PostMapping(value = "/query/keyword", produces = APPLICATION_JSON_VALUE)
     public ResponseEntity<Message<List<LogQueryResponse>>> queryByKeyword(
-            @Parameter(description = "Keyword to search for", required = true)
-            @RequestParam @NotBlank String keyword,
-            @Parameter(description = "Start time for the query")
-            @RequestParam(required = false) String start,
-            @Parameter(description = "End time for the query")
-            @RequestParam(required = false) String end,
-            @Parameter(description = "Maximum number of results to return")
-            @RequestParam(required = false) Integer limit) {
+            @Parameter(description = "Log query parameters", required = true)
+            @RequestBody @Validated LogQueryRequest request) {
         try {
-            List<LogQueryResponse> entries = 
logAnalysisService.queryByKeyword(keyword, start, end, limit);
+            List<LogQueryResponse> entries = logAnalysisService.queryByKeyword(
+                    request.getQuery(),
+                    request.getStart(),
+                    request.getEnd(),
+                    request.getLimit());
             return ResponseEntity.ok(Message.success(entries));
         } catch (Exception e) {
             log.error("query by keyword error", e);
@@ -95,16 +83,15 @@ public class LogAnalysisController {
     }
 
     @Operation(summary = "Count keyword occurrences", description = "Count 
occurrences of a keyword in logs")
-    @GetMapping(value = "/count/keyword", produces = APPLICATION_JSON_VALUE)
+    @PostMapping(value = "/count/keyword", produces = APPLICATION_JSON_VALUE)
     public ResponseEntity<Message<Long>> countKeywordOccurrences(
-            @Parameter(description = "Keyword to count", required = true)
-            @RequestParam @NotBlank String keyword,
-            @Parameter(description = "Start time for the query")
-            @RequestParam(required = false) String start,
-            @Parameter(description = "End time for the query")
-            @RequestParam(required = false) String end) {
+            @Parameter(description = "Log query parameters", required = true)
+            @RequestBody @Validated LogQueryRequest request) {
         try {
-            long count = logAnalysisService.countKeywordOccurrences(keyword, 
start, end);
+            long count = logAnalysisService.countKeywordOccurrences(
+                    request.getQuery(),
+                    request.getStart(),
+                    request.getEnd());
             return ResponseEntity.ok(Message.success(count));
         } catch (Exception e) {
             log.error("count keyword occurrences error", e);
@@ -112,34 +99,11 @@ public class LogAnalysisController {
         }
     }
 
-    @GetMapping(value = "/tail", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
-    public SseEmitter streamLogs(@RequestParam String query) {
-        SseEmitter emitter = new SseEmitter(0L);
-        logAnalysisService.streamLogs(query, emitter);
-        return emitter;
+    @PostMapping(value = "/tail", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+    public Flux<ServerSentEvent<LogQueryResponse>> streamLogs(
+            @Parameter(description = "Log query parameters", required = true)
+            @RequestBody @Validated LogQueryRequest request) {
+        return logAnalysisService.streamLogs(request.getQuery());
     }
 
-    @Operation(summary = "Stop all streams", description = "Stop all active 
log streams")
-    @PostMapping(value = "/tail/stop", produces = APPLICATION_JSON_VALUE)
-    public ResponseEntity<Message<String>> stopAllStreams() {
-        try {
-            logAnalysisService.stopAllStreams();
-            return ResponseEntity.ok(Message.success("Successfully stopped all 
streams"));
-        } catch (Exception e) {
-            log.error("Failed to stop streams", e);
-            return ResponseEntity.ok(Message.fail(FAIL_CODE, "Failed to stop 
streams"));
-        }
-    }
-
-    @Operation(summary = "Get active stream count", description = "Get the 
number of active log streams")
-    @GetMapping(value = "/tail/count", produces = APPLICATION_JSON_VALUE)
-    public ResponseEntity<Message<Integer>> getActiveStreamCount() {
-        try {
-            int count = logAnalysisService.getActiveStreamCount();
-            return ResponseEntity.ok(Message.success(count));
-        } catch (Exception e) {
-            log.error("Failed to get stream count", e);
-            return ResponseEntity.ok(Message.fail(FAIL_CODE, "Failed to get 
stream count"));
-        }
-    }
 }
\ No newline at end of file
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java
index 7906ee2f5..f33b54457 100644
--- 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java
@@ -21,10 +21,10 @@ import jakarta.validation.constraints.Max;
 import jakarta.validation.constraints.Min;
 import jakarta.validation.constraints.NotBlank;
 import java.util.Set;
+import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
-import lombok.Value;
-import org.springframework.boot.context.properties.bind.DefaultValue;
+import lombok.NoArgsConstructor;
 
 
 /**
@@ -32,6 +32,8 @@ import 
org.springframework.boot.context.properties.bind.DefaultValue;
  */
 @Data
 @Builder
+@NoArgsConstructor
+@AllArgsConstructor
 public class LogQueryRequest {
     /**
      * Query string for log search
diff --git 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java
 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java
index 88925e7f4..aa2bbb18c 100644
--- 
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java
+++ 
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java
@@ -22,6 +22,7 @@ import 
org.apache.hertzbeat.log.victorialogs.client.VictoriaLogsQueryClient;
 import org.apache.hertzbeat.log.victorialogs.model.LogQueryResponse;
 import org.apache.hertzbeat.log.victorialogs.model.LogQueryRequest;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.codec.ServerSentEvent;
 import org.springframework.stereotype.Service;
 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
 
@@ -30,35 +31,19 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.*;
+import reactor.core.publisher.Flux;
 
 @Service
 @Slf4j
 public class LogAnalysisService {
 
-    private final Set<SseEmitter> activeEmitters = 
ConcurrentHashMap.newKeySet();
     private final VictoriaLogsQueryClient victoriaLogsQueryClient;
-    private final ScheduledExecutorService heartbeatExecutor = 
Executors.newSingleThreadScheduledExecutor();
-
-    private static final long HEARTBEAT_DELAY = 15; // seconds
 
     @Autowired
     public LogAnalysisService(VictoriaLogsQueryClient victoriaLogsQueryClient) 
{
         this.victoriaLogsQueryClient = victoriaLogsQueryClient;
     }
 
-    @PreDestroy
-    public void destroy() {
-        stopAllStreams();
-        heartbeatExecutor.shutdown();
-        try {
-            if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
-                heartbeatExecutor.shutdownNow();
-            }
-        } catch (InterruptedException e) {
-            heartbeatExecutor.shutdownNow();
-            Thread.currentThread().interrupt();
-        }
-    }
 
     public List<LogQueryResponse> searchLogs(LogQueryRequest request) {
         return victoriaLogsQueryClient.query(request);
@@ -88,107 +73,9 @@ public class LogAnalysisService {
         return String.format("message ILIKE \"%%%s%%\"", escapedKeyword);
     }
 
-    public void streamLogs(String query, SseEmitter emitter) {
-        activeEmitters.add(emitter);
-
-        // Start heartbeat for this emitter
-        ScheduledFuture<?> heartbeatFuture = startHeartbeat(emitter);
-
-        CompletableFuture.runAsync(() -> {
-            try {
-                // Send initial connection established event
-                sendEvent(emitter, "open", "Connection established", null);
-
-                victoriaLogsQueryClient.tail(query, logQueryResponse -> {
-                    try {
-                        if (activeEmitters.contains(emitter)) {
-                            sendEvent(emitter, "log", logQueryResponse, 
String.valueOf(System.currentTimeMillis()));
-                        }
-                    } catch (IOException e) {
-                        handleEmitterError(emitter, heartbeatFuture, e);
-                    }
-                });
-            } catch (Exception e) {
-                handleEmitterError(emitter, heartbeatFuture, e);
-            }
-        });
-
-        // Set up completion callback
-        emitter.onCompletion(() -> {
-            cleanup(emitter, heartbeatFuture);
-        });
-
-        // Set up timeout callback
-        emitter.onTimeout(() -> {
-            cleanup(emitter, heartbeatFuture);
-        });
-
-        // Set up error callback
-        emitter.onError(ex -> {
-            cleanup(emitter, heartbeatFuture);
-        });
-    }
-
-    private ScheduledFuture<?> startHeartbeat(SseEmitter emitter) {
-        return heartbeatExecutor.scheduleAtFixedRate(() -> {
-            try {
-                if (activeEmitters.contains(emitter)) {
-                    emitter.send(SseEmitter.event()
-                            .comment("heartbeat")
-                            .build());
-                }
-            } catch (IOException e) {
-                log.warn("Failed to send heartbeat, closing connection", e);
-                removeEmitter(emitter);
-            }
-        }, HEARTBEAT_DELAY, HEARTBEAT_DELAY, TimeUnit.SECONDS);
-    }
-
-    private void sendEvent(SseEmitter emitter, String eventName, Object data, 
String id) throws IOException {
-        SseEmitter.SseEventBuilder builder = SseEmitter.event()
-                .name(eventName)
-                .data(data);
-
-        if (id != null) {
-            builder.id(id);
-        }
-
-        emitter.send(builder.build());
-    }
-
-    private void handleEmitterError(SseEmitter emitter, ScheduledFuture<?> 
heartbeatFuture, Exception e) {
-        log.error("Error in log streaming", e);
-        cleanup(emitter, heartbeatFuture);
-        try {
-            sendEvent(emitter, "error", e.getMessage(), null);
-        } catch (IOException ex) {
-            log.error("Failed to send error message", ex);
-        }
+    public Flux<ServerSentEvent<LogQueryResponse>> streamLogs(String query) {
+        return victoriaLogsQueryClient.tailAsSSE(query);
     }
 
-    private void cleanup(SseEmitter emitter, ScheduledFuture<?> 
heartbeatFuture) {
-        removeEmitter(emitter);
-        if (heartbeatFuture != null) {
-            heartbeatFuture.cancel(true);
-        }
-    }
 
-    public void removeEmitter(SseEmitter emitter) {
-        activeEmitters.remove(emitter);
-    }
-
-    public int getActiveStreamCount() {
-        return activeEmitters.size();
-    }
-
-    public void stopAllStreams() {
-        activeEmitters.forEach(emitter -> {
-            try {
-                emitter.complete();
-            } catch (Exception e) {
-                log.error("Error completing emitter", e);
-            }
-        });
-        activeEmitters.clear();
-    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to