This is an automated email from the ASF dual-hosted git repository.

zhaoqingran pushed a commit to branch greptime-grafana
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git

commit 3ab4aa193c9946559cff01190af7fb1c4bdb530a
Author: Logic <[email protected]>
AuthorDate: Mon May 19 00:11:29 2025 +0800

    feat(prometheus): add proxy mode for Prometheus collector
    
    - Implement PrometheusProxyCollectImpl for proxy-based collection
    - Add PrometheusCollect interface and update PrometheusAutoCollectImpl
    - Introduce PrometheusCollectorFactory to choose collector implementation
    - Update MetricsCollect to use the new collector factory- Add 
prometheusProxyMode flag in Job entity
---
 .../prometheus/PrometheusAutoCollectImpl.java      |   4 +-
 .../collect/prometheus/PrometheusCollect.java      |  23 +++
 ...ctImpl.java => PrometheusProxyCollectImpl.java} | 210 ++++++++++-----------
 .../collector/dispatch/MetricsCollect.java         |   2 +
 .../apache/hertzbeat/common/entity/job/Job.java    |   5 +
 .../prometheus/PrometheusCollectorFactory.java     |  91 +++++++++
 .../manager/scheduler/CollectorJobScheduler.java   |   4 +
 7 files changed, 224 insertions(+), 115 deletions(-)

diff --git 
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java
 
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java
index eae58d914f..1fe0f832ae 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java
@@ -72,12 +72,13 @@ import org.springframework.util.StringUtils;
  * prometheus auto collect
  */
 @Slf4j
-public class PrometheusAutoCollectImpl {
+public class PrometheusAutoCollectImpl implements PrometheusCollect {
     
     private final Set<Integer> defaultSuccessStatusCodes = 
Stream.of(HttpStatus.SC_OK, HttpStatus.SC_CREATED,
             HttpStatus.SC_ACCEPTED, HttpStatus.SC_MULTIPLE_CHOICES, 
HttpStatus.SC_MOVED_PERMANENTLY,
             HttpStatus.SC_MOVED_TEMPORARILY).collect(Collectors.toSet());
     
+    @Override
     public List<CollectRep.MetricsData> collect(CollectRep.MetricsData.Builder 
builder,
                                                 Metrics metrics) {
         try {
@@ -139,6 +140,7 @@ public class PrometheusAutoCollectImpl {
         return Collections.singletonList(builder.build());
     }
     
+    @Override
     public String supportProtocol() {
         return DispatchConstants.PROTOCOL_PROMETHEUS;
     }
diff --git 
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusCollect.java
 
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusCollect.java
new file mode 100644
index 0000000000..68e61a8ffb
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusCollect.java
@@ -0,0 +1,23 @@
+package org.apache.hertzbeat.collector.collect.prometheus;
+
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+
+import java.util.List;
+
+public interface PrometheusCollect {
+
+    /**
+     * Collect prometheus metrics data
+     * @param builder metrics data builder
+     * @param metrics metrics config
+     * @return list of metrics data
+     */
+    List<CollectRep.MetricsData> collect(CollectRep.MetricsData.Builder 
builder, Metrics metrics);
+
+    /**
+     * Get the protocol name this collector supported
+     * @return protocol name
+     */
+    String supportProtocol();
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java
 
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusProxyCollectImpl.java
similarity index 68%
copy from 
hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java
copy to 
hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusProxyCollectImpl.java
index eae58d914f..c54834d950 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusAutoCollectImpl.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusProxyCollectImpl.java
@@ -17,24 +17,8 @@
 
 package org.apache.hertzbeat.collector.collect.prometheus;
 
-import static org.apache.hertzbeat.common.constants.SignConstants.RIGHT_DASH;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InterruptedIOException;
-import java.net.ConnectException;
-import java.net.UnknownHostException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import javax.net.ssl.SSLException;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.collector.collect.common.http.CommonHttpClient;
-import org.apache.hertzbeat.collector.collect.prometheus.parser.MetricFamily;
 import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
 import org.apache.hertzbeat.collector.util.CollectUtil;
 import org.apache.hertzbeat.common.constants.CommonConstants;
@@ -46,7 +30,6 @@ import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.util.Base64Util;
 import org.apache.hertzbeat.common.util.CommonUtil;
 import org.apache.hertzbeat.common.util.IpDomainUtil;
-import org.apache.hertzbeat.collector.collect.prometheus.parser.OnlineParser;
 import org.apache.http.HttpHeaders;
 import org.apache.http.HttpHost;
 import org.apache.http.HttpStatus;
@@ -65,70 +48,99 @@ import org.apache.http.impl.auth.DigestScheme;
 import org.apache.http.impl.client.BasicAuthCache;
 import org.apache.http.impl.client.BasicCredentialsProvider;
 import org.apache.http.protocol.HttpContext;
+import org.apache.http.util.EntityUtils;
 import org.springframework.http.MediaType;
 import org.springframework.util.StringUtils;
 
-/**
- * prometheus auto collect
- */
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.net.ConnectException;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import javax.net.ssl.SSLException;
+
+import static org.apache.hertzbeat.common.constants.SignConstants.RIGHT_DASH;
+
+
 @Slf4j
-public class PrometheusAutoCollectImpl {
-    
+public class PrometheusProxyCollectImpl implements PrometheusCollect {
+
     private final Set<Integer> defaultSuccessStatusCodes = 
Stream.of(HttpStatus.SC_OK, HttpStatus.SC_CREATED,
             HttpStatus.SC_ACCEPTED, HttpStatus.SC_MULTIPLE_CHOICES, 
HttpStatus.SC_MOVED_PERMANENTLY,
             HttpStatus.SC_MOVED_TEMPORARILY).collect(Collectors.toSet());
-    
-    public List<CollectRep.MetricsData> collect(CollectRep.MetricsData.Builder 
builder,
-                                                Metrics metrics) {
+
+    public static final String RAW_TEXT_CONTENT_FIELD_NAME = 
"raw_text_content";
+
+    @Override
+    public List<CollectRep.MetricsData> collect(CollectRep.MetricsData.Builder 
builder, Metrics metrics) {
+        PrometheusProtocol prometheusProtocol = metrics.getPrometheus();
+        HttpUriRequest request;
         try {
             validateParams(metrics);
         } catch (Exception e) {
             builder.setCode(CollectRep.Code.FAIL);
             builder.setMsg(e.getMessage());
-            return null;
+            return Collections.singletonList(builder.build());
         }
-        HttpContext httpContext = createHttpContext(metrics.getPrometheus());
-        HttpUriRequest request = createHttpRequest(metrics.getPrometheus());
-        try (CloseableHttpResponse response =
-             CommonHttpClient.getHttpClient().execute(request, httpContext)) {
+
+        HttpContext httpContext = createHttpContext(prometheusProtocol);
+        request = createHttpRequest(prometheusProtocol);
+
+        try (CloseableHttpResponse response = 
CommonHttpClient.getHttpClient().execute(request, httpContext)) {
             int statusCode = response.getStatusLine().getStatusCode();
-            boolean isSuccessInvoke = 
defaultSuccessStatusCodes.contains(statusCode);
-            log.debug("http response status: {}", statusCode);
-            if (!isSuccessInvoke) {
+            log.debug("Prometheus proxy collect, response status: {}", 
statusCode);
+
+            if (!defaultSuccessStatusCodes.contains(statusCode)) {
                 builder.setCode(CollectRep.Code.FAIL);
                 builder.setMsg(NetworkConstants.STATUS_CODE + 
SignConstants.BLANK + statusCode);
-                return null;
-            }
-            try {
-                return 
parseResponseByPrometheusExporter(response.getEntity().getContent(), builder);
-            } catch (Exception e) {
-                log.info("parse error: {}.", e.getMessage(), e);
-                builder.setCode(CollectRep.Code.FAIL);
-                builder.setMsg("parse response data error:" + e.getMessage());
+                return Collections.singletonList(builder.build());
             }
+
+            String rawTextContent = EntityUtils.toString(response.getEntity(), 
StandardCharsets.UTF_8);
+
+            builder.clearFields();
+            builder.clearValues();
+
+            CollectRep.Field rawDataField = CollectRep.Field.newBuilder()
+                    .setName(RAW_TEXT_CONTENT_FIELD_NAME)
+                    .setType(CommonConstants.TYPE_STRING)
+                    .build();
+            builder.addField(rawDataField);
+
+            CollectRep.ValueRow.Builder valueRowBuilder = 
CollectRep.ValueRow.newBuilder();
+            valueRowBuilder.addColumn(rawTextContent);
+            builder.addValueRow(valueRowBuilder.build());
+
+            builder.setCode(CollectRep.Code.SUCCESS);
         } catch (ClientProtocolException e1) {
             String errorMsg = CommonUtil.getMessageFromThrowable(e1);
-            log.error(errorMsg);
+            log.error("Prometheus proxy collect error: {}. Host: {}, Port: 
{}", errorMsg, prometheusProtocol.getHost(), prometheusProtocol.getPort(), e1);
             builder.setCode(CollectRep.Code.UN_CONNECTABLE);
             builder.setMsg(errorMsg);
         } catch (UnknownHostException e2) {
             String errorMsg = CommonUtil.getMessageFromThrowable(e2);
-            log.info(errorMsg);
+            log.info("Prometheus proxy collect unknown host: {}. Host: {}", 
errorMsg, prometheusProtocol.getHost(), e2);
             builder.setCode(CollectRep.Code.UN_REACHABLE);
             builder.setMsg("unknown host:" + errorMsg);
         } catch (InterruptedIOException | ConnectException | SSLException e3) {
             String errorMsg = CommonUtil.getMessageFromThrowable(e3);
-            log.info(errorMsg);
+            log.info("Prometheus proxy collect connect error: {}. Host: {}, 
Port: {}", errorMsg, prometheusProtocol.getHost(), 
prometheusProtocol.getPort(), e3);
             builder.setCode(CollectRep.Code.UN_CONNECTABLE);
             builder.setMsg(errorMsg);
         } catch (IOException e4) {
             String errorMsg = CommonUtil.getMessageFromThrowable(e4);
-            log.info(errorMsg);
+            log.info("Prometheus proxy collect IO error: {}. Host: {}, Port: 
{}", errorMsg, prometheusProtocol.getHost(), prometheusProtocol.getPort(), e4);
             builder.setCode(CollectRep.Code.FAIL);
             builder.setMsg(errorMsg);
         } catch (Exception e) {
             String errorMsg = CommonUtil.getMessageFromThrowable(e);
-            log.error(errorMsg, e);
+            log.error("Prometheus proxy collect unknown error: {}. Host: {}, 
Port: {}", errorMsg, prometheusProtocol.getHost(), 
prometheusProtocol.getPort(), e);
             builder.setCode(CollectRep.Code.FAIL);
             builder.setMsg(errorMsg);
         } finally {
@@ -138,70 +150,31 @@ public class PrometheusAutoCollectImpl {
         }
         return Collections.singletonList(builder.build());
     }
-    
+
+    @Override
     public String supportProtocol() {
         return DispatchConstants.PROTOCOL_PROMETHEUS;
     }
-    
+
     private void validateParams(Metrics metrics) throws Exception {
         if (metrics == null || metrics.getPrometheus() == null) {
             throw new Exception("Prometheus collect must has prometheus 
params");
         }
         PrometheusProtocol protocol = metrics.getPrometheus();
+        if (!StringUtils.hasText(protocol.getHost())
+            || !StringUtils.hasText(protocol.getPort())) {
+            throw new Exception("Prometheus collect must has host and port 
params");
+        }
         if (protocol.getPath() == null
-                    || !StringUtils.hasText(protocol.getPath())
-                    || !protocol.getPath().startsWith(RIGHT_DASH)) {
+                || !StringUtils.hasText(protocol.getPath())
+                || !protocol.getPath().startsWith(RIGHT_DASH)) {
             protocol.setPath(protocol.getPath() == null ? RIGHT_DASH : 
RIGHT_DASH + protocol.getPath().trim());
         }
     }
     
-    private List<CollectRep.MetricsData> 
parseResponseByPrometheusExporter(InputStream inputStream, 
CollectRep.MetricsData.Builder builder) throws IOException {
-        long endTime = System.currentTimeMillis();
-        builder.setTime(endTime);
-        Map<String, MetricFamily> metricFamilyMap = 
OnlineParser.parseMetrics(inputStream);
-        List<CollectRep.MetricsData> metricsDataList = new LinkedList<>();
-        if (metricFamilyMap == null) {
-            return metricsDataList;
-        }
-        for (Map.Entry<String, MetricFamily> entry : 
metricFamilyMap.entrySet()) {
-            builder.clearFields();
-            builder.clearValues();
-            String metricsName = entry.getKey();
-            builder.setMetrics(metricsName);
-            MetricFamily metricFamily = entry.getValue();
-            if (!metricFamily.getMetricList().isEmpty()) {
-                List<String> metricsFields = new LinkedList<>();
-                for (int index = 0; index < 
metricFamily.getMetricList().size(); index++) {
-                    MetricFamily.Metric metric = 
metricFamily.getMetricList().get(index);
-                    if (index == 0) {
-                        metric.getLabels().forEach(label -> {
-                            metricsFields.add(label.getName());
-                            
builder.addField(CollectRep.Field.newBuilder().setName(label.getName())
-                                    
.setType(CommonConstants.TYPE_STRING).setLabel(true).build());
-                        });
-                        
builder.addField(CollectRep.Field.newBuilder().setName("value")
-                                
.setType(CommonConstants.TYPE_NUMBER).setLabel(false).build());
-                    }
-                    Map<String, String> labelMap = metric.getLabels()
-                            .stream()
-                            
.collect(Collectors.toMap(MetricFamily.Label::getName, 
MetricFamily.Label::getValue));
-                    CollectRep.ValueRow.Builder valueRowBuilder = 
CollectRep.ValueRow.newBuilder();
-                    for (String field : metricsFields) {
-                        String fieldValue = labelMap.get(field);
-                        valueRowBuilder.addColumn(fieldValue == null ? 
CommonConstants.NULL_VALUE : fieldValue);
-                    }
-                    
valueRowBuilder.addColumn(String.valueOf(metric.getValue()));
-                    builder.addValueRow(valueRowBuilder.build());
-                }
-                metricsDataList.add(builder.build());
-            }
-        }
-        return metricsDataList;
-    }
-    
     /**
      * create httpContext
-     *
+     * This method is adapted from PrometheusAutoCollectImpl
      * @param protocol prometheus protocol
      * @return context
      */
@@ -216,7 +189,8 @@ public class PrometheusAutoCollectImpl {
                         new 
UsernamePasswordCredentials(auth.getDigestAuthUsername(), 
auth.getDigestAuthPassword());
                 provider.setCredentials(AuthScope.ANY, credentials);
                 AuthCache authCache = new BasicAuthCache();
-                authCache.put(new HttpHost(protocol.getHost(), 
Integer.parseInt(protocol.getPort())), new DigestScheme());
+                HttpHost targetHost = new HttpHost(protocol.getHost(), 
Integer.parseInt(protocol.getPort()));
+                authCache.put(targetHost, new DigestScheme());
                 clientContext.setCredentialsProvider(provider);
                 clientContext.setAuthCache(authCache);
                 return clientContext;
@@ -227,6 +201,7 @@ public class PrometheusAutoCollectImpl {
 
     /**
      * create http request
+     * This method is adapted from PrometheusAutoCollectImpl
      * @param protocol http params
      * @return http uri request
      */
@@ -241,8 +216,6 @@ public class PrometheusAutoCollectImpl {
                 }
             }
         }
-        // The default request header can be overridden if customized
-        // keep-alive
         requestBuilder.addHeader(HttpHeaders.CONNECTION, 
NetworkConstants.KEEP_ALIVE);
         requestBuilder.addHeader(HttpHeaders.USER_AGENT, 
NetworkConstants.USER_AGENT);
         // headers  The custom request header is overwritten here
@@ -255,14 +228,17 @@ public class PrometheusAutoCollectImpl {
                 }
             }
         }
-        // add accept
-        requestBuilder.addHeader(HttpHeaders.ACCEPT, 
MediaType.TEXT_PLAIN_VALUE);
+        if (headers == null || 
headers.keySet().stream().noneMatch(HttpHeaders.ACCEPT::equalsIgnoreCase)) {
+            requestBuilder.addHeader(HttpHeaders.ACCEPT, 
MediaType.TEXT_PLAIN_VALUE + ";version=0.0.4,*/*;q=0.1");
+        }
         
         if (protocol.getAuthorization() != null) {
             PrometheusProtocol.Authorization authorization = 
protocol.getAuthorization();
             if 
(DispatchConstants.BEARER_TOKEN.equalsIgnoreCase(authorization.getType())) {
-                String value = DispatchConstants.BEARER + " " + 
authorization.getBearerTokenToken();
-                requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, value);
+                if (StringUtils.hasText(authorization.getBearerTokenToken())) {
+                    String value = DispatchConstants.BEARER + " " + 
authorization.getBearerTokenToken();
+                    requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, value);
+                }
             } else if 
(DispatchConstants.BASIC_AUTH.equals(authorization.getType())) {
                 if (StringUtils.hasText(authorization.getBasicAuthUsername())
                             && 
StringUtils.hasText(authorization.getBasicAuthPassword())) {
@@ -273,21 +249,21 @@ public class PrometheusAutoCollectImpl {
             }
         }
 
-        // if it has payload, would override post params
         if (StringUtils.hasLength(protocol.getPayload())) {
             requestBuilder.setEntity(new StringEntity(protocol.getPayload(), 
StandardCharsets.UTF_8));
+            if (headers == null || 
headers.keySet().stream().noneMatch(HttpHeaders.CONTENT_TYPE::equalsIgnoreCase))
 {
+                requestBuilder.setHeader(HttpHeaders.CONTENT_TYPE, 
MediaType.TEXT_PLAIN_VALUE);
+            }
         }
         
-        // uri
-        String uri = CollectUtil.replaceUriSpecialChar(protocol.getPath());
+        String uriPath = CollectUtil.replaceUriSpecialChar(protocol.getPath());
         if (IpDomainUtil.isHasSchema(protocol.getHost())) {
-            
-            requestBuilder.setUri(protocol.getHost() + 
SignConstants.DOUBLE_MARK + protocol.getPort() + uri);
+            requestBuilder.setUri(protocol.getHost() + 
SignConstants.DOUBLE_MARK + protocol.getPort() + uriPath);
         } else {
             String ipAddressType = 
IpDomainUtil.checkIpAddressType(protocol.getHost());
             String baseUri = NetworkConstants.IPV6.equals(ipAddressType)
-                                     ? String.format("[%s]:%s%s", 
protocol.getHost(), protocol.getPort(), uri)
-                                     : String.format("%s:%s%s", 
protocol.getHost(), protocol.getPort(), uri);
+                                     ? String.format("[%s]:%s%s", 
protocol.getHost(), protocol.getPort(), uriPath)
+                                     : String.format("%s:%s%s", 
protocol.getHost(), protocol.getPort(), uriPath);
             boolean ssl = Boolean.parseBoolean(protocol.getSsl());
             if (ssl) {
                 requestBuilder.setUri(NetworkConstants.HTTPS_HEADER + baseUri);
@@ -297,11 +273,17 @@ public class PrometheusAutoCollectImpl {
         }
         
         // custom timeout
-        int timeout = CollectUtil.getTimeout(protocol.getTimeout(), 0);
+        int timeout = CollectUtil.getTimeout(protocol.getTimeout());
         if (timeout > 0) {
             RequestConfig requestConfig = RequestConfig.custom()
                                                   .setConnectTimeout(timeout)
                                                   .setSocketTimeout(timeout)
+                                                  
.setConnectionRequestTimeout(timeout)
+                                                  .setRedirectsEnabled(true)
+                                                  .build();
+            requestBuilder.setConfig(requestConfig);
+        } else {
+             RequestConfig requestConfig = RequestConfig.custom()
                                                   .setRedirectsEnabled(true)
                                                   .build();
             requestBuilder.setConfig(requestConfig);
@@ -313,14 +295,14 @@ public class PrometheusAutoCollectImpl {
      * get collect instance
      * @return instance
      */
-    public static PrometheusAutoCollectImpl getInstance() {
-        return PrometheusAutoCollectImpl.SingleInstance.INSTANCE;
+    public static PrometheusProxyCollectImpl getInstance() {
+        return PrometheusProxyCollectImpl.SingleInstance.INSTANCE;
     }
-    
+
     /**
      * static instance
      */
     private static class SingleInstance {
-        private static final PrometheusAutoCollectImpl INSTANCE = new 
PrometheusAutoCollectImpl();
+        private static final PrometheusProxyCollectImpl INSTANCE = new 
PrometheusProxyCollectImpl();
     }
 }
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
index 254af29710..3213c1072b 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
@@ -155,6 +155,8 @@ public class MetricsCollect implements Runnable, 
Comparable<MetricsCollect> {
                 
.setLabels(labels).setAnnotations(annotations).addMetadataAll(metadata);
         // for prometheus auto
         if 
(DispatchConstants.PROTOCOL_PROMETHEUS.equalsIgnoreCase(metrics.getProtocol())) 
{
+            PrometheusCollectorFactory
+                    .getCollector(metrics).collect(response, metrics);
             List<CollectRep.MetricsData> metricsData = 
PrometheusAutoCollectImpl
                     .getInstance().collect(response, metrics);
             validateResponse(metricsData.stream().findFirst().orElse(null));
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/Job.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/Job.java
index 67bc2978de..6ab4e10b0d 100644
--- 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/Job.java
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/Job.java
@@ -138,6 +138,11 @@ public class Job {
      */
     private boolean isSd = false;
 
+    /**
+     * Whether to use the Prometheus proxy
+     */
+    private boolean prometheusProxyMode = false;
+
     /**
      * the collect data response metrics as env configmap for other collect 
use. ^o^xxx^o^
      */
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/prometheus/PrometheusCollectorFactory.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/prometheus/PrometheusCollectorFactory.java
new file mode 100644
index 0000000000..513f354e59
--- /dev/null
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/prometheus/PrometheusCollectorFactory.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.manager.component.prometheus;
+
+import 
org.apache.hertzbeat.collector.collect.prometheus.PrometheusAutoCollectImpl;
+import org.apache.hertzbeat.collector.collect.prometheus.PrometheusCollect;
+import 
org.apache.hertzbeat.collector.collect.prometheus.PrometheusProxyCollectImpl;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import 
org.apache.hertzbeat.warehouse.store.history.tsdb.greptime.GreptimeProperties;
+import 
org.apache.hertzbeat.warehouse.store.history.tsdb.vm.VictoriaMetricsProperties;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import jakarta.annotation.PostConstruct;
+
+/**
+ * Factory class to get the appropriate Prometheus collector implementation.
+ * It decides based on whether GreptimeDB or VictoriaMetrics is enabled in the 
warehouse configuration.
+ */
+@Component
+public class PrometheusCollectorFactory {
+
+    private static GreptimeProperties staticGreptimeProperties;
+    private static VictoriaMetricsProperties staticVictoriaMetricsProperties;
+
+    private final GreptimeProperties greptimeProperties;
+    private final VictoriaMetricsProperties victoriaMetricsProperties;
+
+    /**
+     * Constructs the factory and injects warehouse properties.
+     * Uses @Autowired(required = false) to allow these properties to be 
optional,
+     * in case they are not configured or enabled in the warehouse module.
+     *
+     * @param greptimeProperties GreptimeDB configuration properties.
+     * @param victoriaMetricsProperties VictoriaMetrics configuration 
properties.
+     */
+    @Autowired
+    public PrometheusCollectorFactory(
+            @Autowired(required = false) GreptimeProperties greptimeProperties,
+            @Autowired(required = false) VictoriaMetricsProperties 
victoriaMetricsProperties) {
+        this.greptimeProperties = greptimeProperties;
+        this.victoriaMetricsProperties = victoriaMetricsProperties;
+    }
+
+    /**
+     * Initializes static fields with the injected properties after 
construction.
+     * This allows the static getCollector method to access these 
configurations.
+     */
+    @PostConstruct
+    private void initStatic() {
+        staticGreptimeProperties = this.greptimeProperties;
+        staticVictoriaMetricsProperties = this.victoriaMetricsProperties;
+    }
+
+    /**
+     * Gets the appropriate Prometheus collector.
+     * If GreptimeDB or VictoriaMetrics is enabled (checked in that order),
+     * it returns an instance of {@link PrometheusProxyCollectImpl}.
+     * Otherwise, it defaults to {@link PrometheusAutoCollectImpl}.
+     *
+     * @param metrics The metrics configuration (currently not used for 
collector type decision but kept for API consistency).
+     * @return An instance of {@link PrometheusCollect}.
+     */
+    public static PrometheusCollect getCollector(Metrics metrics) {
+        boolean useProxyImpl = staticGreptimeProperties != null && 
staticGreptimeProperties.enabled();
+
+        if (!useProxyImpl && staticVictoriaMetricsProperties != null && 
staticVictoriaMetricsProperties.enabled()) {
+            useProxyImpl = true;
+        }
+
+        if (useProxyImpl) {
+            return PrometheusProxyCollectImpl.getInstance();
+        }
+        
+        return new PrometheusAutoCollectImpl();
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
index 1072f407b8..94667cbed1 100644
--- 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/CollectorJobScheduler.java
@@ -47,6 +47,7 @@ import org.apache.hertzbeat.common.entity.message.ClusterMsg;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.util.JsonUtil;
 import org.apache.hertzbeat.common.util.SnowFlakeIdGenerator;
+import 
org.apache.hertzbeat.manager.component.prometheus.PrometheusCollectorFactory;
 import org.apache.hertzbeat.manager.dao.CollectorDao;
 import org.apache.hertzbeat.manager.dao.CollectorMonitorBindDao;
 import org.apache.hertzbeat.manager.dao.MonitorDao;
@@ -90,6 +91,9 @@ public class CollectorJobScheduler implements 
CollectorScheduling, CollectJobSch
     @Autowired
     private ParamDao paramDao;
 
+    @Autowired
+    private PrometheusCollectorFactory prometheusCollectorFactory;
+
     private ManageServer manageServer;
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to