This is an automated email from the ASF dual-hosted git repository.

zhengqiwei pushed a commit to branch refactor_collector
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/refactor_collector by this 
push:
     new d0b3c2dba [refactor] optimize collector module
d0b3c2dba is described below

commit d0b3c2dba21fc602258d9115a0e484ff2c74c7c9
Author: Calvin <[email protected]>
AuthorDate: Sat Sep 6 14:55:23 2025 +0800

    [refactor] optimize collector module
---
 .../prometheus/PrometheusProxyCollectImpl.java     | 308 --------------------
 .../dispatch/CollectTaskTimeoutMonitor.java        | 116 ++++++++
 .../collector/dispatch/CommonDispatcher.java       | 313 +++++++++-----------
 .../collector/dispatch/MetricsCollect.java         | 321 +--------------------
 .../handler/CollectMetricsDataHandler.java         | 110 +++++++
 .../DynamicSubTaskCollectMetricsDataHandler.java   |  17 ++
 .../CalculateFieldsListener.java}                  | 261 +----------------
 .../listener/CommonMetricsDataListener.java        |  21 ++
 .../listener/MetricsDataDeliveryListener.java      |  52 ++++
 .../listener/RemoveTimeoutMonitorListener.java     |  26 ++
 .../hertzbeat/collector/listener/RerunHandler.java |  38 +++
 .../listener/ResponseJobDataListener.java          |  44 +++
 .../listener/ValidateResponseListener.java         |  36 +++
 .../hertzbeat/collector/constants/ContextKey.java  |  24 ++
 .../collector/constants/ContextStatus.java         |   8 +
 .../hertzbeat/collector/constants/HandlerType.java |   8 +
 .../collector/context/AbstractInmemoryContext.java |  56 ++++
 .../hertzbeat/collector/context/Context.java       |  12 +
 .../collector/context/ContextOperation.java        |  17 ++
 .../hertzbeat/collector/context/ContextView.java   |  12 +
 .../collector/context/ExceptionStrategy.java       |   7 +
 .../collector/context/impl/DefaultContext.java     |  15 +
 .../collector/dispatch/CollectDataDispatch.java    |   9 -
 .../collector/dispatch/unit/UnitConverter.java     |  15 +
 .../collector/handler/ChainBootstrap.java          |  96 ++++++
 .../collector/handler/ContextBoundHandler.java     |  12 +
 .../collector/handler/ContextBoundListener.java    |   9 +
 .../hertzbeat/collector/handler/TaskChain.java     |  15 +
 .../impl/AbstractBatchDataBoundHandler.java        |  22 ++
 .../impl/AbstractContextBoundTaskChain.java        |  27 ++
 .../handler/impl/AbstractListenerBoundHandler.java |  73 +++++
 .../handler/impl/BatchExecuteTaskChain.java        |  48 +++
 .../common/constants/CommonConstants.java          |   2 +
 .../common/entity/collector/CollectorMetaData.java |  21 ++
 .../hertzbeat/common/entity/job/MetricsSource.java |  13 +
 35 files changed, 1124 insertions(+), 1060 deletions(-)

diff --git 
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusProxyCollectImpl.java
 
b/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusProxyCollectImpl.java
deleted file mode 100644
index 2177710d0..000000000
--- 
a/hertzbeat-collector/hertzbeat-collector-basic/src/main/java/org/apache/hertzbeat/collector/collect/prometheus/PrometheusProxyCollectImpl.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hertzbeat.collector.collect.prometheus;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hertzbeat.collector.collect.common.http.CommonHttpClient;
-import org.apache.hertzbeat.collector.dispatch.DispatchConstants;
-import org.apache.hertzbeat.collector.util.CollectUtil;
-import org.apache.hertzbeat.common.constants.CommonConstants;
-import org.apache.hertzbeat.common.constants.NetworkConstants;
-import org.apache.hertzbeat.common.constants.SignConstants;
-import org.apache.hertzbeat.common.entity.job.Metrics;
-import org.apache.hertzbeat.common.entity.job.protocol.PrometheusProtocol;
-import org.apache.hertzbeat.common.entity.message.CollectRep;
-import org.apache.hertzbeat.common.util.Base64Util;
-import org.apache.hertzbeat.common.util.CommonUtil;
-import org.apache.hertzbeat.common.util.IpDomainUtil;
-import org.apache.http.HttpHeaders;
-import org.apache.http.HttpHost;
-import org.apache.http.HttpStatus;
-import org.apache.http.auth.AuthScope;
-import org.apache.http.auth.UsernamePasswordCredentials;
-import org.apache.http.client.AuthCache;
-import org.apache.http.client.ClientProtocolException;
-import org.apache.http.client.CredentialsProvider;
-import org.apache.http.client.config.RequestConfig;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.client.methods.RequestBuilder;
-import org.apache.http.client.protocol.HttpClientContext;
-import org.apache.http.entity.StringEntity;
-import org.apache.http.impl.auth.DigestScheme;
-import org.apache.http.impl.client.BasicAuthCache;
-import org.apache.http.impl.client.BasicCredentialsProvider;
-import org.apache.http.protocol.HttpContext;
-import org.apache.http.util.EntityUtils;
-import org.springframework.http.MediaType;
-import org.springframework.util.StringUtils;
-
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.net.ConnectException;
-import java.net.UnknownHostException;
-import java.nio.charset.StandardCharsets;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import javax.net.ssl.SSLException;
-
-import static org.apache.hertzbeat.common.constants.SignConstants.RIGHT_DASH;
-
-
-@Slf4j
-public class PrometheusProxyCollectImpl implements PrometheusCollect {
-
-    private final Set<Integer> defaultSuccessStatusCodes = 
Stream.of(HttpStatus.SC_OK, HttpStatus.SC_CREATED,
-            HttpStatus.SC_ACCEPTED, HttpStatus.SC_MULTIPLE_CHOICES, 
HttpStatus.SC_MOVED_PERMANENTLY,
-            HttpStatus.SC_MOVED_TEMPORARILY).collect(Collectors.toSet());
-
-    public static final String RAW_TEXT_CONTENT_FIELD_NAME = 
"raw_text_content";
-
-    @Override
-    public List<CollectRep.MetricsData> collect(CollectRep.MetricsData.Builder 
builder, Metrics metrics) {
-        PrometheusProtocol prometheusProtocol = metrics.getPrometheus();
-        HttpUriRequest request;
-        try {
-            validateParams(metrics);
-        } catch (Exception e) {
-            builder.setCode(CollectRep.Code.FAIL);
-            builder.setMsg(e.getMessage());
-            return Collections.singletonList(builder.build());
-        }
-
-        HttpContext httpContext = createHttpContext(prometheusProtocol);
-        request = createHttpRequest(prometheusProtocol);
-
-        try (CloseableHttpResponse response = 
CommonHttpClient.getHttpClient().execute(request, httpContext)) {
-            int statusCode = response.getStatusLine().getStatusCode();
-            log.debug("Prometheus proxy collect, response status: {}", 
statusCode);
-
-            if (!defaultSuccessStatusCodes.contains(statusCode)) {
-                builder.setCode(CollectRep.Code.FAIL);
-                builder.setMsg(NetworkConstants.STATUS_CODE + 
SignConstants.BLANK + statusCode);
-                return Collections.singletonList(builder.build());
-            }
-
-            String rawTextContent = EntityUtils.toString(response.getEntity(), 
StandardCharsets.UTF_8);
-
-            builder.clearFields();
-            builder.clearValues();
-
-            CollectRep.Field rawDataField = CollectRep.Field.newBuilder()
-                    .setName(RAW_TEXT_CONTENT_FIELD_NAME)
-                    .setType(CommonConstants.TYPE_STRING)
-                    .build();
-            builder.addField(rawDataField);
-
-            CollectRep.ValueRow.Builder valueRowBuilder = 
CollectRep.ValueRow.newBuilder();
-            valueRowBuilder.addColumn(rawTextContent);
-            builder.addValueRow(valueRowBuilder.build());
-
-            builder.setCode(CollectRep.Code.SUCCESS);
-        } catch (ClientProtocolException e1) {
-            String errorMsg = CommonUtil.getMessageFromThrowable(e1);
-            log.error("Prometheus proxy collect error: {}. Host: {}, Port: 
{}", errorMsg, prometheusProtocol.getHost(), prometheusProtocol.getPort(), e1);
-            builder.setCode(CollectRep.Code.UN_CONNECTABLE);
-            builder.setMsg(errorMsg);
-        } catch (UnknownHostException e2) {
-            String errorMsg = CommonUtil.getMessageFromThrowable(e2);
-            log.info("Prometheus proxy collect unknown host: {}. Host: {}", 
errorMsg, prometheusProtocol.getHost(), e2);
-            builder.setCode(CollectRep.Code.UN_REACHABLE);
-            builder.setMsg("unknown host:" + errorMsg);
-        } catch (InterruptedIOException | ConnectException | SSLException e3) {
-            String errorMsg = CommonUtil.getMessageFromThrowable(e3);
-            log.info("Prometheus proxy collect connect error: {}. Host: {}, 
Port: {}", errorMsg, prometheusProtocol.getHost(), 
prometheusProtocol.getPort(), e3);
-            builder.setCode(CollectRep.Code.UN_CONNECTABLE);
-            builder.setMsg(errorMsg);
-        } catch (IOException e4) {
-            String errorMsg = CommonUtil.getMessageFromThrowable(e4);
-            log.info("Prometheus proxy collect IO error: {}. Host: {}, Port: 
{}", errorMsg, prometheusProtocol.getHost(), prometheusProtocol.getPort(), e4);
-            builder.setCode(CollectRep.Code.FAIL);
-            builder.setMsg(errorMsg);
-        } catch (Exception e) {
-            String errorMsg = CommonUtil.getMessageFromThrowable(e);
-            log.error("Prometheus proxy collect unknown error: {}. Host: {}, 
Port: {}", errorMsg, prometheusProtocol.getHost(), 
prometheusProtocol.getPort(), e);
-            builder.setCode(CollectRep.Code.FAIL);
-            builder.setMsg(errorMsg);
-        } finally {
-            if (request != null) {
-                request.abort();
-            }
-        }
-        return Collections.singletonList(builder.build());
-    }
-
-    @Override
-    public String supportProtocol() {
-        return DispatchConstants.PROTOCOL_PROMETHEUS;
-    }
-
-    private void validateParams(Metrics metrics) throws Exception {
-        if (metrics == null || metrics.getPrometheus() == null) {
-            throw new Exception("Prometheus collect must has prometheus 
params");
-        }
-        PrometheusProtocol protocol = metrics.getPrometheus();
-        if (!StringUtils.hasText(protocol.getHost())
-            || !StringUtils.hasText(protocol.getPort())) {
-            throw new Exception("Prometheus collect must has host and port 
params");
-        }
-        if (protocol.getPath() == null
-                || !StringUtils.hasText(protocol.getPath())
-                || !protocol.getPath().startsWith(RIGHT_DASH)) {
-            protocol.setPath(protocol.getPath() == null ? RIGHT_DASH : 
RIGHT_DASH + protocol.getPath().trim());
-        }
-    }
-    
-    /**
-     * create httpContext
-     * This method is adapted from PrometheusAutoCollectImpl
-     * @param protocol prometheus protocol
-     * @return context
-     */
-    public HttpContext createHttpContext(PrometheusProtocol protocol) {
-        PrometheusProtocol.Authorization auth = protocol.getAuthorization();
-        if (auth != null && 
DispatchConstants.DIGEST_AUTH.equals(auth.getType())) {
-            HttpClientContext clientContext = new HttpClientContext();
-            if (StringUtils.hasText(auth.getDigestAuthUsername())
-                        && StringUtils.hasText(auth.getDigestAuthPassword())) {
-                CredentialsProvider provider = new BasicCredentialsProvider();
-                UsernamePasswordCredentials credentials =
-                        new 
UsernamePasswordCredentials(auth.getDigestAuthUsername(), 
auth.getDigestAuthPassword());
-                provider.setCredentials(AuthScope.ANY, credentials);
-                AuthCache authCache = new BasicAuthCache();
-                HttpHost targetHost = new HttpHost(protocol.getHost(), 
Integer.parseInt(protocol.getPort()));
-                authCache.put(targetHost, new DigestScheme());
-                clientContext.setCredentialsProvider(provider);
-                clientContext.setAuthCache(authCache);
-                return clientContext;
-            }
-        }
-        return null;
-    }
-
-    /**
-     * create http request
-     * This method is adapted from PrometheusAutoCollectImpl
-     * @param protocol http params
-     * @return http uri request
-     */
-    public HttpUriRequest createHttpRequest(PrometheusProtocol protocol) {
-        RequestBuilder requestBuilder = RequestBuilder.get();
-        // params
-        Map<String, String> params = protocol.getParams();
-        if (params != null && !params.isEmpty()) {
-            for (Map.Entry<String, String> param : params.entrySet()) {
-                if (StringUtils.hasText(param.getValue())) {
-                    requestBuilder.addParameter(param.getKey(), 
param.getValue());
-                }
-            }
-        }
-        requestBuilder.addHeader(HttpHeaders.CONNECTION, 
NetworkConstants.KEEP_ALIVE);
-        requestBuilder.addHeader(HttpHeaders.USER_AGENT, 
NetworkConstants.USER_AGENT);
-        // headers  The custom request header is overwritten here
-        Map<String, String> headers = protocol.getHeaders();
-        if (headers != null && !headers.isEmpty()) {
-            for (Map.Entry<String, String> header : headers.entrySet()) {
-                if (StringUtils.hasText(header.getValue())) {
-                    
requestBuilder.addHeader(CollectUtil.replaceUriSpecialChar(header.getKey()),
-                            
CollectUtil.replaceUriSpecialChar(header.getValue()));
-                }
-            }
-        }
-        if (headers == null || 
headers.keySet().stream().noneMatch(HttpHeaders.ACCEPT::equalsIgnoreCase)) {
-            requestBuilder.addHeader(HttpHeaders.ACCEPT, 
MediaType.TEXT_PLAIN_VALUE + ";version=0.0.4,*/*;q=0.1");
-        }
-        
-        if (protocol.getAuthorization() != null) {
-            PrometheusProtocol.Authorization authorization = 
protocol.getAuthorization();
-            if 
(DispatchConstants.BEARER_TOKEN.equalsIgnoreCase(authorization.getType())) {
-                if (StringUtils.hasText(authorization.getBearerTokenToken())) {
-                    String value = DispatchConstants.BEARER + " " + 
authorization.getBearerTokenToken();
-                    requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, value);
-                }
-            } else if 
(DispatchConstants.BASIC_AUTH.equals(authorization.getType())) {
-                if (StringUtils.hasText(authorization.getBasicAuthUsername())
-                            && 
StringUtils.hasText(authorization.getBasicAuthPassword())) {
-                    String authStr = authorization.getBasicAuthUsername() + 
":" + authorization.getBasicAuthPassword();
-                    String encodedAuth = Base64Util.encode(authStr);
-                    requestBuilder.addHeader(HttpHeaders.AUTHORIZATION, 
DispatchConstants.BASIC + " " + encodedAuth);
-                }
-            }
-        }
-
-        if (StringUtils.hasLength(protocol.getPayload())) {
-            requestBuilder.setEntity(new StringEntity(protocol.getPayload(), 
StandardCharsets.UTF_8));
-            if (headers == null || 
headers.keySet().stream().noneMatch(HttpHeaders.CONTENT_TYPE::equalsIgnoreCase))
 {
-                requestBuilder.setHeader(HttpHeaders.CONTENT_TYPE, 
MediaType.TEXT_PLAIN_VALUE);
-            }
-        }
-        
-        String uriPath = CollectUtil.replaceUriSpecialChar(protocol.getPath());
-        if (IpDomainUtil.isHasSchema(protocol.getHost())) {
-            requestBuilder.setUri(protocol.getHost() + 
SignConstants.DOUBLE_MARK + protocol.getPort() + uriPath);
-        } else {
-            String ipAddressType = 
IpDomainUtil.checkIpAddressType(protocol.getHost());
-            String baseUri = NetworkConstants.IPV6.equals(ipAddressType)
-                                     ? String.format("[%s]:%s%s", 
protocol.getHost(), protocol.getPort(), uriPath)
-                                     : String.format("%s:%s%s", 
protocol.getHost(), protocol.getPort(), uriPath);
-            boolean ssl = Boolean.parseBoolean(protocol.getSsl());
-            if (ssl) {
-                requestBuilder.setUri(NetworkConstants.HTTPS_HEADER + baseUri);
-            } else {
-                requestBuilder.setUri(NetworkConstants.HTTP_HEADER + baseUri);
-            }
-        }
-        
-        // custom timeout
-        int timeout = CollectUtil.getTimeout(protocol.getTimeout());
-        if (timeout > 0) {
-            RequestConfig requestConfig = RequestConfig.custom()
-                                                  .setConnectTimeout(timeout)
-                                                  .setSocketTimeout(timeout)
-                                                  
.setConnectionRequestTimeout(timeout)
-                                                  .setRedirectsEnabled(true)
-                                                  .build();
-            requestBuilder.setConfig(requestConfig);
-        } else {
-            RequestConfig requestConfig = RequestConfig.custom()
-                                                  .setRedirectsEnabled(true)
-                                                  .build();
-            requestBuilder.setConfig(requestConfig);
-        }
-        return requestBuilder.build();
-    }
-
-    /**
-     * get collect instance
-     * @return instance
-     */
-    public static PrometheusProxyCollectImpl getInstance() {
-        return PrometheusProxyCollectImpl.SingleInstance.INSTANCE;
-    }
-
-    /**
-     * static instance
-     */
-    private static class SingleInstance {
-        private static final PrometheusProxyCollectImpl INSTANCE = new 
PrometheusProxyCollectImpl();
-    }
-}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectTaskTimeoutMonitor.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectTaskTimeoutMonitor.java
new file mode 100644
index 000000000..6a97ab5e3
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectTaskTimeoutMonitor.java
@@ -0,0 +1,116 @@
+package org.apache.hertzbeat.collector.dispatch;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.metrics.HertzBeatMetricsCollector;
+import org.apache.hertzbeat.collector.timer.WheelTimerTask;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.timer.Timeout;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *
+ */
+@Slf4j
+@Component
+public class CollectTaskTimeoutMonitor {
+    /**
+     * Collection task timeout value
+     */
+    private static final long DURATION_TIME = 240_000L;
+    /**
+     * Metrics task and start time mapping map
+     */
+    private final Map<String, MetricsTime> metricsTimeoutMonitorMap = new 
ConcurrentHashMap<>(16);
+
+    @Autowired
+    private HertzBeatMetricsCollector metricsCollector;
+    private CommonDispatcher commonDispatcher;
+
+    public void start(CommonDispatcher commonDispatcher) {
+        this.commonDispatcher = commonDispatcher;
+
+        // monitoring metrics collection task execution timeout
+        ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                .setNameFormat("metrics-task-timeout-monitor-%d")
+                .setDaemon(true)
+                .build();
+        ScheduledThreadPoolExecutor scheduledExecutor = new 
ScheduledThreadPoolExecutor(1, threadFactory);
+        
scheduledExecutor.scheduleWithFixedDelay(this::monitorCollectTaskTimeout, 2, 
20, TimeUnit.SECONDS);
+    }
+
+    public void putMetrics(String key, MetricsTime value) {
+        this.metricsTimeoutMonitorMap.put(key, value);
+    }
+
+    public MetricsTime removeMetrics(String key) {
+        return this.metricsTimeoutMonitorMap.remove(key);
+    }
+
+    private void monitorCollectTaskTimeout() {
+        try {
+            // Detect whether the collection unit of each metrics has timed 
out for 4 minutes,
+            // and if it times out, it will be discarded and an exception will 
be returned.
+            long deadline = System.currentTimeMillis() - DURATION_TIME;
+            for (Map.Entry<String, MetricsTime> entry : 
metricsTimeoutMonitorMap.entrySet()) {
+                MetricsTime metricsTime = entry.getValue();
+                if (metricsTime.getStartTime() < deadline) {
+                    // Metrics collection timeout
+                    MetricsTime removedMetricsTime = 
metricsTimeoutMonitorMap.remove(entry.getKey());
+                    if (removedMetricsTime == null) {
+                        continue;
+                    }
+                    WheelTimerTask timerJob = (WheelTimerTask) 
metricsTime.getTimeout().task();
+                    Job job = timerJob.getJob();
+                    // timeout metrics
+                    if (metricsCollector != null) {
+                        long duration = System.currentTimeMillis() - 
removedMetricsTime.getStartTime();
+                        metricsCollector.recordCollectMetrics(job, duration, 
"timeout");
+                    }
+
+                    CollectRep.MetricsData metricsData = 
CollectRep.MetricsData.newBuilder()
+                            .setId(job.getMonitorId())
+                            .setTenantId(job.getTenantId())
+                            .setApp(job.getApp())
+                            .setMetrics(metricsTime.getMetrics().getName())
+                            
.setPriority(metricsTime.getMetrics().getPriority())
+                            .setTime(System.currentTimeMillis())
+                            .setCode(CollectRep.Code.TIMEOUT)
+                            .setMsg("collect timeout")
+                            .build();
+                    log.error("[Collect Timeout]: \n{}", metricsData);
+                    if (metricsData.getPriority() == 
CommonConstants.AVAILABLE_METRICS) {
+                        //todo 使用chain bootstrap
+//                        
commonDispatcher.dispatchCollectData(metricsTime.timeout, 
metricsTime.getMetrics(), metricsData);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("[Task Timeout Monitor]-{}.", e.getMessage(), e);
+        }
+    }
+
+    /**
+     * Metrics times.
+     */
+    @Data
+    @AllArgsConstructor
+    public static class MetricsTime {
+        private long startTime;
+        private Metrics metrics;
+        private Timeout timeout;
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
index 6e6875624..213bb434a 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
@@ -17,13 +17,24 @@
 
 package org.apache.hertzbeat.collector.dispatch;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.gson.Gson;
-import lombok.AllArgsConstructor;
-import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.impl.DefaultContext;
 import 
org.apache.hertzbeat.collector.dispatch.entrance.internal.CollectJobService;
+import org.apache.hertzbeat.collector.dispatch.unit.UnitConverter;
+import org.apache.hertzbeat.collector.handler.ChainBootstrap;
+import org.apache.hertzbeat.collector.handler.CollectMetricsDataHandler;
+import org.apache.hertzbeat.collector.listener.CalculateFieldsListener;
+import org.apache.hertzbeat.collector.listener.MetricsDataDeliveryListener;
+import org.apache.hertzbeat.collector.listener.RemoveTimeoutMonitorListener;
+import org.apache.hertzbeat.collector.listener.RerunHandler;
+import org.apache.hertzbeat.collector.listener.ResponseJobDataListener;
+import org.apache.hertzbeat.collector.listener.ValidateResponseListener;
 import org.apache.hertzbeat.collector.metrics.HertzBeatMetricsCollector;
+import org.apache.hertzbeat.collector.handler.impl.BatchExecuteTaskChain;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.entity.collector.CollectorMetaData;
 import org.apache.hertzbeat.common.timer.Timeout;
 import org.apache.hertzbeat.collector.timer.TimerDispatch;
 import org.apache.hertzbeat.collector.timer.WheelTimerTask;
@@ -37,17 +48,18 @@ import org.apache.hertzbeat.common.queue.CommonDataQueue;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 /**
  * Collection task and response data scheduler
@@ -56,10 +68,6 @@ import java.util.concurrent.atomic.AtomicReference;
 @Slf4j
 public class CommonDispatcher implements MetricsTaskDispatch, 
CollectDataDispatch {
 
-    /**
-     * Collection task timeout value
-     */
-    private static final long DURATION_TIME = 240_000L;
     /**
      * Trigger sub task max num
      */
@@ -81,16 +89,14 @@ public class CommonDispatcher implements 
MetricsTaskDispatch, CollectDataDispatc
      * collection data exporter
      */
     private final CommonDataQueue commonDataQueue;
-    /**
-     * Metrics task and start time mapping map
-     */
-    private final Map<String, MetricsTime> metricsTimeoutMonitorMap;
 
     private final List<UnitConvert> unitConvertList;
 
     private final WorkerPool workerPool;
 
-    private final String collectorIdentity;
+    private final CollectorMetaData metaData;
+
+    private final CollectTaskTimeoutMonitor collectTaskTimeoutMonitor;
 
     @Autowired
     private HertzBeatMetricsCollector metricsCollector;
@@ -100,17 +106,24 @@ public class CommonDispatcher implements 
MetricsTaskDispatch, CollectDataDispatc
                             CommonDataQueue commonDataQueue,
                             WorkerPool workerPool,
                             CollectJobService collectJobService,
-                            List<UnitConvert> unitConvertList) {
+                            List<UnitConvert> unitConvertList,
+                            CollectTaskTimeoutMonitor 
collectTaskTimeoutMonitor) {
         this.commonDataQueue = commonDataQueue;
         this.jobRequestQueue = jobRequestQueue;
         this.timerDispatch = timerDispatch;
         this.unitConvertList = unitConvertList;
         this.workerPool = workerPool;
-        this.collectorIdentity = collectJobService.getCollectorIdentity();
-        this.metricsTimeoutMonitorMap = new ConcurrentHashMap<>(16);
-        this.start();
+        this.metaData = CollectorMetaData.builder()
+                .identity(collectJobService.getCollectorIdentity())
+                .mode(collectJobService.getCollectorMode())
+                .startTime(new Date())
+                .build();
+        this.collectTaskTimeoutMonitor = collectTaskTimeoutMonitor;
+//        this.start();
+        this.collectTaskTimeoutMonitor.start(this);
     }
 
+    @Deprecated
     public void start() {
         try {
             // Pull the collection task from the task queue and put it into 
the thread pool for execution
@@ -138,91 +151,104 @@ public class CommonDispatcher implements 
MetricsTaskDispatch, CollectDataDispatc
                 }
                 log.info("Thread Interrupted, Shutdown the 
[metrics-task-dispatcher]");
             });
-            // monitoring metrics collection task execution timeout
-            ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                    .setNameFormat("metrics-task-timeout-monitor-%d")
-                    .setDaemon(true)
-                    .build();
-            ScheduledThreadPoolExecutor scheduledExecutor = new 
ScheduledThreadPoolExecutor(1, threadFactory);
-            
scheduledExecutor.scheduleWithFixedDelay(this::monitorCollectTaskTimeout, 2, 
20, TimeUnit.SECONDS);
         } catch (Exception e) {
             log.error("Common Dispatcher error: {}.", e.getMessage(), e);
         }
     }
 
-    private void monitorCollectTaskTimeout() {
-        try {
-            // Detect whether the collection unit of each metrics has timed 
out for 4 minutes,
-            // and if it times out, it will be discarded and an exception will 
be returned.
-            long deadline = System.currentTimeMillis() - DURATION_TIME;
-            for (Map.Entry<String, MetricsTime> entry : 
metricsTimeoutMonitorMap.entrySet()) {
-                MetricsTime metricsTime = entry.getValue();
-                if (metricsTime.getStartTime() < deadline) {
-                    // Metrics collection timeout
-                    MetricsTime removedMetricsTime = 
metricsTimeoutMonitorMap.remove(entry.getKey());
-                    if (removedMetricsTime == null) {
-                        continue;
-                    }
-                    WheelTimerTask timerJob = (WheelTimerTask) 
metricsTime.getTimeout().task();
-                    Job job = timerJob.getJob();
-                    // timeout metrics
-                    if (metricsCollector != null) {
-                        long duration = System.currentTimeMillis() - 
removedMetricsTime.getStartTime();
-                        metricsCollector.recordCollectMetrics(job, duration, 
"timeout");
-                    }
-
-                    CollectRep.MetricsData metricsData = 
CollectRep.MetricsData.newBuilder()
-                            .setId(job.getMonitorId())
-                            .setTenantId(job.getTenantId())
-                            .setApp(job.getApp())
-                            .setMetrics(metricsTime.getMetrics().getName())
-                            
.setPriority(metricsTime.getMetrics().getPriority())
-                            .setTime(System.currentTimeMillis())
-                            .setCode(CollectRep.Code.TIMEOUT).setMsg("collect 
timeout").build();
-                    log.error("[Collect Timeout]: \n{}", metricsData);
-                    if (metricsData.getPriority() == 0) {
-                        dispatchCollectData(metricsTime.timeout, 
metricsTime.getMetrics(), metricsData);
-                    }
-                }
-            }
-        } catch (Exception e) {
-            log.error("[Task Timeout Monitor]-{}.", e.getMessage(), e);
-        }
-    }
-
     @Override
     public void dispatchMetricsTask(Timeout timeout) {
         // Divide the collection task of a single application into 
corresponding collection tasks of the metrics under it.
         // Put each collect task into the thread pool for scheduling
         WheelTimerTask timerTask = (WheelTimerTask) timeout.task();
         Job job = timerTask.getJob();
-        job.constructPriorMetrics();
-        Set<Metrics> metricsSet = job.getNextCollectMetrics(null, true);
-        metricsSet.forEach(metrics -> {
-            MetricsCollect metricsCollect = new MetricsCollect(metrics, 
timeout, this,
-                    collectorIdentity, unitConvertList);
-            jobRequestQueue.addJob(metricsCollect);
-            if (metrics.getPrometheus() != null) {
-                metricsTimeoutMonitorMap.put(String.valueOf(job.getId()),
-                        new MetricsTime(System.currentTimeMillis(), metrics, 
timeout));
+
+        ChainBootstrap bootstrap = constructMetricsCollectTaskChain(job);
+        //todo context需要划分作用域
+        bootstrap.addContext(ContextKey.META_DATA, metaData)
+                .addContext(ContextKey.JOB, job)
+                .addContext(ContextKey.TIMEOUT, timeout)
+                .addListener(new CalculateFieldsListener(new 
UnitConverter(unitConvertList)))
+                .addListener(new ValidateResponseListener())
+                .addOnCompleteListener(new 
RemoveTimeoutMonitorListener(collectTaskTimeoutMonitor));
+
+        if (job.isCyclic()) {
+            bootstrap.withWorkerPool(workerPool)
+                    .addListener(new 
MetricsDataDeliveryListener(commonDataQueue))
+                    .onComplete(new RerunHandler(timerDispatch));
+        } else {
+            bootstrap.addListener(new ResponseJobDataListener(timerDispatch));
+        }
+
+
+        bootstrap.start();
+    }
+
+    private ChainBootstrap constructMetricsCollectTaskChain(Job job) {
+        long now = System.currentTimeMillis();
+        Map<Byte, List<Metrics>> currentCollectMetrics = 
job.getMetrics().stream()
+                .filter(metrics -> (now >= metrics.getCollectTime() + 
metrics.getInterval() * 1000L))
+                .peek(metric -> {
+                    metric.setCollectTime(now);
+                    // Determine whether to configure aliasFields If not, 
configure the default
+                    if ((metric.getAliasFields() == null || 
metric.getAliasFields().isEmpty()) && metric.getFields() != null) {
+                        
metric.setAliasFields(metric.getFields().stream().map(Metrics.Field::getField).collect(Collectors.toList()));
+                    }
+                    // Set the default metrics execution priority, if not 
filled, the default last priority
+                    if (metric.getPriority() == null) {
+                        metric.setPriority(Byte.MAX_VALUE);
+                    }
+                })
+                .collect(Collectors.groupingBy(Metrics::getPriority));
+
+        // the current collect metrics can not empty, if empty, add a default 
availability metrics
+        // due the metric collect is trigger by the previous metric collect
+        if (currentCollectMetrics.isEmpty()) {
+            Optional<Metrics> defaultMetricOption = job.getMetrics().stream()
+                    .filter(metric -> metric.getPriority() == 
CommonConstants.AVAILABLE_METRICS).findFirst();
+            if (defaultMetricOption.isPresent()) {
+                Metrics defaultMetric = defaultMetricOption.get();
+                defaultMetric.setCollectTime(now);
+                currentCollectMetrics.put(CommonConstants.AVAILABLE_METRICS, 
Collections.singletonList(defaultMetric));
             } else {
-                metricsTimeoutMonitorMap.put(job.getId() + "-" + 
metrics.getName(),
-                        new MetricsTime(System.currentTimeMillis(), metrics, 
timeout));
+                log.error("metrics must has one priority 0 metrics at least.");
             }
-        });
+        }
+
+        ChainBootstrap chainBootstrap = 
ChainBootstrap.withContext(DefaultContext.newInstance())
+                .withChain(new BatchExecuteTaskChain<Metrics>());
+        // order by priority
+        currentCollectMetrics.keySet().stream()
+                .sorted()
+                .forEach(priority -> {
+                    if (job.isCyclic() || 
isOneTimeJobAndIsAvailableMetrics(job, priority)) {
+                        List<Metrics> metricsList = 
currentCollectMetrics.get(priority);
+                        CollectMetricsDataHandler collectHandler = 
CollectMetricsDataHandler.builder()
+                                
.collectTaskTimeoutMonitor(collectTaskTimeoutMonitor)
+                                .build();
+                        collectHandler.setSourceDataList(metricsList);
+
+                        chainBootstrap.handler(collectHandler);
+                    }
+                });
+
+        return chainBootstrap;
+    }
+
+    private boolean isOneTimeJobAndIsAvailableMetrics(Job job, byte priority) {
+        return (!job.isCyclic()) && priority == 
CommonConstants.AVAILABLE_METRICS;
     }
 
+    @Deprecated
     @Override
     public void dispatchCollectData(Timeout timeout, Metrics metrics, 
CollectRep.MetricsData metricsData) {
         WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
         Job job = timerJob.getJob();
-        String monitorKey;
+        String monitorKey = job.getId() + "-" + metrics.getName();
         if (metrics.isHasSubTask()) {
-            monitorKey = job.getId() + "-" + metrics.getName() + "-sub-" + 
metrics.getSubTaskId();
-        } else {
-            monitorKey = job.getId() + "-" + metrics.getName();
+            monitorKey = monitorKey + "-sub-" + metrics.getSubTaskId();
         }
-        MetricsTime metricsTime = metricsTimeoutMonitorMap.remove(monitorKey);
+        CollectTaskTimeoutMonitor.MetricsTime metricsTime = 
this.collectTaskTimeoutMonitor.removeMetrics(monitorKey);
 
         // job completed metrics
         if (metricsTime != null && metricsCollector != null) {
@@ -238,31 +264,28 @@ public class CommonDispatcher implements 
MetricsTaskDispatch, CollectDataDispatc
                 return;
             }
         }
+
+
         Set<Metrics> metricsSet = job.getNextCollectMetrics(metrics, false);
         if (job.isCyclic()) {
-            if (log.isDebugEnabled()) {
-                log.debug("Cyclic Job: {} - {} - {}", job.getMonitorId(), 
job.getApp(), metricsData.getMetrics());
-                for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
-                    for (CollectRep.Field field : metricsData.getFields()) {
-                        log.debug("Field-->{},Value-->{}", field.getName(), 
valueRow.getColumns(metricsData.getFields().indexOf(field)));
-                    }
-                }
-            }
+            cyclicJobDebugLog(job, metricsData);
+
             // If metricsSet is null, it means that the execution is completed 
or whether the priority of the collection metrics is 0, that is, the 
availability collection metrics.
             // If the availability collection fails, the next metrics 
scheduling will be cancelled and the next round of scheduling will be entered 
directly.
             boolean isAvailableCollectFailed = metricsSet != null && 
!metricsSet.isEmpty()
-                    && metrics.getPriority() == (byte) 0 && 
metricsData.getCode() != CollectRep.Code.SUCCESS;
+                    && metrics.getPriority() == 
CommonConstants.AVAILABLE_METRICS && metricsData.getCode() != 
CollectRep.Code.SUCCESS;
             if (metricsSet == null || isAvailableCollectFailed || job.isSd()) {
                 // The collection and execution task of this job are completed.
                 // The periodic task pushes the task to the time wheel again.
                 // First, determine the execution time of the task and the 
task collection interval.
                 if (!timeout.isCancelled()) {
                     long spendTime = System.currentTimeMillis() - 
job.getDispatchTime();
-                    long interval = job.getInterval() - spendTime / 1000;
+                    long interval = job.getInterval() - spendTime / 1000L;
                     interval = interval <= 0 ? 0 : interval;
                     timerDispatch.cyclicJob(timerJob, interval, 
TimeUnit.SECONDS);
                 }
-            } else if (!metricsSet.isEmpty()) {
+            }
+            else if (!metricsSet.isEmpty()) {
                 // The execution of the current level metrics is completed, 
and the execution of the next level metrics starts
                 // use pre collect metrics data to replace next metrics config 
params
                 List<Map<String, Configmap>> configmapList = 
CollectUtil.getConfigmapFromPreCollectData(metricsData);
@@ -273,12 +296,15 @@ public class CommonDispatcher implements 
MetricsTaskDispatch, CollectDataDispatc
                     Set<String> cryPlaceholderFields = 
CollectUtil.matchCryPlaceholderField(GSON.toJsonTree(metricItem));
                     if (cryPlaceholderFields.isEmpty()) {
                         MetricsCollect metricsCollect = new 
MetricsCollect(metricItem, timeout, this,
-                                collectorIdentity, unitConvertList);
+                                metaData.getIdentity(), unitConvertList);
                         jobRequestQueue.addJob(metricsCollect);
-                        metricsTimeoutMonitorMap.put(job.getId() + "-" + 
metricItem.getName(),
-                                new MetricsTime(System.currentTimeMillis(), 
metricItem, timeout));
+
+                        this.collectTaskTimeoutMonitor.putMetrics(job.getId() 
+ "-" + metricItem.getName(),
+                                new 
CollectTaskTimeoutMonitor.MetricsTime(System.currentTimeMillis(), metricItem, 
timeout));
                         continue;
                     }
+
+
                     boolean isSubTask = configmapList.stream().anyMatch(map -> 
map.keySet().stream().anyMatch(cryPlaceholderFields::contains));
                     int subTaskNum = isSubTask ? 
Math.min(configmapList.size(), MAX_SUB_TASK_NUM) : 1;
                     AtomicInteger subTaskNumAtomic = new 
AtomicInteger(subTaskNum);
@@ -294,95 +320,30 @@ public class CommonDispatcher implements 
MetricsTaskDispatch, CollectDataDispatc
                         metric.setSubTaskId(index);
                         metric.setSubTaskDataRef(metricsDataReference);
                         MetricsCollect metricsCollect = new 
MetricsCollect(metric, timeout, this,
-                                collectorIdentity, unitConvertList);
+                                metaData.getIdentity(), unitConvertList);
                         jobRequestQueue.addJob(metricsCollect);
-                        metricsTimeoutMonitorMap.put(job.getId() + "-" + 
metric.getName() + "-sub-" + index,
-                                new MetricsTime(System.currentTimeMillis(), 
metric, timeout));
-                    }
 
-                }
-            } else {
-                // The list of metrics at the current execution level has not 
been fully executed.
-                // It needs to wait for the execution of other metrics task of 
the same level to complete the execution and enter the next level for execution.
-            }
-            // If it is an asynchronous periodic cyclic task, directly 
response the collected data
-            if (job.isSd()) {
-                CollectRep.MetricsData sdMetricsData = 
CollectRep.MetricsData.newBuilder(metricsData).build();
-                commonDataQueue.sendServiceDiscoveryData(sdMetricsData);
-            }
-            commonDataQueue.sendMetricsData(metricsData);
-        } else {
-            // If it is a temporary one-time task, you need to wait for the 
collected data of all metrics task to be packaged and returned.
-            // Insert the current metrics data into the job for unified 
assembly
-            job.addCollectMetricsData(metricsData);
-            if (log.isDebugEnabled()) {
-                log.debug("One-time Job: {}", metricsData.getMetrics());
-                for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
-                    for (CollectRep.Field field : metricsData.getFields()) {
-                        log.debug("Field-->{},Value-->{}", field.getName(), 
valueRow.getColumns(metricsData.getFields().indexOf(field)));
+                        this.collectTaskTimeoutMonitor.putMetrics(job.getId() 
+ "-" + metric.getName() + "-sub-" + index,
+                                new 
CollectTaskTimeoutMonitor.MetricsTime(System.currentTimeMillis(), metric, 
timeout));
                     }
-                }
-            }
 
-            if (job.isSd() || metricsSet == null) {
-                // The collection and execution of all metrics of this job are 
completed
-                // and the result listener is notified of the combination of 
all metrics data
-                timerDispatch.responseSyncJobData(job.getId(), 
job.getResponseDataTemp());
-            } else if (!metricsSet.isEmpty()) {
-                // The execution of the current level metrics is completed, 
and the execution of the next level metrics starts
-                metricsSet.forEach(metricItem -> {
-                    MetricsCollect metricsCollect = new 
MetricsCollect(metricItem, timeout, this,
-                            collectorIdentity, unitConvertList);
-                    jobRequestQueue.addJob(metricsCollect);
-                    metricsTimeoutMonitorMap.put(job.getId() + "-" + 
metricItem.getName(),
-                            new MetricsTime(System.currentTimeMillis(), 
metricItem, timeout));
-                });
-            } else {
-                // The list of metrics task at the current execution level has 
not been fully executed.
-                // It needs to wait for the execution of other metrics task of 
the same level to complete the execution and enter the next level for execution.
+                }
             }
         }
     }
 
-    @Override
-    public void dispatchCollectData(Timeout timeout, Metrics metrics, 
List<CollectRep.MetricsData> metricsDataList) {
-        WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
-        Job job = timerJob.getJob();
-        MetricsTime metricsTime = 
metricsTimeoutMonitorMap.remove(String.valueOf(job.getId()));
-        if (metricsTime != null && metricsCollector != null) {
-            long duration = System.currentTimeMillis() - 
metricsTime.getStartTime();
-            // For a list, we consider it a success if at least one item is 
successful.
-            boolean isSuccess = metricsDataList.stream().anyMatch(item -> 
item.getCode() == CollectRep.Code.SUCCESS);
-            metricsCollector.recordCollectMetrics(job, duration, isSuccess ? 
"success" : "fail");
-        }
-        if (job.isCyclic()) {
-            // The collection and execution of all task of this job are 
completed.
-            // The periodic task pushes the task to the time wheel again.
-            // First, determine the execution time of the task and the task 
collection interval.
-            if (!timeout.isCancelled()) {
-                long spendTime = System.currentTimeMillis() - 
job.getDispatchTime();
-                long interval = job.getInterval() - spendTime / 1000;
-                interval = interval <= 0 ? 0 : interval;
-                timerDispatch.cyclicJob(timerJob, interval, TimeUnit.SECONDS);
-            }
-            // it is an asynchronous periodic cyclic task, directly response 
the collected data
-            metricsDataList.forEach(commonDataQueue::sendMetricsData);
-        } else {
-            // The collection and execution of all metrics of this job are 
completed
-            // and the result listener is notified of the combination of all 
metrics data
-            timerDispatch.responseSyncJobData(job.getId(), metricsDataList);
+    private void cyclicJobDebugLog(Job job, CollectRep.MetricsData 
metricsData) {
+        if (log.isDebugEnabled()) {
+            log.debug("Cyclic Job: {} - {} - {}", job.getMonitorId(), 
job.getApp(), metricsData.getMetrics());
+            metricsDataDebugLog(metricsData);
         }
     }
 
-
-    /**
-     * Metrics times.
-     */
-    @Data
-    @AllArgsConstructor
-    protected static class MetricsTime {
-        private long startTime;
-        private Metrics metrics;
-        private Timeout timeout;
+    private void metricsDataDebugLog(CollectRep.MetricsData metricsData) {
+        for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
+            for (CollectRep.Field field : metricsData.getFields()) {
+                log.debug("Field-->{},Value-->{}", field.getName(), 
valueRow.getColumns(metricsData.getFields().indexOf(field)));
+            }
+        }
     }
 }
\ No newline at end of file
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
index 94b202259..c1c7822f7 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
@@ -129,7 +129,6 @@ public class MetricsCollect implements Runnable, 
Comparable<MetricsCollect> {
         this.newTime = System.currentTimeMillis();
         this.timeout = timeout;
         this.metrics = metrics;
-        this.collectorIdentity = collectorIdentity;
         WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
         Job job = timerJob.getJob();
         this.id = job.getMonitorId();
@@ -153,327 +152,9 @@ public class MetricsCollect implements Runnable, 
Comparable<MetricsCollect> {
 
     @Override
     public void run() {
-        this.startTime = System.currentTimeMillis();
-        setNewThreadName(id, app, startTime, metrics);
-        CollectRep.MetricsData.Builder response = 
CollectRep.MetricsData.newBuilder();
-        response.setApp(app).setId(id).setTenantId(tenantId)
-                
.setLabels(labels).setAnnotations(annotations).addMetadataAll(metadata);
-        // for prometheus auto or proxy mode
-        if 
(DispatchConstants.PROTOCOL_PROMETHEUS.equalsIgnoreCase(metrics.getProtocol())) 
{
-            List<CollectRep.MetricsData> metricsData;
-
-            // TODO: Refactor Prometheus metrics collection logic.
-            // The current implementation for proxy mode and auto mode needs 
review and potential simplification.
-            // Consider a more unified approach or clarify the conditions for 
each mode.
-            /*
-            // TODO USE PROXY MODE
-            if (prometheusProxyMode) {
-                List<CollectRep.MetricsData> proxyData = 
PrometheusProxyCollectImpl.getInstance().collect(response, metrics);
-                List<CollectRep.MetricsData> autoData = 
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
-                metricsData = new LinkedList<>();
-                if (proxyData != null) {
-                    metricsData.addAll(proxyData);
-                }
-                if (autoData != null) {
-                    metricsData.addAll(autoData);
-                }
-            } else {
-                metricsData = 
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
-            }
-            */
-            metricsData = 
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
-            validateResponse(metricsData == null ? null : 
metricsData.stream().findFirst().orElse(null));
-            collectDataDispatch.dispatchCollectData(timeout, metrics, 
metricsData);
-            return;
-        }
-        response.setMetrics(metrics.getName());
-        // According to the metrics collection protocol, application type, 
etc.,
-        // dispatch to the real application metrics collection implementation 
class
-        AbstractCollect abstractCollect = 
CollectStrategyFactory.invoke(metrics.getProtocol());
-        if (abstractCollect == null) {
-            log.error("[Dispatcher] - not support this: app: {}, metrics: {}, 
protocol: {}.",
-                    app, metrics.getName(), metrics.getProtocol());
-            response.setCode(CollectRep.Code.FAIL);
-            response.setMsg("not support " + app + ", "
-                    + metrics.getName() + ", " + metrics.getProtocol());
-        } else {
-            try {
-                abstractCollect.preCheck(metrics);
-                abstractCollect.collect(response, metrics);
-            } catch (Exception e) {
-                String msg = e.getMessage();
-                if (msg == null && e.getCause() != null) {
-                    msg = e.getCause().getMessage();
-                }
-                if (e instanceof IllegalArgumentException) {
-                    log.error("[Metrics PreCheck]: {}.", msg, e);
-                } else {
-                    log.error("[Metrics Collect]: {}.", msg, e);
-                }
-                response.setCode(CollectRep.Code.FAIL);
-                if (msg != null) {
-                    response.setMsg(msg);
-                }
-            }
-        }
-        // Alias attribute expression replacement calculation
-        if (fastFailed()) {
-            return;
-        }
-        calculateFields(metrics, response);
-        CollectRep.MetricsData metricsData = validateResponse(response);
-        collectDataDispatch.dispatchCollectData(timeout, metrics, metricsData);
-    }
-
-    /**
-     * Calculate the real metrics value according to the calculates and 
aliasFields configuration
-     *
-     * @param metrics     Metrics configuration
-     * @param collectData Data collection
-     */
-    public void calculateFields(Metrics metrics, 
CollectRep.MetricsData.Builder collectData) {
-        collectData.setPriority(metrics.getPriority());
-        List<CollectRep.Field> fieldList = new LinkedList<>();
-        for (Metrics.Field field : metrics.getFields()) {
-            CollectRep.Field.Builder fieldBuilder = 
CollectRep.Field.newBuilder();
-            
fieldBuilder.setName(field.getField()).setType(field.getType()).setLabel(field.isLabel());
-            if (field.getUnit() != null) {
-                fieldBuilder.setUnit(field.getUnit());
-            }
-            fieldList.add(fieldBuilder.build());
-        }
-        collectData.addAllFields(fieldList);
-        List<CollectRep.ValueRow> aliasRowList = collectData.getValuesList();
-        if (aliasRowList == null || aliasRowList.isEmpty()) {
-            return;
-        }
-        collectData.clearValues();
-        // Preprocess calculates first
-        if (metrics.getCalculates() == null) {
-            metrics.setCalculates(Collections.emptyList());
-        }
-        // eg: database_pages=Database pages unconventional mapping
-        Map<String, String> fieldAliasMap = new HashMap<>(8);
-        Map<String, JexlExpression> fieldExpressionMap = 
metrics.getCalculates()
-                .stream()
-                .map(cal -> transformCal(cal, fieldAliasMap))
-                .filter(Objects::nonNull)
-                .collect(Collectors.toMap(arr -> (String) arr[0], arr -> 
(JexlExpression) arr[1], (oldValue, newValue) -> newValue));
-
-        if (metrics.getUnits() == null) {
-            metrics.setUnits(Collections.emptyList());
-        }
-        Map<String, Pair<String, String>> fieldUnitMap = metrics.getUnits()
-                .stream()
-                .map(this::transformUnit)
-                .filter(Objects::nonNull)
-                .collect(Collectors.toMap(arr -> (String) arr[0], arr -> 
(Pair<String, String>) arr[1], (oldValue, newValue) -> newValue));
-
-        List<Metrics.Field> fields = metrics.getFields();
-        List<String> aliasFields = 
Optional.ofNullable(metrics.getAliasFields()).orElseGet(Collections::emptyList);
-        Map<String, String> aliasFieldValueMap = new HashMap<>(8);
-        Map<String, Object> fieldValueMap = new HashMap<>(8);
-        Map<String, Object> stringTypefieldValueMap = new HashMap<>(8);
-        Map<String, String> aliasFieldUnitMap = new HashMap<>(8);
-        CollectRep.ValueRow.Builder realValueRowBuilder = 
CollectRep.ValueRow.newBuilder();
-        for (CollectRep.ValueRow aliasRow : aliasRowList) {
-            for (int aliasIndex = 0; aliasIndex < aliasFields.size(); 
aliasIndex++) {
-                String aliasFieldValue = aliasRow.getColumns(aliasIndex);
-                String aliasField = aliasFields.get(aliasIndex);
-                if (!CommonConstants.NULL_VALUE.equals(aliasFieldValue)) {
-                    aliasFieldValueMap.put(aliasField, aliasFieldValue);
-                    // whether the alias field is a number
-                    CollectUtil.DoubleAndUnit doubleAndUnit = CollectUtil
-                            .extractDoubleAndUnitFromStr(aliasFieldValue);
-                    if (doubleAndUnit != null && doubleAndUnit.getValue() != 
null) {
-                        fieldValueMap.put(aliasField, 
doubleAndUnit.getValue());
-                        if (doubleAndUnit.getUnit() != null) {
-                            aliasFieldUnitMap.put(aliasField, 
doubleAndUnit.getUnit());
-                        }
-                    } else {
-                        fieldValueMap.put(aliasField, aliasFieldValue);
-                    }
-                    stringTypefieldValueMap.put(aliasField, aliasFieldValue);
-                } else {
-                    fieldValueMap.put(aliasField, null);
-                    stringTypefieldValueMap.put(aliasField, null);
-                }
-            }
-
-            for (Metrics.Field field : fields) {
-                String realField = field.getField();
-                JexlExpression expression = fieldExpressionMap.get(realField);
-                String value = null;
-                String aliasFieldUnit = null;
-                if (expression != null) {
-                    try {
-                        Map<String, Object> context;
-                        if (CommonConstants.TYPE_STRING == field.getType()) {
-                            context = stringTypefieldValueMap;
-                        } else {
-                            for (Map.Entry<String, String> unitEntry : 
aliasFieldUnitMap.entrySet()) {
-                                if 
(expression.getSourceText().contains(unitEntry.getKey())) {
-                                    aliasFieldUnit = unitEntry.getValue();
-                                    break;
-                                }
-                            }
-                            context = fieldValueMap;
-                        }
-
-                        // Also executed when valueList is empty, covering 
pure string assignment expressions
-                        Object objValue = 
JexlExpressionRunner.evaluate(expression, context);
-
-                        if (objValue != null) {
-                            value = String.valueOf(objValue);
-                        }
-                    } catch (Exception e) {
-                        log.warn("[calculates execute warning, use original 
value.] {}", e.getMessage());
-                        value = 
Optional.ofNullable(fieldValueMap.get(expression.getSourceText()))
-                                .map(String::valueOf)
-                                .orElse(null);
-                    }
-                } else {
-                    // does not exist then map the alias value
-                    String aliasField = fieldAliasMap.get(realField);
-                    if (aliasField != null) {
-                        value = aliasFieldValueMap.get(aliasField);
-                    } else {
-                        value = aliasFieldValueMap.get(realField);
-                    }
-
-                    if (value != null) {
-                        final byte fieldType = field.getType();
-                        if (fieldType == CommonConstants.TYPE_NUMBER) {
-                            CollectUtil.DoubleAndUnit doubleAndUnit = 
CollectUtil
-                                    .extractDoubleAndUnitFromStr(value);
-                            final Double tempValue = doubleAndUnit == null ? 
null : doubleAndUnit.getValue();
-                            value = tempValue == null ? null : 
String.valueOf(tempValue);
-                            aliasFieldUnit = doubleAndUnit == null ? null : 
doubleAndUnit.getUnit();
-                        } else if (fieldType == CommonConstants.TYPE_TIME) {
-                            final int tempValue;
-                            value = (tempValue = 
CommonUtil.parseTimeStrToSecond(value)) == -1 ? null : 
String.valueOf(tempValue);
-                        }
-                    }
-                }
-
-                Pair<String, String> unitPair = fieldUnitMap.get(realField);
-                if (aliasFieldUnit != null) {
-                    if (unitPair != null) {
-                        unitPair.setLeft(aliasFieldUnit);
-                    } else if (field.getUnit() != null && 
!aliasFieldUnit.equalsIgnoreCase(field.getUnit())) {
-                        unitPair = Pair.of(aliasFieldUnit, field.getUnit());
-                    }
-                }
-                if (value != null && unitPair != null) {
-                    for (UnitConvert unitConvert : unitConvertList) {
-                        if (unitConvert.checkUnit(unitPair.getLeft()) && 
unitConvert.checkUnit(unitPair.getRight())) {
-                            value = unitConvert.convert(value, 
unitPair.getLeft(), unitPair.getRight());
-                        }
-                    }
-                }
-                // Handle metrics values that may have units such as 34%, 
34Mb, and limit values to 4 decimal places
-                if (CommonConstants.TYPE_NUMBER == field.getType()) {
-                    value = CommonUtil.parseDoubleStr(value, field.getUnit());
-                }
-                if (value == null) {
-                    value = CommonConstants.NULL_VALUE;
-                }
-                realValueRowBuilder.addColumn(value);
-            }
-            aliasFieldValueMap.clear();
-            fieldValueMap.clear();
-            aliasFieldUnitMap.clear();
-            stringTypefieldValueMap.clear();
-            CollectRep.ValueRow realValueRow = realValueRowBuilder.build();
-            realValueRowBuilder.clear();
-            // apply filter calculation to the real value row
-            if (!CollectionUtils.isEmpty(metrics.getFilters())) {
-                Map<String, Object> contextMap = new HashMap<>(8);
-                for (int i = 0; i < fields.size(); i++) {
-                    Metrics.Field field = fields.get(i);
-                    String value = realValueRow.getColumns(i);
-                    contextMap.put(field.getField(), value);
-                }
-                boolean isMatch = false;
-                for (String filterExpr : metrics.getFilters()) {
-                    try {
-                        JexlExpression expression = 
JexlExpressionRunner.compile(filterExpr);
-                        if ((Boolean) 
JexlExpressionRunner.evaluate(expression, contextMap)) {
-                            isMatch = true;
-                            break;
-                        }
-                    } catch (Exception e) {
-                        log.warn("[metrics data row filters execute warning] 
{}.", e.getMessage());
-                    }
-                }
-                if (!isMatch) {
-                    // ignore this data row
-                    continue;
-                }
-            }
-            collectData.addValueRow(realValueRow);
-        }
+//        collectDataDispatch.dispatchCollectData(timeout, metrics, 
metricsData);
     }
 
-    /**
-     * @param cal           cal
-     * @param fieldAliasMap field alias map
-     * @return expr
-     */
-    private Object[] transformCal(String cal, Map<String, String> 
fieldAliasMap) {
-        int splitIndex = cal.indexOf("=");
-        if (splitIndex < 0) {
-            return null;
-        }
-        String field = cal.substring(0, splitIndex).trim();
-        String expressionStr = cal.substring(splitIndex + 
1).trim().replace("\\#", "#");
-        JexlExpression expression;
-        try {
-            expression = JexlExpressionRunner.compile(expressionStr);
-        } catch (Exception e) {
-            fieldAliasMap.put(field, expressionStr);
-            return null;
-        }
-        return new Object[]{field, expression};
-    }
-
-    /**
-     * transform unit
-     *
-     * @param unit unit
-     * @return units
-     */
-    private Object[] transformUnit(String unit) {
-        int equalIndex = unit.indexOf("=");
-        int arrowIndex = unit.indexOf("->");
-        if (equalIndex < 0 || arrowIndex < 0) {
-            return null;
-        }
-        String field = unit.substring(0, equalIndex).trim();
-        String originUnit = unit.substring(equalIndex + 1, arrowIndex).trim();
-        String newUnit = unit.substring(arrowIndex + 2).trim();
-        return new Object[]{field, Pair.of(originUnit, newUnit)};
-    }
-
-    private boolean fastFailed() {
-        return this.timeout == null || this.timeout.isCancelled();
-    }
-
-    private CollectRep.MetricsData 
validateResponse(CollectRep.MetricsData.Builder builder) {
-        long endTime = System.currentTimeMillis();
-        builder.setTime(endTime);
-        long runningTime = endTime - startTime;
-        long allTime = endTime - newTime;
-        if (startTime - newTime >= WARN_DISPATCH_TIME) {
-            log.warn("[Collector Dispatch Warn, Dispatch Use {}ms.", startTime 
- newTime);
-        }
-        if (builder.getCode() != CollectRep.Code.SUCCESS) {
-            log.info("[Collect Failed, Run {}ms, All {}ms] Reason: {}", 
runningTime, allTime, builder.getMsg());
-        } else {
-            log.info("[Collect Success, Run {}ms, All {}ms].", runningTime, 
allTime);
-        }
-        return builder.build();
-    }
 
     private void validateResponse(CollectRep.MetricsData metricsData) {
         if (metricsData == null) {
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataHandler.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataHandler.java
new file mode 100644
index 000000000..f3cca9d8d
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataHandler.java
@@ -0,0 +1,110 @@
+package org.apache.hertzbeat.collector.handler;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.collect.AbstractCollect;
+import org.apache.hertzbeat.collector.collect.strategy.CollectStrategyFactory;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.constants.ContextStatus;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.dispatch.CollectTaskTimeoutMonitor;
+import 
org.apache.hertzbeat.collector.handler.impl.AbstractBatchDataBoundHandler;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.timer.Timeout;
+
+
+@Data
+@Slf4j
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
+public class CollectMetricsDataHandler extends 
AbstractBatchDataBoundHandler<Metrics, CollectRep.MetricsData.Builder> {
+    private CollectTaskTimeoutMonitor collectTaskTimeoutMonitor;
+
+    @Override
+    public CollectRep.MetricsData.Builder executeWithResponse(Context context, 
Metrics data) {
+        // preset start info
+        context.put(ContextKey.METRICS, data);
+        Job job = context.get(ContextKey.JOB);
+        long startTime = context.get(ContextKey.METRICS_COLLECT_START_TIME);
+        setNewThreadName(job.getMonitorId(), job.getApp(), startTime, data);
+
+
+        Timeout timeout = context.get(ContextKey.TIMEOUT);
+        String key = data.getPrometheus() != null ? 
String.valueOf(job.getId()) : job.getId() + "-" + data.getName();
+        context.put(ContextKey.METRICS_KEY, key);
+        this.collectTaskTimeoutMonitor.putMetrics(key, new 
CollectTaskTimeoutMonitor.MetricsTime(startTime, data, timeout));
+
+
+        CollectRep.MetricsData.Builder fetchedData = this.fetchData(job, data);
+        if (fetchedData.getCode() != CollectRep.Code.SUCCESS && 
CommonConstants.AVAILABLE_METRICS == data.getPriority()) {
+            context.setStatus(ContextStatus.TRUNCATE_HANDLER);
+        }
+        return fetchedData;
+    }
+
+    private void setNewThreadName(long monitorId, String app, long startTime, 
Metrics metrics) {
+        String builder = monitorId + "-" + app + "-" + metrics.getName() + "-" 
+ String.valueOf(startTime).substring(9);
+        Thread.currentThread().setName(builder);
+    }
+
+    private CollectRep.MetricsData.Builder fetchData(Job job, Metrics metrics) 
{
+        CollectRep.MetricsData.Builder response = 
CollectRep.MetricsData.newBuilder();
+        response.setApp(job.getApp())
+                .setId(job.getMonitorId())
+                .setTenantId(job.getTenantId())
+                .setLabels(job.getLabels())
+                .setAnnotations(job.getAnnotations())
+                .addMetadataAll(job.getMetadata());
+
+        //todo transcribe Prometheus to different chain
+        // for prometheus auto or proxy mode
+//        if 
(DispatchConstants.PROTOCOL_PROMETHEUS.equalsIgnoreCase(metrics.getProtocol())) 
{
+//            List<CollectRep.MetricsData> metricsData = 
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
+//            validateResponse(metricsData == null ? null : 
metricsData.stream().findFirst().orElse(null));
+//            collectDataDispatch.dispatchCollectData(timeout, metrics, 
metricsData);
+//            return null;
+//        }
+
+        response.setMetrics(metrics.getName());
+        // According to the metrics collection protocol, application type, 
etc.,
+        // dispatch to the real application metrics collection implementation 
class
+        AbstractCollect abstractCollect = 
CollectStrategyFactory.invoke(metrics.getProtocol());
+        if (abstractCollect == null) {
+            log.error("[Dispatcher] - not support this: app: {}, metrics: {}, 
protocol: {}.", job.getApp(), metrics.getName(), metrics.getProtocol());
+            response.setCode(CollectRep.Code.FAIL);
+            response.setMsg("not support " + job.getApp() + ", " + 
metrics.getName() + ", " + metrics.getProtocol());
+
+            return response;
+        }
+
+        try {
+            abstractCollect.preCheck(metrics);
+            abstractCollect.collect(response, metrics);
+        } catch (Exception e) {
+            String msg = e.getMessage();
+            if (msg == null && e.getCause() != null) {
+                msg = e.getCause().getMessage();
+            }
+            if (e instanceof IllegalArgumentException) {
+                log.error("[Metrics PreCheck]: {}.", msg, e);
+            } else {
+                log.error("[Metrics Collect]: {}.", msg, e);
+            }
+            response.setCode(CollectRep.Code.FAIL);
+            if (msg != null) {
+                response.setMsg(msg);
+            }
+        }
+
+        return response;
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataHandler.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataHandler.java
new file mode 100644
index 000000000..74cbea66c
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataHandler.java
@@ -0,0 +1,17 @@
+package org.apache.hertzbeat.collector.handler;
+
+import org.apache.hertzbeat.collector.context.Context;
+import 
org.apache.hertzbeat.collector.handler.impl.AbstractBatchDataBoundHandler;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+
+/**
+ *
+ */
+public class DynamicSubTaskCollectMetricsDataHandler extends 
AbstractBatchDataBoundHandler<Metrics, CollectRep.MetricsData.Builder> {
+    @Override
+    public CollectRep.MetricsData.Builder executeWithResponse(Context context, 
Metrics data) {
+        //todo 动态拆分
+        return null;
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
similarity index 54%
copy from 
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
copy to 
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
index 94b202259..0a2a8b333 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/MetricsCollect.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
@@ -1,34 +1,15 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+package org.apache.hertzbeat.collector.listener;
 
-package org.apache.hertzbeat.collector.dispatch;
-
-import lombok.Data;
+import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.jexl3.JexlExpression;
-import org.apache.hertzbeat.collector.collect.AbstractCollect;
-import 
org.apache.hertzbeat.collector.collect.prometheus.PrometheusAutoCollectImpl;
-import org.apache.hertzbeat.collector.collect.strategy.CollectStrategyFactory;
-import org.apache.hertzbeat.common.timer.Timeout;
-import org.apache.hertzbeat.collector.timer.WheelTimerTask;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
 import org.apache.hertzbeat.collector.dispatch.unit.UnitConvert;
+import org.apache.hertzbeat.collector.dispatch.unit.UnitConverter;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
 import org.apache.hertzbeat.collector.util.CollectUtil;
 import org.apache.hertzbeat.common.constants.CommonConstants;
-import org.apache.hertzbeat.common.entity.job.Job;
 import org.apache.hertzbeat.common.entity.job.Metrics;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 import org.apache.hertzbeat.common.util.CommonUtil;
@@ -46,183 +27,18 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 /**
- * metrics collection
+ *
  */
 @Slf4j
-@Data
-public class MetricsCollect implements Runnable, Comparable<MetricsCollect> {
-    /**
-     * Scheduling alarm threshold time 100ms
-     */
-    private static final long WARN_DISPATCH_TIME = 100;
-    /**
-     * collector identity
-     */
-    protected String collectorIdentity;
-    /**
-     * tenant id
-     */
-    protected long tenantId;
-    /**
-     * task id
-     */
-    protected long id;
-    /**
-     * app type name
-     */
-    protected String app;
-    /**
-     * metrics configuration
-     */
-    protected Metrics metrics;
-    /**
-     * metadata
-     */
-    protected Map<String, String> metadata;
-    /**
-     * labels
-     */
-    protected Map<String, String> labels;
-    /**
-     * annotations
-     */
-    protected Map<String, String> annotations;
-    /**
-     * time wheel timeout
-     */
-    protected Timeout timeout;
-    /**
-     * Task and Data Scheduling
-     */
-    protected CollectDataDispatch collectDataDispatch;
-    /**
-     * task execution priority
-     */
-    protected byte runPriority;
-    /**
-     * Periodic collection or one-time collection true-periodic false-one-time
-     */
-    protected boolean isCyclic;
-    /**
-     * Time for creating collection task
-     */
-    protected long newTime;
-    /**
-     * Start time of the collection task
-     */
-    protected long startTime;
-    /**
-     * Whether it is a service discovery job, true is yes, false is no
-     */
-    protected boolean isSd;
-    /**
-     * Whether to use the Prometheus proxy
-     */
-    protected boolean prometheusProxyMode;
-
-    protected List<UnitConvert> unitConvertList;
-
-    public MetricsCollect(Metrics metrics, Timeout timeout,
-                          CollectDataDispatch collectDataDispatch,
-                          String collectorIdentity,
-                          List<UnitConvert> unitConvertList) {
-        this.newTime = System.currentTimeMillis();
-        this.timeout = timeout;
-        this.metrics = metrics;
-        this.collectorIdentity = collectorIdentity;
-        WheelTimerTask timerJob = (WheelTimerTask) timeout.task();
-        Job job = timerJob.getJob();
-        this.id = job.getMonitorId();
-        this.tenantId = job.getTenantId();
-        this.app = job.getApp();
-        this.metadata = job.getMetadata();
-        this.labels = job.getLabels();
-        this.annotations = job.getAnnotations();
-        this.collectDataDispatch = collectDataDispatch;
-        this.isCyclic = job.isCyclic();
-        this.isSd = job.isSd();
-        this.prometheusProxyMode = job.isPrometheusProxyMode();
-        this.unitConvertList = unitConvertList;
-        // Temporary one-time tasks are executed with high priority
-        if (isCyclic) {
-            runPriority = (byte) -1;
-        } else {
-            runPriority = (byte) 1;
-        }
-    }
+@AllArgsConstructor
+public class CalculateFieldsListener implements 
ContextBoundListener<CollectRep.MetricsData.Builder> {
+    private UnitConverter unitConverter;
 
     @Override
-    public void run() {
-        this.startTime = System.currentTimeMillis();
-        setNewThreadName(id, app, startTime, metrics);
-        CollectRep.MetricsData.Builder response = 
CollectRep.MetricsData.newBuilder();
-        response.setApp(app).setId(id).setTenantId(tenantId)
-                
.setLabels(labels).setAnnotations(annotations).addMetadataAll(metadata);
-        // for prometheus auto or proxy mode
-        if 
(DispatchConstants.PROTOCOL_PROMETHEUS.equalsIgnoreCase(metrics.getProtocol())) 
{
-            List<CollectRep.MetricsData> metricsData;
+    public void execute(Context context, CollectRep.MetricsData.Builder data) {
+        Metrics metrics = context.get(ContextKey.METRICS);
 
-            // TODO: Refactor Prometheus metrics collection logic.
-            // The current implementation for proxy mode and auto mode needs 
review and potential simplification.
-            // Consider a more unified approach or clarify the conditions for 
each mode.
-            /*
-            // TODO USE PROXY MODE
-            if (prometheusProxyMode) {
-                List<CollectRep.MetricsData> proxyData = 
PrometheusProxyCollectImpl.getInstance().collect(response, metrics);
-                List<CollectRep.MetricsData> autoData = 
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
-                metricsData = new LinkedList<>();
-                if (proxyData != null) {
-                    metricsData.addAll(proxyData);
-                }
-                if (autoData != null) {
-                    metricsData.addAll(autoData);
-                }
-            } else {
-                metricsData = 
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
-            }
-            */
-            metricsData = 
PrometheusAutoCollectImpl.getInstance().collect(response, metrics);
-            validateResponse(metricsData == null ? null : 
metricsData.stream().findFirst().orElse(null));
-            collectDataDispatch.dispatchCollectData(timeout, metrics, 
metricsData);
-            return;
-        }
-        response.setMetrics(metrics.getName());
-        // According to the metrics collection protocol, application type, 
etc.,
-        // dispatch to the real application metrics collection implementation 
class
-        AbstractCollect abstractCollect = 
CollectStrategyFactory.invoke(metrics.getProtocol());
-        if (abstractCollect == null) {
-            log.error("[Dispatcher] - not support this: app: {}, metrics: {}, 
protocol: {}.",
-                    app, metrics.getName(), metrics.getProtocol());
-            response.setCode(CollectRep.Code.FAIL);
-            response.setMsg("not support " + app + ", "
-                    + metrics.getName() + ", " + metrics.getProtocol());
-        } else {
-            try {
-                abstractCollect.preCheck(metrics);
-                abstractCollect.collect(response, metrics);
-            } catch (Exception e) {
-                String msg = e.getMessage();
-                if (msg == null && e.getCause() != null) {
-                    msg = e.getCause().getMessage();
-                }
-                if (e instanceof IllegalArgumentException) {
-                    log.error("[Metrics PreCheck]: {}.", msg, e);
-                } else {
-                    log.error("[Metrics Collect]: {}.", msg, e);
-                }
-                response.setCode(CollectRep.Code.FAIL);
-                if (msg != null) {
-                    response.setMsg(msg);
-                }
-            }
-        }
-        // Alias attribute expression replacement calculation
-        if (fastFailed()) {
-            return;
-        }
-        calculateFields(metrics, response);
-        CollectRep.MetricsData metricsData = validateResponse(response);
-        collectDataDispatch.dispatchCollectData(timeout, metrics, metricsData);
+        this.calculateFields(metrics, unitConverter.getUnitConvertList(), 
data);
     }
 
     /**
@@ -231,7 +47,7 @@ public class MetricsCollect implements Runnable, 
Comparable<MetricsCollect> {
      * @param metrics     Metrics configuration
      * @param collectData Data collection
      */
-    public void calculateFields(Metrics metrics, 
CollectRep.MetricsData.Builder collectData) {
+    private void calculateFields(Metrics metrics, List<UnitConvert> 
unitConvertList, CollectRep.MetricsData.Builder collectData) {
         collectData.setPriority(metrics.getPriority());
         List<CollectRep.Field> fieldList = new LinkedList<>();
         for (Metrics.Field field : metrics.getFields()) {
@@ -454,53 +270,4 @@ public class MetricsCollect implements Runnable, 
Comparable<MetricsCollect> {
         String newUnit = unit.substring(arrowIndex + 2).trim();
         return new Object[]{field, Pair.of(originUnit, newUnit)};
     }
-
-    private boolean fastFailed() {
-        return this.timeout == null || this.timeout.isCancelled();
-    }
-
-    private CollectRep.MetricsData 
validateResponse(CollectRep.MetricsData.Builder builder) {
-        long endTime = System.currentTimeMillis();
-        builder.setTime(endTime);
-        long runningTime = endTime - startTime;
-        long allTime = endTime - newTime;
-        if (startTime - newTime >= WARN_DISPATCH_TIME) {
-            log.warn("[Collector Dispatch Warn, Dispatch Use {}ms.", startTime 
- newTime);
-        }
-        if (builder.getCode() != CollectRep.Code.SUCCESS) {
-            log.info("[Collect Failed, Run {}ms, All {}ms] Reason: {}", 
runningTime, allTime, builder.getMsg());
-        } else {
-            log.info("[Collect Success, Run {}ms, All {}ms].", runningTime, 
allTime);
-        }
-        return builder.build();
-    }
-
-    private void validateResponse(CollectRep.MetricsData metricsData) {
-        if (metricsData == null) {
-            log.error("[Collect Failed] Response metrics data is null.");
-            return;
-        }
-        long endTime = System.currentTimeMillis();
-        long runningTime = endTime - startTime;
-        long allTime = endTime - newTime;
-        if (startTime - newTime >= WARN_DISPATCH_TIME) {
-            log.warn("[Collector Dispatch Warn, Dispatch Use {}ms.", startTime 
- newTime);
-        }
-        if (metricsData.getCode() != CollectRep.Code.SUCCESS) {
-            log.info("[Collect Failed, Run {}ms, All {}ms] Reason: {}", 
runningTime, allTime, metricsData.getMsg());
-        } else {
-            log.info("[Collect Success, Run {}ms, All {}ms].", runningTime, 
allTime);
-        }
-    }
-
-    private void setNewThreadName(long monitorId, String app, long startTime, 
Metrics metrics) {
-        String builder = monitorId + "-" + app + "-" + metrics.getName()
-                + "-" + String.valueOf(startTime).substring(9);
-        Thread.currentThread().setName(builder);
-    }
-
-    @Override
-    public int compareTo(MetricsCollect collect) {
-        return collect.runPriority - this.runPriority;
-    }
 }
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CommonMetricsDataListener.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CommonMetricsDataListener.java
new file mode 100644
index 000000000..fb07fea97
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CommonMetricsDataListener.java
@@ -0,0 +1,21 @@
+package org.apache.hertzbeat.collector.listener;
+
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.constants.ContextStatus;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
+import org.apache.hertzbeat.common.timer.Timeout;
+
+/**
+ *
+ */
+public class CommonMetricsDataListener<T> implements ContextBoundListener<T> {
+    @Override
+    public void execute(Context context, T data) {
+        Timeout timeout = context.get(ContextKey.TIMEOUT);
+
+        if (timeout == null || timeout.isCancelled()) {
+            context.setStatus(ContextStatus.STOP);
+        }
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/MetricsDataDeliveryListener.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/MetricsDataDeliveryListener.java
new file mode 100644
index 000000000..6da70fcef
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/MetricsDataDeliveryListener.java
@@ -0,0 +1,52 @@
+package org.apache.hertzbeat.collector.listener;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+import org.apache.hertzbeat.common.queue.CommonDataQueue;
+
+/**
+ * 周期任务专用
+ */
+@Slf4j
+@AllArgsConstructor
+public class MetricsDataDeliveryListener implements 
ContextBoundListener<CollectRep.MetricsData.Builder> {
+    private CommonDataQueue commonDataQueue;
+
+    @Override
+    public void execute(Context context, CollectRep.MetricsData.Builder data) {
+        Job job = context.get(ContextKey.JOB);
+        CollectRep.MetricsData metricsData = data.build();
+
+        cyclicJobDebugLog(job, metricsData);
+
+        sendToQueue(job, metricsData);
+    }
+
+    private void sendToQueue(Job job, CollectRep.MetricsData metricsData) {
+        if (job.isSd()) {
+            CollectRep.MetricsData sdMetricsData = 
CollectRep.MetricsData.newBuilder(metricsData).build();
+            commonDataQueue.sendServiceDiscoveryData(sdMetricsData);
+        }
+        commonDataQueue.sendMetricsData(metricsData);
+    }
+
+    private void cyclicJobDebugLog(Job job, CollectRep.MetricsData 
metricsData) {
+        if (log.isDebugEnabled()) {
+            log.debug("Cyclic Job: {} - {} - {}", job.getMonitorId(), 
job.getApp(), metricsData.getMetrics());
+            metricsDataDebugLog(metricsData);
+        }
+    }
+
+    private void metricsDataDebugLog(CollectRep.MetricsData metricsData) {
+        for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
+            for (CollectRep.Field field : metricsData.getFields()) {
+                log.debug("Field-->{},Value-->{}", field.getName(), 
valueRow.getColumns(metricsData.getFields().indexOf(field)));
+            }
+        }
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RemoveTimeoutMonitorListener.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RemoveTimeoutMonitorListener.java
new file mode 100644
index 000000000..516d6798b
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RemoveTimeoutMonitorListener.java
@@ -0,0 +1,26 @@
+package org.apache.hertzbeat.collector.listener;
+
+import lombok.AllArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.dispatch.CollectTaskTimeoutMonitor;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
+
+/**
+ *
+ */
+@AllArgsConstructor
+public class RemoveTimeoutMonitorListener implements 
ContextBoundListener<Object> {
+    private CollectTaskTimeoutMonitor collectTaskTimeoutMonitor;
+
+    @Override
+    public void execute(Context context, Object data) {
+        String metricsKey = context.get(ContextKey.METRICS_KEY);
+        if (StringUtils.isBlank(metricsKey)) {
+            return;
+        }
+
+        collectTaskTimeoutMonitor.removeMetrics(metricsKey);
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunHandler.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunHandler.java
new file mode 100644
index 000000000..750f44476
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunHandler.java
@@ -0,0 +1,38 @@
+package org.apache.hertzbeat.collector.listener;
+
+import lombok.AllArgsConstructor;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.timer.TimerDispatch;
+import org.apache.hertzbeat.collector.timer.WheelTimerTask;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.timer.Timeout;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 周期任务专用
+ */
+@AllArgsConstructor
+public class RerunHandler implements ContextBoundHandler<Object> {
+    private TimerDispatch timerDispatch;
+
+    @Override
+    public void execute(Context context, Object data) {
+        Job job = context.get(ContextKey.JOB);
+        Timeout timeout = context.get(ContextKey.TIMEOUT);
+
+        if (!timeout.isCancelled()) {
+            long spendTime = System.currentTimeMillis() - 
job.getDispatchTime();
+            long interval = job.getInterval() - spendTime / 1000L;
+            interval = interval <= 0 ? 0 : interval;
+            timerDispatch.cyclicJob((WheelTimerTask) timeout.task(), interval, 
TimeUnit.SECONDS);
+        }
+    }
+
+    @Override
+    public void whenException(Context context, Object data, Throwable 
throwable) {
+
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/ResponseJobDataListener.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/ResponseJobDataListener.java
new file mode 100644
index 000000000..6e0fe261d
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/ResponseJobDataListener.java
@@ -0,0 +1,44 @@
+package org.apache.hertzbeat.collector.listener;
+
+import com.google.common.collect.Lists;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
+import org.apache.hertzbeat.collector.timer.TimerDispatch;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+
+/**
+ * 一次性任务专用
+ */
+@Slf4j
+@AllArgsConstructor
+public class ResponseJobDataListener implements 
ContextBoundListener<CollectRep.MetricsData.Builder> {
+    private TimerDispatch timerDispatch;
+
+    @Override
+    public void execute(Context context, CollectRep.MetricsData.Builder data) {
+        Job job = context.get(ContextKey.JOB);
+        CollectRep.MetricsData metricsData = data.build();
+
+        oneTimeJobDebugLog(metricsData);
+        timerDispatch.responseSyncJobData(job.getId(), 
Lists.newArrayList(metricsData));
+    }
+
+    private void oneTimeJobDebugLog(CollectRep.MetricsData metricsData) {
+        if (log.isDebugEnabled()) {
+            log.debug("One-time Job: {}", metricsData.getMetrics());
+            metricsDataDebugLog(metricsData);
+        }
+    }
+
+    private void metricsDataDebugLog(CollectRep.MetricsData metricsData) {
+        for (CollectRep.ValueRow valueRow : metricsData.getValues()) {
+            for (CollectRep.Field field : metricsData.getFields()) {
+                log.debug("Field-->{},Value-->{}", field.getName(), 
valueRow.getColumns(metricsData.getFields().indexOf(field)));
+            }
+        }
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/ValidateResponseListener.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/ValidateResponseListener.java
new file mode 100644
index 000000000..3fa8acda2
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/ValidateResponseListener.java
@@ -0,0 +1,36 @@
+package org.apache.hertzbeat.collector.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.entity.message.CollectRep;
+
+/**
+ *
+ * @author Calvin
+ * @date 9/1/2025
+ */
+@Slf4j
+public class ValidateResponseListener implements 
ContextBoundListener<CollectRep.MetricsData.Builder> {
+
+    @Override
+    public void execute(Context context, CollectRep.MetricsData.Builder data) {
+        long startTime = context.get(ContextKey.METRICS_COLLECT_START_TIME);
+        Metrics metrics = context.get(ContextKey.METRICS);
+
+        this.validateResponse(startTime, metrics, data);
+    }
+
+    private void validateResponse(long startTime, Metrics metrics, 
CollectRep.MetricsData.Builder builder) {
+        long endTime = System.currentTimeMillis();
+        builder.setTime(endTime);
+        long allTime = endTime - startTime;
+        if (builder.getCode() != CollectRep.Code.SUCCESS) {
+            log.info("[Metrics: {}][Collect Failed, Run {}ms] Reason: {}", 
metrics.getName(), allTime, builder.getMsg());
+        } else {
+            log.info("[Metrics: {}][Collect Success, Run {}ms].", 
metrics.getName(), allTime);
+        }
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/ContextKey.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/ContextKey.java
new file mode 100644
index 000000000..e142c5d08
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/ContextKey.java
@@ -0,0 +1,24 @@
+package org.apache.hertzbeat.collector.constants;
+
+import lombok.Getter;
+import org.apache.hertzbeat.common.entity.collector.CollectorMetaData;
+import org.apache.hertzbeat.common.entity.job.Job;
+import org.apache.hertzbeat.common.entity.job.Metrics;
+import org.apache.hertzbeat.common.timer.Timeout;
+
+public enum ContextKey {
+    META_DATA(CollectorMetaData.class),
+    JOB(Job.class),
+    TIMEOUT(Timeout.class),
+    METRICS_COLLECT_START_TIME(Long.class),
+    METRICS(Metrics.class),
+    METRICS_KEY(String.class),
+    ;
+
+    @Getter
+    private final Class<?> clazz;
+
+    ContextKey(Class<?> clazz) {
+        this.clazz = clazz;
+    }
+}
\ No newline at end of file
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/ContextStatus.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/ContextStatus.java
new file mode 100644
index 000000000..c1e5106aa
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/ContextStatus.java
@@ -0,0 +1,8 @@
+package org.apache.hertzbeat.collector.constants;
+
+/**
+ *
+ */
+public enum ContextStatus {
+    WAITING, RUNNING, STOP, TRUNCATE_HANDLER
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/HandlerType.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/HandlerType.java
new file mode 100644
index 000000000..3e774399d
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/constants/HandlerType.java
@@ -0,0 +1,8 @@
+package org.apache.hertzbeat.collector.constants;
+
+/**
+ *
+ */
+public enum HandlerType {
+    NORMAL, ON_COMPLETE
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/AbstractInmemoryContext.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/AbstractInmemoryContext.java
new file mode 100644
index 000000000..38144df32
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/AbstractInmemoryContext.java
@@ -0,0 +1,56 @@
+package org.apache.hertzbeat.collector.context;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.hertzbeat.collector.constants.ContextStatus;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ *
+ */
+public abstract class AbstractInmemoryContext implements Context {
+    protected final AtomicReference<ContextStatus> contextStatus = new 
AtomicReference<>(ContextStatus.WAITING);
+    protected final Map<Object, Object> map = new ConcurrentHashMap<>();
+    @Getter
+    @Setter
+    private Throwable error;
+
+
+    @Override
+    public <T> void put(Object key, T value) {
+        this.map.put(key, value);
+    }
+
+    @Override
+    public <T> T remove(Object key) {
+        return (T) this.map.remove(key);
+    }
+
+    @Override
+    public <T> T get(Object key) {
+        return (T) this.map.get(key);
+    }
+
+    @Override
+    public <T> T getOrDefault(Object key, T defaultValue) {
+        return (T) this.map.getOrDefault(key, defaultValue);
+    }
+
+    @Override
+    public boolean hasKey(Object key) {
+        return this.map.containsKey(key);
+    }
+
+    @Override
+    public ContextStatus getStatus() {
+        return contextStatus.get();
+    }
+
+    @Override
+    public void setStatus(ContextStatus status) {
+        contextStatus.set(status);
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/Context.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/Context.java
new file mode 100644
index 000000000..9dbbf5df6
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/Context.java
@@ -0,0 +1,12 @@
+package org.apache.hertzbeat.collector.context;
+
+import org.apache.hertzbeat.collector.constants.ContextStatus;
+
+/**
+ * 只维护与上下文的元数据
+ */
+public interface Context extends ContextView, ContextOperation {
+    ContextStatus getStatus();
+
+    void setStatus(ContextStatus status);
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ContextOperation.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ContextOperation.java
new file mode 100644
index 000000000..58685acbb
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ContextOperation.java
@@ -0,0 +1,17 @@
+package org.apache.hertzbeat.collector.context;
+
+import io.micrometer.common.lang.Nullable;
+
+/**
+ *
+ */
+public interface ContextOperation {
+    <T> void put(Object key, T value);
+
+    <T> T remove(Object key);
+
+    @Nullable
+    Throwable getError();
+
+    void setError(Throwable error);
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ContextView.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ContextView.java
new file mode 100644
index 000000000..92a2dea66
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ContextView.java
@@ -0,0 +1,12 @@
+package org.apache.hertzbeat.collector.context;
+
+/**
+ * 对上下文内容的查询操作
+ */
+public interface ContextView {
+    <T> T get(Object key);
+
+    <T> T getOrDefault(Object key, T defaultValue);
+
+    boolean hasKey(Object key);
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ExceptionStrategy.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ExceptionStrategy.java
new file mode 100644
index 000000000..9579eeb11
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/ExceptionStrategy.java
@@ -0,0 +1,7 @@
+package org.apache.hertzbeat.collector.context;
+
+/**
+ *
+ */
+public interface ExceptionStrategy {
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/impl/DefaultContext.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/impl/DefaultContext.java
new file mode 100644
index 000000000..c8b62f6ec
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/context/impl/DefaultContext.java
@@ -0,0 +1,15 @@
+package org.apache.hertzbeat.collector.context.impl;
+
+import org.apache.hertzbeat.collector.context.AbstractInmemoryContext;
+
+/**
+ *
+ */
+public class DefaultContext extends AbstractInmemoryContext {
+    private DefaultContext() {
+    }
+
+    public static DefaultContext newInstance() {
+        return new DefaultContext();
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectDataDispatch.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectDataDispatch.java
index f4f4e14e0..7c2e1f41d 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectDataDispatch.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/CollectDataDispatch.java
@@ -36,13 +36,4 @@ public interface CollectDataDispatch {
      */
     void dispatchCollectData(Timeout timeout, Metrics metrics, 
CollectRep.MetricsData metricsData);
 
-    /**
-     * Processing and distributing collection result data
-     *
-     * @param timeout     time wheel timeout        
-     * @param metrics     The following metrics collection tasks    
-     * @param metricsDataList Collect result data       
-     */
-    void dispatchCollectData(Timeout timeout, Metrics metrics, 
List<CollectRep.MetricsData> metricsDataList);
-
 }
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/unit/UnitConverter.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/unit/UnitConverter.java
new file mode 100644
index 000000000..16cb38ba0
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/unit/UnitConverter.java
@@ -0,0 +1,15 @@
+package org.apache.hertzbeat.collector.dispatch.unit;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ *
+ */
+@Data
+@AllArgsConstructor
+public class UnitConverter {
+    private List<UnitConvert> unitConvertList;
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
new file mode 100644
index 000000000..eed03a450
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
@@ -0,0 +1,96 @@
+package org.apache.hertzbeat.collector.handler;
+
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hertzbeat.collector.constants.HandlerType;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.dispatch.WorkerPool;
+import 
org.apache.hertzbeat.collector.handler.impl.AbstractListenerBoundHandler;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+@Slf4j
+public class ChainBootstrap {
+    @Setter
+    private Context context;
+    private TaskChain<?> taskChain;
+    private WorkerPool workerPool;
+    private final List<ContextBoundHandler> contextBoundHandlerList = new 
ArrayList<>();
+    private final List<ContextBoundHandler> onCompleteContextBoundHandlerList 
= new ArrayList<>();
+    private final List<ContextBoundListener> dataListenerList = new 
ArrayList<>();
+    private final List<ContextBoundListener> onCompleteListenerList = new 
ArrayList<>();
+
+    public static ChainBootstrap withContext(Context context) {
+        ChainBootstrap bootstrap = new ChainBootstrap();
+        bootstrap.setContext(context);
+        return bootstrap;
+    }
+
+    public ChainBootstrap withChain(TaskChain<?> taskChain) {
+        this.taskChain = taskChain;
+        return this;
+    }
+
+    public ChainBootstrap withWorkerPool(WorkerPool workerPool) {
+        this.workerPool = workerPool;
+        return this;
+    }
+
+    public <T> ChainBootstrap addContext(Object key, T value) {
+        context.put(key, value);
+        return this;
+    }
+
+    public ChainBootstrap handler(ContextBoundHandler contextBoundHandler) {
+        contextBoundHandlerList.add(contextBoundHandler);
+        return this;
+    }
+
+    public ChainBootstrap onComplete(ContextBoundHandler contextBoundHandler) {
+        onCompleteContextBoundHandlerList.add(contextBoundHandler);
+        return this;
+    }
+
+    public ChainBootstrap addListener(ContextBoundListener dataListener) {
+        dataListenerList.add(dataListener);
+        return this;
+    }
+
+    public ChainBootstrap addOnCompleteListener(ContextBoundListener 
dataListener) {
+        onCompleteListenerList.add(dataListener);
+        return this;
+    }
+
+    public void start() {
+        if (taskChain == null || context == null) {
+            log.error("Failed to start chain boostrap due to null value of 
Context or TaskChain");
+            return;
+        }
+
+        for (ContextBoundHandler contextBoundHandler : 
contextBoundHandlerList) {
+            if (contextBoundHandler instanceof AbstractListenerBoundHandler 
listenerBoundHandler) {
+                if (CollectionUtils.isNotEmpty(dataListenerList)) {
+                    
listenerBoundHandler.getDataListenerList().addAll(dataListenerList);
+                }
+                if (CollectionUtils.isNotEmpty(onCompleteListenerList)) {
+                    
listenerBoundHandler.getOnCompleteListenerList().addAll(onCompleteListenerList);
+                }
+            }
+
+            taskChain.addLast(HandlerType.NORMAL, contextBoundHandler);
+        }
+
+        onCompleteContextBoundHandlerList.forEach(handler -> 
taskChain.addLast(HandlerType.ON_COMPLETE, handler));
+
+        if (workerPool != null) {
+            workerPool.executeJob(() -> taskChain.execute(context));
+        } else {
+            taskChain.execute(context);
+        }
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundHandler.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundHandler.java
new file mode 100644
index 000000000..542eb2778
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundHandler.java
@@ -0,0 +1,12 @@
+package org.apache.hertzbeat.collector.handler;
+
+import org.apache.hertzbeat.collector.context.Context;
+
+/**
+ *
+ */
+public interface ContextBoundHandler<T> {
+    void execute(Context context, T data);
+
+    void whenException(Context context, T data, Throwable throwable);
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundListener.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundListener.java
new file mode 100644
index 000000000..c9965a157
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundListener.java
@@ -0,0 +1,9 @@
+package org.apache.hertzbeat.collector.handler;
+
+import org.apache.hertzbeat.collector.context.Context;
+
+/**
+ */
+public interface ContextBoundListener<T> {
+    void execute(Context context, T data);
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
new file mode 100644
index 000000000..769cf60c3
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
@@ -0,0 +1,15 @@
+package org.apache.hertzbeat.collector.handler;
+
+import org.apache.hertzbeat.collector.constants.HandlerType;
+import org.apache.hertzbeat.collector.context.Context;
+
+/**
+ *
+ */
+public interface TaskChain<T> {
+    void execute(Context context);
+
+    void execute(Context context, T data);
+
+    void addLast(HandlerType handlerType, ContextBoundHandler<T> handler);
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataBoundHandler.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataBoundHandler.java
new file mode 100644
index 000000000..f63708872
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataBoundHandler.java
@@ -0,0 +1,22 @@
+package org.apache.hertzbeat.collector.handler.impl;
+
+import lombok.Setter;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.context.Context;
+
+import java.util.List;
+
+/**
+ *
+ */
+public abstract class AbstractBatchDataBoundHandler<T, R> extends 
AbstractListenerBoundHandler<T, R> {
+    @Setter
+    protected List<T> sourceDataList;
+
+    @Override
+    public void execute(Context context, T data) {
+        for (T t : sourceDataList) {
+            super.execute(context, t);
+        }
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
new file mode 100644
index 000000000..ec88492ec
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
@@ -0,0 +1,27 @@
+package org.apache.hertzbeat.collector.handler.impl;
+
+import org.apache.hertzbeat.collector.constants.HandlerType;
+import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.handler.TaskChain;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ */
+public abstract class AbstractContextBoundTaskChain<T> implements TaskChain<T> 
{
+    protected final Map<HandlerType, List<ContextBoundHandler<T>>> 
contextBoundHandlerMap = new HashMap<>();
+
+    @Override
+    public void addLast(HandlerType handlerType, ContextBoundHandler<T> 
handler) {
+        if (!contextBoundHandlerMap.containsKey(handlerType)) {
+            contextBoundHandlerMap.put(handlerType, new ArrayList<>());
+        }
+
+        contextBoundHandlerMap.get(handlerType).add(handler);
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundHandler.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundHandler.java
new file mode 100644
index 000000000..78acdbe54
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundHandler.java
@@ -0,0 +1,73 @@
+package org.apache.hertzbeat.collector.handler.impl;
+
+import lombok.Getter;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hertzbeat.collector.constants.ContextKey;
+import org.apache.hertzbeat.collector.constants.ContextStatus;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.handler.ContextBoundListener;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ *
+ */
+public abstract class AbstractListenerBoundHandler<T, R> implements 
ContextBoundHandler<T> {
+    @Getter
+    private final List<? extends ContextBoundListener<R>> dataListenerList = 
new ArrayList<>();
+    @Getter
+    private final List<? extends ContextBoundListener<R>> 
onCompleteListenerList = new ArrayList<>();
+
+    @Override
+    public void execute(Context context, T data) {
+        long startTime = System.currentTimeMillis();
+        context.put(ContextKey.METRICS_COLLECT_START_TIME, startTime);
+
+        R executeResult = executeWithResponse(context, data);
+
+        runListener(context, executeResult);
+
+        runOnCompleteListener(context, executeResult);
+    }
+
+    public R executeWithResponse(Context context, T data) {
+        // no-op
+        return null;
+    }
+
+    @Override
+    public void whenException(Context context, T data, Throwable throwable) {
+        // no-op
+    }
+
+    private void runListener(Context context, R executeResult) {
+        if (CollectionUtils.isEmpty(dataListenerList)) {
+            return;
+        }
+
+        if (ContextStatus.STOP.equals(context.getStatus())) {
+            return;
+        }
+
+        //todo 异常处理
+        for (ContextBoundListener<R> listener : dataListenerList) {
+            listener.execute(context, executeResult);
+
+            if (ContextStatus.STOP.equals(context.getStatus())) {
+                break;
+            }
+        }
+    }
+
+    private void runOnCompleteListener(Context context, R executeResult) {
+        if (CollectionUtils.isEmpty(onCompleteListenerList)) {
+            return;
+        }
+
+        for (ContextBoundListener<R> listener : onCompleteListenerList) {
+            listener.execute(context, executeResult);
+        }
+    }
+}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
new file mode 100644
index 000000000..b659eac29
--- /dev/null
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
@@ -0,0 +1,48 @@
+package org.apache.hertzbeat.collector.handler.impl;
+
+import org.apache.hertzbeat.collector.constants.ContextStatus;
+import org.apache.hertzbeat.collector.constants.HandlerType;
+import org.apache.hertzbeat.collector.context.Context;
+import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ *
+ */
+public class BatchExecuteTaskChain<T> extends AbstractContextBoundTaskChain<T> 
{
+    @Override
+    public void execute(Context context) {
+        this.execute(context, null);
+    }
+
+    @Override
+    public void execute(Context context, T data) {
+        context.setStatus(ContextStatus.RUNNING);
+
+        for (ContextBoundHandler<T> contextBoundHandler : 
contextBoundHandlerMap.getOrDefault(HandlerType.NORMAL, new ArrayList<>())) {
+            runHandler(context, data, contextBoundHandler);
+
+            if (ContextStatus.TRUNCATE_HANDLER.equals(context.getStatus()) || 
ContextStatus.STOP.equals(context.getStatus())) {
+                break;
+            }
+
+            // in order to init error info for the next loop
+            context.setError(null);
+        }
+
+        contextBoundHandlerMap.getOrDefault(HandlerType.ON_COMPLETE, new 
ArrayList<>()).forEach(handler -> runHandler(context, data, handler));
+    }
+
+    private static <T> void runHandler(Context context, T data, 
ContextBoundHandler<T> contextBoundHandler) {
+        try {
+            contextBoundHandler.execute(context, data);
+        } catch (Exception exception) {
+            context.setError(exception);
+            contextBoundHandler.whenException(context, data, exception);
+        }
+    }
+}
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
index 592d31b73..509699a14 100644
--- 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/constants/CommonConstants.java
@@ -382,4 +382,6 @@ public interface CommonConstants {
      * JEXL custom function `json`
      */
     String JEXL_CUSTOM_JSON_FUNCTION = "json";
+
+    byte AVAILABLE_METRICS = 0;
 }
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/collector/CollectorMetaData.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/collector/CollectorMetaData.java
new file mode 100644
index 000000000..c0f3c403c
--- /dev/null
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/collector/CollectorMetaData.java
@@ -0,0 +1,21 @@
+package org.apache.hertzbeat.common.entity.collector;
+
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Date;
+
+/**
+ *
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class CollectorMetaData {
+    private String identity;
+    private String mode;
+    private Date startTime;
+}
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/MetricsSource.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/MetricsSource.java
new file mode 100644
index 000000000..d9e2c8f07
--- /dev/null
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/entity/job/MetricsSource.java
@@ -0,0 +1,13 @@
+package org.apache.hertzbeat.common.entity.job;
+
+import lombok.Data;
+
+import java.util.List;
+
+/**
+ *
+ */
+@Data
+public class MetricsSource {
+    private List<Metrics> metricsList;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to