This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch log-oltp
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/log-oltp by this push:
new 207a2dc20 modify api endpoints
207a2dc20 is described below
commit 207a2dc20b421357dde5efd6d895bfbefb186360
Author: Logic <[email protected]>
AuthorDate: Tue Nov 19 22:56:50 2024 +0800
modify api endpoints
---
hertzbeat-log/pom.xml | 61 +++++++
.../client/VictoriaLogsQueryClient.java | 184 +++++++++++----------
.../config/VictoriaLogsConfiguration.java | 6 +-
.../config/VictoriaLogsProperties.java | 8 +-
.../log/victorialogs/config/WebClientConfig.java | 51 ++++++
.../controller/LogAnalysisController.java | 86 +++-------
.../log/victorialogs/model/LogQueryRequest.java | 6 +-
.../victorialogs/service/LogAnalysisService.java | 121 +-------------
8 files changed, 253 insertions(+), 270 deletions(-)
diff --git a/hertzbeat-log/pom.xml b/hertzbeat-log/pom.xml
new file mode 100644
index 000000000..ec05e0d92
--- /dev/null
+++ b/hertzbeat-log/pom.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.hertzbeat</groupId>
+ <artifactId>hertzbeat</artifactId>
+ <version>2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>hertzbeat-log</artifactId>
+ <name>${project.artifactId}</name>
+
+ <properties>
+ <maven.compiler.source>17</maven.compiler.source>
+ <maven.compiler.target>17</maven.compiler.target>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+
+ <dependencies>
+ <!-- common -->
+ <dependency>
+ <groupId>org.apache.hertzbeat</groupId>
+ <artifactId>hertzbeat-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <!-- hertzbeat warehouse -->
+ <dependency>
+ <groupId>org.apache.hertzbeat</groupId>
+ <artifactId>hertzbeat-warehouse</artifactId>
+ </dependency>
+ <!-- swagger -->
+ <dependency>
+ <groupId>org.springdoc</groupId>
+ <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
+ </dependency>
+ <!--webflux-->
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-webflux</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java
index 60ab26eb1..d22fcff90 100644
---
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java
+++
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java
@@ -17,9 +17,12 @@
package org.apache.hertzbeat.log.victorialogs.client;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.log.victorialogs.config.VictoriaLogsProperties;
import org.apache.hertzbeat.log.victorialogs.model.LogQueryResponse;
@@ -27,10 +30,19 @@ import
org.apache.hertzbeat.log.victorialogs.model.LogQueryRequest;
import
org.apache.hertzbeat.log.victorialogs.exception.VictoriaLogsQueryException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.web.client.RestTemplateBuilder;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Component;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.util.StreamUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
+import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriComponentsBuilder;
import java.io.BufferedReader;
@@ -40,6 +52,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.springframework.web.util.UriUtils;
+import reactor.core.publisher.Flux;
@Slf4j
@Component
@@ -60,17 +73,20 @@ public class VictoriaLogsQueryClient {
private final RestTemplate restTemplate;
private final VictoriaLogsProperties properties;
private final ObjectMapper objectMapper;
+ private final WebClient webClient;
@Autowired
public VictoriaLogsQueryClient(RestTemplateBuilder restTemplateBuilder,
VictoriaLogsProperties properties,
- ObjectMapper objectMapper) {
+ ObjectMapper objectMapper,
+ WebClient webClient) {
this.properties = properties;
this.objectMapper = objectMapper;
this.restTemplate = restTemplateBuilder
.setConnectTimeout(Duration.ofSeconds(5))
.setReadTimeout(Duration.ZERO)
.build();
+ this.webClient = webClient;
}
/**
@@ -79,11 +95,26 @@ public class VictoriaLogsQueryClient {
* @return List of log entries matching the query
*/
public List<LogQueryResponse> query(LogQueryRequest request) {
- String url = buildQueryUrl(request);
+ String url = properties.url() + QUERY;
log.info("Executing query: {}", url);
try {
- String response = restTemplate.getForObject(url, String.class);
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
+
+ MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
+ params.add("query", request.getQuery());
+ if (StringUtils.hasText(request.getStart())) {
+ params.add("start", request.getStart());
+ }
+ if (StringUtils.hasText(request.getEnd())) {
+ params.add("end", request.getEnd());
+ }
+ params.add("limit", String.valueOf(request.getLimit() != null ?
request.getLimit() : 500));
+
+ HttpEntity<MultiValueMap<String, String>> requestEntity = new
HttpEntity<>(params, headers);
+ String response = restTemplate.postForObject(url, requestEntity,
String.class);
+
List<LogQueryResponse> entries = new ArrayList<>();
if (response != null) {
String[] lines = response.split("\n");
@@ -107,11 +138,25 @@ public class VictoriaLogsQueryClient {
* @return Number of matching log entries
*/
public long count(LogQueryRequest request) {
- String url = buildCountUrl(request);
+ String url = properties.url() + "/api/v1/query/count";
log.debug("Executing count query: {}", url);
try {
- String response = restTemplate.getForObject(url, String.class);
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED);
+
+ MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
+ params.add("query", request.getQuery());
+ if (StringUtils.hasText(request.getStart())) {
+ params.add("start", request.getStart());
+ }
+ if (StringUtils.hasText(request.getEnd())) {
+ params.add("end", request.getEnd());
+ }
+
+ HttpEntity<MultiValueMap<String, String>> requestEntity = new
HttpEntity<>(params, headers);
+ String response = restTemplate.postForObject(url, requestEntity,
String.class);
+
if (StringUtils.hasText(response)) {
return Long.parseLong(response.trim());
}
@@ -125,91 +170,62 @@ public class VictoriaLogsQueryClient {
/**
* Execute a streaming query for real-time log tailing
* @param query Query expression
- * @param handler Callback function to handle each log entry
*/
- public void tail(String query, Consumer<LogQueryResponse> handler) {
- String url = buildTailUrl(query);
- log.debug("Starting log tail stream: {}", url);
-
- try {
- restTemplate.execute(url, HttpMethod.GET, null, response -> {
- try (BufferedReader reader = new BufferedReader(
- new InputStreamReader(response.getBody()))) {
- String line;
- while ((line = reader.readLine()) != null) {
- if (!StringUtils.hasText(line)) {
- continue;
- }
- LogQueryResponse entry = objectMapper.readValue(line,
LogQueryResponse.class);
- handler.accept(entry);
+ public Flux<LogQueryResponse> tail(String query) {
+ String url = properties.url() + TAIL;
+
+ return webClient
+ .post()
+ .uri(url)
+ .contentType(MediaType.APPLICATION_FORM_URLENCODED)
+ .body(BodyInserters.fromFormData("query", query))
+ .retrieve()
+ .bodyToFlux(String.class)
+ .doOnNext(response -> log.debug("Received response: {}",
response))
+ .filter(StringUtils::hasText)
+ .<LogQueryResponse>handle((line, sink) -> {
+ try {
+ sink.next(objectMapper.readValue(line,
LogQueryResponse.class));
+ } catch (JsonProcessingException e) {
+ log.error("Failed to parse log entry: {}", line, e);
+ sink.error(new VictoriaLogsQueryException("Failed to
parse log entry", e));
}
- }
- return null;
- });
- } catch (Exception e) {
- log.error("Error in log tail stream: {}", query, e);
- throw new VictoriaLogsQueryException("Failed to tail logs", e);
- }
- }
-
- /**
- * Build URL for regular queries
- */
- private String buildQueryUrl(LogQueryRequest request) {
- // 不使用 UriComponentsBuilder,因为它可能会导致多次编码
- StringBuilder urlBuilder = new StringBuilder(properties.url())
- .append("/select/logsql/query?query=");
-
- // 只编码一次查询参数
- urlBuilder.append(URLEncoder.encode(request.getQuery(),
StandardCharsets.UTF_8));
-
- // 添加其他参数
- if (StringUtils.hasText(request.getStart())) {
-
urlBuilder.append("&start=").append(URLEncoder.encode(request.getStart(),
StandardCharsets.UTF_8));
- }
- if (StringUtils.hasText(request.getEnd())) {
-
urlBuilder.append("&end=").append(URLEncoder.encode(request.getEnd(),
StandardCharsets.UTF_8));
- }
- if (request.getLimit() != null) {
- urlBuilder.append("&limit=").append(request.getLimit());
- } else {
- urlBuilder.append("&limit=500");
- }
-
- return urlBuilder.toString();
+ })
+ .doOnSubscribe(subscription -> log.debug("Starting
subscription to log stream"))
+ .doOnError(error -> log.error("Error in log tail stream: {}",
error.getMessage(), error))
+ .doOnComplete(() -> log.debug("Log stream completed"))
+ .onErrorMap(e -> new VictoriaLogsQueryException("Failed to
tail logs", e));
}
-
-
/**
- * Build URL for count queries
+ * Execute a streaming query for real-time log tailing with heartbeat
mechanism
+ * @param query Query expression
*/
- private String buildCountUrl(LogQueryRequest request) {
- UriComponentsBuilder builder = UriComponentsBuilder
- .fromHttpUrl(properties.url())
- .path("/api/v1/query/count")
- .queryParam("query", request.getQuery());
-
- if (StringUtils.hasText(request.getStart())) {
- builder.queryParam("start", request.getStart());
- }
- if (StringUtils.hasText(request.getEnd())) {
- builder.queryParam("end", request.getEnd());
- }
-
- return builder.build().encode().toUriString();
+ public Flux<ServerSentEvent<LogQueryResponse>> tailAsSSE(String query) {
+ // Create a heartbeat event flux that emits every 15 seconds
+ Flux<ServerSentEvent<LogQueryResponse>> heartbeat =
Flux.interval(Duration.ZERO, Duration.ofSeconds(15))
+ .map(i -> ServerSentEvent.<LogQueryResponse>builder()
+ .event("heartbeat")
+ .data(null)
+ .build());
+
+ // Create the initial connection event
+ Flux<ServerSentEvent<LogQueryResponse>> connectionEvent = Flux.just(
+ ServerSentEvent.<LogQueryResponse>builder()
+ .event("connected")
+ .data(null)
+ .build()
+ );
+
+ // Combine the data stream with heartbeat events
+ return Flux.merge(
+ connectionEvent,
+ heartbeat,
+ tail(query).map(logResponse ->
ServerSentEvent.<LogQueryResponse>builder()
+ .event("log")
+ .data(logResponse)
+ .build())
+ );
}
- /**
- * Build URL for streaming queries
- */
- private String buildTailUrl(String query) {
- return UriComponentsBuilder
- .fromHttpUrl(properties.url())
- .path(TAIL)
- .queryParam("query", query)
- .build()
- .encode()
- .toUriString();
- }
}
\ No newline at end of file
diff --git
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java
index 776837b87..3cfa5adc7 100644
---
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java
+++
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java
@@ -29,6 +29,7 @@ import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.SimpleClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
+import org.springframework.web.reactive.function.client.WebClient;
/**
* VictoriaLogs configuration class
@@ -46,7 +47,8 @@ public class VictoriaLogsConfiguration {
public VictoriaLogsQueryClient victoriaLogsQueryClient(
VictoriaLogsProperties properties,
RestTemplateBuilder victoriaLogsRestTemplatebuilder,
- ObjectMapper objectMapper) {
- return new VictoriaLogsQueryClient(victoriaLogsRestTemplatebuilder,
properties, objectMapper);
+ ObjectMapper objectMapper,
+ WebClient webClient) {
+ return new VictoriaLogsQueryClient(victoriaLogsRestTemplatebuilder,
properties, objectMapper, webClient);
}
}
\ No newline at end of file
diff --git
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java
index b05caf446..9ce4ec151 100644
---
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java
+++
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java
@@ -33,14 +33,14 @@ import
org.springframework.boot.context.properties.bind.DefaultValue;
+ SignConstants.DOT
+ LogConstants.VICTORIA_LOGS)
public record VictoriaLogsProperties(
- /**
- * Whether to enable VictoriaLogs integration
+ /*
+ Whether to enable VictoriaLogs integration
*/
@DefaultValue("false")
boolean enabled,
- /**
- * VictoriaLogs server URL
+ /*
+ VictoriaLogs server URL
*/
@DefaultValue("http://localhost:9428")
@NotNull
diff --git
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/WebClientConfig.java
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/WebClientConfig.java
new file mode 100644
index 000000000..eca0c9366
--- /dev/null
+++
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/WebClientConfig.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.log.victorialogs.config;
+
+import io.netty.channel.ChannelOption;
+import java.time.Duration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.MediaType;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.netty.http.client.HttpClient;
+
+/**
+ * WebClient Configuration
+ */
+@Configuration
+public class WebClientConfig {
+
+ /**
+ * Creates WebClient
+ */
+ @Bean
+ public WebClient webClient(WebClient.Builder builder) {
+ return builder
+ .clientConnector(new ReactorClientHttpConnector(
+ HttpClient.create()
+ .headers(headers ->
headers.add(HttpHeaders.CONTENT_TYPE,
MediaType.APPLICATION_FORM_URLENCODED_VALUE))
+ .responseTimeout(Duration.ZERO)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
0)
+ .keepAlive(true)
+ ))
+ .build();
+ }
+}
\ No newline at end of file
diff --git
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java
index 98982b2ba..33f577c4f 100644
---
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java
+++
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java
@@ -32,11 +32,13 @@ import
org.apache.hertzbeat.log.victorialogs.service.LogAnalysisService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
+import org.springframework.http.codec.ServerSentEvent;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.List;
+import reactor.core.publisher.Flux;
@Tag(name = "Log Analysis API")
@RestController
@@ -49,23 +51,11 @@ public class LogAnalysisController {
private LogAnalysisService logAnalysisService;
@Operation(summary = "Search logs", description = "Search logs with query
parameters")
- @GetMapping(value = "/query", produces = APPLICATION_JSON_VALUE)
+ @PostMapping(value = "/query", produces = APPLICATION_JSON_VALUE)
public ResponseEntity<Message<List<LogQueryResponse>>> searchLogs(
- @Parameter(description = "Log query expression", required = true)
- @RequestParam @NotBlank String query,
- @Parameter(description = "Start time for the query")
- @RequestParam(required = false) String start,
- @Parameter(description = "End time for the query")
- @RequestParam(required = false) String end,
- @Parameter(description = "Maximum number of results to return")
- @RequestParam(required = false) Integer limit) {
+ @Parameter(description = "Log query parameters", required = true)
+ @RequestBody @Validated LogQueryRequest request) {
try {
- LogQueryRequest request = LogQueryRequest.builder()
- .query(query)
- .start(start)
- .end(end)
- .limit(limit)
- .build();
List<LogQueryResponse> entries =
logAnalysisService.searchLogs(request);
return ResponseEntity.ok(Message.success(entries));
} catch (Exception e) {
@@ -75,18 +65,16 @@ public class LogAnalysisController {
}
@Operation(summary = "Query log entries by keyword", description = "Search
log entries containing specific keyword")
- @GetMapping(value = "/query/keyword", produces = APPLICATION_JSON_VALUE)
+ @PostMapping(value = "/query/keyword", produces = APPLICATION_JSON_VALUE)
public ResponseEntity<Message<List<LogQueryResponse>>> queryByKeyword(
- @Parameter(description = "Keyword to search for", required = true)
- @RequestParam @NotBlank String keyword,
- @Parameter(description = "Start time for the query")
- @RequestParam(required = false) String start,
- @Parameter(description = "End time for the query")
- @RequestParam(required = false) String end,
- @Parameter(description = "Maximum number of results to return")
- @RequestParam(required = false) Integer limit) {
+ @Parameter(description = "Log query parameters", required = true)
+ @RequestBody @Validated LogQueryRequest request) {
try {
- List<LogQueryResponse> entries =
logAnalysisService.queryByKeyword(keyword, start, end, limit);
+ List<LogQueryResponse> entries = logAnalysisService.queryByKeyword(
+ request.getQuery(),
+ request.getStart(),
+ request.getEnd(),
+ request.getLimit());
return ResponseEntity.ok(Message.success(entries));
} catch (Exception e) {
log.error("query by keyword error", e);
@@ -95,16 +83,15 @@ public class LogAnalysisController {
}
@Operation(summary = "Count keyword occurrences", description = "Count
occurrences of a keyword in logs")
- @GetMapping(value = "/count/keyword", produces = APPLICATION_JSON_VALUE)
+ @PostMapping(value = "/count/keyword", produces = APPLICATION_JSON_VALUE)
public ResponseEntity<Message<Long>> countKeywordOccurrences(
- @Parameter(description = "Keyword to count", required = true)
- @RequestParam @NotBlank String keyword,
- @Parameter(description = "Start time for the query")
- @RequestParam(required = false) String start,
- @Parameter(description = "End time for the query")
- @RequestParam(required = false) String end) {
+ @Parameter(description = "Log query parameters", required = true)
+ @RequestBody @Validated LogQueryRequest request) {
try {
- long count = logAnalysisService.countKeywordOccurrences(keyword,
start, end);
+ long count = logAnalysisService.countKeywordOccurrences(
+ request.getQuery(),
+ request.getStart(),
+ request.getEnd());
return ResponseEntity.ok(Message.success(count));
} catch (Exception e) {
log.error("count keyword occurrences error", e);
@@ -112,34 +99,11 @@ public class LogAnalysisController {
}
}
- @GetMapping(value = "/tail", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
- public SseEmitter streamLogs(@RequestParam String query) {
- SseEmitter emitter = new SseEmitter(0L);
- logAnalysisService.streamLogs(query, emitter);
- return emitter;
+ @PostMapping(value = "/tail", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux<ServerSentEvent<LogQueryResponse>> streamLogs(
+ @Parameter(description = "Log query parameters", required = true)
+ @RequestBody @Validated LogQueryRequest request) {
+ return logAnalysisService.streamLogs(request.getQuery());
}
- @Operation(summary = "Stop all streams", description = "Stop all active
log streams")
- @PostMapping(value = "/tail/stop", produces = APPLICATION_JSON_VALUE)
- public ResponseEntity<Message<String>> stopAllStreams() {
- try {
- logAnalysisService.stopAllStreams();
- return ResponseEntity.ok(Message.success("Successfully stopped all
streams"));
- } catch (Exception e) {
- log.error("Failed to stop streams", e);
- return ResponseEntity.ok(Message.fail(FAIL_CODE, "Failed to stop
streams"));
- }
- }
-
- @Operation(summary = "Get active stream count", description = "Get the
number of active log streams")
- @GetMapping(value = "/tail/count", produces = APPLICATION_JSON_VALUE)
- public ResponseEntity<Message<Integer>> getActiveStreamCount() {
- try {
- int count = logAnalysisService.getActiveStreamCount();
- return ResponseEntity.ok(Message.success(count));
- } catch (Exception e) {
- log.error("Failed to get stream count", e);
- return ResponseEntity.ok(Message.fail(FAIL_CODE, "Failed to get
stream count"));
- }
- }
}
\ No newline at end of file
diff --git
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java
index 7906ee2f5..f33b54457 100644
---
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java
+++
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java
@@ -21,10 +21,10 @@ import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;
import java.util.Set;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
-import lombok.Value;
-import org.springframework.boot.context.properties.bind.DefaultValue;
+import lombok.NoArgsConstructor;
/**
@@ -32,6 +32,8 @@ import
org.springframework.boot.context.properties.bind.DefaultValue;
*/
@Data
@Builder
+@NoArgsConstructor
+@AllArgsConstructor
public class LogQueryRequest {
/**
* Query string for log search
diff --git
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java
index 88925e7f4..aa2bbb18c 100644
---
a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java
+++
b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java
@@ -22,6 +22,7 @@ import
org.apache.hertzbeat.log.victorialogs.client.VictoriaLogsQueryClient;
import org.apache.hertzbeat.log.victorialogs.model.LogQueryResponse;
import org.apache.hertzbeat.log.victorialogs.model.LogQueryRequest;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -30,35 +31,19 @@ import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
+import reactor.core.publisher.Flux;
@Service
@Slf4j
public class LogAnalysisService {
- private final Set<SseEmitter> activeEmitters =
ConcurrentHashMap.newKeySet();
private final VictoriaLogsQueryClient victoriaLogsQueryClient;
- private final ScheduledExecutorService heartbeatExecutor =
Executors.newSingleThreadScheduledExecutor();
-
- private static final long HEARTBEAT_DELAY = 15; // seconds
@Autowired
public LogAnalysisService(VictoriaLogsQueryClient victoriaLogsQueryClient)
{
this.victoriaLogsQueryClient = victoriaLogsQueryClient;
}
- @PreDestroy
- public void destroy() {
- stopAllStreams();
- heartbeatExecutor.shutdown();
- try {
- if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
- heartbeatExecutor.shutdownNow();
- }
- } catch (InterruptedException e) {
- heartbeatExecutor.shutdownNow();
- Thread.currentThread().interrupt();
- }
- }
public List<LogQueryResponse> searchLogs(LogQueryRequest request) {
return victoriaLogsQueryClient.query(request);
@@ -88,107 +73,9 @@ public class LogAnalysisService {
return String.format("message ILIKE \"%%%s%%\"", escapedKeyword);
}
- public void streamLogs(String query, SseEmitter emitter) {
- activeEmitters.add(emitter);
-
- // Start heartbeat for this emitter
- ScheduledFuture<?> heartbeatFuture = startHeartbeat(emitter);
-
- CompletableFuture.runAsync(() -> {
- try {
- // Send initial connection established event
- sendEvent(emitter, "open", "Connection established", null);
-
- victoriaLogsQueryClient.tail(query, logQueryResponse -> {
- try {
- if (activeEmitters.contains(emitter)) {
- sendEvent(emitter, "log", logQueryResponse,
String.valueOf(System.currentTimeMillis()));
- }
- } catch (IOException e) {
- handleEmitterError(emitter, heartbeatFuture, e);
- }
- });
- } catch (Exception e) {
- handleEmitterError(emitter, heartbeatFuture, e);
- }
- });
-
- // Set up completion callback
- emitter.onCompletion(() -> {
- cleanup(emitter, heartbeatFuture);
- });
-
- // Set up timeout callback
- emitter.onTimeout(() -> {
- cleanup(emitter, heartbeatFuture);
- });
-
- // Set up error callback
- emitter.onError(ex -> {
- cleanup(emitter, heartbeatFuture);
- });
- }
-
- private ScheduledFuture<?> startHeartbeat(SseEmitter emitter) {
- return heartbeatExecutor.scheduleAtFixedRate(() -> {
- try {
- if (activeEmitters.contains(emitter)) {
- emitter.send(SseEmitter.event()
- .comment("heartbeat")
- .build());
- }
- } catch (IOException e) {
- log.warn("Failed to send heartbeat, closing connection", e);
- removeEmitter(emitter);
- }
- }, HEARTBEAT_DELAY, HEARTBEAT_DELAY, TimeUnit.SECONDS);
- }
-
- private void sendEvent(SseEmitter emitter, String eventName, Object data,
String id) throws IOException {
- SseEmitter.SseEventBuilder builder = SseEmitter.event()
- .name(eventName)
- .data(data);
-
- if (id != null) {
- builder.id(id);
- }
-
- emitter.send(builder.build());
- }
-
- private void handleEmitterError(SseEmitter emitter, ScheduledFuture<?>
heartbeatFuture, Exception e) {
- log.error("Error in log streaming", e);
- cleanup(emitter, heartbeatFuture);
- try {
- sendEvent(emitter, "error", e.getMessage(), null);
- } catch (IOException ex) {
- log.error("Failed to send error message", ex);
- }
+ public Flux<ServerSentEvent<LogQueryResponse>> streamLogs(String query) {
+ return victoriaLogsQueryClient.tailAsSSE(query);
}
- private void cleanup(SseEmitter emitter, ScheduledFuture<?>
heartbeatFuture) {
- removeEmitter(emitter);
- if (heartbeatFuture != null) {
- heartbeatFuture.cancel(true);
- }
- }
- public void removeEmitter(SseEmitter emitter) {
- activeEmitters.remove(emitter);
- }
-
- public int getActiveStreamCount() {
- return activeEmitters.size();
- }
-
- public void stopAllStreams() {
- activeEmitters.forEach(emitter -> {
- try {
- emitter.complete();
- } catch (Exception e) {
- log.error("Error completing emitter", e);
- }
- });
- activeEmitters.clear();
- }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]