This is an automated email from the ASF dual-hosted git repository.
yinyijun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new 3eb3f926f5 [fix] Update query parameter encoding and error time parser
in VictoriaMetricsClusterDataStorage (#3989)
3eb3f926f5 is described below
commit 3eb3f926f5e6144ffc46a8ec626aea895f34e6c0
Author: Logic <[email protected]>
AuthorDate: Sat Jan 24 20:37:08 2026 +0800
[fix] Update query parameter encoding and error time parser in
VictoriaMetricsClusterDataStorage (#3989)
Co-authored-by: Yang Chen <[email protected]>
Co-authored-by: aias00 <[email protected]>
Co-authored-by: Duansg <[email protected]>
---
.../tsdb/vm/VictoriaMetricsClusterDataStorage.java | 162 ++++++++++-----------
.../tsdb/vm/VictoriaMetricsDataStorage.java | 127 ++++++++--------
2 files changed, 149 insertions(+), 140 deletions(-)
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsClusterDataStorage.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsClusterDataStorage.java
index 7b25d95472..5371a26e1c 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsClusterDataStorage.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsClusterDataStorage.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,10 +22,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.net.URI;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
import java.time.Duration;
-import java.time.Instant;
import java.time.ZonedDateTime;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
@@ -150,17 +147,17 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
String selectNodeStatusUrl = vmClusterProps.select().url() +
VM_SELECT_BASE_PATH.formatted(vmClusterProps.accountID(), STATUS_PATH);
HttpHeaders headers = new HttpHeaders();
if (StringUtils.hasText(vmInsertProps.username())
- && StringUtils.hasText(vmInsertProps.password())) {
+ && StringUtils.hasText(vmInsertProps.password())) {
String authStr = vmInsertProps.username() + ":" +
vmInsertProps.password();
String encodedAuth = Base64Util.encode(authStr);
headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ SignConstants.BLANK + encodedAuth);
}
HttpEntity<Void> requestEntity = new HttpEntity<>(headers);
ResponseEntity<String> responseEntity = restTemplate.exchange(
- selectNodeStatusUrl,
- HttpMethod.GET,
- requestEntity,
- String.class
+ selectNodeStatusUrl,
+ HttpMethod.GET,
+ requestEntity,
+ String.class
);
String result = responseEntity.getBody();
@@ -186,7 +183,7 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
}
if (metricsData.getValues().isEmpty()) {
log.info("[warehouse victoria-metrics] flush metrics data {} {} {}
is null, ignore.",
- metricsData.getId(), metricsData.getApp(),
metricsData.getMetrics());
+ metricsData.getId(), metricsData.getApp(),
metricsData.getMetrics());
return;
}
Map<String, String> defaultLabels = Maps.newHashMapWithExpectedSize(8);
@@ -196,7 +193,7 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
isPrometheusAuto = true;
defaultLabels.remove(MONITOR_METRICS_KEY);
defaultLabels.put(LABEL_KEY_JOB, metricsData.getApp()
-
.substring(CommonConstants.PROMETHEUS_APP_PREFIX.length()));
+ .substring(CommonConstants.PROMETHEUS_APP_PREFIX.length()));
} else {
isPrometheusAuto = false;
defaultLabels.put(LABEL_KEY_JOB, metricsData.getApp());
@@ -239,7 +236,7 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
try {
labels.putAll(defaultLabels);
String labelName = isPrometheusAuto ?
metricsData.getMetrics()
- : metricsData.getMetrics() + SPILT +
entry.getKey();
+ : metricsData.getMetrics() + SPILT +
entry.getKey();
labels.put(LABEL_KEY_NAME, labelName);
if (!isPrometheusAuto) {
labels.put(MONITOR_METRIC_KEY,
entry.getKey());
@@ -251,10 +248,10 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
labels.putAll(customizedLabels);
}
VictoriaMetricsDataStorage.VictoriaMetricsContent content =
VictoriaMetricsDataStorage.VictoriaMetricsContent.builder()
- .metric(new HashMap<>(labels))
- .values(new Double[]{entry.getValue()})
- .timestamps(timestamp)
- .build();
+ .metric(new HashMap<>(labels))
+ .values(new Double[]{entry.getValue()})
+ .timestamps(timestamp)
+ .build();
contentList.add(content);
} catch (Exception e) {
log.error("combine metrics data error: {}.",
e.getMessage(), e);
@@ -294,9 +291,9 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
labelName = metrics;
}
String timeSeriesSelector = Stream.of(
- LABEL_KEY_NAME + "=\"" + labelName + "\"",
- LABEL_KEY_INSTANCE + "=\"" + instance + "\"",
- CommonConstants.PROMETHEUS.equals(app) ? null :
MONITOR_METRIC_KEY + "=\"" + metric + "\""
+ LABEL_KEY_NAME + "=\"" + labelName + "\"",
+ LABEL_KEY_INSTANCE + "=\"" + instance + "\"",
+ CommonConstants.PROMETHEUS.equals(app) ? null : MONITOR_METRIC_KEY
+ "=\"" + metric + "\""
).filter(Objects::nonNull).collect(Collectors.joining(","));
Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
try {
@@ -307,26 +304,25 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
String authStr = vmSelectProps.username() + ":" +
vmSelectProps.password();
String encodedAuth = Base64Util.encode(authStr);
headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
- + SignConstants.BLANK + encodedAuth);
+ + SignConstants.BLANK + encodedAuth);
}
HttpEntity<Void> httpEntity = new HttpEntity<>(headers);
- Instant end = Instant.now();
- Duration duration =
Duration.ofHours(Long.parseLong(history.replace("h", "")));
- Instant start = end.minus(duration);
- String exportUrl = vmClusterProps.select().url() +
VM_SELECT_BASE_PATH.formatted(vmClusterProps.accountID(), EXPORT_PATH);
+ String exportUrl = vmClusterProps.select().url() +
VM_SELECT_BASE_PATH.formatted(vmClusterProps.accountID(), EXPORT_PATH);
URI uri = UriComponentsBuilder.fromUriString(exportUrl)
- .queryParam("match", URLEncoder.encode("{" +
timeSeriesSelector + "}", StandardCharsets.UTF_8))
- .queryParam("start",
String.valueOf(start.getEpochSecond()))
- .queryParam("end", String.valueOf(end.getEpochSecond()))
- .build(true).toUri();
+ .queryParam("match[]", "{" + timeSeriesSelector + "}")
+ .queryParam("start", "now-" + history)
+ .queryParam("end", "now")
+ .build()
+ .encode()
+ .toUri();
ResponseEntity<String> responseEntity = restTemplate.exchange(uri,
HttpMethod.GET, httpEntity,
- String.class);
+ String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("query metrics data from victoria-metrics success.
{}", uri);
if (StringUtils.hasText(responseEntity.getBody())) {
String[] contentJsonArr =
responseEntity.getBody().split("\n");
List<VictoriaMetricsContent> contents =
Arrays.stream(contentJsonArr)
- .map(item -> JsonUtil.fromJson(item,
VictoriaMetricsContent.class)).toList();
+ .map(item -> JsonUtil.fromJson(item,
VictoriaMetricsContent.class)).toList();
for (VictoriaMetricsContent content : contents) {
Map<String, String> labels = content.getMetric();
labels.remove(LABEL_KEY_NAME);
@@ -338,7 +334,7 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
String labelStr = JsonUtil.toJson(labels);
if (content.getValues() != null &&
content.getTimestamps() != null) {
List<Value> valueList =
instanceValuesMap.computeIfAbsent(labelStr,
- k -> new LinkedList<>());
+ k -> new LinkedList<>());
if (content.getValues().length !=
content.getTimestamps().length) {
log.error("content.getValues().length !=
content.getTimestamps().length");
continue;
@@ -347,7 +343,7 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
Long[] timestamps = content.getTimestamps();
for (int index = 0; index <
content.getValues().length; index++) {
String strValue =
BigDecimal.valueOf(values[index]).setScale(4, RoundingMode.HALF_UP)
- .stripTrailingZeros().toPlainString();
+ .stripTrailingZeros().toPlainString();
// read timestamp here is ms unit
valueList.add(new Value(strValue,
timestamps[index]));
}
@@ -368,7 +364,7 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
String
metric, String history) {
if (!serverAvailable) {
log.error("""
-
+
\t---------------VictoriaMetrics Init Failed---------------
\t--------------Please Config VictoriaMetrics--------------
\t----------Can Not Use Metric History Now----------
@@ -396,9 +392,9 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
labelName = metrics;
}
String timeSeriesSelector = Stream.of(
- LABEL_KEY_NAME + "=\"" + labelName + "\"",
- LABEL_KEY_INSTANCE + "=\"" + instance + "\"",
- CommonConstants.PROMETHEUS.equals(app) ? null :
MONITOR_METRIC_KEY + "=\"" + metric + "\""
+ LABEL_KEY_NAME + "=\"" + labelName + "\"",
+ LABEL_KEY_INSTANCE + "=\"" + instance + "\"",
+ CommonConstants.PROMETHEUS.equals(app) ? null : MONITOR_METRIC_KEY
+ "=\"" + metric + "\""
).filter(Objects::nonNull).collect(Collectors.joining(","));
Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
try {
@@ -409,25 +405,26 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
String authStr = vmSelectProps.username() + ":" +
vmSelectProps.password();
String encodedAuth = Base64Util.encode(authStr);
headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
- + SignConstants.BLANK + encodedAuth);
+ + SignConstants.BLANK + encodedAuth);
}
HttpEntity<Void> httpEntity = new HttpEntity<>(headers);
- String rangeUrl =
VM_SELECT_BASE_PATH.formatted(vmClusterProps.accountID(), QUERY_RANGE_PATH);
+ String rangeUrl = vmClusterProps.select().url() +
VM_SELECT_BASE_PATH.formatted(vmClusterProps.accountID(), QUERY_RANGE_PATH);
URI uri = UriComponentsBuilder.fromUriString(rangeUrl)
- .queryParam("query", URLEncoder.encode("{" +
timeSeriesSelector + "}", StandardCharsets.UTF_8))
- .queryParam("step", "4h")
- .queryParam("start", startTime)
- .queryParam("end", endTime)
- .build(true)
- .toUri();
+ .queryParam("query", "{" + timeSeriesSelector + "}")
+ .queryParam("step", "4h")
+ .queryParam("start", startTime)
+ .queryParam("end", endTime)
+ .build()
+ .encode()
+ .toUri();
ResponseEntity<PromQlQueryContent> responseEntity =
restTemplate.exchange(uri, HttpMethod.GET,
- httpEntity, PromQlQueryContent.class);
+ httpEntity, PromQlQueryContent.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("query metrics data from victoria-metrics success.
{}", uri);
if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null
- && responseEntity.getBody().getData().getResult() !=
null) {
+ && responseEntity.getBody().getData().getResult() != null)
{
List<PromQlQueryContent.ContentData.Content> contents =
responseEntity.getBody().getData()
- .getResult();
+ .getResult();
for (PromQlQueryContent.ContentData.Content content :
contents) {
Map<String, String> labels = content.getMetric();
labels.remove(LABEL_KEY_NAME);
@@ -439,11 +436,11 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
String labelStr = JsonUtil.toJson(labels);
if (content.getValues() != null &&
!content.getValues().isEmpty()) {
List<Value> valueList =
instanceValuesMap.computeIfAbsent(labelStr,
- k -> new LinkedList<>());
+ k -> new LinkedList<>());
for (Object[] valueArr : content.getValues()) {
long timestamp =
Long.parseLong(String.valueOf(valueArr[0]));
String value = new
BigDecimal(String.valueOf(valueArr[1])).setScale(4,
-
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
// read timestamp here is s unit
valueList.add(new Value(value, timestamp *
1000));
}
@@ -455,18 +452,19 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
}
// max
uri = UriComponentsBuilder.fromUriString(rangeUrl)
- .queryParam("query", URLEncoder.encode("max_over_time({" +
timeSeriesSelector + "})", StandardCharsets.UTF_8))
- .queryParam("step", "4h")
- .queryParam("start", startTime)
- .queryParam("end", endTime)
- .build(true)
- .toUri();
+ .queryParam("query", "max_over_time({" + timeSeriesSelector +
"})")
+ .queryParam("step", "4h")
+ .queryParam("start", startTime)
+ .queryParam("end", endTime)
+ .build()
+ .encode()
+ .toUri();
responseEntity = restTemplate.exchange(uri, HttpMethod.GET,
httpEntity, PromQlQueryContent.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null
- && responseEntity.getBody().getData().getResult() !=
null) {
+ && responseEntity.getBody().getData().getResult() != null)
{
List<PromQlQueryContent.ContentData.Content> contents =
responseEntity.getBody().getData()
- .getResult();
+ .getResult();
for (PromQlQueryContent.ContentData.Content content :
contents) {
Map<String, String> labels = content.getMetric();
labels.remove(LABEL_KEY_NAME);
@@ -478,13 +476,13 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
String labelStr = JsonUtil.toJson(labels);
if (content.getValues() != null &&
!content.getValues().isEmpty()) {
List<Value> valueList =
instanceValuesMap.computeIfAbsent(labelStr,
- k -> new LinkedList<>());
+ k -> new LinkedList<>());
if (valueList.size() ==
content.getValues().size()) {
for (int timestampIndex = 0; timestampIndex <
valueList.size(); timestampIndex++) {
Value value =
valueList.get(timestampIndex);
Object[] valueArr =
content.getValues().get(timestampIndex);
String maxValue = new
BigDecimal(String.valueOf(valueArr[1])).setScale(4,
-
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
value.setMax(maxValue);
}
}
@@ -494,18 +492,19 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
}
// min
uri = UriComponentsBuilder.fromUriString(rangeUrl)
- .queryParam("query", URLEncoder.encode("min_over_time({" +
timeSeriesSelector + "})", StandardCharsets.UTF_8))
- .queryParam("step", "4h")
- .queryParam("start", startTime)
- .queryParam("end", endTime)
- .build(true)
- .toUri();
+ .queryParam("query", "min_over_time({" + timeSeriesSelector +
"})")
+ .queryParam("step", "4h")
+ .queryParam("start", startTime)
+ .queryParam("end", endTime)
+ .build()
+ .encode()
+ .toUri();
responseEntity = restTemplate.exchange(uri, HttpMethod.GET,
httpEntity, PromQlQueryContent.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null
- && responseEntity.getBody().getData().getResult() !=
null) {
+ && responseEntity.getBody().getData().getResult() != null)
{
List<PromQlQueryContent.ContentData.Content> contents =
responseEntity.getBody().getData()
- .getResult();
+ .getResult();
for (PromQlQueryContent.ContentData.Content content :
contents) {
Map<String, String> labels = content.getMetric();
labels.remove(LABEL_KEY_NAME);
@@ -517,13 +516,13 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
String labelStr = JsonUtil.toJson(labels);
if (content.getValues() != null &&
!content.getValues().isEmpty()) {
List<Value> valueList =
instanceValuesMap.computeIfAbsent(labelStr,
- k -> new LinkedList<>());
+ k -> new LinkedList<>());
if (valueList.size() ==
content.getValues().size()) {
for (int timestampIndex = 0; timestampIndex <
valueList.size(); timestampIndex++) {
Value value =
valueList.get(timestampIndex);
Object[] valueArr =
content.getValues().get(timestampIndex);
String minValue = new
BigDecimal(String.valueOf(valueArr[1])).setScale(4,
-
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
value.setMin(minValue);
}
}
@@ -533,18 +532,19 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
}
// avg
uri = UriComponentsBuilder.fromUriString(rangeUrl)
- .queryParam("query", URLEncoder.encode("avg_over_time({" +
timeSeriesSelector + "})", StandardCharsets.UTF_8))
- .queryParam("step", "4h")
- .queryParam("start", startTime)
- .queryParam("end", endTime)
- .build(true)
- .toUri();
+ .queryParam("query", "avg_over_time({" + timeSeriesSelector +
"})")
+ .queryParam("step", "4h")
+ .queryParam("start", startTime)
+ .queryParam("end", endTime)
+ .build()
+ .encode()
+ .toUri();
responseEntity = restTemplate.exchange(uri, HttpMethod.GET,
httpEntity, PromQlQueryContent.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null
- && responseEntity.getBody().getData().getResult() !=
null) {
+ && responseEntity.getBody().getData().getResult() != null)
{
List<PromQlQueryContent.ContentData.Content> contents =
responseEntity.getBody().getData()
- .getResult();
+ .getResult();
for (PromQlQueryContent.ContentData.Content content :
contents) {
Map<String, String> labels = content.getMetric();
labels.remove(LABEL_KEY_NAME);
@@ -556,13 +556,13 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
String labelStr = JsonUtil.toJson(labels);
if (content.getValues() != null &&
!content.getValues().isEmpty()) {
List<Value> valueList =
instanceValuesMap.computeIfAbsent(labelStr,
- k -> new LinkedList<>());
+ k -> new LinkedList<>());
if (valueList.size() ==
content.getValues().size()) {
for (int timestampIndex = 0; timestampIndex <
valueList.size(); timestampIndex++) {
Value value =
valueList.get(timestampIndex);
Object[] valueArr =
content.getValues().get(timestampIndex);
String avgValue = new
BigDecimal(String.valueOf(valueArr[1])).setScale(4,
-
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+
RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
value.setMean(avgValue);
}
}
@@ -584,7 +584,7 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
if (StringUtils.hasText(vmInsertProps.username())
- && StringUtils.hasText(vmInsertProps.password())) {
+ && StringUtils.hasText(vmInsertProps.password())) {
String authStr = vmInsertProps.username() + ":" +
vmInsertProps.password();
String encodedAuth = Base64Util.encode(authStr);
headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ SignConstants.BLANK + encodedAuth);
@@ -596,7 +596,7 @@ public class VictoriaMetricsClusterDataStorage extends
AbstractHistoryDataStorag
HttpEntity<String> httpEntity = new
HttpEntity<>(stringBuilder.toString(), headers);
String importUrl = vmClusterProps.insert().url() +
VM_INSERT_BASE_PATH.formatted(vmClusterProps.accountID(), IMPORT_PATH);
ResponseEntity<String> responseEntity =
restTemplate.postForEntity(importUrl,
- httpEntity, String.class);
+ httpEntity, String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("insert metrics data to victoria-metrics success.");
} else {
diff --git
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java
index 2219678457..5267e9fe9c 100644
---
a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java
+++
b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/vm/VictoriaMetricsDataStorage.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.net.URI;
-import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.ZonedDateTime;
@@ -86,7 +85,7 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
private static final String IMPORT_PATH = "/api/v1/import";
/**
* <a
href="https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-export-data-in-json-line-format">
- *
https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-export-data-in-json-line-format
+ *
https://docs.victoriametrics.com/Single-server-VictoriaMetrics.html#how-to-export-data-in-json-line-format
* </a>
*/
private static final String EXPORT_PATH = "/api/v1/export";
@@ -105,7 +104,7 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
private final VictoriaMetricsProperties victoriaMetricsProp;
private final RestTemplate restTemplate;
private final
BlockingQueue<VictoriaMetricsDataStorage.VictoriaMetricsContent>
metricsBufferQueue;
-
+
private HashedWheelTimer metricsFlushTimer = null;
private final VictoriaMetricsProperties.InsertConfig insertConfig;
private final AtomicBoolean draining = new AtomicBoolean(false);
@@ -119,7 +118,7 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
victoriaMetricsProp = victoriaMetricsProperties;
serverAvailable = checkVictoriaMetricsDatasourceAvailable();
insertConfig = victoriaMetricsProperties.insert() == null ? new
VictoriaMetricsProperties.InsertConfig(100, 3,
- new VictoriaMetricsProperties.Compression(false)) :
victoriaMetricsProperties.insert();
+ new VictoriaMetricsProperties.Compression(false)) :
victoriaMetricsProperties.insert();
metricsBufferQueue = new
LinkedBlockingQueue<>(insertConfig.bufferSize());
initializeFlushTimer();
}
@@ -139,14 +138,14 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
try {
HttpHeaders headers = new HttpHeaders();
if (StringUtils.hasText(victoriaMetricsProp.username())
- && StringUtils.hasText(victoriaMetricsProp.password())) {
+ && StringUtils.hasText(victoriaMetricsProp.password())) {
String authStr = victoriaMetricsProp.username() +
SignConstants.DOUBLE_MARK + victoriaMetricsProp.password();
String encodedAuth = Base64Util.encode(authStr);
headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ " " + encodedAuth);
}
HttpEntity<Void> httpEntity = new HttpEntity<>(headers);
ResponseEntity<String> responseEntity =
restTemplate.exchange(victoriaMetricsProp.url() + STATUS_PATH,
- HttpMethod.GET, httpEntity, String.class);
+ HttpMethod.GET, httpEntity, String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
log.info("check victoria metrics server status success.");
return true;
@@ -168,7 +167,7 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
}
if (metricsData.getValues().isEmpty()) {
log.info("[warehouse victoria-metrics] flush metrics data {} {} {}
is null, ignore.",
- metricsData.getId(), metricsData.getApp(),
metricsData.getMetrics());
+ metricsData.getId(), metricsData.getApp(),
metricsData.getMetrics());
return;
}
Map<String, String> defaultLabels = Maps.newHashMapWithExpectedSize(8);
@@ -178,7 +177,7 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
isPrometheusAuto = true;
defaultLabels.remove(MONITOR_METRICS_KEY);
defaultLabels.put(LABEL_KEY_JOB, metricsData.getApp()
-
.substring(CommonConstants.PROMETHEUS_APP_PREFIX.length()));
+ .substring(CommonConstants.PROMETHEUS_APP_PREFIX.length()));
} else {
defaultLabels.put(LABEL_KEY_JOB, metricsData.getApp());
}
@@ -220,7 +219,7 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
try {
labels.putAll(defaultLabels);
String labelName = isPrometheusAuto ?
metricsData.getMetrics()
- : metricsData.getMetrics() + SPILT +
entry.getKey();
+ : metricsData.getMetrics() + SPILT +
entry.getKey();
labels.put(LABEL_KEY_NAME, labelName);
if (!isPrometheusAuto) {
labels.put(MONITOR_METRIC_KEY, entry.getKey());
@@ -232,10 +231,10 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
labels.putAll(customizedLabels);
}
VictoriaMetricsContent content =
VictoriaMetricsContent.builder()
- .metric(new HashMap<>(labels))
- .values(new Double[]{entry.getValue()})
- .timestamps(timestamp)
- .build();
+ .metric(new HashMap<>(labels))
+ .values(new Double[]{entry.getValue()})
+ .timestamps(timestamp)
+ .build();
contentList.add(content);
} catch (Exception e) {
log.error("combine metrics data error: {}.",
e.getMessage(), e);
@@ -270,33 +269,35 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
labelName = metrics;
}
String timeSeriesSelector = LABEL_KEY_NAME + "=\"" + labelName + "\""
- + "," + LABEL_KEY_INSTANCE + "=\"" + instance + "\""
- + (CommonConstants.PROMETHEUS.equals(app) ? "" : "," +
MONITOR_METRIC_KEY + "=\"" + metric + "\"");
+ + "," + LABEL_KEY_INSTANCE + "=\"" + instance + "\""
+ + (CommonConstants.PROMETHEUS.equals(app) ? "" : "," +
MONITOR_METRIC_KEY + "=\"" + metric + "\"");
Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setAccept(List.of(MediaType.APPLICATION_JSON));
if (StringUtils.hasText(victoriaMetricsProp.username())
- && StringUtils.hasText(victoriaMetricsProp.password())) {
+ && StringUtils.hasText(victoriaMetricsProp.password())) {
String authStr = victoriaMetricsProp.username() + ":" +
victoriaMetricsProp.password();
String encodedAuth = Base64Util.encode(authStr);
headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ SignConstants.BLANK + encodedAuth);
}
HttpEntity<Void> httpEntity = new HttpEntity<>(headers);
URI uri =
UriComponentsBuilder.fromUriString(victoriaMetricsProp.url() + EXPORT_PATH)
- .queryParam(URLEncoder.encode("match[]",
StandardCharsets.UTF_8), URLEncoder.encode("{" + timeSeriesSelector + "}",
StandardCharsets.UTF_8))
- .queryParam("start", URLEncoder.encode("now-" + history,
StandardCharsets.UTF_8))
- .queryParam("end", "now")
- .build(true).toUri();
+ .queryParam("match[]", "{" + timeSeriesSelector + "}")
+ .queryParam("start", "now-" + history)
+ .queryParam("end", "now")
+ .build()
+ .encode()
+ .toUri();
ResponseEntity<String> responseEntity = restTemplate.exchange(uri,
- HttpMethod.GET, httpEntity, String.class);
+ HttpMethod.GET, httpEntity, String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("query metrics data from victoria-metrics success.
{}", uri);
if (StringUtils.hasText(responseEntity.getBody())) {
String[] contentJsonArr =
responseEntity.getBody().split("\n");
List<VictoriaMetricsContent> contents =
Arrays.stream(contentJsonArr).map(
- item -> JsonUtil.fromJson(item,
VictoriaMetricsContent.class)
+ item -> JsonUtil.fromJson(item,
VictoriaMetricsContent.class)
).toList();
for (VictoriaMetricsContent content : contents) {
Map<String, String> labels = content.getMetric();
@@ -337,7 +338,7 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
String
metric, String history) {
if (!serverAvailable) {
log.error("""
-
+
\t---------------VictoriaMetrics Init Failed---------------
\t--------------Please Config VictoriaMetrics--------------
\t----------Can Not Use Metric History Now----------
@@ -365,33 +366,35 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
labelName = metrics;
}
String timeSeriesSelector = LABEL_KEY_NAME + "=\"" + labelName + "\""
- + "," + LABEL_KEY_INSTANCE + "=\"" + instance + "\""
- + (CommonConstants.PROMETHEUS.equals(app) ? "" : "," +
MONITOR_METRIC_KEY + "=\"" + metric + "\"");
+ + "," + LABEL_KEY_INSTANCE + "=\"" + instance + "\""
+ + (CommonConstants.PROMETHEUS.equals(app) ? "" : "," +
MONITOR_METRIC_KEY + "=\"" + metric + "\"");
Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
try {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
headers.setAccept(List.of(MediaType.APPLICATION_JSON));
if (StringUtils.hasText(victoriaMetricsProp.username())
- && StringUtils.hasText(victoriaMetricsProp.password())) {
+ && StringUtils.hasText(victoriaMetricsProp.password())) {
String authStr = victoriaMetricsProp.username() + ":" +
victoriaMetricsProp.password();
String encodedAuth = Base64Util.encode(authStr);
headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
- + SignConstants.BLANK + encodedAuth);
+ + SignConstants.BLANK + encodedAuth);
}
HttpEntity<Void> httpEntity = new HttpEntity<>(headers);
URI uri =
UriComponentsBuilder.fromUriString(victoriaMetricsProp.url() + QUERY_RANGE_PATH)
- .queryParam(URLEncoder.encode("query",
StandardCharsets.UTF_8), URLEncoder.encode("{" + timeSeriesSelector + "}",
StandardCharsets.UTF_8))
- .queryParam("step", "4h")
- .queryParam("start", startTime)
- .queryParam("end", endTime)
- .build(true).toUri();
+ .queryParam("query", "{" + timeSeriesSelector + "}")
+ .queryParam("step", "4h")
+ .queryParam("start", startTime)
+ .queryParam("end", endTime)
+ .build()
+ .encode()
+ .toUri();
ResponseEntity<PromQlQueryContent> responseEntity =
restTemplate.exchange(uri,
- HttpMethod.GET, httpEntity, PromQlQueryContent.class);
+ HttpMethod.GET, httpEntity, PromQlQueryContent.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("query metrics data from victoria-metrics success.
{}", uri);
if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null
- && responseEntity.getBody().getData().getResult() !=
null) {
+ && responseEntity.getBody().getData().getResult() != null)
{
List<PromQlQueryContent.ContentData.Content> contents =
responseEntity.getBody().getData().getResult();
for (PromQlQueryContent.ContentData.Content content :
contents) {
Map<String, String> labels = content.getMetric();
@@ -418,16 +421,18 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
}
// max
uri = UriComponentsBuilder.fromUriString(victoriaMetricsProp.url()
+ QUERY_RANGE_PATH)
- .queryParam(URLEncoder.encode("query",
StandardCharsets.UTF_8), URLEncoder.encode("max_over_time({" +
timeSeriesSelector + "})", StandardCharsets.UTF_8))
- .queryParam("step", "4h")
- .queryParam("start", startTime)
- .queryParam("end", endTime)
- .build(true).toUri();
+ .queryParam("query", "max_over_time({" + timeSeriesSelector +
"})")
+ .queryParam("step", "4h")
+ .queryParam("start", startTime)
+ .queryParam("end", endTime)
+ .build()
+ .encode()
+ .toUri();
responseEntity = restTemplate.exchange(uri,
- HttpMethod.GET, httpEntity, PromQlQueryContent.class);
+ HttpMethod.GET, httpEntity, PromQlQueryContent.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null
- && responseEntity.getBody().getData().getResult() !=
null) {
+ && responseEntity.getBody().getData().getResult() != null)
{
List<PromQlQueryContent.ContentData.Content> contents =
responseEntity.getBody().getData().getResult();
for (PromQlQueryContent.ContentData.Content content :
contents) {
Map<String, String> labels = content.getMetric();
@@ -454,16 +459,18 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
}
// min
uri = UriComponentsBuilder.fromUriString(victoriaMetricsProp.url()
+ QUERY_RANGE_PATH)
- .queryParam(URLEncoder.encode("query",
StandardCharsets.UTF_8), URLEncoder.encode("min_over_time({" +
timeSeriesSelector + "})", StandardCharsets.UTF_8))
- .queryParam("step", "4h")
- .queryParam("start", startTime)
- .queryParam("end", endTime)
- .build(true).toUri();
+ .queryParam("query", "min_over_time({" + timeSeriesSelector +
"})")
+ .queryParam("step", "4h")
+ .queryParam("start", startTime)
+ .queryParam("end", endTime)
+ .build()
+ .encode()
+ .toUri();
responseEntity = restTemplate.exchange(uri,
- HttpMethod.GET, httpEntity, PromQlQueryContent.class);
+ HttpMethod.GET, httpEntity, PromQlQueryContent.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null
- && responseEntity.getBody().getData().getResult() !=
null) {
+ && responseEntity.getBody().getData().getResult() != null)
{
List<PromQlQueryContent.ContentData.Content> contents =
responseEntity.getBody().getData().getResult();
for (PromQlQueryContent.ContentData.Content content :
contents) {
Map<String, String> labels = content.getMetric();
@@ -490,16 +497,18 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
}
// avg
uri = UriComponentsBuilder.fromUriString(victoriaMetricsProp.url()
+ QUERY_RANGE_PATH)
- .queryParam(URLEncoder.encode("query",
StandardCharsets.UTF_8), URLEncoder.encode("avg_over_time({" +
timeSeriesSelector + "})", StandardCharsets.UTF_8))
- .queryParam("step", "4h")
- .queryParam("start", startTime)
- .queryParam("end", endTime)
- .build(true).toUri();
+ .queryParam("query", "avg_over_time({" + timeSeriesSelector +
"})")
+ .queryParam("step", "4h")
+ .queryParam("start", startTime)
+ .queryParam("end", endTime)
+ .build()
+ .encode()
+ .toUri();
responseEntity = restTemplate.exchange(uri,
- HttpMethod.GET, httpEntity, PromQlQueryContent.class);
+ HttpMethod.GET, httpEntity, PromQlQueryContent.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
if (responseEntity.getBody() != null &&
responseEntity.getBody().getData() != null
- && responseEntity.getBody().getData().getResult() !=
null) {
+ && responseEntity.getBody().getData().getResult() != null)
{
List<PromQlQueryContent.ContentData.Content> contents =
responseEntity.getBody().getData().getResult();
for (PromQlQueryContent.ContentData.Content content :
contents) {
Map<String, String> labels = content.getMetric();
@@ -598,7 +607,7 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
}
// Refresh in advance to avoid waiting
if (metricsBufferQueue.size() >= insertConfig.bufferSize() * 0.8
- && draining.compareAndSet(false, true)) {
+ && draining.compareAndSet(false, true)) {
triggerImmediateFlush();
}
}
@@ -664,7 +673,7 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
if (StringUtils.hasText(victoriaMetricsProp.username())
- && StringUtils.hasText(victoriaMetricsProp.password())) {
+ && StringUtils.hasText(victoriaMetricsProp.password())) {
String authStr = victoriaMetricsProp.username() + ":" +
victoriaMetricsProp.password();
String encodedAuth = Base64Util.encode(authStr);
headers.add(HttpHeaders.AUTHORIZATION, NetworkConstants.BASIC
+ SignConstants.BLANK + encodedAuth);
@@ -693,7 +702,7 @@ public class VictoriaMetricsDataStorage extends
AbstractHistoryDataStorage {
}
ResponseEntity<String> responseEntity =
restTemplate.postForEntity(victoriaMetricsProp.url() + IMPORT_PATH,
- httpEntity, String.class);
+ httpEntity, String.class);
if (responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("insert metrics data to victoria-metrics success.");
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]