This is an automated email from the ASF dual-hosted git repository. zhaoqingran pushed a commit to branch greptime-grafana in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
commit 3ab4aa193c9946559cff01190af7fb1c4bdb530a Author: Logic <[email protected]> AuthorDate: Mon May 19 00:11:29 2025 +0800 feat(prometheus): add proxy mode for Prometheus collector - Implement PrometheusProxyCollectImpl for proxy-based collection - Add PrometheusCollect interface and update PrometheusAutoCollectImpl - Introduce PrometheusCollectorFactory to choose collector implementation - Update MetricsCollect to use the new collector factory- Add prometheusProxyMode flag in Job entity --- .../prometheus/PrometheusAutoCollectImpl.java | 4 +- .../collect/prometheus/PrometheusCollect.java | 23 +++ ...ctImpl.java => PrometheusProxyCollectImpl.java} | 210 ++++++++++----------- .../collector/dispatch/MetricsCollect.java | 2 + .../apache/hertzbeat/common/entity/job/Job.java | 5 + .../prometheus/PrometheusCollectorFactory.java | 91 +++++++++ .../manager/scheduler/CollectorJobScheduler.java | 4 + 7 files changed, 224 insertions(+), 115 deletions(-) diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java index eae58d914f..1fe0f832ae 100644 --- a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java +++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java @@ -72,12 +72,13 @@ import org.springframework.util.StringUtils; * prometheus auto collect */ @Slf4j -public class PrometheusAutoCollectImpl { +public class PrometheusAutoCollectImpl implements PrometheusCollect { private final Set<Integer> defaultSuccessStatusCodes = Stream.of(HttpStatus.SC_OK, HttpStatus.SC_CREATED, HttpStatus.SC_ACCEPTED, HttpStatus.SC_MULTIPLE_CHOICES, HttpStatus.SC_MOVED_PERMANENTLY, HttpStatus.SC_MOVED_TEMPORARILY).collect(Collectors.toSet()); + @Override public List<CollectRep.MetricsData> collect(CollectRep.MetricsData.Builder builder, Metrics metrics) { try { @@ -139,6 +140,7 @@ public class PrometheusAutoCollectImpl { return Collections.singletonList(builder.build()); } + @Override public String supportProtocol() { return DispatchConstants.PROTOCOL_PROMETHEUS; } diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusCollect.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusCollect.java new file mode 100644 index 0000000000..68e61a8ffb --- /dev/null +++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusCollect.java @@ -0,0 +1,23 @@ +package org.apache.hertzbeat.collector.collect.prometheus; + +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.common.entity.message.CollectRep; + +import java.util.List; + +public interface PrometheusCollect { + + /** + * Collect prometheus metrics data + * @param builder metrics data builder + * @param metrics metrics config + * @return list of metrics data + */ + List<CollectRep.MetricsData> collect(CollectRep.MetricsData.Builder builder, Metrics metrics); + + /** + * Get the protocol name this collector supported + * @return protocol name + */ + String supportProtocol(); +} diff --git a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusProxyCollectImpl.java similarity index 68% copy from hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java copy to hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusProxyCollectImpl.java index eae58d914f..c54834d950 100644 --- a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java +++ b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusProxyCollectImpl.java @@ -17,24 +17,8 @@ package org.apache.hertzbeat.collector.collect.prometheus; -import static org.apache.hertzbeat.common.constants.SignConstants.RIGHT_DASH; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.net.ConnectException; -import java.net.UnknownHostException; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.Stream; -import javax.net.ssl.SSLException; import lombok.extern.slf4j.Slf4j; import org.apache.hertzbeat.collector.collect.common.http.CommonHttpClient; -import org.apache.hertzbeat.collector.collect.prometheus.parser.MetricFamily; import org.apache.hertzbeat.collector.dispatch.DispatchConstants; import org.apache.hertzbeat.collector.util.CollectUtil; import org.apache.hertzbeat.common.constants.CommonConstants; @@ -46,7 +30,6 @@ import org.apache.hertzbeat.common.entity.message.CollectRep; import org.apache.hertzbeat.common.util.Base64Util; import org.apache.hertzbeat.common.util.CommonUtil; import org.apache.hertzbeat.common.util.IpDomainUtil; -import org.apache.hertzbeat.collector.collect.prometheus.parser.OnlineParser; import org.apache.http.HttpHeaders; import org.apache.http.HttpHost; import org.apache.http.HttpStatus; @@ -65,70 +48,99 @@ import org.apache.http.impl.auth.DigestScheme; import org.apache.http.impl.client.BasicAuthCache; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.protocol.HttpContext; +import org.apache.http.util.EntityUtils; import org.springframework.http.MediaType; import org.springframework.util.StringUtils; -/** - * prometheus auto collect - */ +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.UnknownHostException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import javax.net.ssl.SSLException; + +import static org.apache.hertzbeat.common.constants.SignConstants.RIGHT_DASH; + + @Slf4j -public class PrometheusAutoCollectImpl { - +public class PrometheusProxyCollectImpl implements PrometheusCollect { + private final Set<Integer> defaultSuccessStatusCodes = Stream.of(HttpStatus.SC_OK, HttpStatus.SC_CREATED, HttpStatus.SC_ACCEPTED, HttpStatus.SC_MULTIPLE_CHOICES, HttpStatus.SC_MOVED_PERMANENTLY, HttpStatus.SC_MOVED_TEMPORARILY).collect(Collectors.toSet()); - - public List<CollectRep.MetricsData> collect(CollectRep.MetricsData.Builder builder, - Metrics metrics) { + + public static final String RAW_TEXT_CONTENT_FIELD_NAME = "raw_text_content"; + + @Override + public List<CollectRep.MetricsData> collect(CollectRep.MetricsData.Builder builder, Metrics metrics) { + PrometheusProtocol prometheusProtocol = metrics.getPrometheus(); + HttpUriRequest request; try { validateParams(metrics); } catch (Exception e) { builder.setCode(CollectRep.Code.FAIL); builder.setMsg(e.getMessage()); - return null; + return Collections.singletonList(builder.build()); } - HttpContext httpContext = createHttpContext(metrics.getPrometheus()); - HttpUriRequest request = createHttpRequest(metrics.getPrometheus()); - try (CloseableHttpResponse response = - CommonHttpClient.getHttpClient().execute(request, httpContext)) { + + HttpContext httpContext = createHttpContext(prometheusProtocol); + request = createHttpRequest(prometheusProtocol); + + try (CloseableHttpResponse response = CommonHttpClient.getHttpClient().execute(request, httpContext)) { int statusCode = response.getStatusLine().getStatusCode(); - boolean isSuccessInvoke = defaultSuccessStatusCodes.contains(statusCode); - log.debug("http response status: {}", statusCode); - if (!isSuccessInvoke) { + log.debug("Prometheus proxy collect, response status: {}", statusCode); + + if (!defaultSuccessStatusCodes.contains(statusCode)) { builder.setCode(CollectRep.Code.FAIL); builder.setMsg(NetworkConstants.STATUS_CODE + SignConstants.BLANK + statusCode); - return null; - } - try { - return parseResponseByPrometheusExporter(response.getEntity().getContent(), builder); - } catch (Exception e) { - log.info("parse error: {}.", e.getMessage(), e); - builder.setCode(CollectRep.Code.FAIL); - builder.setMsg("parse response data error:" + e.getMessage()); + return Collections.singletonList(builder.build()); } + + String rawTextContent = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + + builder.clearFields(); + builder.clearValues(); + + CollectRep.Field rawDataField = CollectRep.Field.newBuilder() + .setName(RAW_TEXT_CONTENT_FIELD_NAME) + .setType(CommonConstants.TYPE_STRING) + .build(); + builder.addField(rawDataField); + + CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); + valueRowBuilder.addColumn(rawTextContent); + builder.addValueRow(valueRowBuilder.build()); + + builder.setCode(CollectRep.Code.SUCCESS); } catch (ClientProtocolException e1) { String errorMsg = CommonUtil.getMessageFromThrowable(e1); - log.error(errorMsg); + log.error("Prometheus proxy collect error: {}. Host: {}, Port: {}", errorMsg, prometheusProtocol.getHost(), prometheusProtocol.getPort(), e1); builder.setCode(CollectRep.Code.UN_CONNECTABLE); builder.setMsg(errorMsg); } catch (UnknownHostException e2) { String errorMsg = CommonUtil.getMessageFromThrowable(e2); - log.info(errorMsg); + log.info("Prometheus proxy collect unknown host: {}. Host: {}", errorMsg, prometheusProtocol.getHost(), e2); builder.setCode(CollectRep.Code.UN_REACHABLE); builder.setMsg("unknown host:" + errorMsg); } catch (InterruptedIOException | ConnectException | SSLException e3) { String errorMsg = CommonUtil.getMessageFromThrowable(e3); - log.info(errorMsg); + log.info("Prometheus proxy collect connect error: {}. Host: {}, Port: {}", errorMsg, prometheusProtocol.getHost(), prometheusProtocol.getPort(), e3); builder.setCode(CollectRep.Code.UN_CONNECTABLE); builder.setMsg(errorMsg); } catch (IOException e4) { String errorMsg = CommonUtil.getMessageFromThrowable(e4); - log.info(errorMsg); + log.info("Prometheus proxy collect IO error: {}. Host: {}, Port: {}", errorMsg, prometheusProtocol.getHost(), prometheusProtocol.getPort(), e4); builder.setCode(CollectRep.Code.FAIL); builder.setMsg(errorMsg); } catch (Exception e) { String errorMsg = CommonUtil.getMessageFromThrowable(e); - log.error(errorMsg, e); + log.error("Prometheus proxy collect unknown error: {}. Host: {}, Port: {}", errorMsg, prometheusProtocol.getHost(), prometheusProtocol.getPort(), e); builder.setCode(CollectRep.Code.FAIL); builder.setMsg(errorMsg); } finally { @@ -138,70 +150,31 @@ public class PrometheusAutoCollectImpl { } return Collections.singletonList(builder.build()); } - + + @Override public String supportProtocol() { return DispatchConstants.PROTOCOL_PROMETHEUS; } - + private void validateParams(Metrics metrics) throws Exception { if (metrics == null || metrics.getPrometheus() == null) { throw new Exception("Prometheus collect must has prometheus params"); } PrometheusProtocol protocol = metrics.getPrometheus(); + if (!StringUtils.hasText(protocol.getHost()) + || !StringUtils.hasText(protocol.getPort())) { + throw new Exception("Prometheus collect must has host and port params"); + } if (protocol.getPath() == null - || !StringUtils.hasText(protocol.getPath()) - || !protocol.getPath().startsWith(RIGHT_DASH)) { + || !StringUtils.hasText(protocol.getPath()) + || !protocol.getPath().startsWith(RIGHT_DASH)) { protocol.setPath(protocol.getPath() == null ? RIGHT_DASH : RIGHT_DASH + protocol.getPath().trim()); } } - private List<CollectRep.MetricsData> parseResponseByPrometheusExporter(InputStream inputStream, CollectRep.MetricsData.Builder builder) throws IOException { - long endTime = System.currentTimeMillis(); - builder.setTime(endTime); - Map<String, MetricFamily> metricFamilyMap = OnlineParser.parseMetrics(inputStream); - List<CollectRep.MetricsData> metricsDataList = new LinkedList<>(); - if (metricFamilyMap == null) { - return metricsDataList; - } - for (Map.Entry<String, MetricFamily> entry : metricFamilyMap.entrySet()) { - builder.clearFields(); - builder.clearValues(); - String metricsName = entry.getKey(); - builder.setMetrics(metricsName); - MetricFamily metricFamily = entry.getValue(); - if (!metricFamily.getMetricList().isEmpty()) { - List<String> metricsFields = new LinkedList<>(); - for (int index = 0; index < metricFamily.getMetricList().size(); index++) { - MetricFamily.Metric metric = metricFamily.getMetricList().get(index); - if (index == 0) { - metric.getLabels().forEach(label -> { - metricsFields.add(label.getName()); - builder.addField(CollectRep.Field.newBuilder().setName(label.getName()) - .setType(CommonConstants.TYPE_STRING).setLabel(true).build()); - }); - builder.addField(CollectRep.Field.newBuilder().setName("value") - .setType(CommonConstants.TYPE_NUMBER).setLabel(false).build()); - } - Map<String, String> labelMap = metric.getLabels() - .stream() - .collect(Collectors.toMap(MetricFamily.Label::getName, MetricFamily.Label::getValue)); - CollectRep.ValueRow.Builder valueRowBuilder = CollectRep.ValueRow.newBuilder(); - for (String field : metricsFields) { - String fieldValue = labelMap.get(field); - valueRowBuilder.addColumn(fieldValue == null ? CommonConstants.NULL_VALUE : fieldValue); - } - valueRowBuilder.addColumn(String.valueOf(metric.getValue())); - builder.addValueRow(valueRowBuilder.build()); - } - metricsDataList.add(builder.build()); - } - } - return metricsDataList; - } - /** * create httpContext - * + * This method is adapted from PrometheusAutoCollectImpl * @param protocol prometheus protocol * @return context */ @@ -216,7 +189,8 @@ public class PrometheusAutoCollectImpl { new UsernamePasswordCredentials(auth.getDigestAuthUsername(), auth.getDigestAuthPassword()); provider.setCredentials(AuthScope.ANY, credentials); AuthCache authCache = new BasicAuthCache(); - authCache.put(new HttpHost(protocol.getHost(), Integer.parseInt(protocol.getPort())), new DigestScheme()); + HttpHost targetHost = new HttpHost(protocol.getHost(), Integer.parseInt(protocol.getPort())); + authCache.put(targetHost, new DigestScheme()); clientContext.setCredentialsProvider(provider); clientContext.setAuthCache(authCache); return clientContext; @@ -227,6 +201,7 @@ public class PrometheusAutoCollectImpl { /** * create http request + * This method is adapted from PrometheusAutoCollectImpl * @param protocol http params * @return http uri request */ @@ -241,8 +216,6 @@ public class PrometheusAutoCollectImpl { } } } - // The default request header can be overridden if customized - // keep-alive requestBuilder.addHeader(HttpHeaders.CONNECTION, NetworkConstants.KEEP_ALIVE); requestBuilder.addHeader(HttpHeaders.USER_AGENT, NetworkConstants.USER_AGENT); // headers The custom request header is overwritten here @@ -255,14 +228,17 @@ public class PrometheusAutoCollectImpl { } } } - // add accept - requestBuilder.addHeader(HttpHeaders.ACCEPT, MediaType.TEXT_PLAIN_VALUE); + if (headers == null || headers.keySet().stream().noneMatch(HttpHeaders.ACCEPT::equalsIgnoreCase)) { + requestBuilder.addHeader(HttpHeaders.ACCEPT, MediaType.TEXT_PLAIN_VALUE + ";version=0.0.4,*/*;q=0.1"); + } if (protocol.getAuthorization() != null) { PrometheusProtocol.Authorization authorization = protocol.getAuthorization(); if (DispatchConstants.BEARER_TOKEN.equalsIgnoreCase(authorization.getType())) { - String value = DispatchConstants.BEARER + " " + authorization.getBearerTokenToken(); - requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, value); + if (StringUtils.hasText(authorization.getBearerTokenToken())) { + String value = DispatchConstants.BEARER + " " + authorization.getBearerTokenToken(); + requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, value); + } } else if (DispatchConstants.BASIC_AUTH.equals(authorization.getType())) { if (StringUtils.hasText(authorization.getBasicAuthUsername()) && StringUtils.hasText(authorization.getBasicAuthPassword())) { @@ -273,21 +249,21 @@ public class PrometheusAutoCollectImpl { } } - // if it has payload, would override post params if (StringUtils.hasLength(protocol.getPayload())) { requestBuilder.setEntity(new StringEntity(protocol.getPayload(), StandardCharsets.UTF_8)); + if (headers == null || headers.keySet().stream().noneMatch(HttpHeaders.CONTENT_TYPE::equalsIgnoreCase)) { + requestBuilder.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_PLAIN_VALUE); + } } - // uri - String uri = CollectUtil.replaceUriSpecialChar(protocol.getPath()); + String uriPath = CollectUtil.replaceUriSpecialChar(protocol.getPath()); if (IpDomainUtil.isHasSchema(protocol.getHost())) { - - requestBuilder.setUri(protocol.getHost() + SignConstants.DOUBLE_MARK + protocol.getPort() + uri); + requestBuilder.setUri(protocol.getHost() + SignConstants.DOUBLE_MARK + protocol.getPort() + uriPath); } else { String ipAddressType = IpDomainUtil.checkIpAddressType(protocol.getHost()); String baseUri = NetworkConstants.IPV6.equals(ipAddressType) - ? String.format("[%s]:%s%s", protocol.getHost(), protocol.getPort(), uri) - : String.format("%s:%s%s", protocol.getHost(), protocol.getPort(), uri); + ? String.format("[%s]:%s%s", protocol.getHost(), protocol.getPort(), uriPath) + : String.format("%s:%s%s", protocol.getHost(), protocol.getPort(), uriPath); boolean ssl = Boolean.parseBoolean(protocol.getSsl()); if (ssl) { requestBuilder.setUri(NetworkConstants.HTTPS_HEADER + baseUri); @@ -297,11 +273,17 @@ public class PrometheusAutoCollectImpl { } // custom timeout - int timeout = CollectUtil.getTimeout(protocol.getTimeout(), 0); + int timeout = CollectUtil.getTimeout(protocol.getTimeout()); if (timeout > 0) { RequestConfig requestConfig = RequestConfig.custom() .setConnectTimeout(timeout) .setSocketTimeout(timeout) + .setConnectionRequestTimeout(timeout) + .setRedirectsEnabled(true) + .build(); + requestBuilder.setConfig(requestConfig); + } else { + RequestConfig requestConfig = RequestConfig.custom() .setRedirectsEnabled(true) .build(); requestBuilder.setConfig(requestConfig); @@ -313,14 +295,14 @@ public class PrometheusAutoCollectImpl { * get collect instance * @return instance */ - public static PrometheusAutoCollectImpl getInstance() { - return PrometheusAutoCollectImpl.SingleInstance.INSTANCE; + public static PrometheusProxyCollectImpl getInstance() { + return PrometheusProxyCollectImpl.SingleInstance.INSTANCE; } - + /** * static instance */ private static class SingleInstance { - private static final PrometheusAutoCollectImpl INSTANCE = new PrometheusAutoCollectImpl(); + private static final PrometheusProxyCollectImpl INSTANCE = new PrometheusProxyCollectImpl(); } } diff --git a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java index 254af29710..3213c1072b 100644 --- a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java +++ b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java @@ -155,6 +155,8 @@ public class MetricsCollect implements Runnable, Comparable<MetricsCollect> { .setLabels(labels).setAnnotations(annotations).addMetadataAll(metadata); // for prometheus auto if (DispatchConstants.PROTOCOL_PROMETHEUS.equalsIgnoreCase(metrics.getProtocol())) { + PrometheusCollectorFactory + .getCollector(metrics).collect(response, metrics); List<CollectRep.MetricsData> metricsData = PrometheusAutoCollectImpl .getInstance().collect(response, metrics); validateResponse(metricsData.stream().findFirst().orElse(null)); diff --git a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/Job.java b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/Job.java index 67bc2978de..6ab4e10b0d 100644 --- a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/Job.java +++ b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/Job.java @@ -138,6 +138,11 @@ public class Job { */ private boolean isSd = false; + /** + * Whether to use the Prometheus proxy + */ + private boolean prometheusProxyMode = false; + /** * the collect data response metrics as env configmap for other collect use. ^o^xxx^o^ */ diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/prometheus/PrometheusCollectorFactory.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/prometheus/PrometheusCollectorFactory.java new file mode 100644 index 0000000000..513f354e59 --- /dev/null +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/prometheus/PrometheusCollectorFactory.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.manager.component.prometheus; + +import org.apache.hertzbeat.collector.collect.prometheus.PrometheusAutoCollectImpl; +import org.apache.hertzbeat.collector.collect.prometheus.PrometheusCollect; +import org.apache.hertzbeat.collector.collect.prometheus.PrometheusProxyCollectImpl; +import org.apache.hertzbeat.common.entity.job.Metrics; +import org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeProperties; +import org.apache.hertzbeat.warehouse.store.history.tsdb.vm.VictoriaMetricsProperties; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import jakarta.annotation.PostConstruct; + +/** + * Factory class to get the appropriate Prometheus collector implementation. + * It decides based on whether GreptimeDB or VictoriaMetrics is enabled in the warehouse configuration. + */ +@Component +public class PrometheusCollectorFactory { + + private static GreptimeProperties staticGreptimeProperties; + private static VictoriaMetricsProperties staticVictoriaMetricsProperties; + + private final GreptimeProperties greptimeProperties; + private final VictoriaMetricsProperties victoriaMetricsProperties; + + /** + * Constructs the factory and injects warehouse properties. + * Uses @Autowired(required = false) to allow these properties to be optional, + * in case they are not configured or enabled in the warehouse module. + * + * @param greptimeProperties GreptimeDB configuration properties. + * @param victoriaMetricsProperties VictoriaMetrics configuration properties. + */ + @Autowired + public PrometheusCollectorFactory( + @Autowired(required = false) GreptimeProperties greptimeProperties, + @Autowired(required = false) VictoriaMetricsProperties victoriaMetricsProperties) { + this.greptimeProperties = greptimeProperties; + this.victoriaMetricsProperties = victoriaMetricsProperties; + } + + /** + * Initializes static fields with the injected properties after construction. + * This allows the static getCollector method to access these configurations. + */ + @PostConstruct + private void initStatic() { + staticGreptimeProperties = this.greptimeProperties; + staticVictoriaMetricsProperties = this.victoriaMetricsProperties; + } + + /** + * Gets the appropriate Prometheus collector. + * If GreptimeDB or VictoriaMetrics is enabled (checked in that order), + * it returns an instance of {@link PrometheusProxyCollectImpl}. + * Otherwise, it defaults to {@link PrometheusAutoCollectImpl}. + * + * @param metrics The metrics configuration (currently not used for collector type decision but kept for API consistency). + * @return An instance of {@link PrometheusCollect}. + */ + public static PrometheusCollect getCollector(Metrics metrics) { + boolean useProxyImpl = staticGreptimeProperties != null && staticGreptimeProperties.enabled(); + + if (!useProxyImpl && staticVictoriaMetricsProperties != null && staticVictoriaMetricsProperties.enabled()) { + useProxyImpl = true; + } + + if (useProxyImpl) { + return PrometheusProxyCollectImpl.getInstance(); + } + + return new PrometheusAutoCollectImpl(); + } +} \ No newline at end of file diff --git a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java index 1072f407b8..94667cbed1 100644 --- a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java +++ b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java @@ -47,6 +47,7 @@ import org.apache.hertzbeat.common.entity.message.ClusterMsg; import org.apache.hertzbeat.common.entity.message.CollectRep; import org.apache.hertzbeat.common.util.JsonUtil; import org.apache.hertzbeat.common.util.SnowFlakeIdGenerator; +import org.apache.hertzbeat.manager.component.prometheus.PrometheusCollectorFactory; import org.apache.hertzbeat.manager.dao.CollectorDao; import org.apache.hertzbeat.manager.dao.CollectorMonitorBindDao; import org.apache.hertzbeat.manager.dao.MonitorDao; @@ -90,6 +91,9 @@ public class CollectorJobScheduler implements CollectorScheduling, CollectJobSch @Autowired private ParamDao paramDao; + @Autowired + private PrometheusCollectorFactory prometheusCollectorFactory; + private ManageServer manageServer; @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
