This is an automated email from the ASF dual-hosted git repository.

zhaoqingran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/master by this push:
     new d0daa2eec [improve] Optimize Gretimedb time-series statistics. (#3776)
d0daa2eec is described below

commit d0daa2eec5d0ed63ef582700ba2d54338bf76c26
Author: 会功夫的李白 <[email protected]>
AuthorDate: Thu Sep 25 22:56:13 2025 +0800

    [improve] Optimize Gretimedb time-series statistics. (#3776)
    
    Co-authored-by: Calvin <[email protected]>
---
 .../tsdb/greptime/GreptimeDbDataStorage.java       | 314 ++++++++++++++++-----
 1 file changed, 244 insertions(+), 70 deletions(-)

diff --git 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
index a82411178..adf411125 100644
--- 
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
+++ 
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
@@ -45,6 +45,8 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import lombok.extern.slf4j.Slf4j;
@@ -68,8 +70,10 @@ import org.springframework.http.HttpMethod;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
+import org.springframework.util.MultiValueMap;
 import org.springframework.util.StringUtils;
 import org.springframework.web.client.RestTemplate;
+import org.springframework.web.util.UriComponents;
 import org.springframework.web.util.UriComponentsBuilder;
 
 /**
@@ -86,6 +90,8 @@ public class GreptimeDbDataStorage extends 
AbstractHistoryDataStorage {
     private static final String LABEL_KEY_FIELD = "__field__";
     private static final String LABEL_KEY_INSTANCE = "instance";
     private static final String LOG_TABLE_NAME = "hertzbeat_logs";
+    private static final String LABEL_KEY_START_TIME = "start";
+    private static final String LABEL_KEY_END_TIME = "end";
 
     private GreptimeDB greptimeDb;
 
@@ -202,60 +208,148 @@ public class GreptimeDbDataStorage extends 
AbstractHistoryDataStorage {
     @Override
     public Map<String, List<Value>> getHistoryMetricData(Long monitorId, 
String app, String metrics, String metric,
                                                          String label, String 
history) {
+        Map<String, Long> timeRange = getTimeRange(history);
+        Long start = timeRange.get(LABEL_KEY_START_TIME);
+        Long end = timeRange.get(LABEL_KEY_END_TIME);
+
+        String step = getTimeStep(start, end);
+
+        return getHistoryData(start, end, step, monitorId, app, metrics, 
metric);
+    }
+
+    private String getTableName(String metrics) {
+        return metrics;
+    }
+
+    @Override
+    public Map<String, List<Value>> getHistoryIntervalMetricData(Long 
monitorId, String app, String metrics,
+                                                                 String 
metric, String label, String history) {
+        Map<String, Long> timeRange = getTimeRange(history);
+        Long start = timeRange.get(LABEL_KEY_START_TIME);
+        Long end = timeRange.get(LABEL_KEY_END_TIME);
+
+        String step = getTimeStep(start, end);
+
+        Map<String, List<Value>> instanceValuesMap = getHistoryData(start, 
end, step, monitorId, app, metrics, metric);
+
+        // Queries below this point may yield inconsistent results due to 
exceeding the valid data range.
+        // Therefore, we restrict the valid range by obtaining the post-query 
timeframe.
+        // Since `gretime`'s `end` excludes the specified time, we add 4 hours.
+        List<Value> values = 
instanceValuesMap.get(instanceValuesMap.keySet().stream().toList().get(0));
+        // effective time
+        long effectiveStart = values.get(0).getTime() / 1000;
+        long effectiveEnd = values.get(values.size() - 1).getTime() / 1000 + 
Duration.ofHours(4).getSeconds();
+
+        String name = getTableName(metrics);
+        String timeSeriesSelector = name + "{" + LABEL_KEY_INSTANCE + "=\"" + 
monitorId + "\"";
+        if (!CommonConstants.PROMETHEUS.equals(app)) {
+            timeSeriesSelector = timeSeriesSelector + "," + LABEL_KEY_FIELD + 
"=\"" + metric + "\"";
+        }
+        timeSeriesSelector = timeSeriesSelector + "}";
+
+        try {
+            // max
+            String finalTimeSeriesSelector = timeSeriesSelector;
+            URI uri = getUri(effectiveStart, effectiveEnd, step, uriComponents 
-> "max_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
+            requestIntervalMetricAndPutValue(uri, instanceValuesMap, 
Value::setMax);
+            // min
+            uri = getUri(effectiveStart, effectiveEnd, step, uriComponents -> 
"min_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
+            requestIntervalMetricAndPutValue(uri, instanceValuesMap, 
Value::setMin);
+            // avg
+            uri = getUri(effectiveStart, effectiveEnd, step, uriComponents -> 
"avg_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
+            requestIntervalMetricAndPutValue(uri, instanceValuesMap, 
Value::setMean);
+        } catch (Exception e) {
+            log.error("query interval metrics data from greptime error. {}", 
e.getMessage(), e);
+        }
+
+        return instanceValuesMap;
+    }
+
+    /**
+     * Get time range
+     *
+     * @param history history range
+     * @return time range
+     */
+    private Map<String, Long> getTimeRange(String history) {
+        // Build start and end times
+        Instant now = Instant.now();
+        long start;
+        try {
+            if (NumberUtils.isParsable(history)) {
+                start = NumberUtils.toLong(history);
+                start = (ZonedDateTime.now().toEpochSecond() - start);
+            } else {
+                TemporalAmount temporalAmount = 
TimePeriodUtil.parseTokenTime(history);
+                assert temporalAmount != null;
+                Instant dateTime = now.minus(temporalAmount);
+                start = dateTime.getEpochSecond();
+            }
+        } catch (Exception e) {
+            log.error("history time error: {}. use default: 6h", 
e.getMessage());
+            start = now.minus(6, ChronoUnit.HOURS).getEpochSecond();
+        }
+        long end = now.getEpochSecond();
+        return Map.of("start", start, "end", end);
+    }
+
+    /**
+     * Get time step
+     *
+     * @param start start time
+     * @param end   end time
+     * @return step
+     */
+    private String getTimeStep(long start, long end) {
+        // get step
+        String step = "60s";
+        if (end - start < Duration.ofDays(7).getSeconds() && end - start > 
Duration.ofDays(1).getSeconds()) {
+            step = "1h";
+        } else if (end - start >= Duration.ofDays(7).getSeconds()) {
+            step = "4h";
+        }
+        return step;
+    }
+
+    /**
+     * Get history metric data
+     *
+     * @param start     start time
+     * @param end       end time
+     * @param step      step
+     * @param monitorId monitor id
+     * @param app       monitor type
+     * @param metrics   metrics
+     * @param metric    metric
+     * @return history metric data
+     */
+    private Map<String, List<Value>> getHistoryData(long start, long end, 
String step, Long monitorId, String app, String metrics, String metric) {
         String name = getTableName(metrics);
         String timeSeriesSelector = LABEL_KEY_NAME + "=\"" + name + "\""
                 + "," + LABEL_KEY_INSTANCE + "=\"" + monitorId + "\"";
         if (!CommonConstants.PROMETHEUS.equals(app)) {
             timeSeriesSelector = timeSeriesSelector + "," + LABEL_KEY_FIELD + 
"=\"" + metric + "\"";
         }
+
         Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
         try {
-            HttpHeaders headers = new HttpHeaders();
-            headers.setContentType(MediaType.APPLICATION_JSON);
-            headers.setAccept(List.of(MediaType.APPLICATION_JSON));
-            if (StringUtils.hasText(greptimeProperties.username())
-                    && StringUtils.hasText(greptimeProperties.password())) {
-                String authStr = greptimeProperties.username() + ":" + 
greptimeProperties.password();
-                String encodedAuth = Base64Util.encode(authStr);
-                headers.add(HttpHeaders.AUTHORIZATION, BASIC + " " + 
encodedAuth);
-            }
-            Instant now = Instant.now();
-            long start;
-            try {
-                if (NumberUtils.isParsable(history)) {
-                    start = NumberUtils.toLong(history);
-                    start = (ZonedDateTime.now().toEpochSecond() - start);
-                } else {
-                    TemporalAmount temporalAmount = 
TimePeriodUtil.parseTokenTime(history);
-                    assert temporalAmount != null;
-                    Instant dateTime = now.minus(temporalAmount);
-                    start = dateTime.getEpochSecond();
+            HttpEntity<Void> httpEntity = getHttpEntity();
+
+            String finalTimeSeriesSelector = timeSeriesSelector;
+            URI uri = getUri(start, end, step, uriComponents -> {
+                MultiValueMap<String, String> queryParams = 
uriComponents.getQueryParams();
+                if (!queryParams.isEmpty()) {
+                    return "{" + finalTimeSeriesSelector + "}";
                 }
-            } catch (Exception e) {
-                log.error("history time error: {}. use default: 6h", 
e.getMessage());
-                start = now.minus(6, ChronoUnit.HOURS).getEpochSecond();
-            }
+                return null;
+            });
 
-            long end = now.getEpochSecond();
-            String step = "60s";
-            if (end - start < Duration.ofDays(7).getSeconds() && end - start > 
Duration.ofDays(1).getSeconds()) {
-                step = "1h";
-            } else if (end - start >= Duration.ofDays(7).getSeconds()) {
-                step = "4h";
+            ResponseEntity<PromQlQueryContent> responseEntity = null;
+            if (uri != null) {
+                responseEntity = restTemplate.exchange(uri,
+                        HttpMethod.GET, httpEntity, PromQlQueryContent.class);
             }
-
-            HttpEntity<Void> httpEntity = new HttpEntity<>(headers);
-            URI uri = 
UriComponentsBuilder.fromUriString(greptimeProperties.httpEndpoint() + 
QUERY_RANGE_PATH)
-                    .queryParam(URLEncoder.encode("query", 
StandardCharsets.UTF_8), URLEncoder.encode("{" + timeSeriesSelector + "}", 
StandardCharsets.UTF_8))
-                    .queryParam("start", start)
-                    .queryParam("end", end)
-                    .queryParam("step", step)
-                    .queryParam("db", greptimeProperties.database())
-                    .build(true).toUri();
-
-            ResponseEntity<PromQlQueryContent> responseEntity = 
restTemplate.exchange(uri,
-                    HttpMethod.GET, httpEntity, PromQlQueryContent.class);
-            if (responseEntity.getStatusCode().is2xxSuccessful()) {
+            if (responseEntity != null && 
responseEntity.getStatusCode().is2xxSuccessful()) {
                 log.debug("query metrics data from greptime success. {}", uri);
                 if (responseEntity.getBody() != null && 
responseEntity.getBody().getData() != null
                         && responseEntity.getBody().getData().getResult() != 
null) {
@@ -279,21 +373,101 @@ public class GreptimeDbDataStorage extends 
AbstractHistoryDataStorage {
                 log.error("query metrics data from greptime failed. {}", 
responseEntity);
             }
         } catch (Exception e) {
-            log.error(e.getMessage(), e);
+            log.error("query metrics data from greptime error. {}", 
e.getMessage(), e);
         }
         return instanceValuesMap;
     }
 
-    private String getTableName(String metrics) {
-        return metrics;
+    /**
+     * Get HTTP instance
+     *
+     * @return HTTP instance
+     */
+    private HttpEntity<Void> getHttpEntity() {
+        HttpHeaders headers = new HttpHeaders();
+        headers.setContentType(MediaType.APPLICATION_JSON);
+        headers.setAccept(List.of(MediaType.APPLICATION_JSON));
+        if (StringUtils.hasText(greptimeProperties.username())
+                && StringUtils.hasText(greptimeProperties.password())) {
+            String authStr = greptimeProperties.username() + ":" + 
greptimeProperties.password();
+            String encodedAuth = Base64Util.encode(authStr);
+            headers.add(HttpHeaders.AUTHORIZATION, BASIC + " " + encodedAuth);
+        }
+        return new HttpEntity<>(headers);
     }
 
-    @Override
-    public Map<String, List<Value>> getHistoryIntervalMetricData(Long 
monitorId, String app, String metrics,
-                                                                 String 
metric, String label, String history) {
-        return getHistoryMetricData(monitorId, app, metrics, metric, label, 
history);
+    /**
+     * Get Request URI
+     *
+     * @param start         start time
+     * @param end           end time
+     * @param step          interval
+     * @param queryFunction request parameters
+     * @return URI
+     */
+    private URI getUri(long start, long end, String step, 
Function<UriComponents, String> queryFunction) {
+        UriComponentsBuilder uriComponentsBuilder = 
UriComponentsBuilder.fromUriString(greptimeProperties.httpEndpoint() + 
QUERY_RANGE_PATH)
+                .queryParam("start", start)
+                .queryParam("end", end)
+                .queryParam("step", step)
+                .queryParam("db", greptimeProperties.database());
+        UriComponents cloneUriComponents = 
uriComponentsBuilder.cloneBuilder().build(true);
+        String queryValue = queryFunction.apply(cloneUriComponents);
+        if (!StringUtils.hasText(queryValue)) {
+            return null;
+        }
+        UriComponents uriComponents = uriComponentsBuilder
+                .queryParam(
+                        URLEncoder.encode("query", StandardCharsets.UTF_8),
+                        URLEncoder.encode(queryValue, StandardCharsets.UTF_8)
+                ).build(true);
+        return uriComponents.toUri();
     }
-    
+
+    /**
+     * Request greptime and assign a value
+     *
+     * @param uri               request URI
+     * @param instanceValuesMap metrics data
+     * @param valueConsumer     consumer used for assigning values
+     */
+    private void requestIntervalMetricAndPutValue(URI uri, Map<String, 
List<Value>> instanceValuesMap, BiConsumer<Value, String> valueConsumer) {
+        if (uri == null) {
+            return;
+        }
+        HttpEntity<Void> httpEntity = getHttpEntity();
+        ResponseEntity<PromQlQueryContent> responseEntity = 
restTemplate.exchange(uri,
+                HttpMethod.GET, httpEntity, PromQlQueryContent.class);
+        if (!responseEntity.getStatusCode().is2xxSuccessful()) {
+            log.error("query interval metrics data from greptime failed. {}", 
responseEntity);
+            return;
+        }
+        log.debug("query interval metrics data from greptime success. {}", 
uri);
+        PromQlQueryContent body = responseEntity.getBody();
+        if (body == null || body.getData() == null || 
body.getData().getResult() == null) {
+            return;
+        }
+        List<PromQlQueryContent.ContentData.Content> contents = 
body.getData().getResult();
+        for (PromQlQueryContent.ContentData.Content content : contents) {
+            Map<String, String> labels = content.getMetric();
+            labels.remove(LABEL_KEY_NAME);
+            labels.remove(LABEL_KEY_INSTANCE);
+            String labelStr = JsonUtil.toJson(labels);
+            if (content.getValues() == null || content.getValues().isEmpty()) {
+                continue;
+            }
+            List<Value> valueList = 
instanceValuesMap.computeIfAbsent(labelStr, k -> new LinkedList<>());
+            if (valueList.size() == content.getValues().size()) {
+                for (int timestampIndex = 0; timestampIndex < 
valueList.size(); timestampIndex++) {
+                    Value value = valueList.get(timestampIndex);
+                    Object[] valueArr = 
content.getValues().get(timestampIndex);
+                    String avgValue = new 
BigDecimal(String.valueOf(valueArr[1])).setScale(4, 
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+                    valueConsumer.accept(value, avgValue);
+                }
+            }
+        }
+    }
+
     @Override
     public void destroy() {
         if (this.greptimeDb != null) {
@@ -359,14 +533,14 @@ public class GreptimeDbDataStorage extends 
AbstractHistoryDataStorage {
     }
 
     @Override
-    public List<LogEntry> queryLogsByMultipleConditions(Long startTime, Long 
endTime, String traceId, 
-                                                        String spanId, Integer 
severityNumber, 
+    public List<LogEntry> queryLogsByMultipleConditions(Long startTime, Long 
endTime, String traceId,
+                                                        String spanId, Integer 
severityNumber,
                                                         String severityText) {
         try {
             StringBuilder sql = new StringBuilder("SELECT * FROM 
").append(LOG_TABLE_NAME);
             buildWhereConditions(sql, startTime, endTime, traceId, spanId, 
severityNumber, severityText);
             sql.append(" ORDER BY time_unix_nano DESC");
-            
+
             List<Map<String, Object>> rows = 
greptimeSqlQueryExecutor.execute(sql.toString());
             return mapRowsToLogEntries(rows);
         } catch (Exception e) {
@@ -376,14 +550,14 @@ public class GreptimeDbDataStorage extends 
AbstractHistoryDataStorage {
     }
 
     @Override
-    public List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long 
startTime, Long endTime, String traceId, 
-                                                                      String 
spanId, Integer severityNumber, 
+    public List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long 
startTime, Long endTime, String traceId,
+                                                                      String 
spanId, Integer severityNumber,
                                                                       String 
severityText, Integer offset, Integer limit) {
         try {
             StringBuilder sql = new StringBuilder("SELECT * FROM 
").append(LOG_TABLE_NAME);
             buildWhereConditions(sql, startTime, endTime, traceId, spanId, 
severityNumber, severityText);
             sql.append(" ORDER BY time_unix_nano DESC");
-            
+
             // Add pagination
             if (limit != null && limit > 0) {
                 sql.append(" LIMIT ").append(limit);
@@ -391,7 +565,7 @@ public class GreptimeDbDataStorage extends 
AbstractHistoryDataStorage {
                     sql.append(" OFFSET ").append(offset);
                 }
             }
-            
+
             List<Map<String, Object>> rows = 
greptimeSqlQueryExecutor.execute(sql.toString());
             return mapRowsToLogEntries(rows);
         } catch (Exception e) {
@@ -401,13 +575,13 @@ public class GreptimeDbDataStorage extends 
AbstractHistoryDataStorage {
     }
 
     @Override
-    public long countLogsByMultipleConditions(Long startTime, Long endTime, 
String traceId, 
-                                             String spanId, Integer 
severityNumber, 
+    public long countLogsByMultipleConditions(Long startTime, Long endTime, 
String traceId,
+                                             String spanId, Integer 
severityNumber,
                                              String severityText) {
         try {
             StringBuilder sql = new StringBuilder("SELECT COUNT(*) as count 
FROM ").append(LOG_TABLE_NAME);
             buildWhereConditions(sql, startTime, endTime, traceId, spanId, 
severityNumber, severityText);
-            
+
             List<Map<String, Object>> rows = 
greptimeSqlQueryExecutor.execute(sql.toString());
             if (rows != null && !rows.isEmpty()) {
                 Object countObj = rows.get(0).get("count");
@@ -442,35 +616,35 @@ public class GreptimeDbDataStorage extends 
AbstractHistoryDataStorage {
      * @param spanId span id
      * @param severityNumber severity number
      */
-    private void buildWhereConditions(StringBuilder sql, Long startTime, Long 
endTime, String traceId, 
+    private void buildWhereConditions(StringBuilder sql, Long startTime, Long 
endTime, String traceId,
                                      String spanId, Integer severityNumber, 
String severityText) {
         List<String> conditions = new ArrayList<>();
-        
+
         // Time range condition
         if (startTime != null && endTime != null) {
             conditions.add("time_unix_nano >= " + msToNs(startTime) + " AND 
time_unix_nano <= " + msToNs(endTime));
         }
-        
+
         // TraceId condition
         if (StringUtils.hasText(traceId)) {
             conditions.add("trace_id = '" + safeString(traceId) + "'");
         }
-        
+
         // SpanId condition
         if (StringUtils.hasText(spanId)) {
             conditions.add("span_id = '" + safeString(spanId) + "'");
         }
-        
+
         // Severity condition
         if (severityNumber != null) {
             conditions.add("severity_number = " + severityNumber);
         }
-        
+
         // SeverityText condition
         if (StringUtils.hasText(severityText)) {
             conditions.add("severity_text = '" + safeString(severityText) + 
"'");
         }
-        
+
         // Add WHERE clause if there are conditions
         if (!conditions.isEmpty()) {
             sql.append(" WHERE ").append(String.join(" AND ", conditions));
@@ -582,11 +756,11 @@ public class GreptimeDbDataStorage extends 
AbstractHistoryDataStorage {
                     .map(String::valueOf)
                     .collect(Collectors.joining(", ")));
             sql.append(")");
-            
+
             greptimeSqlQueryExecutor.execute(sql.toString());
             log.info("[warehouse greptime-log] Batch delete executed 
successfully for {} logs", timeUnixNanos.size());
             return true;
-            
+
         } catch (Exception e) {
             log.error("[warehouse greptime-log] batchDeleteLogs error: {}", 
e.getMessage(), e);
             return false;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to