This is an automated email from the ASF dual-hosted git repository.
zhengqiwei pushed a commit to branch refactor_collector
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/refactor_collector by this
push:
new d0b3c2dba [refactor] optimize collector module
d0b3c2dba is described below
commit d0b3c2dba21fc602258d9115a0e484ff2c74c7c9
Author: Calvin <[email protected]>
AuthorDate: Sat Sep 6 14:55:23 2025 +0800
[refactor] optimize collector module
---
.../prometheus/PrometheusProxyCollectImpl.java | 308 --------------------
.../dispatch/CollectTaskTimeoutMonitor.java | 116 ++++++++
.../collector/dispatch/CommonDispatcher.java | 313 +++++++++-----------
.../collector/dispatch/MetricsCollect.java | 321 +--------------------
.../handler/CollectMetricsDataHandler.java | 110 +++++++
.../DynamicSubTaskCollectMetricsDataHandler.java | 17 ++
.../CalculateFieldsListener.java} | 261 +----------------
.../listener/CommonMetricsDataListener.java | 21 ++
.../listener/MetricsDataDeliveryListener.java | 52 ++++
.../listener/RemoveTimeoutMonitorListener.java | 26 ++
.../hertzbeat/collector/listener/RerunHandler.java | 38 +++
.../listener/ResponseJobDataListener.java | 44 +++
.../listener/ValidateResponseListener.java | 36 +++
.../hertzbeat/collector/constants/ContextKey.java | 24 ++
.../collector/constants/ContextStatus.java | 8 +
.../hertzbeat/collector/constants/HandlerType.java | 8 +
.../collector/context/AbstractInmemoryContext.java | 56 ++++
.../hertzbeat/collector/context/Context.java | 12 +
.../collector/context/ContextOperation.java | 17 ++
.../hertzbeat/collector/context/ContextView.java | 12 +
.../collector/context/ExceptionStrategy.java | 7 +
.../collector/context/impl/DefaultContext.java | 15 +
.../collector/dispatch/CollectDataDispatch.java | 9 -
.../collector/dispatch/unit/UnitConverter.java | 15 +
.../collector/handler/ChainBootstrap.java | 96 ++++++
.../collector/handler/ContextBoundHandler.java | 12 +
.../collector/handler/ContextBoundListener.java | 9 +
.../hertzbeat/collector/handler/TaskChain.java | 15 +
.../impl/AbstractBatchDataBoundHandler.java | 22 ++
.../impl/AbstractContextBoundTaskChain.java | 27 ++
.../handler/impl/AbstractListenerBoundHandler.java | 73 +++++
.../handler/impl/BatchExecuteTaskChain.java | 48 +++
.../common/constants/CommonConstants.java | 2 +
.../common/entity/collector/CollectorMetaData.java | 21 ++
.../hertzbeat/common/entity/job/MetricsSource.java | 13 +
35 files changed, 1124 insertions(+), 1060 deletions(-)
diff --git
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusProxyCollectImpl.java
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusProxyCollectImpl.java
deleted file mode 100644
index 2177710d0..000000000
---
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusProxyCollectImpl.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hertzbeat.collector.collect.prometheus;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.collector.collect.common.http.CommonHttpClient;
-import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
-import org.apache.hertzbeat.collector.util.CollectUtil;
-import org.apache.hertzbeat.common.constants.CommonConstants;
-import org.apache.hertzbeat.common.constants.NetworkConstants;
-import org.apache.hertzbeat.common.constants.SignConstants;
-import org.apache.hertzbeat.common.entity.job.Metrics;
-import org.apache.hertzbeat.common.entity.job.protocol.PrometheusProtocol;
-import org.apache.hertzbeat.common.entity.message.CollectRep;
-import org.apache.hertzbeat.common.util.Base64Util;
-import org.apache.hertzbeat.common.util.CommonUtil;
-import org.apache.hertzbeat.common.util.IpDomainUtil;
-import org.apache.http.HttpHeaders;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpStatus;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.AuthCache;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.methods.RequestBuilder;
-import org.apache.http.client.protocol.HttpClientContext;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.auth.DigestScheme;
-import org.apache.http.impl.client.BasicAuthCache;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.protocol.HttpContext;
-import org.apache.http.util.EntityUtils;
-import org.springframework.http.MediaType;
-import org.springframework.util.StringUtils;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.ConnectException;
-import java.net.UnknownHostException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import javax.net.ssl.SSLException;
-
-import static org.apache.hertzbeat.common.constants.SignConstants.RIGHT_DASH;
-
-
-@Slf4j
-public class PrometheusProxyCollectImpl implements PrometheusCollect {
-
- private final Set<Integer> defaultSuccessStatusCodes =
Stream.of(HttpStatus.SC_OK, HttpStatus.SC_CREATED,
- HttpStatus.SC_ACCEPTED, HttpStatus.SC_MULTIPLE_CHOICES,
HttpStatus.SC_MOVED_PERMANENTLY,
- HttpStatus.SC_MOVED_TEMPORARILY).collect(Collectors.toSet());
-
- public static final String RAW_TEXT_CONTENT_FIELD_NAME =
"raw_text_content";
-
- @Override
- public List<CollectRep.MetricsData> collect(CollectRep.MetricsData.Builder
builder, Metrics metrics) {
- PrometheusProtocol prometheusProtocol = metrics.getPrometheus();
- HttpUriRequest request;
- try {
- validateParams(metrics);
- } catch (Exception e) {
- builder.setCode(CollectRep.Code.FAIL);
- builder.setMsg(e.getMessage());
- return Collections.singletonList(builder.build());
- }
-
- HttpContext httpContext = createHttpContext(prometheusProtocol);
- request = createHttpRequest(prometheusProtocol);
-
- try (CloseableHttpResponse response =
CommonHttpClient.getHttpClient().execute(request, httpContext)) {
- int statusCode = response.getStatusLine().getStatusCode();
- log.debug("Prometheus proxy collect, response status: {}",
statusCode);
-
- if (!defaultSuccessStatusCodes.contains(statusCode)) {
- builder.setCode(CollectRep.Code.FAIL);
- builder.setMsg(NetworkConstants.STATUS_CODE +
SignConstants.BLANK + statusCode);
- return Collections.singletonList(builder.build());
- }
-
- String rawTextContent = EntityUtils.toString(response.getEntity(),
StandardCharsets.UTF_8);
-
- builder.clearFields();
- builder.clearValues();
-
- CollectRep.Field rawDataField = CollectRep.Field.newBuilder()
- .setName(RAW_TEXT_CONTENT_FIELD_NAME)
- .setType(CommonConstants.TYPE_STRING)
- .build();
- builder.addField(rawDataField);
-
- CollectRep.ValueRow.Builder valueRowBuilder =
CollectRep.ValueRow.newBuilder();
- valueRowBuilder.addColumn(rawTextContent);
- builder.addValueRow(valueRowBuilder.build());
-
- builder.setCode(CollectRep.Code.SUCCESS);
- } catch (ClientProtocolException e1) {
- String errorMsg = CommonUtil.getMessageFromThrowable(e1);
- log.error("Prometheus proxy collect error: {}. Host: {}, Port:
{}", errorMsg, prometheusProtocol.getHost(), prometheusProtocol.getPort(), e1);
- builder.setCode(CollectRep.Code.UN_CONNECTABLE);
- builder.setMsg(errorMsg);
- } catch (UnknownHostException e2) {
- String errorMsg = CommonUtil.getMessageFromThrowable(e2);
- log.info("Prometheus proxy collect unknown host: {}. Host: {}",
errorMsg, prometheusProtocol.getHost(), e2);
- builder.setCode(CollectRep.Code.UN_REACHABLE);
- builder.setMsg("unknown host:" + errorMsg);
- } catch (InterruptedIOException | ConnectException | SSLException e3) {
- String errorMsg = CommonUtil.getMessageFromThrowable(e3);
- log.info("Prometheus proxy collect connect error: {}. Host: {},
Port: {}", errorMsg, prometheusProtocol.getHost(),
prometheusProtocol.getPort(), e3);
- builder.setCode(CollectRep.Code.UN_CONNECTABLE);
- builder.setMsg(errorMsg);
- } catch (IOException e4) {
- String errorMsg = CommonUtil.getMessageFromThrowable(e4);
- log.info("Prometheus proxy collect IO error: {}. Host: {}, Port:
{}", errorMsg, prometheusProtocol.getHost(), prometheusProtocol.getPort(), e4);
- builder.setCode(CollectRep.Code.FAIL);
- builder.setMsg(errorMsg);
- } catch (Exception e) {
- String errorMsg = CommonUtil.getMessageFromThrowable(e);
- log.error("Prometheus proxy collect unknown error: {}. Host: {},
Port: {}", errorMsg, prometheusProtocol.getHost(),
prometheusProtocol.getPort(), e);
- builder.setCode(CollectRep.Code.FAIL);
- builder.setMsg(errorMsg);
- } finally {
- if (request != null) {
- request.abort();
- }
- }
- return Collections.singletonList(builder.build());
- }
-
- @Override
- public String supportProtocol() {
- return DispatchConstants.PROTOCOL_PROMETHEUS;
- }
-
- private void validateParams(Metrics metrics) throws Exception {
- if (metrics == null || metrics.getPrometheus() == null) {
- throw new Exception("Prometheus collect must has prometheus
params");
- }
- PrometheusProtocol protocol = metrics.getPrometheus();
- if (!StringUtils.hasText(protocol.getHost())
- || !StringUtils.hasText(protocol.getPort())) {
- throw new Exception("Prometheus collect must has host and port
params");
- }
- if (protocol.getPath() == null
- || !StringUtils.hasText(protocol.getPath())
- || !protocol.getPath().startsWith(RIGHT_DASH)) {
- protocol.setPath(protocol.getPath() == null ? RIGHT_DASH :
RIGHT_DASH + protocol.getPath().trim());
- }
- }
-
- /**
- * create httpContext
- * This method is adapted from PrometheusAutoCollectImpl
- * @param protocol prometheus protocol
- * @return context
- */
- public HttpContext createHttpContext(PrometheusProtocol protocol) {
- PrometheusProtocol.Authorization auth = protocol.getAuthorization();
- if (auth != null &&
DispatchConstants.DIGEST_AUTH.equals(auth.getType())) {
- HttpClientContext clientContext = new HttpClientContext();
- if (StringUtils.hasText(auth.getDigestAuthUsername())
- && StringUtils.hasText(auth.getDigestAuthPassword())) {
- CredentialsProvider provider = new BasicCredentialsProvider();
- UsernamePasswordCredentials credentials =
- new
UsernamePasswordCredentials(auth.getDigestAuthUsername(),
auth.getDigestAuthPassword());
- provider.setCredentials(AuthScope.ANY, credentials);
- AuthCache authCache = new BasicAuthCache();
- HttpHost targetHost = new HttpHost(protocol.getHost(),
Integer.parseInt(protocol.getPort()));
- authCache.put(targetHost, new DigestScheme());
- clientContext.setCredentialsProvider(provider);
- clientContext.setAuthCache(authCache);
- return clientContext;
- }
- }
- return null;
- }
-
- /**
- * create http request
- * This method is adapted from PrometheusAutoCollectImpl
- * @param protocol http params
- * @return http uri request
- */
- public HttpUriRequest createHttpRequest(PrometheusProtocol protocol) {
- RequestBuilder requestBuilder = RequestBuilder.get();
- // params
- Map<String, String> params = protocol.getParams();
- if (params != null && !params.isEmpty()) {
- for (Map.Entry<String, String> param : params.entrySet()) {
- if (StringUtils.hasText(param.getValue())) {
- requestBuilder.addParameter(param.getKey(),
param.getValue());
- }
- }
- }
- requestBuilder.addHeader(HttpHeaders.CONNECTION,
NetworkConstants.KEEP_ALIVE);
- requestBuilder.addHeader(HttpHeaders.USER_AGENT,
NetworkConstants.USER_AGENT);
- // headers The custom request header is overwritten here
- Map<String, String> headers = protocol.getHeaders();
- if (headers != null && !headers.isEmpty()) {
- for (Map.Entry<String, String> header : headers.entrySet()) {
- if (StringUtils.hasText(header.getValue())) {
-
requestBuilder.addHeader(CollectUtil.replaceUriSpecialChar(header.getKey()),
-
CollectUtil.replaceUriSpecialChar(header.getValue()));
- }
- }
- }
- if (headers == null ||
headers.keySet().stream().noneMatch(HttpHeaders.ACCEPT::equalsIgnoreCase)) {
- requestBuilder.addHeader(HttpHeaders.ACCEPT,
MediaType.TEXT_PLAIN_VALUE + ";version=0.0.4,*/*;q=0.1");
- }
-
- if (protocol.getAuthorization() != null) {
- PrometheusProtocol.Authorization authorization =
protocol.getAuthorization();
- if
(DispatchConstants.BEARER_TOKEN.equalsIgnoreCase(authorization.getType())) {
- if (StringUtils.hasText(authorization.getBearerTokenToken())) {
- String value = DispatchConstants.BEARER + " " +
authorization.getBearerTokenToken();
- requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, value);
- }
- } else if
(DispatchConstants.BASIC_AUTH.equals(authorization.getType())) {
- if (StringUtils.hasText(authorization.getBasicAuthUsername())
- &&
StringUtils.hasText(authorization.getBasicAuthPassword())) {
- String authStr = authorization.getBasicAuthUsername() +
":" + authorization.getBasicAuthPassword();
- String encodedAuth = Base64Util.encode(authStr);
- requestBuilder.addHeader(HttpHeaders.AUTHORIZATION,
DispatchConstants.BASIC + " " + encodedAuth);
- }
- }
- }
-
- if (StringUtils.hasLength(protocol.getPayload())) {
- requestBuilder.setEntity(new StringEntity(protocol.getPayload(),
StandardCharsets.UTF_8));
- if (headers == null ||
headers.keySet().stream().noneMatch(HttpHeaders.CONTENT_TYPE::equalsIgnoreCase))
{
- requestBuilder.setHeader(HttpHeaders.CONTENT_TYPE,
MediaType.TEXT_PLAIN_VALUE);
- }
- }
-
- String uriPath = CollectUtil.replaceUriSpecialChar(protocol.getPath());
- if (IpDomainUtil.isHasSchema(protocol.getHost())) {
- requestBuilder.setUri(protocol.getHost() +
SignConstants.DOUBLE_MARK + protocol.getPort() + uriPath);
- } else {
- String ipAddressType =
IpDomainUtil.checkIpAddressType(protocol.getHost());
- String baseUri = NetworkConstants.IPV6.equals(ipAddressType)
- ? String.format("[%s]:%s%s",
protocol.getHost(), protocol.getPort(), uriPath)
- : String.format("%s:%s%s",
protocol.getHost(), protocol.getPort(), uriPath);
- boolean ssl = Boolean.parseBoolean(protocol.getSsl());
- if (ssl) {
- requestBuilder.setUri(NetworkConstants.HTTPS_HEADER + baseUri);
- } else {
- requestBuilder.setUri(NetworkConstants.HTTP_HEADER + baseUri);
- }
- }
-
- // custom timeout
- int timeout = CollectUtil.getTimeout(protocol.getTimeout());
- if (timeout > 0) {
- RequestConfig requestConfig = RequestConfig.custom()
- .setConnectTimeout(timeout)
- .setSocketTimeout(timeout)
-
.setConnectionRequestTimeout(timeout)
- .setRedirectsEnabled(true)
- .build();
- requestBuilder.setConfig(requestConfig);
- } else {
- RequestConfig requestConfig = RequestConfig.custom()
- .setRedirectsEnabled(true)
- .build();
- requestBuilder.setConfig(requestConfig);
- }
- return requestBuilder.build();
- }
-
- /**
- * get collect instance
- * @return instance
- */
- public static PrometheusProxyCollectImpl getInstance() {
- return PrometheusProxyCollectImpl.SingleInstance.INSTANCE;
- }
-
- /**
- * static instance
- */
- private static class SingleInstance {
- private static final PrometheusProxyCollectImpl INSTANCE = new
PrometheusProxyCollectImpl();
- }
-}
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectTaskTimeoutMonitor.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectTaskTimeoutMonitor.java
new file mode 100644
index 000000000..6a97ab5e3
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectTaskTimeoutMonitor.java
@@ -0,0 +1,116 @@
+package org.apache.hertzbeat.collector.dispatch;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.metrics.HertzBeatMetricsCollector;
+import org.apache.hertzbeat.collector.timer.WheelTimerTask;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.timer.Timeout;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+@Slf4j
+@Component
+public class CollectTaskTimeoutMonitor {
+ /**
+ * Collection task timeout value
+ */
+ private static final long DURATION_TIME = 240_000L;
+ /**
+ * Metrics task and start time mapping map
+ */
+ private final Map<String, MetricsTime> metricsTimeoutMonitorMap = new
ConcurrentHashMap<>(16);
+
+ @Autowired
+ private HertzBeatMetricsCollector metricsCollector;
+ private CommonDispatcher commonDispatcher;
+
+ public void start(CommonDispatcher commonDispatcher) {
+ this.commonDispatcher = commonDispatcher;
+
+ // monitoring metrics collection task execution timeout
+ ThreadFactory threadFactory = new ThreadFactoryBuilder()
+ .setNameFormat("metrics-task-timeout-monitor-%d")
+ .setDaemon(true)
+ .build();
+ ScheduledThreadPoolExecutor scheduledExecutor = new
ScheduledThreadPoolExecutor(1, threadFactory);
+
scheduledExecutor.scheduleWithFixedDelay(this::monitorCollectTaskTimeout, 2,
20, TimeUnit.SECONDS);
+ }
+
+ public void putMetrics(String key, MetricsTime value) {
+ this.metricsTimeoutMonitorMap.put(key, value);
+ }
+
+ public MetricsTime removeMetrics(String key) {
+ return this.metricsTimeoutMonitorMap.remove(key);
+ }
+
+ private void monitorCollectTaskTimeout() {
+ try {
+ // Detect whether the collection unit of each metrics has timed
out for 4 minutes,
+ // and if it times out, it will be discarded and an exception will
be returned.
+ long deadline = System.currentTimeMillis() - DURATION_TIME;
+ for (Map.Entry<String, MetricsTime> entry :
metricsTimeoutMonitorMap.entrySet()) {
+ MetricsTime metricsTime = entry.getValue();
+ if (metricsTime.getStartTime() < deadline) {
+ // Metrics collection timeout
+ MetricsTime removedMetricsTime =
metricsTimeoutMonitorMap.remove(entry.getKey());
+ if (removedMetricsTime == null) {
+ continue;
+ }
+ WheelTimerTask timerJob = (WheelTimerTask)
metricsTime.getTimeout().task();
+ Job job = timerJob.getJob();
+ // timeout metrics
+ if (metricsCollector != null) {
+ long duration = System.currentTimeMillis() -
removedMetricsTime.getStartTime();
+ metricsCollector.recordCollectMetrics(job, duration,
"timeout");
+ }
+
+ CollectRep.MetricsData metricsData =
CollectRep.MetricsData.newBuilder()
+ .setId(job.getMonitorId())
+ .setTenantId(job.getTenantId())
+ .setApp(job.getApp())
+ .setMetrics(metricsTime.getMetrics().getName())
+
.setPriority(metricsTime.getMetrics().getPriority())
+ .setTime(System.currentTimeMillis())
+ .setCode(CollectRep.Code.TIMEOUT)
+ .setMsg("collect timeout")
+ .build();
+ log.error("[Collect Timeout]: \n{}", metricsData);
+ if (metricsData.getPriority() ==
CommonConstants.AVAILABLE_METRICS) {
+ //todo 使用chain bootstrap
+//
commonDispatcher.dispatchCollectData(metricsTime.timeout,
metricsTime.getMetrics(), metricsData);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("[Task Timeout Monitor]-{}.", e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Metrics times.
+ */
+ @Data
+ @AllArgsConstructor
+ public static class MetricsTime {
+ private long startTime;
+ private Metrics metrics;
+ private Timeout timeout;
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
index 6e6875624..213bb434a 100644
---
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
@@ -17,13 +17,24 @@
package org.apache.hertzbeat.collector.dispatch;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
-import lombok.AllArgsConstructor;
-import lombok.Data;
import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.impl.DefaultContext;
import
org.apache.hertzbeat.collector.dispatch.entrance.internal.CollectJobService;
+import org.apache.hertzbeat.collector.dispatch.unit.UnitConverter;
+import org.apache.hertzbeat.collector.handler.ChainBootstrap;
+import org.apache.hertzbeat.collector.handler.CollectMetricsDataHandler;
+import org.apache.hertzbeat.collector.listener.CalculateFieldsListener;
+import org.apache.hertzbeat.collector.listener.MetricsDataDeliveryListener;
+import org.apache.hertzbeat.collector.listener.RemoveTimeoutMonitorListener;
+import org.apache.hertzbeat.collector.listener.RerunHandler;
+import org.apache.hertzbeat.collector.listener.ResponseJobDataListener;
+import org.apache.hertzbeat.collector.listener.ValidateResponseListener;
import org.apache.hertzbeat.collector.metrics.HertzBeatMetricsCollector;
+import org.apache.hertzbeat.collector.handler.impl.BatchExecuteTaskChain;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.entity.collector.CollectorMetaData;
import org.apache.hertzbeat.common.timer.Timeout;
import org.apache.hertzbeat.collector.timer.TimerDispatch;
import org.apache.hertzbeat.collector.timer.WheelTimerTask;
@@ -37,17 +48,18 @@ import org.apache.hertzbeat.common.queue.CommonDataQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
/**
* Collection task and response data scheduler
@@ -56,10 +68,6 @@ import java.util.concurrent.atomic.AtomicReference;
@Slf4j
public class CommonDispatcher implements MetricsTaskDispatch,
CollectDataDispatch {
- /**
- * Collection task timeout value
- */
- private static final long DURATION_TIME = 240_000L;
/**
* Trigger sub task max num
*/
@@ -81,16 +89,14 @@ public class CommonDispatcher implements
MetricsTaskDispatch, CollectDataDispatc
* collection data exporter
*/
private final CommonDataQueue commonDataQueue;
- /**
- * Metrics task and start time mapping map
- */
- private final Map<String, MetricsTime> metricsTimeoutMonitorMap;
private final List<UnitConvert> unitConvertList;
private final WorkerPool workerPool;
- private final String collectorIdentity;
+ private final CollectorMetaData metaData;
+
+ private final CollectTaskTimeoutMonitor collectTaskTimeoutMonitor;
@Autowired
private HertzBeatMetricsCollector metricsCollector;
@@ -100,17 +106,24 @@ public class CommonDispatcher implements
MetricsTaskDispatch, CollectDataDispatc
CommonDataQueue commonDataQueue,
WorkerPool workerPool,
CollectJobService collectJobService,
- List<UnitConvert> unitConvertList) {
+ List<UnitConvert> unitConvertList,
+ CollectTaskTimeoutMonitor
collectTaskTimeoutMonitor) {
this.commonDataQueue = commonDataQueue;
this.jobRequestQueue = jobRequestQueue;
this.timerDispatch = timerDispatch;
this.unitConvertList = unitConvertList;
this.workerPool = workerPool;
- this.collectorIdentity = collectJobService.getCollectorIdentity();
- this.metricsTimeoutMonitorMap = new ConcurrentHashMap<>(16);
- this.start();
+ this.metaData = CollectorMetaData.builder()
+ .identity(collectJobService.getCollectorIdentity())
+ .mode(collectJobService.getCollectorMode())
+ .startTime(new Date())
+ .build();
+ this.collectTaskTimeoutMonitor = collectTaskTimeoutMonitor;
+// this.start();
+ this.collectTaskTimeoutMonitor.start(this);
}
+ @Deprecated
public void start() {
try {
// Pull the collection task from the task queue and put it into
the thread pool for execution
@@ -138,91 +151,104 @@ public class CommonDispatcher implements
MetricsTaskDispatch, CollectDataDispatc
}
log.info("Thread Interrupted, Shutdown the
[metrics-task-dispatcher]");
});
- // monitoring metrics collection task execution timeout
- ThreadFactory threadFactory = new ThreadFactoryBuilder()
- .setNameFormat("metrics-task-timeout-monitor-%d")
- .setDaemon(true)
- .build();
- ScheduledThreadPoolExecutor scheduledExecutor = new
ScheduledThreadPoolExecutor(1, threadFactory);
-
scheduledExecutor.scheduleWithFixedDelay(this::monitorCollectTaskTimeout, 2,
20, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Common Dispatcher error: {}.", e.getMessage(), e);
}
}
- private void monitorCollectTaskTimeout() {
- try {
- // Detect whether the collection unit of each metrics has timed
out for 4 minutes,
- // and if it times out, it will be discarded and an exception will
be returned.
- long deadline = System.currentTimeMillis() - DURATION_TIME;
- for (Map.Entry<String, MetricsTime> entry :
metricsTimeoutMonitorMap.entrySet()) {
- MetricsTime metricsTime = entry.getValue();
- if (metricsTime.getStartTime() < deadline) {
- // Metrics collection timeout
- MetricsTime removedMetricsTime =
metricsTimeoutMonitorMap.remove(entry.getKey());
- if (removedMetricsTime == null) {
- continue;
- }
- WheelTimerTask timerJob = (WheelTimerTask)
metricsTime.getTimeout().task();
- Job job = timerJob.getJob();
- // timeout metrics
- if (metricsCollector != null) {
- long duration = System.currentTimeMillis() -
removedMetricsTime.getStartTime();
- metricsCollector.recordCollectMetrics(job, duration,
"timeout");
- }
-
- CollectRep.MetricsData metricsData =
CollectRep.MetricsData.newBuilder()
- .setId(job.getMonitorId())
- .setTenantId(job.getTenantId())
- .setApp(job.getApp())
- .setMetrics(metricsTime.getMetrics().getName())
-
.setPriority(metricsTime.getMetrics().getPriority())
- .setTime(System.currentTimeMillis())
- .setCode(CollectRep.Code.TIMEOUT).setMsg("collect
timeout").build();
- log.error("[Collect Timeout]: \n{}", metricsData);
- if (metricsData.getPriority() == 0) {
- dispatchCollectData(metricsTime.timeout,
metricsTime.getMetrics(), metricsData);
- }
- }
- }
- } catch (Exception e) {
- log.error("[Task Timeout Monitor]-{}.", e.getMessage(), e);
- }
- }
-
@Override
public void dispatchMetricsTask(Timeout timeout) {
// Divide the collection task of a single application into
corresponding collection tasks of the metrics under it.
// Put each collect task into the thread pool for scheduling
WheelTimerTask timerTask = (WheelTimerTask) timeout.task();
Job job = timerTask.getJob();
- job.constructPriorMetrics();
- Set<Metrics> metricsSet = job.getNextCollectMetrics(null, true);
- metricsSet.forEach(metrics -> {
- MetricsCollect metricsCollect = new MetricsCollect(metrics,
timeout, this,
- collectorIdentity, unitConvertList);
- jobRequestQueue.addJob(metricsCollect);
- if (metrics.getPrometheus() != null) {
- metricsTimeoutMonitorMap.put(String.valueOf(job.getId()),
- new MetricsTime(System.currentTimeMillis(), metrics,
timeout));
+
+ ChainBootstrap bootstrap = constructMetricsCollectTaskChain(job);
+ //todo context需要划分作用域
+ bootstrap.addContext(ContextKey.META_DATA, metaData)
+ .addContext(ContextKey.JOB, job)
+ .addContext(ContextKey.TIMEOUT, timeout)
+ .addListener(new CalculateFieldsListener(new
UnitConverter(unitConvertList)))
+ .addListener(new ValidateResponseListener())
+ .addOnCompleteListener(new
RemoveTimeoutMonitorListener(collectTaskTimeoutMonitor));
+
+ if (job.isCyclic()) {
+ bootstrap.withWorkerPool(workerPool)
+ .addListener(new
MetricsDataDeliveryListener(commonDataQueue))
+ .onComplete(new RerunHandler(timerDispatch));
+ } else {
+ bootstrap.addListener(new ResponseJobDataListener(timerDispatch));
+ }
+
+
+ bootstrap.start();
+ }
+
+ private ChainBootstrap constructMetricsCollectTaskChain(Job job) {
+ long now = System.currentTimeMillis();
+ Map<Byte, List<Metrics>> currentCollectMetrics =
job.getMetrics().stream()
+ .filter(metrics -> (now >= metrics.getCollectTime() +
metrics.getInterval() * 1000L))
+ .peek(metric -> {
+ metric.setCollectTime(now);
+ // Determine whether to configure aliasFields If not,
configure the default
+ if ((metric.getAliasFields() == null ||
metric.getAliasFields().isEmpty()) && metric.getFields() != null) {
+
metric.setAliasFields(metric.getFields().stream().map(Metrics.Field::getField).collect(Collectors.toList()));
+ }
+ // Set the default metrics execution priority, if not
filled, the default last priority
+ if (metric.getPriority() == null) {
+ metric.setPriority(Byte.MAX_VALUE);
+ }
+ })
+ .collect(Collectors.groupingBy(Metrics::getPriority));
+
+ // the current collect metrics can not empty, if empty, add a default
availability metrics
+ // due the metric collect is trigger by the previous metric collect
+ if (currentCollectMetrics.isEmpty()) {
+ Optional<Metrics> defaultMetricOption = job.getMetrics().stream()
+ .filter(metric -> metric.getPriority() ==
CommonConstants.AVAILABLE_METRICS).findFirst();
+ if (defaultMetricOption.isPresent()) {
+ Metrics defaultMetric = defaultMetricOption.get();
+ defaultMetric.setCollectTime(now);
+ currentCollectMetrics.put(CommonConstants.AVAILABLE_METRICS,
Collections.singletonList(defaultMetric));
} else {
- metricsTimeoutMonitorMap.put(job.getId() + "-" +
metrics.getName(),
- new MetricsTime(System.currentTimeMillis(), metrics,
timeout));
+ log.error("metrics must has one priority 0 metrics at least.");
}
- });
+ }
+
+ ChainBootstrap chainBootstrap =
ChainBootstrap.withContext(DefaultContext.newInstance())
+ .withChain(new BatchExecuteTaskChain<Metrics>());
+ // order by priority
+ currentCollectMetrics.keySet().stream()
+ .sorted()
+ .forEach(priority -> {
+ if (job.isCyclic() ||
isOneTimeJobAndIsAvailableMetrics(job, priority)) {
+ List<Metrics> metricsList =
currentCollectMetrics.get(priority);
+ CollectMetricsDataHandler collectHandler =
CollectMetricsDataHandler.builder()
+
.collectTaskTimeoutMonitor(collectTaskTimeoutMonitor)
+ .build();
+ collectHandler.setSourceDataList(metricsList);
+
+ chainBootstrap.handler(collectHandler);
+ }
+ });
+
+ return chainBootstrap;
+ }
+
+ private boolean isOneTimeJobAndIsAvailableMetrics(Job job, byte priority) {
+ return (!job.isCyclic()) && priority ==
CommonConstants.AVAILABLE_METRICS;
}
+ @Deprecated
@Override
public void dispatchCollectData(Timeout timeout, Metrics metrics,
CollectRep.MetricsData metricsData) {
WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
Job job = timerJob.getJob();
- String monitorKey;
+ String monitorKey = job.getId() + "-" + metrics.getName();
if (metrics.isHasSubTask()) {
- monitorKey = job.getId() + "-" + metrics.getName() + "-sub-" +
metrics.getSubTaskId();
- } else {
- monitorKey = job.getId() + "-" + metrics.getName();
+ monitorKey = monitorKey + "-sub-" + metrics.getSubTaskId();
}
- MetricsTime metricsTime = metricsTimeoutMonitorMap.remove(monitorKey);
+ CollectTaskTimeoutMonitor.MetricsTime metricsTime =
this.collectTaskTimeoutMonitor.removeMetrics(monitorKey);
// job completed metrics
if (metricsTime != null && metricsCollector != null) {
@@ -238,31 +264,28 @@ public class CommonDispatcher implements
MetricsTaskDispatch, CollectDataDispatc
return;
}
}
+
+
Set<Metrics> metricsSet = job.getNextCollectMetrics(metrics, false);
if (job.isCyclic()) {
- if (log.isDebugEnabled()) {
- log.debug("Cyclic Job: {} - {} - {}", job.getMonitorId(),
job.getApp(), metricsData.getMetrics());
- for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
- for (CollectRep.Field field : metricsData.getFields()) {
- log.debug("Field-->{},Value-->{}", field.getName(),
valueRow.getColumns(metricsData.getFields().indexOf(field)));
- }
- }
- }
+ cyclicJobDebugLog(job, metricsData);
+
// If metricsSet is null, it means that the execution is completed
or whether the priority of the collection metrics is 0, that is, the
availability collection metrics.
// If the availability collection fails, the next metrics
scheduling will be cancelled and the next round of scheduling will be entered
directly.
boolean isAvailableCollectFailed = metricsSet != null &&
!metricsSet.isEmpty()
- && metrics.getPriority() == (byte) 0 &&
metricsData.getCode() != CollectRep.Code.SUCCESS;
+ && metrics.getPriority() ==
CommonConstants.AVAILABLE_METRICS && metricsData.getCode() !=
CollectRep.Code.SUCCESS;
if (metricsSet == null || isAvailableCollectFailed || job.isSd()) {
// The collection and execution task of this job are completed.
// The periodic task pushes the task to the time wheel again.
// First, determine the execution time of the task and the
task collection interval.
if (!timeout.isCancelled()) {
long spendTime = System.currentTimeMillis() -
job.getDispatchTime();
- long interval = job.getInterval() - spendTime / 1000;
+ long interval = job.getInterval() - spendTime / 1000L;
interval = interval <= 0 ? 0 : interval;
timerDispatch.cyclicJob(timerJob, interval,
TimeUnit.SECONDS);
}
- } else if (!metricsSet.isEmpty()) {
+ }
+ else if (!metricsSet.isEmpty()) {
// The execution of the current level metrics is completed,
and the execution of the next level metrics starts
// use pre collect metrics data to replace next metrics config
params
List<Map<String, Configmap>> configmapList =
CollectUtil.getConfigmapFromPreCollectData(metricsData);
@@ -273,12 +296,15 @@ public class CommonDispatcher implements
MetricsTaskDispatch, CollectDataDispatc
Set<String> cryPlaceholderFields =
CollectUtil.matchCryPlaceholderField(GSON.toJsonTree(metricItem));
if (cryPlaceholderFields.isEmpty()) {
MetricsCollect metricsCollect = new
MetricsCollect(metricItem, timeout, this,
- collectorIdentity, unitConvertList);
+ metaData.getIdentity(), unitConvertList);
jobRequestQueue.addJob(metricsCollect);
- metricsTimeoutMonitorMap.put(job.getId() + "-" +
metricItem.getName(),
- new MetricsTime(System.currentTimeMillis(),
metricItem, timeout));
+
+ this.collectTaskTimeoutMonitor.putMetrics(job.getId()
+ "-" + metricItem.getName(),
+ new
CollectTaskTimeoutMonitor.MetricsTime(System.currentTimeMillis(), metricItem,
timeout));
continue;
}
+
+
boolean isSubTask = configmapList.stream().anyMatch(map ->
map.keySet().stream().anyMatch(cryPlaceholderFields::contains));
int subTaskNum = isSubTask ?
Math.min(configmapList.size(), MAX_SUB_TASK_NUM) : 1;
AtomicInteger subTaskNumAtomic = new
AtomicInteger(subTaskNum);
@@ -294,95 +320,30 @@ public class CommonDispatcher implements
MetricsTaskDispatch, CollectDataDispatc
metric.setSubTaskId(index);
metric.setSubTaskDataRef(metricsDataReference);
MetricsCollect metricsCollect = new
MetricsCollect(metric, timeout, this,
- collectorIdentity, unitConvertList);
+ metaData.getIdentity(), unitConvertList);
jobRequestQueue.addJob(metricsCollect);
- metricsTimeoutMonitorMap.put(job.getId() + "-" +
metric.getName() + "-sub-" + index,
- new MetricsTime(System.currentTimeMillis(),
metric, timeout));
- }
- }
- } else {
- // The list of metrics at the current execution level has not
been fully executed.
- // It needs to wait for the execution of other metrics task of
the same level to complete the execution and enter the next level for execution.
- }
- // If it is an asynchronous periodic cyclic task, directly
response the collected data
- if (job.isSd()) {
- CollectRep.MetricsData sdMetricsData =
CollectRep.MetricsData.newBuilder(metricsData).build();
- commonDataQueue.sendServiceDiscoveryData(sdMetricsData);
- }
- commonDataQueue.sendMetricsData(metricsData);
- } else {
- // If it is a temporary one-time task, you need to wait for the
collected data of all metrics task to be packaged and returned.
- // Insert the current metrics data into the job for unified
assembly
- job.addCollectMetricsData(metricsData);
- if (log.isDebugEnabled()) {
- log.debug("One-time Job: {}", metricsData.getMetrics());
- for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
- for (CollectRep.Field field : metricsData.getFields()) {
- log.debug("Field-->{},Value-->{}", field.getName(),
valueRow.getColumns(metricsData.getFields().indexOf(field)));
+ this.collectTaskTimeoutMonitor.putMetrics(job.getId()
+ "-" + metric.getName() + "-sub-" + index,
+ new
CollectTaskTimeoutMonitor.MetricsTime(System.currentTimeMillis(), metric,
timeout));
}
- }
- }
- if (job.isSd() || metricsSet == null) {
- // The collection and execution of all metrics of this job are
completed
- // and the result listener is notified of the combination of
all metrics data
- timerDispatch.responseSyncJobData(job.getId(),
job.getResponseDataTemp());
- } else if (!metricsSet.isEmpty()) {
- // The execution of the current level metrics is completed,
and the execution of the next level metrics starts
- metricsSet.forEach(metricItem -> {
- MetricsCollect metricsCollect = new
MetricsCollect(metricItem, timeout, this,
- collectorIdentity, unitConvertList);
- jobRequestQueue.addJob(metricsCollect);
- metricsTimeoutMonitorMap.put(job.getId() + "-" +
metricItem.getName(),
- new MetricsTime(System.currentTimeMillis(),
metricItem, timeout));
- });
- } else {
- // The list of metrics task at the current execution level has
not been fully executed.
- // It needs to wait for the execution of other metrics task of
the same level to complete the execution and enter the next level for execution.
+ }
}
}
}
- @Override
- public void dispatchCollectData(Timeout timeout, Metrics metrics,
List<CollectRep.MetricsData> metricsDataList) {
- WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
- Job job = timerJob.getJob();
- MetricsTime metricsTime =
metricsTimeoutMonitorMap.remove(String.valueOf(job.getId()));
- if (metricsTime != null && metricsCollector != null) {
- long duration = System.currentTimeMillis() -
metricsTime.getStartTime();
- // For a list, we consider it a success if at least one item is
successful.
- boolean isSuccess = metricsDataList.stream().anyMatch(item ->
item.getCode() == CollectRep.Code.SUCCESS);
- metricsCollector.recordCollectMetrics(job, duration, isSuccess ?
"success" : "fail");
- }
- if (job.isCyclic()) {
- // The collection and execution of all task of this job are
completed.
- // The periodic task pushes the task to the time wheel again.
- // First, determine the execution time of the task and the task
collection interval.
- if (!timeout.isCancelled()) {
- long spendTime = System.currentTimeMillis() -
job.getDispatchTime();
- long interval = job.getInterval() - spendTime / 1000;
- interval = interval <= 0 ? 0 : interval;
- timerDispatch.cyclicJob(timerJob, interval, TimeUnit.SECONDS);
- }
- // it is an asynchronous periodic cyclic task, directly response
the collected data
- metricsDataList.forEach(commonDataQueue::sendMetricsData);
- } else {
- // The collection and execution of all metrics of this job are
completed
- // and the result listener is notified of the combination of all
metrics data
- timerDispatch.responseSyncJobData(job.getId(), metricsDataList);
+ private void cyclicJobDebugLog(Job job, CollectRep.MetricsData
metricsData) {
+ if (log.isDebugEnabled()) {
+ log.debug("Cyclic Job: {} - {} - {}", job.getMonitorId(),
job.getApp(), metricsData.getMetrics());
+ metricsDataDebugLog(metricsData);
}
}
-
- /**
- * Metrics times.
- */
- @Data
- @AllArgsConstructor
- protected static class MetricsTime {
- private long startTime;
- private Metrics metrics;
- private Timeout timeout;
+ private void metricsDataDebugLog(CollectRep.MetricsData metricsData) {
+ for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
+ for (CollectRep.Field field : metricsData.getFields()) {
+ log.debug("Field-->{},Value-->{}", field.getName(),
valueRow.getColumns(metricsData.getFields().indexOf(field)));
+ }
+ }
}
}
\ No newline at end of file
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
index 94b202259..c1c7822f7 100644
---
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
@@ -129,7 +129,6 @@ public class MetricsCollect implements Runnable,
Comparable<MetricsCollect> {
this.newTime = System.currentTimeMillis();
this.timeout = timeout;
this.metrics = metrics;
- this.collectorIdentity = collectorIdentity;
WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
Job job = timerJob.getJob();
this.id = job.getMonitorId();
@@ -153,327 +152,9 @@ public class MetricsCollect implements Runnable,
Comparable<MetricsCollect> {
@Override
public void run() {
- this.startTime = System.currentTimeMillis();
- setNewThreadName(id, app, startTime, metrics);
- CollectRep.MetricsData.Builder response =
CollectRep.MetricsData.newBuilder();
- response.setApp(app).setId(id).setTenantId(tenantId)
-
.setLabels(labels).setAnnotations(annotations).addMetadataAll(metadata);
- // for prometheus auto or proxy mode
- if
(DispatchConstants.PROTOCOL_PROMETHEUS.equalsIgnoreCase(metrics.getProtocol()))
{
- List<CollectRep.MetricsData> metricsData;
-
- // TODO: Refactor Prometheus metrics collection logic.
- // The current implementation for proxy mode and auto mode needs
review and potential simplification.
- // Consider a more unified approach or clarify the conditions for
each mode.
- /*
- // TODO USE PROXY MODE
- if (prometheusProxyMode) {
- List<CollectRep.MetricsData> proxyData =
PrometheusProxyCollectImpl.getInstance().collect(response, metrics);
- List<CollectRep.MetricsData> autoData =
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
- metricsData = new LinkedList<>();
- if (proxyData != null) {
- metricsData.addAll(proxyData);
- }
- if (autoData != null) {
- metricsData.addAll(autoData);
- }
- } else {
- metricsData =
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
- }
- */
- metricsData =
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
- validateResponse(metricsData == null ? null :
metricsData.stream().findFirst().orElse(null));
- collectDataDispatch.dispatchCollectData(timeout, metrics,
metricsData);
- return;
- }
- response.setMetrics(metrics.getName());
- // According to the metrics collection protocol, application type,
etc.,
- // dispatch to the real application metrics collection implementation
class
- AbstractCollect abstractCollect =
CollectStrategyFactory.invoke(metrics.getProtocol());
- if (abstractCollect == null) {
- log.error("[Dispatcher] - not support this: app: {}, metrics: {},
protocol: {}.",
- app, metrics.getName(), metrics.getProtocol());
- response.setCode(CollectRep.Code.FAIL);
- response.setMsg("not support " + app + ", "
- + metrics.getName() + ", " + metrics.getProtocol());
- } else {
- try {
- abstractCollect.preCheck(metrics);
- abstractCollect.collect(response, metrics);
- } catch (Exception e) {
- String msg = e.getMessage();
- if (msg == null && e.getCause() != null) {
- msg = e.getCause().getMessage();
- }
- if (e instanceof IllegalArgumentException) {
- log.error("[Metrics PreCheck]: {}.", msg, e);
- } else {
- log.error("[Metrics Collect]: {}.", msg, e);
- }
- response.setCode(CollectRep.Code.FAIL);
- if (msg != null) {
- response.setMsg(msg);
- }
- }
- }
- // Alias attribute expression replacement calculation
- if (fastFailed()) {
- return;
- }
- calculateFields(metrics, response);
- CollectRep.MetricsData metricsData = validateResponse(response);
- collectDataDispatch.dispatchCollectData(timeout, metrics, metricsData);
- }
-
- /**
- * Calculate the real metrics value according to the calculates and
aliasFields configuration
- *
- * @param metrics Metrics configuration
- * @param collectData Data collection
- */
- public void calculateFields(Metrics metrics,
CollectRep.MetricsData.Builder collectData) {
- collectData.setPriority(metrics.getPriority());
- List<CollectRep.Field> fieldList = new LinkedList<>();
- for (Metrics.Field field : metrics.getFields()) {
- CollectRep.Field.Builder fieldBuilder =
CollectRep.Field.newBuilder();
-
fieldBuilder.setName(field.getField()).setType(field.getType()).setLabel(field.isLabel());
- if (field.getUnit() != null) {
- fieldBuilder.setUnit(field.getUnit());
- }
- fieldList.add(fieldBuilder.build());
- }
- collectData.addAllFields(fieldList);
- List<CollectRep.ValueRow> aliasRowList = collectData.getValuesList();
- if (aliasRowList == null || aliasRowList.isEmpty()) {
- return;
- }
- collectData.clearValues();
- // Preprocess calculates first
- if (metrics.getCalculates() == null) {
- metrics.setCalculates(Collections.emptyList());
- }
- // eg: database_pages=Database pages unconventional mapping
- Map<String, String> fieldAliasMap = new HashMap<>(8);
- Map<String, JexlExpression> fieldExpressionMap =
metrics.getCalculates()
- .stream()
- .map(cal -> transformCal(cal, fieldAliasMap))
- .filter(Objects::nonNull)
- .collect(Collectors.toMap(arr -> (String) arr[0], arr ->
(JexlExpression) arr[1], (oldValue, newValue) -> newValue));
-
- if (metrics.getUnits() == null) {
- metrics.setUnits(Collections.emptyList());
- }
- Map<String, Pair<String, String>> fieldUnitMap = metrics.getUnits()
- .stream()
- .map(this::transformUnit)
- .filter(Objects::nonNull)
- .collect(Collectors.toMap(arr -> (String) arr[0], arr ->
(Pair<String, String>) arr[1], (oldValue, newValue) -> newValue));
-
- List<Metrics.Field> fields = metrics.getFields();
- List<String> aliasFields =
Optional.ofNullable(metrics.getAliasFields()).orElseGet(Collections::emptyList);
- Map<String, String> aliasFieldValueMap = new HashMap<>(8);
- Map<String, Object> fieldValueMap = new HashMap<>(8);
- Map<String, Object> stringTypefieldValueMap = new HashMap<>(8);
- Map<String, String> aliasFieldUnitMap = new HashMap<>(8);
- CollectRep.ValueRow.Builder realValueRowBuilder =
CollectRep.ValueRow.newBuilder();
- for (CollectRep.ValueRow aliasRow : aliasRowList) {
- for (int aliasIndex = 0; aliasIndex < aliasFields.size();
aliasIndex++) {
- String aliasFieldValue = aliasRow.getColumns(aliasIndex);
- String aliasField = aliasFields.get(aliasIndex);
- if (!CommonConstants.NULL_VALUE.equals(aliasFieldValue)) {
- aliasFieldValueMap.put(aliasField, aliasFieldValue);
- // whether the alias field is a number
- CollectUtil.DoubleAndUnit doubleAndUnit = CollectUtil
- .extractDoubleAndUnitFromStr(aliasFieldValue);
- if (doubleAndUnit != null && doubleAndUnit.getValue() !=
null) {
- fieldValueMap.put(aliasField,
doubleAndUnit.getValue());
- if (doubleAndUnit.getUnit() != null) {
- aliasFieldUnitMap.put(aliasField,
doubleAndUnit.getUnit());
- }
- } else {
- fieldValueMap.put(aliasField, aliasFieldValue);
- }
- stringTypefieldValueMap.put(aliasField, aliasFieldValue);
- } else {
- fieldValueMap.put(aliasField, null);
- stringTypefieldValueMap.put(aliasField, null);
- }
- }
-
- for (Metrics.Field field : fields) {
- String realField = field.getField();
- JexlExpression expression = fieldExpressionMap.get(realField);
- String value = null;
- String aliasFieldUnit = null;
- if (expression != null) {
- try {
- Map<String, Object> context;
- if (CommonConstants.TYPE_STRING == field.getType()) {
- context = stringTypefieldValueMap;
- } else {
- for (Map.Entry<String, String> unitEntry :
aliasFieldUnitMap.entrySet()) {
- if
(expression.getSourceText().contains(unitEntry.getKey())) {
- aliasFieldUnit = unitEntry.getValue();
- break;
- }
- }
- context = fieldValueMap;
- }
-
- // Also executed when valueList is empty, covering
pure string assignment expressions
- Object objValue =
JexlExpressionRunner.evaluate(expression, context);
-
- if (objValue != null) {
- value = String.valueOf(objValue);
- }
- } catch (Exception e) {
- log.warn("[calculates execute warning, use original
value.] {}", e.getMessage());
- value =
Optional.ofNullable(fieldValueMap.get(expression.getSourceText()))
- .map(String::valueOf)
- .orElse(null);
- }
- } else {
- // does not exist then map the alias value
- String aliasField = fieldAliasMap.get(realField);
- if (aliasField != null) {
- value = aliasFieldValueMap.get(aliasField);
- } else {
- value = aliasFieldValueMap.get(realField);
- }
-
- if (value != null) {
- final byte fieldType = field.getType();
- if (fieldType == CommonConstants.TYPE_NUMBER) {
- CollectUtil.DoubleAndUnit doubleAndUnit =
CollectUtil
- .extractDoubleAndUnitFromStr(value);
- final Double tempValue = doubleAndUnit == null ?
null : doubleAndUnit.getValue();
- value = tempValue == null ? null :
String.valueOf(tempValue);
- aliasFieldUnit = doubleAndUnit == null ? null :
doubleAndUnit.getUnit();
- } else if (fieldType == CommonConstants.TYPE_TIME) {
- final int tempValue;
- value = (tempValue =
CommonUtil.parseTimeStrToSecond(value)) == -1 ? null :
String.valueOf(tempValue);
- }
- }
- }
-
- Pair<String, String> unitPair = fieldUnitMap.get(realField);
- if (aliasFieldUnit != null) {
- if (unitPair != null) {
- unitPair.setLeft(aliasFieldUnit);
- } else if (field.getUnit() != null &&
!aliasFieldUnit.equalsIgnoreCase(field.getUnit())) {
- unitPair = Pair.of(aliasFieldUnit, field.getUnit());
- }
- }
- if (value != null && unitPair != null) {
- for (UnitConvert unitConvert : unitConvertList) {
- if (unitConvert.checkUnit(unitPair.getLeft()) &&
unitConvert.checkUnit(unitPair.getRight())) {
- value = unitConvert.convert(value,
unitPair.getLeft(), unitPair.getRight());
- }
- }
- }
- // Handle metrics values that may have units such as 34%,
34Mb, and limit values to 4 decimal places
- if (CommonConstants.TYPE_NUMBER == field.getType()) {
- value = CommonUtil.parseDoubleStr(value, field.getUnit());
- }
- if (value == null) {
- value = CommonConstants.NULL_VALUE;
- }
- realValueRowBuilder.addColumn(value);
- }
- aliasFieldValueMap.clear();
- fieldValueMap.clear();
- aliasFieldUnitMap.clear();
- stringTypefieldValueMap.clear();
- CollectRep.ValueRow realValueRow = realValueRowBuilder.build();
- realValueRowBuilder.clear();
- // apply filter calculation to the real value row
- if (!CollectionUtils.isEmpty(metrics.getFilters())) {
- Map<String, Object> contextMap = new HashMap<>(8);
- for (int i = 0; i < fields.size(); i++) {
- Metrics.Field field = fields.get(i);
- String value = realValueRow.getColumns(i);
- contextMap.put(field.getField(), value);
- }
- boolean isMatch = false;
- for (String filterExpr : metrics.getFilters()) {
- try {
- JexlExpression expression =
JexlExpressionRunner.compile(filterExpr);
- if ((Boolean)
JexlExpressionRunner.evaluate(expression, contextMap)) {
- isMatch = true;
- break;
- }
- } catch (Exception e) {
- log.warn("[metrics data row filters execute warning]
{}.", e.getMessage());
- }
- }
- if (!isMatch) {
- // ignore this data row
- continue;
- }
- }
- collectData.addValueRow(realValueRow);
- }
+// collectDataDispatch.dispatchCollectData(timeout, metrics,
metricsData);
}
- /**
- * @param cal cal
- * @param fieldAliasMap field alias map
- * @return expr
- */
- private Object[] transformCal(String cal, Map<String, String>
fieldAliasMap) {
- int splitIndex = cal.indexOf("=");
- if (splitIndex < 0) {
- return null;
- }
- String field = cal.substring(0, splitIndex).trim();
- String expressionStr = cal.substring(splitIndex +
1).trim().replace("\\#", "#");
- JexlExpression expression;
- try {
- expression = JexlExpressionRunner.compile(expressionStr);
- } catch (Exception e) {
- fieldAliasMap.put(field, expressionStr);
- return null;
- }
- return new Object[]{field, expression};
- }
-
- /**
- * transform unit
- *
- * @param unit unit
- * @return units
- */
- private Object[] transformUnit(String unit) {
- int equalIndex = unit.indexOf("=");
- int arrowIndex = unit.indexOf("->");
- if (equalIndex < 0 || arrowIndex < 0) {
- return null;
- }
- String field = unit.substring(0, equalIndex).trim();
- String originUnit = unit.substring(equalIndex + 1, arrowIndex).trim();
- String newUnit = unit.substring(arrowIndex + 2).trim();
- return new Object[]{field, Pair.of(originUnit, newUnit)};
- }
-
- private boolean fastFailed() {
- return this.timeout == null || this.timeout.isCancelled();
- }
-
- private CollectRep.MetricsData
validateResponse(CollectRep.MetricsData.Builder builder) {
- long endTime = System.currentTimeMillis();
- builder.setTime(endTime);
- long runningTime = endTime - startTime;
- long allTime = endTime - newTime;
- if (startTime - newTime >= WARN_DISPATCH_TIME) {
- log.warn("[Collector Dispatch Warn, Dispatch Use {}ms.", startTime
- newTime);
- }
- if (builder.getCode() != CollectRep.Code.SUCCESS) {
- log.info("[Collect Failed, Run {}ms, All {}ms] Reason: {}",
runningTime, allTime, builder.getMsg());
- } else {
- log.info("[Collect Success, Run {}ms, All {}ms].", runningTime,
allTime);
- }
- return builder.build();
- }
private void validateResponse(CollectRep.MetricsData metricsData) {
if (metricsData == null) {
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataHandler.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataHandler.java
new file mode 100644
index 000000000..f3cca9d8d
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataHandler.java
@@ -0,0 +1,110 @@
+package org.apache.hertzbeat.collector.handler;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.collect.AbstractCollect;
+import org.apache.hertzbeat.collector.collect.strategy.CollectStrategyFactory;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.constants.ContextStatus;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.dispatch.CollectTaskTimeoutMonitor;
+import
org.apache.hertzbeat.collector.handler.impl.AbstractBatchDataBoundHandler;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.timer.Timeout;
+
+
+@Data
+@Slf4j
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+public class CollectMetricsDataHandler extends
AbstractBatchDataBoundHandler<Metrics, CollectRep.MetricsData.Builder> {
+ private CollectTaskTimeoutMonitor collectTaskTimeoutMonitor;
+
+ @Override
+ public CollectRep.MetricsData.Builder executeWithResponse(Context context,
Metrics data) {
+ // preset start info
+ context.put(ContextKey.METRICS, data);
+ Job job = context.get(ContextKey.JOB);
+ long startTime = context.get(ContextKey.METRICS_COLLECT_START_TIME);
+ setNewThreadName(job.getMonitorId(), job.getApp(), startTime, data);
+
+
+ Timeout timeout = context.get(ContextKey.TIMEOUT);
+ String key = data.getPrometheus() != null ?
String.valueOf(job.getId()) : job.getId() + "-" + data.getName();
+ context.put(ContextKey.METRICS_KEY, key);
+ this.collectTaskTimeoutMonitor.putMetrics(key, new
CollectTaskTimeoutMonitor.MetricsTime(startTime, data, timeout));
+
+
+ CollectRep.MetricsData.Builder fetchedData = this.fetchData(job, data);
+ if (fetchedData.getCode() != CollectRep.Code.SUCCESS &&
CommonConstants.AVAILABLE_METRICS == data.getPriority()) {
+ context.setStatus(ContextStatus.TRUNCATE_HANDLER);
+ }
+ return fetchedData;
+ }
+
+ private void setNewThreadName(long monitorId, String app, long startTime,
Metrics metrics) {
+ String builder = monitorId + "-" + app + "-" + metrics.getName() + "-"
+ String.valueOf(startTime).substring(9);
+ Thread.currentThread().setName(builder);
+ }
+
+ private CollectRep.MetricsData.Builder fetchData(Job job, Metrics metrics)
{
+ CollectRep.MetricsData.Builder response =
CollectRep.MetricsData.newBuilder();
+ response.setApp(job.getApp())
+ .setId(job.getMonitorId())
+ .setTenantId(job.getTenantId())
+ .setLabels(job.getLabels())
+ .setAnnotations(job.getAnnotations())
+ .addMetadataAll(job.getMetadata());
+
+ //todo transcribe Prometheus to different chain
+ // for prometheus auto or proxy mode
+// if
(DispatchConstants.PROTOCOL_PROMETHEUS.equalsIgnoreCase(metrics.getProtocol()))
{
+// List<CollectRep.MetricsData> metricsData =
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
+// validateResponse(metricsData == null ? null :
metricsData.stream().findFirst().orElse(null));
+// collectDataDispatch.dispatchCollectData(timeout, metrics,
metricsData);
+// return null;
+// }
+
+ response.setMetrics(metrics.getName());
+ // According to the metrics collection protocol, application type,
etc.,
+ // dispatch to the real application metrics collection implementation
class
+ AbstractCollect abstractCollect =
CollectStrategyFactory.invoke(metrics.getProtocol());
+ if (abstractCollect == null) {
+ log.error("[Dispatcher] - not support this: app: {}, metrics: {},
protocol: {}.", job.getApp(), metrics.getName(), metrics.getProtocol());
+ response.setCode(CollectRep.Code.FAIL);
+ response.setMsg("not support " + job.getApp() + ", " +
metrics.getName() + ", " + metrics.getProtocol());
+
+ return response;
+ }
+
+ try {
+ abstractCollect.preCheck(metrics);
+ abstractCollect.collect(response, metrics);
+ } catch (Exception e) {
+ String msg = e.getMessage();
+ if (msg == null && e.getCause() != null) {
+ msg = e.getCause().getMessage();
+ }
+ if (e instanceof IllegalArgumentException) {
+ log.error("[Metrics PreCheck]: {}.", msg, e);
+ } else {
+ log.error("[Metrics Collect]: {}.", msg, e);
+ }
+ response.setCode(CollectRep.Code.FAIL);
+ if (msg != null) {
+ response.setMsg(msg);
+ }
+ }
+
+ return response;
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataHandler.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataHandler.java
new file mode 100644
index 000000000..74cbea66c
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataHandler.java
@@ -0,0 +1,17 @@
+package org.apache.hertzbeat.collector.handler;
+
+import org.apache.hertzbeat.collector.context.Context;
+import
org.apache.hertzbeat.collector.handler.impl.AbstractBatchDataBoundHandler;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+
+/**
+ *
+ */
+public class DynamicSubTaskCollectMetricsDataHandler extends
AbstractBatchDataBoundHandler<Metrics, CollectRep.MetricsData.Builder> {
+ @Override
+ public CollectRep.MetricsData.Builder executeWithResponse(Context context,
Metrics data) {
+ //todo 动态拆分
+ return null;
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
similarity index 54%
copy from
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
copy to
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
index 94b202259..0a2a8b333 100644
---
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
@@ -1,34 +1,15 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+package org.apache.hertzbeat.collector.listener;
-package org.apache.hertzbeat.collector.dispatch;
-
-import lombok.Data;
+import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.jexl3.JexlExpression;
-import org.apache.hertzbeat.collector.collect.AbstractCollect;
-import
org.apache.hertzbeat.collector.collect.prometheus.PrometheusAutoCollectImpl;
-import org.apache.hertzbeat.collector.collect.strategy.CollectStrategyFactory;
-import org.apache.hertzbeat.common.timer.Timeout;
-import org.apache.hertzbeat.collector.timer.WheelTimerTask;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
import org.apache.hertzbeat.collector.dispatch.unit.UnitConvert;
+import org.apache.hertzbeat.collector.dispatch.unit.UnitConverter;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
import org.apache.hertzbeat.collector.util.CollectUtil;
import org.apache.hertzbeat.common.constants.CommonConstants;
-import org.apache.hertzbeat.common.entity.job.Job;
import org.apache.hertzbeat.common.entity.job.Metrics;
import org.apache.hertzbeat.common.entity.message.CollectRep;
import org.apache.hertzbeat.common.util.CommonUtil;
@@ -46,183 +27,18 @@ import java.util.Optional;
import java.util.stream.Collectors;
/**
- * metrics collection
+ *
*/
@Slf4j
-@Data
-public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
- /**
- * Scheduling alarm threshold time 100ms
- */
- private static final long WARN_DISPATCH_TIME = 100;
- /**
- * collector identity
- */
- protected String collectorIdentity;
- /**
- * tenant id
- */
- protected long tenantId;
- /**
- * task id
- */
- protected long id;
- /**
- * app type name
- */
- protected String app;
- /**
- * metrics configuration
- */
- protected Metrics metrics;
- /**
- * metadata
- */
- protected Map<String, String> metadata;
- /**
- * labels
- */
- protected Map<String, String> labels;
- /**
- * annotations
- */
- protected Map<String, String> annotations;
- /**
- * time wheel timeout
- */
- protected Timeout timeout;
- /**
- * Task and Data Scheduling
- */
- protected CollectDataDispatch collectDataDispatch;
- /**
- * task execution priority
- */
- protected byte runPriority;
- /**
- * Periodic collection or one-time collection true-periodic false-one-time
- */
- protected boolean isCyclic;
- /**
- * Time for creating collection task
- */
- protected long newTime;
- /**
- * Start time of the collection task
- */
- protected long startTime;
- /**
- * Whether it is a service discovery job, true is yes, false is no
- */
- protected boolean isSd;
- /**
- * Whether to use the Prometheus proxy
- */
- protected boolean prometheusProxyMode;
-
- protected List<UnitConvert> unitConvertList;
-
- public MetricsCollect(Metrics metrics, Timeout timeout,
- CollectDataDispatch collectDataDispatch,
- String collectorIdentity,
- List<UnitConvert> unitConvertList) {
- this.newTime = System.currentTimeMillis();
- this.timeout = timeout;
- this.metrics = metrics;
- this.collectorIdentity = collectorIdentity;
- WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
- Job job = timerJob.getJob();
- this.id = job.getMonitorId();
- this.tenantId = job.getTenantId();
- this.app = job.getApp();
- this.metadata = job.getMetadata();
- this.labels = job.getLabels();
- this.annotations = job.getAnnotations();
- this.collectDataDispatch = collectDataDispatch;
- this.isCyclic = job.isCyclic();
- this.isSd = job.isSd();
- this.prometheusProxyMode = job.isPrometheusProxyMode();
- this.unitConvertList = unitConvertList;
- // Temporary one-time tasks are executed with high priority
- if (isCyclic) {
- runPriority = (byte) -1;
- } else {
- runPriority = (byte) 1;
- }
- }
+@AllArgsConstructor
+public class CalculateFieldsListener implements
ContextBoundListener<CollectRep.MetricsData.Builder> {
+ private UnitConverter unitConverter;
@Override
- public void run() {
- this.startTime = System.currentTimeMillis();
- setNewThreadName(id, app, startTime, metrics);
- CollectRep.MetricsData.Builder response =
CollectRep.MetricsData.newBuilder();
- response.setApp(app).setId(id).setTenantId(tenantId)
-
.setLabels(labels).setAnnotations(annotations).addMetadataAll(metadata);
- // for prometheus auto or proxy mode
- if
(DispatchConstants.PROTOCOL_PROMETHEUS.equalsIgnoreCase(metrics.getProtocol()))
{
- List<CollectRep.MetricsData> metricsData;
+ public void execute(Context context, CollectRep.MetricsData.Builder data) {
+ Metrics metrics = context.get(ContextKey.METRICS);
- // TODO: Refactor Prometheus metrics collection logic.
- // The current implementation for proxy mode and auto mode needs
review and potential simplification.
- // Consider a more unified approach or clarify the conditions for
each mode.
- /*
- // TODO USE PROXY MODE
- if (prometheusProxyMode) {
- List<CollectRep.MetricsData> proxyData =
PrometheusProxyCollectImpl.getInstance().collect(response, metrics);
- List<CollectRep.MetricsData> autoData =
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
- metricsData = new LinkedList<>();
- if (proxyData != null) {
- metricsData.addAll(proxyData);
- }
- if (autoData != null) {
- metricsData.addAll(autoData);
- }
- } else {
- metricsData =
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
- }
- */
- metricsData =
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
- validateResponse(metricsData == null ? null :
metricsData.stream().findFirst().orElse(null));
- collectDataDispatch.dispatchCollectData(timeout, metrics,
metricsData);
- return;
- }
- response.setMetrics(metrics.getName());
- // According to the metrics collection protocol, application type,
etc.,
- // dispatch to the real application metrics collection implementation
class
- AbstractCollect abstractCollect =
CollectStrategyFactory.invoke(metrics.getProtocol());
- if (abstractCollect == null) {
- log.error("[Dispatcher] - not support this: app: {}, metrics: {},
protocol: {}.",
- app, metrics.getName(), metrics.getProtocol());
- response.setCode(CollectRep.Code.FAIL);
- response.setMsg("not support " + app + ", "
- + metrics.getName() + ", " + metrics.getProtocol());
- } else {
- try {
- abstractCollect.preCheck(metrics);
- abstractCollect.collect(response, metrics);
- } catch (Exception e) {
- String msg = e.getMessage();
- if (msg == null && e.getCause() != null) {
- msg = e.getCause().getMessage();
- }
- if (e instanceof IllegalArgumentException) {
- log.error("[Metrics PreCheck]: {}.", msg, e);
- } else {
- log.error("[Metrics Collect]: {}.", msg, e);
- }
- response.setCode(CollectRep.Code.FAIL);
- if (msg != null) {
- response.setMsg(msg);
- }
- }
- }
- // Alias attribute expression replacement calculation
- if (fastFailed()) {
- return;
- }
- calculateFields(metrics, response);
- CollectRep.MetricsData metricsData = validateResponse(response);
- collectDataDispatch.dispatchCollectData(timeout, metrics, metricsData);
+ this.calculateFields(metrics, unitConverter.getUnitConvertList(),
data);
}
/**
@@ -231,7 +47,7 @@ public class MetricsCollect implements Runnable,
Comparable<MetricsCollect> {
* @param metrics Metrics configuration
* @param collectData Data collection
*/
- public void calculateFields(Metrics metrics,
CollectRep.MetricsData.Builder collectData) {
+ private void calculateFields(Metrics metrics, List<UnitConvert>
unitConvertList, CollectRep.MetricsData.Builder collectData) {
collectData.setPriority(metrics.getPriority());
List<CollectRep.Field> fieldList = new LinkedList<>();
for (Metrics.Field field : metrics.getFields()) {
@@ -454,53 +270,4 @@ public class MetricsCollect implements Runnable,
Comparable<MetricsCollect> {
String newUnit = unit.substring(arrowIndex + 2).trim();
return new Object[]{field, Pair.of(originUnit, newUnit)};
}
-
- private boolean fastFailed() {
- return this.timeout == null || this.timeout.isCancelled();
- }
-
- private CollectRep.MetricsData
validateResponse(CollectRep.MetricsData.Builder builder) {
- long endTime = System.currentTimeMillis();
- builder.setTime(endTime);
- long runningTime = endTime - startTime;
- long allTime = endTime - newTime;
- if (startTime - newTime >= WARN_DISPATCH_TIME) {
- log.warn("[Collector Dispatch Warn, Dispatch Use {}ms.", startTime
- newTime);
- }
- if (builder.getCode() != CollectRep.Code.SUCCESS) {
- log.info("[Collect Failed, Run {}ms, All {}ms] Reason: {}",
runningTime, allTime, builder.getMsg());
- } else {
- log.info("[Collect Success, Run {}ms, All {}ms].", runningTime,
allTime);
- }
- return builder.build();
- }
-
- private void validateResponse(CollectRep.MetricsData metricsData) {
- if (metricsData == null) {
- log.error("[Collect Failed] Response metrics data is null.");
- return;
- }
- long endTime = System.currentTimeMillis();
- long runningTime = endTime - startTime;
- long allTime = endTime - newTime;
- if (startTime - newTime >= WARN_DISPATCH_TIME) {
- log.warn("[Collector Dispatch Warn, Dispatch Use {}ms.", startTime
- newTime);
- }
- if (metricsData.getCode() != CollectRep.Code.SUCCESS) {
- log.info("[Collect Failed, Run {}ms, All {}ms] Reason: {}",
runningTime, allTime, metricsData.getMsg());
- } else {
- log.info("[Collect Success, Run {}ms, All {}ms].", runningTime,
allTime);
- }
- }
-
- private void setNewThreadName(long monitorId, String app, long startTime,
Metrics metrics) {
- String builder = monitorId + "-" + app + "-" + metrics.getName()
- + "-" + String.valueOf(startTime).substring(9);
- Thread.currentThread().setName(builder);
- }
-
- @Override
- public int compareTo(MetricsCollect collect) {
- return collect.runPriority - this.runPriority;
- }
}
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CommonMetricsDataListener.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CommonMetricsDataListener.java
new file mode 100644
index 000000000..fb07fea97
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CommonMetricsDataListener.java
@@ -0,0 +1,21 @@
+package org.apache.hertzbeat.collector.listener;
+
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.constants.ContextStatus;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
+import org.apache.hertzbeat.common.timer.Timeout;
+
+/**
+ *
+ */
+public class CommonMetricsDataListener<T> implements ContextBoundListener<T> {
+ @Override
+ public void execute(Context context, T data) {
+ Timeout timeout = context.get(ContextKey.TIMEOUT);
+
+ if (timeout == null || timeout.isCancelled()) {
+ context.setStatus(ContextStatus.STOP);
+ }
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/MetricsDataDeliveryListener.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/MetricsDataDeliveryListener.java
new file mode 100644
index 000000000..6da70fcef
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/MetricsDataDeliveryListener.java
@@ -0,0 +1,52 @@
+package org.apache.hertzbeat.collector.listener;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.queue.CommonDataQueue;
+
+/**
+ * 周期任务专用
+ */
+@Slf4j
+@AllArgsConstructor
+public class MetricsDataDeliveryListener implements
ContextBoundListener<CollectRep.MetricsData.Builder> {
+ private CommonDataQueue commonDataQueue;
+
+ @Override
+ public void execute(Context context, CollectRep.MetricsData.Builder data) {
+ Job job = context.get(ContextKey.JOB);
+ CollectRep.MetricsData metricsData = data.build();
+
+ cyclicJobDebugLog(job, metricsData);
+
+ sendToQueue(job, metricsData);
+ }
+
+ private void sendToQueue(Job job, CollectRep.MetricsData metricsData) {
+ if (job.isSd()) {
+ CollectRep.MetricsData sdMetricsData =
CollectRep.MetricsData.newBuilder(metricsData).build();
+ commonDataQueue.sendServiceDiscoveryData(sdMetricsData);
+ }
+ commonDataQueue.sendMetricsData(metricsData);
+ }
+
+ private void cyclicJobDebugLog(Job job, CollectRep.MetricsData
metricsData) {
+ if (log.isDebugEnabled()) {
+ log.debug("Cyclic Job: {} - {} - {}", job.getMonitorId(),
job.getApp(), metricsData.getMetrics());
+ metricsDataDebugLog(metricsData);
+ }
+ }
+
+ private void metricsDataDebugLog(CollectRep.MetricsData metricsData) {
+ for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
+ for (CollectRep.Field field : metricsData.getFields()) {
+ log.debug("Field-->{},Value-->{}", field.getName(),
valueRow.getColumns(metricsData.getFields().indexOf(field)));
+ }
+ }
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RemoveTimeoutMonitorListener.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RemoveTimeoutMonitorListener.java
new file mode 100644
index 000000000..516d6798b
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RemoveTimeoutMonitorListener.java
@@ -0,0 +1,26 @@
+package org.apache.hertzbeat.collector.listener;
+
+import lombok.AllArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.dispatch.CollectTaskTimeoutMonitor;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
+
+/**
+ *
+ */
+@AllArgsConstructor
+public class RemoveTimeoutMonitorListener implements
ContextBoundListener<Object> {
+ private CollectTaskTimeoutMonitor collectTaskTimeoutMonitor;
+
+ @Override
+ public void execute(Context context, Object data) {
+ String metricsKey = context.get(ContextKey.METRICS_KEY);
+ if (StringUtils.isBlank(metricsKey)) {
+ return;
+ }
+
+ collectTaskTimeoutMonitor.removeMetrics(metricsKey);
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunHandler.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunHandler.java
new file mode 100644
index 000000000..750f44476
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunHandler.java
@@ -0,0 +1,38 @@
+package org.apache.hertzbeat.collector.listener;
+
+import lombok.AllArgsConstructor;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.timer.TimerDispatch;
+import org.apache.hertzbeat.collector.timer.WheelTimerTask;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.timer.Timeout;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 周期任务专用
+ */
+@AllArgsConstructor
+public class RerunHandler implements ContextBoundHandler<Object> {
+ private TimerDispatch timerDispatch;
+
+ @Override
+ public void execute(Context context, Object data) {
+ Job job = context.get(ContextKey.JOB);
+ Timeout timeout = context.get(ContextKey.TIMEOUT);
+
+ if (!timeout.isCancelled()) {
+ long spendTime = System.currentTimeMillis() -
job.getDispatchTime();
+ long interval = job.getInterval() - spendTime / 1000L;
+ interval = interval <= 0 ? 0 : interval;
+ timerDispatch.cyclicJob((WheelTimerTask) timeout.task(), interval,
TimeUnit.SECONDS);
+ }
+ }
+
+ @Override
+ public void whenException(Context context, Object data, Throwable
throwable) {
+
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/ResponseJobDataListener.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/ResponseJobDataListener.java
new file mode 100644
index 000000000..6e0fe261d
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/ResponseJobDataListener.java
@@ -0,0 +1,44 @@
+package org.apache.hertzbeat.collector.listener;
+
+import com.google.common.collect.Lists;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
+import org.apache.hertzbeat.collector.timer.TimerDispatch;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+
+/**
+ * 一次性任务专用
+ */
+@Slf4j
+@AllArgsConstructor
+public class ResponseJobDataListener implements
ContextBoundListener<CollectRep.MetricsData.Builder> {
+ private TimerDispatch timerDispatch;
+
+ @Override
+ public void execute(Context context, CollectRep.MetricsData.Builder data) {
+ Job job = context.get(ContextKey.JOB);
+ CollectRep.MetricsData metricsData = data.build();
+
+ oneTimeJobDebugLog(metricsData);
+ timerDispatch.responseSyncJobData(job.getId(),
Lists.newArrayList(metricsData));
+ }
+
+ private void oneTimeJobDebugLog(CollectRep.MetricsData metricsData) {
+ if (log.isDebugEnabled()) {
+ log.debug("One-time Job: {}", metricsData.getMetrics());
+ metricsDataDebugLog(metricsData);
+ }
+ }
+
+ private void metricsDataDebugLog(CollectRep.MetricsData metricsData) {
+ for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
+ for (CollectRep.Field field : metricsData.getFields()) {
+ log.debug("Field-->{},Value-->{}", field.getName(),
valueRow.getColumns(metricsData.getFields().indexOf(field)));
+ }
+ }
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/ValidateResponseListener.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/ValidateResponseListener.java
new file mode 100644
index 000000000..3fa8acda2
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/ValidateResponseListener.java
@@ -0,0 +1,36 @@
+package org.apache.hertzbeat.collector.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+
+/**
+ *
+ * @author Calvin
+ * @date 9/1/2025
+ */
+@Slf4j
+public class ValidateResponseListener implements
ContextBoundListener<CollectRep.MetricsData.Builder> {
+
+ @Override
+ public void execute(Context context, CollectRep.MetricsData.Builder data) {
+ long startTime = context.get(ContextKey.METRICS_COLLECT_START_TIME);
+ Metrics metrics = context.get(ContextKey.METRICS);
+
+ this.validateResponse(startTime, metrics, data);
+ }
+
+ private void validateResponse(long startTime, Metrics metrics,
CollectRep.MetricsData.Builder builder) {
+ long endTime = System.currentTimeMillis();
+ builder.setTime(endTime);
+ long allTime = endTime - startTime;
+ if (builder.getCode() != CollectRep.Code.SUCCESS) {
+ log.info("[Metrics: {}][Collect Failed, Run {}ms] Reason: {}",
metrics.getName(), allTime, builder.getMsg());
+ } else {
+ log.info("[Metrics: {}][Collect Success, Run {}ms].",
metrics.getName(), allTime);
+ }
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/ContextKey.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/ContextKey.java
new file mode 100644
index 000000000..e142c5d08
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/ContextKey.java
@@ -0,0 +1,24 @@
+package org.apache.hertzbeat.collector.constants;
+
+import lombok.Getter;
+import org.apache.hertzbeat.common.entity.collector.CollectorMetaData;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.timer.Timeout;
+
+public enum ContextKey {
+ META_DATA(CollectorMetaData.class),
+ JOB(Job.class),
+ TIMEOUT(Timeout.class),
+ METRICS_COLLECT_START_TIME(Long.class),
+ METRICS(Metrics.class),
+ METRICS_KEY(String.class),
+ ;
+
+ @Getter
+ private final Class<?> clazz;
+
+ ContextKey(Class<?> clazz) {
+ this.clazz = clazz;
+ }
+}
\ No newline at end of file
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/ContextStatus.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/ContextStatus.java
new file mode 100644
index 000000000..c1e5106aa
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/ContextStatus.java
@@ -0,0 +1,8 @@
+package org.apache.hertzbeat.collector.constants;
+
+/**
+ *
+ */
+public enum ContextStatus {
+ WAITING, RUNNING, STOP, TRUNCATE_HANDLER
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/HandlerType.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/HandlerType.java
new file mode 100644
index 000000000..3e774399d
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/HandlerType.java
@@ -0,0 +1,8 @@
+package org.apache.hertzbeat.collector.constants;
+
+/**
+ *
+ */
+public enum HandlerType {
+ NORMAL, ON_COMPLETE
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/AbstractInmemoryContext.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/AbstractInmemoryContext.java
new file mode 100644
index 000000000..38144df32
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/AbstractInmemoryContext.java
@@ -0,0 +1,56 @@
+package org.apache.hertzbeat.collector.context;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.hertzbeat.collector.constants.ContextStatus;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ *
+ */
+public abstract class AbstractInmemoryContext implements Context {
+ protected final AtomicReference<ContextStatus> contextStatus = new
AtomicReference<>(ContextStatus.WAITING);
+ protected final Map<Object, Object> map = new ConcurrentHashMap<>();
+ @Getter
+ @Setter
+ private Throwable error;
+
+
+ @Override
+ public <T> void put(Object key, T value) {
+ this.map.put(key, value);
+ }
+
+ @Override
+ public <T> T remove(Object key) {
+ return (T) this.map.remove(key);
+ }
+
+ @Override
+ public <T> T get(Object key) {
+ return (T) this.map.get(key);
+ }
+
+ @Override
+ public <T> T getOrDefault(Object key, T defaultValue) {
+ return (T) this.map.getOrDefault(key, defaultValue);
+ }
+
+ @Override
+ public boolean hasKey(Object key) {
+ return this.map.containsKey(key);
+ }
+
+ @Override
+ public ContextStatus getStatus() {
+ return contextStatus.get();
+ }
+
+ @Override
+ public void setStatus(ContextStatus status) {
+ contextStatus.set(status);
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/Context.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/Context.java
new file mode 100644
index 000000000..9dbbf5df6
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/Context.java
@@ -0,0 +1,12 @@
+package org.apache.hertzbeat.collector.context;
+
+import org.apache.hertzbeat.collector.constants.ContextStatus;
+
+/**
+ * 只维护与上下文的元数据
+ */
+public interface Context extends ContextView, ContextOperation {
+ ContextStatus getStatus();
+
+ void setStatus(ContextStatus status);
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ContextOperation.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ContextOperation.java
new file mode 100644
index 000000000..58685acbb
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ContextOperation.java
@@ -0,0 +1,17 @@
+package org.apache.hertzbeat.collector.context;
+
+import io.micrometer.common.lang.Nullable;
+
+/**
+ *
+ */
+public interface ContextOperation {
+ <T> void put(Object key, T value);
+
+ <T> T remove(Object key);
+
+ @Nullable
+ Throwable getError();
+
+ void setError(Throwable error);
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ContextView.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ContextView.java
new file mode 100644
index 000000000..92a2dea66
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ContextView.java
@@ -0,0 +1,12 @@
+package org.apache.hertzbeat.collector.context;
+
+/**
+ * 对上下文内容的查询操作
+ */
+public interface ContextView {
+ <T> T get(Object key);
+
+ <T> T getOrDefault(Object key, T defaultValue);
+
+ boolean hasKey(Object key);
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ExceptionStrategy.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ExceptionStrategy.java
new file mode 100644
index 000000000..9579eeb11
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ExceptionStrategy.java
@@ -0,0 +1,7 @@
+package org.apache.hertzbeat.collector.context;
+
+/**
+ *
+ */
+public interface ExceptionStrategy {
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/impl/DefaultContext.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/impl/DefaultContext.java
new file mode 100644
index 000000000..c8b62f6ec
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/impl/DefaultContext.java
@@ -0,0 +1,15 @@
+package org.apache.hertzbeat.collector.context.impl;
+
+import org.apache.hertzbeat.collector.context.AbstractInmemoryContext;
+
+/**
+ *
+ */
+public class DefaultContext extends AbstractInmemoryContext {
+ private DefaultContext() {
+ }
+
+ public static DefaultContext newInstance() {
+ return new DefaultContext();
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectDataDispatch.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectDataDispatch.java
index f4f4e14e0..7c2e1f41d 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectDataDispatch.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectDataDispatch.java
@@ -36,13 +36,4 @@ public interface CollectDataDispatch {
*/
void dispatchCollectData(Timeout timeout, Metrics metrics,
CollectRep.MetricsData metricsData);
- /**
- * Processing and distributing collection result data
- *
- * @param timeout time wheel timeout
- * @param metrics The following metrics collection tasks
- * @param metricsDataList Collect result data
- */
- void dispatchCollectData(Timeout timeout, Metrics metrics,
List<CollectRep.MetricsData> metricsDataList);
-
}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/unit/UnitConverter.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/unit/UnitConverter.java
new file mode 100644
index 000000000..16cb38ba0
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/unit/UnitConverter.java
@@ -0,0 +1,15 @@
+package org.apache.hertzbeat.collector.dispatch.unit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ *
+ */
+@Data
+@AllArgsConstructor
+public class UnitConverter {
+ private List<UnitConvert> unitConvertList;
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
new file mode 100644
index 000000000..eed03a450
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
@@ -0,0 +1,96 @@
+package org.apache.hertzbeat.collector.handler;
+
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hertzbeat.collector.constants.HandlerType;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.dispatch.WorkerPool;
+import
org.apache.hertzbeat.collector.handler.impl.AbstractListenerBoundHandler;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+@Slf4j
+public class ChainBootstrap {
+ @Setter
+ private Context context;
+ private TaskChain<?> taskChain;
+ private WorkerPool workerPool;
+ private final List<ContextBoundHandler> contextBoundHandlerList = new
ArrayList<>();
+ private final List<ContextBoundHandler> onCompleteContextBoundHandlerList
= new ArrayList<>();
+ private final List<ContextBoundListener> dataListenerList = new
ArrayList<>();
+ private final List<ContextBoundListener> onCompleteListenerList = new
ArrayList<>();
+
+ public static ChainBootstrap withContext(Context context) {
+ ChainBootstrap bootstrap = new ChainBootstrap();
+ bootstrap.setContext(context);
+ return bootstrap;
+ }
+
+ public ChainBootstrap withChain(TaskChain<?> taskChain) {
+ this.taskChain = taskChain;
+ return this;
+ }
+
+ public ChainBootstrap withWorkerPool(WorkerPool workerPool) {
+ this.workerPool = workerPool;
+ return this;
+ }
+
+ public <T> ChainBootstrap addContext(Object key, T value) {
+ context.put(key, value);
+ return this;
+ }
+
+ public ChainBootstrap handler(ContextBoundHandler contextBoundHandler) {
+ contextBoundHandlerList.add(contextBoundHandler);
+ return this;
+ }
+
+ public ChainBootstrap onComplete(ContextBoundHandler contextBoundHandler) {
+ onCompleteContextBoundHandlerList.add(contextBoundHandler);
+ return this;
+ }
+
+ public ChainBootstrap addListener(ContextBoundListener dataListener) {
+ dataListenerList.add(dataListener);
+ return this;
+ }
+
+ public ChainBootstrap addOnCompleteListener(ContextBoundListener
dataListener) {
+ onCompleteListenerList.add(dataListener);
+ return this;
+ }
+
+ public void start() {
+ if (taskChain == null || context == null) {
+ log.error("Failed to start chain boostrap due to null value of
Context or TaskChain");
+ return;
+ }
+
+ for (ContextBoundHandler contextBoundHandler :
contextBoundHandlerList) {
+ if (contextBoundHandler instanceof AbstractListenerBoundHandler
listenerBoundHandler) {
+ if (CollectionUtils.isNotEmpty(dataListenerList)) {
+
listenerBoundHandler.getDataListenerList().addAll(dataListenerList);
+ }
+ if (CollectionUtils.isNotEmpty(onCompleteListenerList)) {
+
listenerBoundHandler.getOnCompleteListenerList().addAll(onCompleteListenerList);
+ }
+ }
+
+ taskChain.addLast(HandlerType.NORMAL, contextBoundHandler);
+ }
+
+ onCompleteContextBoundHandlerList.forEach(handler ->
taskChain.addLast(HandlerType.ON_COMPLETE, handler));
+
+ if (workerPool != null) {
+ workerPool.executeJob(() -> taskChain.execute(context));
+ } else {
+ taskChain.execute(context);
+ }
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundHandler.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundHandler.java
new file mode 100644
index 000000000..542eb2778
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundHandler.java
@@ -0,0 +1,12 @@
+package org.apache.hertzbeat.collector.handler;
+
+import org.apache.hertzbeat.collector.context.Context;
+
+/**
+ *
+ */
+public interface ContextBoundHandler<T> {
+ void execute(Context context, T data);
+
+ void whenException(Context context, T data, Throwable throwable);
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundListener.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundListener.java
new file mode 100644
index 000000000..c9965a157
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundListener.java
@@ -0,0 +1,9 @@
+package org.apache.hertzbeat.collector.handler;
+
+import org.apache.hertzbeat.collector.context.Context;
+
+/**
+ */
+public interface ContextBoundListener<T> {
+ void execute(Context context, T data);
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
new file mode 100644
index 000000000..769cf60c3
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
@@ -0,0 +1,15 @@
+package org.apache.hertzbeat.collector.handler;
+
+import org.apache.hertzbeat.collector.constants.HandlerType;
+import org.apache.hertzbeat.collector.context.Context;
+
+/**
+ *
+ */
+public interface TaskChain<T> {
+ void execute(Context context);
+
+ void execute(Context context, T data);
+
+ void addLast(HandlerType handlerType, ContextBoundHandler<T> handler);
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataBoundHandler.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataBoundHandler.java
new file mode 100644
index 000000000..f63708872
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataBoundHandler.java
@@ -0,0 +1,22 @@
+package org.apache.hertzbeat.collector.handler.impl;
+
+import lombok.Setter;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
+
+import java.util.List;
+
+/**
+ *
+ */
+public abstract class AbstractBatchDataBoundHandler<T, R> extends
AbstractListenerBoundHandler<T, R> {
+ @Setter
+ protected List<T> sourceDataList;
+
+ @Override
+ public void execute(Context context, T data) {
+ for (T t : sourceDataList) {
+ super.execute(context, t);
+ }
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
new file mode 100644
index 000000000..ec88492ec
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
@@ -0,0 +1,27 @@
+package org.apache.hertzbeat.collector.handler.impl;
+
+import org.apache.hertzbeat.collector.constants.HandlerType;
+import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.handler.TaskChain;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public abstract class AbstractContextBoundTaskChain<T> implements TaskChain<T>
{
+ protected final Map<HandlerType, List<ContextBoundHandler<T>>>
contextBoundHandlerMap = new HashMap<>();
+
+ @Override
+ public void addLast(HandlerType handlerType, ContextBoundHandler<T>
handler) {
+ if (!contextBoundHandlerMap.containsKey(handlerType)) {
+ contextBoundHandlerMap.put(handlerType, new ArrayList<>());
+ }
+
+ contextBoundHandlerMap.get(handlerType).add(handler);
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundHandler.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundHandler.java
new file mode 100644
index 000000000..78acdbe54
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundHandler.java
@@ -0,0 +1,73 @@
+package org.apache.hertzbeat.collector.handler.impl;
+
+import lombok.Getter;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.constants.ContextStatus;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public abstract class AbstractListenerBoundHandler<T, R> implements
ContextBoundHandler<T> {
+ @Getter
+ private final List<? extends ContextBoundListener<R>> dataListenerList =
new ArrayList<>();
+ @Getter
+ private final List<? extends ContextBoundListener<R>>
onCompleteListenerList = new ArrayList<>();
+
+ @Override
+ public void execute(Context context, T data) {
+ long startTime = System.currentTimeMillis();
+ context.put(ContextKey.METRICS_COLLECT_START_TIME, startTime);
+
+ R executeResult = executeWithResponse(context, data);
+
+ runListener(context, executeResult);
+
+ runOnCompleteListener(context, executeResult);
+ }
+
+ public R executeWithResponse(Context context, T data) {
+ // no-op
+ return null;
+ }
+
+ @Override
+ public void whenException(Context context, T data, Throwable throwable) {
+ // no-op
+ }
+
+ private void runListener(Context context, R executeResult) {
+ if (CollectionUtils.isEmpty(dataListenerList)) {
+ return;
+ }
+
+ if (ContextStatus.STOP.equals(context.getStatus())) {
+ return;
+ }
+
+ //todo 异常处理
+ for (ContextBoundListener<R> listener : dataListenerList) {
+ listener.execute(context, executeResult);
+
+ if (ContextStatus.STOP.equals(context.getStatus())) {
+ break;
+ }
+ }
+ }
+
+ private void runOnCompleteListener(Context context, R executeResult) {
+ if (CollectionUtils.isEmpty(onCompleteListenerList)) {
+ return;
+ }
+
+ for (ContextBoundListener<R> listener : onCompleteListenerList) {
+ listener.execute(context, executeResult);
+ }
+ }
+}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
new file mode 100644
index 000000000..b659eac29
--- /dev/null
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
@@ -0,0 +1,48 @@
+package org.apache.hertzbeat.collector.handler.impl;
+
+import org.apache.hertzbeat.collector.constants.ContextStatus;
+import org.apache.hertzbeat.collector.constants.HandlerType;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ *
+ */
+public class BatchExecuteTaskChain<T> extends AbstractContextBoundTaskChain<T>
{
+ @Override
+ public void execute(Context context) {
+ this.execute(context, null);
+ }
+
+ @Override
+ public void execute(Context context, T data) {
+ context.setStatus(ContextStatus.RUNNING);
+
+ for (ContextBoundHandler<T> contextBoundHandler :
contextBoundHandlerMap.getOrDefault(HandlerType.NORMAL, new ArrayList<>())) {
+ runHandler(context, data, contextBoundHandler);
+
+ if (ContextStatus.TRUNCATE_HANDLER.equals(context.getStatus()) ||
ContextStatus.STOP.equals(context.getStatus())) {
+ break;
+ }
+
+ // in order to init error info for the next loop
+ context.setError(null);
+ }
+
+ contextBoundHandlerMap.getOrDefault(HandlerType.ON_COMPLETE, new
ArrayList<>()).forEach(handler -> runHandler(context, data, handler));
+ }
+
+ private static <T> void runHandler(Context context, T data,
ContextBoundHandler<T> contextBoundHandler) {
+ try {
+ contextBoundHandler.execute(context, data);
+ } catch (Exception exception) {
+ context.setError(exception);
+ contextBoundHandler.whenException(context, data, exception);
+ }
+ }
+}
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
index 592d31b73..509699a14 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
@@ -382,4 +382,6 @@ public interface CommonConstants {
* JEXL custom function `json`
*/
String JEXL_CUSTOM_JSON_FUNCTION = "json";
+
+ byte AVAILABLE_METRICS = 0;
}
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/collector/CollectorMetaData.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/collector/CollectorMetaData.java
new file mode 100644
index 000000000..c0f3c403c
--- /dev/null
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/collector/CollectorMetaData.java
@@ -0,0 +1,21 @@
+package org.apache.hertzbeat.common.entity.collector;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Date;
+
+/**
+ *
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CollectorMetaData {
+ private String identity;
+ private String mode;
+ private Date startTime;
+}
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/MetricsSource.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/MetricsSource.java
new file mode 100644
index 000000000..d9e2c8f07
--- /dev/null
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/MetricsSource.java
@@ -0,0 +1,13 @@
+package org.apache.hertzbeat.common.entity.job;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ *
+ */
+@Data
+public class MetricsSource {
+ private List<Metrics> metricsList;
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]