This is an automated email from the ASF dual-hosted git repository. zhaoqingran pushed a commit to branch log-oltp in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
commit 7019c9e90bf69b477f1cbb2d2032c603e10b21c4 Author: Logic <[email protected]> AuthorDate: Mon Nov 18 21:44:29 2024 +0800 log init --- hertzbeat-common/pom.xml | 5 + .../common/constants/ConfigConstants.java | 2 + .../client/VictoriaLogsQueryClient.java | 205 +++++++++++++++++++++ .../config/VictoriaLogsConfiguration.java | 52 ++++++ .../config/VictoriaLogsProperties.java | 66 +++++++ .../controller/LogAnalysisController.java | 145 +++++++++++++++ .../exception/VictoriaLogsQueryException.java | 32 ++++ .../log/victorialogs/model/LogConstants.java | 6 + .../log/victorialogs/model/LogQueryRequest.java | 73 ++++++++ .../log/victorialogs/model/LogQueryResponse.java | 116 ++++++++++++ .../victorialogs/service/LogAnalysisService.java | 197 ++++++++++++++++++++ hertzbeat-manager/pom.xml | 10 +- .../src/main/resources/application.yml | 5 + hertzbeat-manager/src/main/resources/sureness.yml | 1 + pom.xml | 7 + 15 files changed, 917 insertions(+), 5 deletions(-) diff --git a/hertzbeat-common/pom.xml b/hertzbeat-common/pom.xml index bfa3faa7c..29442a921 100644 --- a/hertzbeat-common/pom.xml +++ b/hertzbeat-common/pom.xml @@ -66,6 +66,11 @@ <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-validation</artifactId> </dependency> + <!-- okhttp --> + <dependency> + <groupId>com.squareup.okhttp3</groupId> + <artifactId>okhttp</artifactId> + </dependency> <!-- jackson --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> diff --git a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/ConfigConstants.java b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/ConfigConstants.java index 727d9d7d1..d71e7dc2e 100644 --- a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/ConfigConstants.java +++ b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/ConfigConstants.java @@ -65,6 +65,8 @@ public interface ConfigConstants { String INFO = "info"; String GRAFANA = "grafana"; + + String LOG = "log"; } } diff --git a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java new file mode 100644 index 000000000..a4d42b46b --- /dev/null +++ b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/client/VictoriaLogsQueryClient.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.log.victorialogs.client; + +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.log.victorialogs.config.VictoriaLogsProperties; +import org.apache.hertzbeat.log.victorialogs.model.LogQueryResponse; +import org.apache.hertzbeat.log.victorialogs.model.LogQueryRequest; +import org.apache.hertzbeat.log.victorialogs.exception.VictoriaLogsQueryException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.http.HttpMethod; +import org.springframework.stereotype.Component; +import org.springframework.util.StringUtils; +import org.springframework.web.client.RestTemplate; +import org.springframework.web.util.UriComponentsBuilder; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +@Slf4j +@Component +public class VictoriaLogsQueryClient { + + private static final String QUERY = "/select/logsql/query"; + private static final String TAIL = "/select/logsql/tail"; + private static final String HITS = "/select/logsql/hits"; + private static final String STATS_QUERY = "/select/logsql/stats_query"; + private static final String STATS_QUERY_RANGE = "/select/logsql/stats_query_range"; + private static final String STREAMS_IDS = "/select/logsql/streams_ids"; + private static final String STREAMS = "/select/logsql/streams"; + private static final String STREAM_FIELD_NAMES = "/select/logsql/stream_field_names"; + private static final String STREAM_FIELD_VALUES = "/select/logsql/stream_field_values"; + private static final String FIELD_NAMES = "/select/logsql/field_names"; + private static final String FIELD_VALUES = "/select/logsql/field_values"; + + private final RestTemplate restTemplate; + private final VictoriaLogsProperties properties; + private final ObjectMapper objectMapper; + + @Autowired + public VictoriaLogsQueryClient(RestTemplateBuilder restTemplateBuilder, + VictoriaLogsProperties properties, + ObjectMapper objectMapper) { + this.properties = properties; + this.objectMapper = objectMapper; + this.restTemplate = restTemplateBuilder + .setConnectTimeout(Duration.ofSeconds(5)) + .setReadTimeout(Duration.ZERO) + .build(); + } + + /** + * Execute a regular query + * @param request Query request containing search parameters + * @return List of log entries matching the query + */ + public List<LogQueryResponse> query(LogQueryRequest request) { + String url = buildQueryUrl(request); + log.debug("Executing query: {}", url); + + try { + String response = restTemplate.getForObject(url, String.class); + List<LogQueryResponse> entries = new ArrayList<>(); + if (response != null) { + String[] lines = response.split("\n"); + for (String line : lines) { + if (!StringUtils.hasText(line)) { + continue; + } + entries.add(objectMapper.readValue(line, LogQueryResponse.class)); + } + } + return entries; + } catch (Exception e) { + log.error("Failed to execute query: {}", url, e); + throw new VictoriaLogsQueryException("Failed to execute query", e); + } + } + + /** + * Count log entries matching the query + * @param request Query request containing search parameters + * @return Number of matching log entries + */ + public long count(LogQueryRequest request) { + String url = buildCountUrl(request); + log.debug("Executing count query: {}", url); + + try { + String response = restTemplate.getForObject(url, String.class); + if (StringUtils.hasText(response)) { + return Long.parseLong(response.trim()); + } + return 0; + } catch (Exception e) { + log.error("Failed to execute count query: {}", url, e); + throw new VictoriaLogsQueryException("Failed to execute count query", e); + } + } + + /** + * Execute a streaming query for real-time log tailing + * @param query Query expression + * @param handler Callback function to handle each log entry + */ + public void tail(String query, Consumer<LogQueryResponse> handler) { + String url = buildTailUrl(query); + log.debug("Starting log tail stream: {}", url); + + try { + restTemplate.execute(url, HttpMethod.GET, null, response -> { + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(response.getBody()))) { + String line; + while ((line = reader.readLine()) != null) { + if (!StringUtils.hasText(line)) { + continue; + } + LogQueryResponse entry = objectMapper.readValue(line, LogQueryResponse.class); + handler.accept(entry); + } + } + return null; + }); + } catch (Exception e) { + log.error("Error in log tail stream: {}", query, e); + throw new VictoriaLogsQueryException("Failed to tail logs", e); + } + } + + /** + * Build URL for regular queries + */ + private String buildQueryUrl(LogQueryRequest request) { + UriComponentsBuilder builder = UriComponentsBuilder + .fromHttpUrl(properties.url()) + .path(QUERY) + .queryParam("query", request.getQuery()); + + if (StringUtils.hasText(request.getStart())) { + builder.queryParam("start", request.getStart()); + } + if (StringUtils.hasText(request.getEnd())) { + builder.queryParam("end", request.getEnd()); + } + if (request.getLimit() != null) { + builder.queryParam("limit", request.getLimit()); + } + + return builder.build().encode().toUriString(); + } + + /** + * Build URL for count queries + */ + private String buildCountUrl(LogQueryRequest request) { + UriComponentsBuilder builder = UriComponentsBuilder + .fromHttpUrl(properties.url()) + .path("/api/v1/query/count") + .queryParam("query", request.getQuery()); + + if (StringUtils.hasText(request.getStart())) { + builder.queryParam("start", request.getStart()); + } + if (StringUtils.hasText(request.getEnd())) { + builder.queryParam("end", request.getEnd()); + } + + return builder.build().encode().toUriString(); + } + + /** + * Build URL for streaming queries + */ + private String buildTailUrl(String query) { + return UriComponentsBuilder + .fromHttpUrl(properties.url()) + .path(TAIL) + .queryParam("query", query) + .build() + .encode() + .toUriString(); + } +} \ No newline at end of file diff --git a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java new file mode 100644 index 000000000..776837b87 --- /dev/null +++ b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsConfiguration.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.log.victorialogs.config; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hertzbeat.common.constants.ConfigConstants; +import org.apache.hertzbeat.common.constants.SignConstants; +import org.apache.hertzbeat.log.victorialogs.client.VictoriaLogsQueryClient; +import org.apache.hertzbeat.log.victorialogs.model.LogConstants; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.boot.web.client.RestTemplateBuilder; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.client.SimpleClientHttpRequestFactory; +import org.springframework.web.client.RestTemplate; + +/** + * VictoriaLogs configuration class + */ +@Configuration +@EnableConfigurationProperties(VictoriaLogsProperties.class) +public class VictoriaLogsConfiguration { + /** + * Creates VictoriaLogs query client + */ + @Bean + @ConditionalOnProperty(prefix = ConfigConstants.FunctionModuleConstants.LOG + + SignConstants.DOT + + LogConstants.VICTORIA_LOGS, name = "enabled", havingValue = "true") + public VictoriaLogsQueryClient victoriaLogsQueryClient( + VictoriaLogsProperties properties, + RestTemplateBuilder victoriaLogsRestTemplatebuilder, + ObjectMapper objectMapper) { + return new VictoriaLogsQueryClient(victoriaLogsRestTemplatebuilder, properties, objectMapper); + } +} \ No newline at end of file diff --git a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java new file mode 100644 index 000000000..b05caf446 --- /dev/null +++ b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/config/VictoriaLogsProperties.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.log.victorialogs.config; + +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; +import org.apache.hertzbeat.common.constants.ConfigConstants; +import org.apache.hertzbeat.common.constants.SignConstants; +import org.apache.hertzbeat.log.victorialogs.model.LogConstants; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.bind.DefaultValue; + +/** + * VictoriaLogs Configuration Properties + * + */ +@ConfigurationProperties(prefix = ConfigConstants.FunctionModuleConstants.LOG + + SignConstants.DOT + + LogConstants.VICTORIA_LOGS) +public record VictoriaLogsProperties( + /** + * Whether to enable VictoriaLogs integration + */ + @DefaultValue("false") + boolean enabled, + + /** + * VictoriaLogs server URL + */ + @DefaultValue("http://localhost:9428") + @NotNull + String url + +) { + @Valid + public VictoriaLogsProperties { + if (enabled && !isValidUrl(url)) { + throw new IllegalArgumentException("Invalid VictoriaLogs URL: " + url); + } + } + + /** + * Validate URL format + * + * @param url URL to validate + * @return true if valid, false otherwise + */ + private boolean isValidUrl(String url) { + return url != null && (url.startsWith("http://") || url.startsWith("https://")); + } +} \ No newline at end of file diff --git a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java new file mode 100644 index 000000000..aba4ea7cc --- /dev/null +++ b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/controller/LogAnalysisController.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.log.victorialogs.controller; + +import static org.apache.hertzbeat.common.constants.CommonConstants.FAIL_CODE; +import static org.springframework.http.MediaType.APPLICATION_JSON_VALUE; + +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.validation.constraints.NotBlank; +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.common.entity.dto.Message; +import org.apache.hertzbeat.log.victorialogs.model.LogQueryResponse; +import org.apache.hertzbeat.log.victorialogs.model.LogQueryRequest; +import org.apache.hertzbeat.log.victorialogs.service.LogAnalysisService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.util.List; + +@Tag(name = "Log Analysis API") +@RestController +@RequestMapping(path = "/api/logs") +@Validated +@Slf4j +public class LogAnalysisController { + + @Autowired + private LogAnalysisService logAnalysisService; + + @Operation(summary = "Search logs", description = "Search logs with query parameters") + @GetMapping(value = "/search", produces = APPLICATION_JSON_VALUE) + public ResponseEntity<Message<List<LogQueryResponse>>> searchLogs( + @Parameter(description = "Log query expression", required = true) + @RequestParam @NotBlank String query, + @Parameter(description = "Start time for the query") + @RequestParam(required = false) String start, + @Parameter(description = "End time for the query") + @RequestParam(required = false) String end, + @Parameter(description = "Maximum number of results to return") + @RequestParam(required = false) Integer limit) { + try { + LogQueryRequest request = LogQueryRequest.builder() + .query(query) + .start(start) + .end(end) + .limit(limit) + .build(); + List<LogQueryResponse> entries = logAnalysisService.searchLogs(request); + return ResponseEntity.ok(Message.success(entries)); + } catch (Exception e) { + log.error("search logs error", e); + return ResponseEntity.ok(Message.fail(FAIL_CODE, e.getMessage())); + } + } + + @Operation(summary = "Query log entries by keyword", description = "Search log entries containing specific keyword") + @GetMapping(value = "/query/keyword", produces = APPLICATION_JSON_VALUE) + public ResponseEntity<Message<List<LogQueryResponse>>> queryByKeyword( + @Parameter(description = "Keyword to search for", required = true) + @RequestParam @NotBlank String keyword, + @Parameter(description = "Start time for the query") + @RequestParam(required = false) String start, + @Parameter(description = "End time for the query") + @RequestParam(required = false) String end, + @Parameter(description = "Maximum number of results to return") + @RequestParam(required = false) Integer limit) { + try { + List<LogQueryResponse> entries = logAnalysisService.queryByKeyword(keyword, start, end, limit); + return ResponseEntity.ok(Message.success(entries)); + } catch (Exception e) { + log.error("query by keyword error", e); + return ResponseEntity.ok(Message.fail(FAIL_CODE, e.getMessage())); + } + } + + @Operation(summary = "Count keyword occurrences", description = "Count occurrences of a keyword in logs") + @GetMapping(value = "/count/keyword", produces = APPLICATION_JSON_VALUE) + public ResponseEntity<Message<Long>> countKeywordOccurrences( + @Parameter(description = "Keyword to count", required = true) + @RequestParam @NotBlank String keyword, + @Parameter(description = "Start time for the query") + @RequestParam(required = false) String start, + @Parameter(description = "End time for the query") + @RequestParam(required = false) String end) { + try { + long count = logAnalysisService.countKeywordOccurrences(keyword, start, end); + return ResponseEntity.ok(Message.success(count)); + } catch (Exception e) { + log.error("count keyword occurrences error", e); + return ResponseEntity.ok(Message.fail(FAIL_CODE, e.getMessage())); + } + } + + @GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public SseEmitter streamLogs(@RequestParam String query) { + SseEmitter emitter = new SseEmitter(180000L); // 3 minutes timeout + logAnalysisService.streamLogs(query, emitter); + return emitter; + } + + @Operation(summary = "Stop all streams", description = "Stop all active log streams") + @PostMapping(value = "/stream/stop", produces = APPLICATION_JSON_VALUE) + public ResponseEntity<Message<String>> stopAllStreams() { + try { + logAnalysisService.stopAllStreams(); + return ResponseEntity.ok(Message.success("Successfully stopped all streams")); + } catch (Exception e) { + log.error("Failed to stop streams", e); + return ResponseEntity.ok(Message.fail(FAIL_CODE, "Failed to stop streams")); + } + } + + @Operation(summary = "Get active stream count", description = "Get the number of active log streams") + @GetMapping(value = "/stream/count", produces = APPLICATION_JSON_VALUE) + public ResponseEntity<Message<Integer>> getActiveStreamCount() { + try { + int count = logAnalysisService.getActiveStreamCount(); + return ResponseEntity.ok(Message.success(count)); + } catch (Exception e) { + log.error("Failed to get stream count", e); + return ResponseEntity.ok(Message.fail(FAIL_CODE, "Failed to get stream count")); + } + } +} \ No newline at end of file diff --git a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/exception/VictoriaLogsQueryException.java b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/exception/VictoriaLogsQueryException.java new file mode 100644 index 000000000..44a2bf6ba --- /dev/null +++ b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/exception/VictoriaLogsQueryException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.log.victorialogs.exception; + +/** + * Custom exception for VictoriaLogs query errors + */ +public class VictoriaLogsQueryException extends RuntimeException { + + public VictoriaLogsQueryException(String message) { + super(message); + } + + public VictoriaLogsQueryException(String message, Throwable cause) { + super(message, cause); + } +} \ No newline at end of file diff --git a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogConstants.java b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogConstants.java new file mode 100644 index 000000000..76bc3b1c5 --- /dev/null +++ b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogConstants.java @@ -0,0 +1,6 @@ +package org.apache.hertzbeat.log.victorialogs.model; + +public interface LogConstants { + + String VICTORIA_LOGS = "victoria-logs"; +} diff --git a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java new file mode 100644 index 000000000..f7e37b567 --- /dev/null +++ b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryRequest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.log.victorialogs.model; + +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import jakarta.validation.constraints.NotBlank; +import java.util.Set; +import lombok.Builder; +import lombok.Data; + + +/** + * Request parameters for log query + */ +@Data +@Builder +public class LogQueryRequest { + /** + * Query string for log search + */ + @NotBlank(message = "Query cannot be empty") + private String query; + + /** + * Start time for log search + * Format: ISO-8601 or relative time like "-1h", "-30m" + */ + private String start; + + /** + * End time for log search + * Format: ISO-8601 or relative time like "now" + */ + private String end; + + /** + * Maximum number of results to return + */ + @Min(value = 1, message = "Limit must be greater than 0") + @Max(value = 10000, message = "Limit cannot exceed 10000") + private Integer limit; + + /** + * Field to order results by + */ + private String orderBy; + + /** + * Sort order direction + */ + private Boolean ascending; + + /** + * Specific fields to return in the response + */ + private Set<String> fields; +} diff --git a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryResponse.java b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryResponse.java new file mode 100644 index 000000000..16e28701e --- /dev/null +++ b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/model/LogQueryResponse.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.log.victorialogs.model; + +import com.fasterxml.jackson.annotation.JsonAnyGetter; +import com.fasterxml.jackson.annotation.JsonAnySetter; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.HashMap; +import java.util.Map; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class LogQueryResponse { + + /** + * Timestamp of the log entry + */ + @JsonProperty("_time") + private String timestamp; + + /** + * Log message content + */ + @JsonProperty("_msg") + private String message; + + /** + * Unique identifier for the log stream + */ + @JsonProperty("_stream_id") + private String streamId; + + /** + * Stream metadata in JSON format + */ + @JsonProperty("_stream") + private String stream; + + /** + * Log level (e.g., INFO, ERROR) + */ + @JsonProperty("severity") + private String level; + + /** + * Name of the service generating the log + */ + @JsonProperty("service.name") + private String serviceName; + + /** + * Additional properties not covered by standard fields + */ + @Builder.Default + private Map<String, Object> additionalProperties = new HashMap<>(); + + /** + * Getter for additional properties + * + * @return Map of additional properties + */ + @JsonAnyGetter + public Map<String, Object> getAdditionalProperties() { + return additionalProperties; + } + + /** + * Setter for additional properties. Captures any unmapped JSON properties. + * + * @param key Property key + * @param value Property value + */ + @JsonAnySetter + public void addAdditionalProperty(String key, Object value) { + if (!isStandardField(key)) { + additionalProperties.put(key, value); + } + } + + /** + * Check if a field is one of the standard mapped fields + * + * @param fieldName Name of the field to check + * @return true if field is standard, false otherwise + */ + private boolean isStandardField(String fieldName) { + return fieldName.equals("_time") || + fieldName.equals("_msg") || + fieldName.equals("_stream_id") || + fieldName.equals("_stream") || + fieldName.equals("severity") || + fieldName.equals("service.name"); + } +} \ No newline at end of file diff --git a/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java new file mode 100644 index 000000000..e1085369f --- /dev/null +++ b/hertzbeat-log/src/main/java/org/apache/hertzbeat/log/victorialogs/service/LogAnalysisService.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.log.victorialogs.service; + +import lombok.extern.slf4j.Slf4j; +import org.apache.hertzbeat.log.victorialogs.client.VictoriaLogsQueryClient; +import org.apache.hertzbeat.log.victorialogs.model.LogQueryResponse; +import org.apache.hertzbeat.log.victorialogs.model.LogQueryRequest; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import javax.annotation.PreDestroy; +import java.io.IOException; +import java.util.List; +import java.util.Set; +import java.util.concurrent.*; + +@Service +@Slf4j +public class LogAnalysisService { + + private final Set<SseEmitter> activeEmitters = ConcurrentHashMap.newKeySet(); + private final VictoriaLogsQueryClient victoriaLogsQueryClient; + private final ScheduledExecutorService heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(); + + private static final long HEARTBEAT_DELAY = 15; // seconds + + @Autowired + public LogAnalysisService(VictoriaLogsQueryClient victoriaLogsQueryClient) { + this.victoriaLogsQueryClient = victoriaLogsQueryClient; + } + + @PreDestroy + public void destroy() { + stopAllStreams(); + heartbeatExecutor.shutdown(); + try { + if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + heartbeatExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + heartbeatExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + public List<LogQueryResponse> searchLogs(LogQueryRequest request) { + return victoriaLogsQueryClient.query(request); + } + + public List<LogQueryResponse> queryByKeyword(String keyword, String start, String end, Integer limit) { + LogQueryRequest request = LogQueryRequest.builder() + .query(buildKeywordQuery(keyword)) + .start(start) + .end(end) + .limit(limit) + .build(); + return victoriaLogsQueryClient.query(request); + } + + public long countKeywordOccurrences(String keyword, String start, String end) { + LogQueryRequest request = LogQueryRequest.builder() + .query(buildKeywordQuery(keyword)) + .start(start) + .end(end) + .build(); + return victoriaLogsQueryClient.count(request); + } + + private String buildKeywordQuery(String keyword) { + String escapedKeyword = keyword.replace("\"", "\\\""); + return String.format("message ILIKE \"%%%s%%\"", escapedKeyword); + } + + public void streamLogs(String query, SseEmitter emitter) { + activeEmitters.add(emitter); + + // Start heartbeat for this emitter + ScheduledFuture<?> heartbeatFuture = startHeartbeat(emitter); + + CompletableFuture.runAsync(() -> { + try { + // Send initial connection established event + sendEvent(emitter, "open", "Connection established", null); + + victoriaLogsQueryClient.tail(query, logQueryResponse -> { + try { + if (activeEmitters.contains(emitter)) { + sendEvent(emitter, "log", logQueryResponse, String.valueOf(System.currentTimeMillis())); + } + } catch (IOException e) { + handleEmitterError(emitter, heartbeatFuture, e); + } + }); + } catch (Exception e) { + handleEmitterError(emitter, heartbeatFuture, e); + } + }); + + // Set up completion callback + emitter.onCompletion(() -> { + log.debug("SSE completed"); + cleanup(emitter, heartbeatFuture); + }); + + // Set up timeout callback + emitter.onTimeout(() -> { + log.debug("SSE timeout"); + cleanup(emitter, heartbeatFuture); + }); + + // Set up error callback + emitter.onError(ex -> { + log.error("SSE error", ex); + cleanup(emitter, heartbeatFuture); + }); + } + + private ScheduledFuture<?> startHeartbeat(SseEmitter emitter) { + return heartbeatExecutor.scheduleAtFixedRate(() -> { + try { + if (activeEmitters.contains(emitter)) { + emitter.send(SseEmitter.event() + .comment("heartbeat") + .build()); + } + } catch (IOException e) { + log.warn("Failed to send heartbeat, closing connection", e); + removeEmitter(emitter); + } + }, HEARTBEAT_DELAY, HEARTBEAT_DELAY, TimeUnit.SECONDS); + } + + private void sendEvent(SseEmitter emitter, String eventName, Object data, String id) throws IOException { + SseEmitter.SseEventBuilder builder = SseEmitter.event() + .name(eventName) + .data(data); + + if (id != null) { + builder.id(id); + } + + emitter.send(builder.build()); + } + + private void handleEmitterError(SseEmitter emitter, ScheduledFuture<?> heartbeatFuture, Exception e) { + log.error("Error in log streaming", e); + cleanup(emitter, heartbeatFuture); + try { + sendEvent(emitter, "error", e.getMessage(), null); + } catch (IOException ex) { + log.error("Failed to send error message", ex); + } + } + + private void cleanup(SseEmitter emitter, ScheduledFuture<?> heartbeatFuture) { + removeEmitter(emitter); + if (heartbeatFuture != null) { + heartbeatFuture.cancel(true); + } + } + + public void removeEmitter(SseEmitter emitter) { + activeEmitters.remove(emitter); + } + + public int getActiveStreamCount() { + return activeEmitters.size(); + } + + public void stopAllStreams() { + activeEmitters.forEach(emitter -> { + try { + emitter.complete(); + } catch (Exception e) { + log.error("Error completing emitter", e); + } + }); + activeEmitters.clear(); + } +} \ No newline at end of file diff --git a/hertzbeat-manager/pom.xml b/hertzbeat-manager/pom.xml index 8b459bb86..50f2c176f 100644 --- a/hertzbeat-manager/pom.xml +++ b/hertzbeat-manager/pom.xml @@ -84,6 +84,11 @@ <groupId>org.apache.hertzbeat</groupId> <artifactId>hertzbeat-grafana</artifactId> </dependency> + <!-- log --> + <dependency> + <groupId>org.apache.hertzbeat</groupId> + <artifactId>hertzbeat-log</artifactId> + </dependency> <!-- spring --> <dependency> <groupId>org.springframework.boot</groupId> @@ -164,11 +169,6 @@ <groupId>com.usthe.sureness</groupId> <artifactId>spring-boot3-starter-sureness</artifactId> </dependency> - <!-- okhttp --> - <dependency> - <groupId>com.squareup.okhttp3</groupId> - <artifactId>okhttp</artifactId> - </dependency> <!-- h2 database--> <dependency> <groupId>com.h2database</groupId> diff --git a/hertzbeat-manager/src/main/resources/application.yml b/hertzbeat-manager/src/main/resources/application.yml index 1d0d878d8..02e3ace06 100644 --- a/hertzbeat-manager/src/main/resources/application.yml +++ b/hertzbeat-manager/src/main/resources/application.yml @@ -211,6 +211,11 @@ grafana: username: admin password: admin +log: + victoria-logs: + enabled: true + url: http://localhost:9428 + # See the documentation for details : https://hertzbeat.apache.org/zh-cn/docs/help/aiConfig ai: # AI Type:zhiPu、alibabaAi、kimiAi、sparkDesk diff --git a/hertzbeat-manager/src/main/resources/sureness.yml b/hertzbeat-manager/src/main/resources/sureness.yml index 9ebd316af..e1fdd2a4f 100644 --- a/hertzbeat-manager/src/main/resources/sureness.yml +++ b/hertzbeat-manager/src/main/resources/sureness.yml @@ -71,6 +71,7 @@ resourceRole: # rule: api===method # eg: /api/v1/source3===get means /api/v1/source3===get can be access by anyone, no need auth. excludedResource: + - /api/logs/**===* - /api/alerts/report/**===* - /api/account/auth/**===* - /api/i18n/**===get diff --git a/pom.xml b/pom.xml index 613360716..118f0f9ca 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ <module>hertzbeat-push</module> <module>hertzbeat-plugin</module> <module>hertzbeat-grafana</module> + <module>hertzbeat-log</module> <module>hertzbeat-e2e</module> </modules> @@ -222,6 +223,12 @@ <artifactId>hertzbeat-grafana</artifactId> <version>${hertzbeat.version}</version> </dependency> + <!-- log --> + <dependency> + <groupId>org.apache.hertzbeat</groupId> + <artifactId>hertzbeat-log</artifactId> + <version>${hertzbeat.version}</version> + </dependency> <!-- collector-basic --> <dependency> <groupId>org.apache.hertzbeat</groupId> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
