This is an automated email from the ASF dual-hosted git repository.

zhengqiwei pushed a commit to branch refactor_collector
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/refactor_collector by this 
push:
     new 195e7e95f [refactor] rename handler to data stream and delete 
UnitConverter.java
195e7e95f is described below

commit 195e7e95fa958256d8f7ce2caff9cbf2ca9b3394
Author: Calvin <[email protected]>
AuthorDate: Sat Sep 6 15:45:33 2025 +0800

    [refactor] rename handler to data stream and delete UnitConverter.java
---
 .../collector/dispatch/CommonDispatcher.java       | 15 +++++++-------
 ...dler.java => CollectMetricsDataDataStream.java} |  4 ++--
 ...ynamicSubTaskCollectMetricsDataDataStream.java} |  4 ++--
 .../listener/CalculateFieldsListener.java          |  5 ++---
 .../{RerunHandler.java => RerunDataStream.java}    |  4 ++--
 .../collector/dispatch/unit/UnitConverter.java     | 15 --------------
 .../collector/handler/ChainBootstrap.java          | 24 +++++++++++-----------
 ...undHandler.java => ContextBoundDataStream.java} |  2 +-
 .../hertzbeat/collector/handler/TaskChain.java     |  2 +-
 ...ndHandler.java => AbstractBatchDataStream.java} |  3 +--
 .../impl/AbstractContextBoundTaskChain.java        |  7 +++----
 ...r.java => AbstractListenerBoundDataStream.java} |  4 ++--
 .../handler/impl/BatchExecuteTaskChain.java        | 14 ++++++-------
 13 files changed, 41 insertions(+), 62 deletions(-)

diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
index 213bb434a..0c94d8cfb 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
@@ -22,13 +22,12 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.hertzbeat.collector.constants.ContextKey;
 import org.apache.hertzbeat.collector.context.impl.DefaultContext;
 import 
org.apache.hertzbeat.collector.dispatch.entrance.internal.CollectJobService;
-import org.apache.hertzbeat.collector.dispatch.unit.UnitConverter;
 import org.apache.hertzbeat.collector.handler.ChainBootstrap;
-import org.apache.hertzbeat.collector.handler.CollectMetricsDataHandler;
+import org.apache.hertzbeat.collector.handler.CollectMetricsDataDataStream;
 import org.apache.hertzbeat.collector.listener.CalculateFieldsListener;
 import org.apache.hertzbeat.collector.listener.MetricsDataDeliveryListener;
 import org.apache.hertzbeat.collector.listener.RemoveTimeoutMonitorListener;
-import org.apache.hertzbeat.collector.listener.RerunHandler;
+import org.apache.hertzbeat.collector.listener.RerunDataStream;
 import org.apache.hertzbeat.collector.listener.ResponseJobDataListener;
 import org.apache.hertzbeat.collector.listener.ValidateResponseListener;
 import org.apache.hertzbeat.collector.metrics.HertzBeatMetricsCollector;
@@ -168,14 +167,14 @@ public class CommonDispatcher implements 
MetricsTaskDispatch, CollectDataDispatc
         bootstrap.addContext(ContextKey.META_DATA, metaData)
                 .addContext(ContextKey.JOB, job)
                 .addContext(ContextKey.TIMEOUT, timeout)
-                .addListener(new CalculateFieldsListener(new 
UnitConverter(unitConvertList)))
+                .addListener(new CalculateFieldsListener(unitConvertList))
                 .addListener(new ValidateResponseListener())
-                .addOnCompleteListener(new 
RemoveTimeoutMonitorListener(collectTaskTimeoutMonitor));
+                .onEachDataStreamComplete(new 
RemoveTimeoutMonitorListener(collectTaskTimeoutMonitor));
 
         if (job.isCyclic()) {
             bootstrap.withWorkerPool(workerPool)
                     .addListener(new 
MetricsDataDeliveryListener(commonDataQueue))
-                    .onComplete(new RerunHandler(timerDispatch));
+                    .onComplete(new RerunDataStream(timerDispatch));
         } else {
             bootstrap.addListener(new ResponseJobDataListener(timerDispatch));
         }
@@ -223,12 +222,12 @@ public class CommonDispatcher implements 
MetricsTaskDispatch, CollectDataDispatc
                 .forEach(priority -> {
                     if (job.isCyclic() || 
isOneTimeJobAndIsAvailableMetrics(job, priority)) {
                         List<Metrics> metricsList = 
currentCollectMetrics.get(priority);
-                        CollectMetricsDataHandler collectHandler = 
CollectMetricsDataHandler.builder()
+                        CollectMetricsDataDataStream collectHandler = 
CollectMetricsDataDataStream.builder()
                                 
.collectTaskTimeoutMonitor(collectTaskTimeoutMonitor)
                                 .build();
                         collectHandler.setSourceDataList(metricsList);
 
-                        chainBootstrap.handler(collectHandler);
+                        chainBootstrap.addDataStream(collectHandler);
                     }
                 });
 
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataHandler.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataDataStream.java
similarity index 97%
rename from 
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataHandler.java
rename to 
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataDataStream.java
index f3cca9d8d..d941f9a5d 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataHandler.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataDataStream.java
@@ -12,7 +12,7 @@ import org.apache.hertzbeat.collector.constants.ContextKey;
 import org.apache.hertzbeat.collector.constants.ContextStatus;
 import org.apache.hertzbeat.collector.context.Context;
 import org.apache.hertzbeat.collector.dispatch.CollectTaskTimeoutMonitor;
-import 
org.apache.hertzbeat.collector.handler.impl.AbstractBatchDataBoundHandler;
+import org.apache.hertzbeat.collector.handler.impl.AbstractBatchDataStream;
 import org.apache.hertzbeat.common.constants.CommonConstants;
 import org.apache.hertzbeat.common.entity.job.Job;
 import org.apache.hertzbeat.common.entity.job.Metrics;
@@ -26,7 +26,7 @@ import org.apache.hertzbeat.common.timer.Timeout;
 @NoArgsConstructor
 @AllArgsConstructor
 @EqualsAndHashCode(callSuper = true)
-public class CollectMetricsDataHandler extends 
AbstractBatchDataBoundHandler<Metrics, CollectRep.MetricsData.Builder> {
+public class CollectMetricsDataDataStream extends 
AbstractBatchDataStream<Metrics, CollectRep.MetricsData.Builder> {
     private CollectTaskTimeoutMonitor collectTaskTimeoutMonitor;
 
     @Override
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataHandler.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataDataStream.java
similarity index 75%
rename from 
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataHandler.java
rename to 
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataDataStream.java
index 74cbea66c..3a8194665 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataHandler.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataDataStream.java
@@ -1,14 +1,14 @@
 package org.apache.hertzbeat.collector.handler;
 
 import org.apache.hertzbeat.collector.context.Context;
-import 
org.apache.hertzbeat.collector.handler.impl.AbstractBatchDataBoundHandler;
+import org.apache.hertzbeat.collector.handler.impl.AbstractBatchDataStream;
 import org.apache.hertzbeat.common.entity.job.Metrics;
 import org.apache.hertzbeat.common.entity.message.CollectRep;
 
 /**
  *
  */
-public class DynamicSubTaskCollectMetricsDataHandler extends 
AbstractBatchDataBoundHandler<Metrics, CollectRep.MetricsData.Builder> {
+public class DynamicSubTaskCollectMetricsDataDataStream extends 
AbstractBatchDataStream<Metrics, CollectRep.MetricsData.Builder> {
     @Override
     public CollectRep.MetricsData.Builder executeWithResponse(Context context, 
Metrics data) {
         //todo 动态拆分
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
index 0a2a8b333..163c6420f 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
@@ -6,7 +6,6 @@ import org.apache.commons.jexl3.JexlExpression;
 import org.apache.hertzbeat.collector.constants.ContextKey;
 import org.apache.hertzbeat.collector.context.Context;
 import org.apache.hertzbeat.collector.dispatch.unit.UnitConvert;
-import org.apache.hertzbeat.collector.dispatch.unit.UnitConverter;
 import org.apache.hertzbeat.collector.handler.ContextBoundListener;
 import org.apache.hertzbeat.collector.util.CollectUtil;
 import org.apache.hertzbeat.common.constants.CommonConstants;
@@ -32,13 +31,13 @@ import java.util.stream.Collectors;
 @Slf4j
 @AllArgsConstructor
 public class CalculateFieldsListener implements 
ContextBoundListener<CollectRep.MetricsData.Builder> {
-    private UnitConverter unitConverter;
+    private List<UnitConvert> unitConvertList;
 
     @Override
     public void execute(Context context, CollectRep.MetricsData.Builder data) {
         Metrics metrics = context.get(ContextKey.METRICS);
 
-        this.calculateFields(metrics, unitConverter.getUnitConvertList(), 
data);
+        this.calculateFields(metrics, unitConvertList, data);
     }
 
     /**
diff --git 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunHandler.java
 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunDataStream.java
similarity index 89%
rename from 
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunHandler.java
rename to 
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunDataStream.java
index 750f44476..de416c42e 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunHandler.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunDataStream.java
@@ -3,7 +3,7 @@ package org.apache.hertzbeat.collector.listener;
 import lombok.AllArgsConstructor;
 import org.apache.hertzbeat.collector.constants.ContextKey;
 import org.apache.hertzbeat.collector.context.Context;
-import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.handler.ContextBoundDataStream;
 import org.apache.hertzbeat.collector.timer.TimerDispatch;
 import org.apache.hertzbeat.collector.timer.WheelTimerTask;
 import org.apache.hertzbeat.common.entity.job.Job;
@@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit;
  * 周期任务专用
  */
 @AllArgsConstructor
-public class RerunHandler implements ContextBoundHandler<Object> {
+public class RerunDataStream implements ContextBoundDataStream<Object> {
     private TimerDispatch timerDispatch;
 
     @Override
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/unit/UnitConverter.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/unit/UnitConverter.java
deleted file mode 100644
index 16cb38ba0..000000000
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/unit/UnitConverter.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.hertzbeat.collector.dispatch.unit;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.util.List;
-
-/**
- *
- */
-@Data
-@AllArgsConstructor
-public class UnitConverter {
-    private List<UnitConvert> unitConvertList;
-}
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
index eed03a450..246a4ef96 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
@@ -6,7 +6,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.hertzbeat.collector.constants.HandlerType;
 import org.apache.hertzbeat.collector.context.Context;
 import org.apache.hertzbeat.collector.dispatch.WorkerPool;
-import 
org.apache.hertzbeat.collector.handler.impl.AbstractListenerBoundHandler;
+import 
org.apache.hertzbeat.collector.handler.impl.AbstractListenerBoundDataStream;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -20,8 +20,8 @@ public class ChainBootstrap {
     private Context context;
     private TaskChain<?> taskChain;
     private WorkerPool workerPool;
-    private final List<ContextBoundHandler> contextBoundHandlerList = new 
ArrayList<>();
-    private final List<ContextBoundHandler> onCompleteContextBoundHandlerList 
= new ArrayList<>();
+    private final List<ContextBoundDataStream> contextBoundDataStreamList = 
new ArrayList<>();
+    private final List<ContextBoundDataStream> 
onCompleteContextBoundDataStreamList = new ArrayList<>();
     private final List<ContextBoundListener> dataListenerList = new 
ArrayList<>();
     private final List<ContextBoundListener> onCompleteListenerList = new 
ArrayList<>();
 
@@ -46,13 +46,13 @@ public class ChainBootstrap {
         return this;
     }
 
-    public ChainBootstrap handler(ContextBoundHandler contextBoundHandler) {
-        contextBoundHandlerList.add(contextBoundHandler);
+    public ChainBootstrap addDataStream(ContextBoundDataStream 
contextBoundDataStream) {
+        contextBoundDataStreamList.add(contextBoundDataStream);
         return this;
     }
 
-    public ChainBootstrap onComplete(ContextBoundHandler contextBoundHandler) {
-        onCompleteContextBoundHandlerList.add(contextBoundHandler);
+    public ChainBootstrap onComplete(ContextBoundDataStream 
contextBoundDataStream) {
+        onCompleteContextBoundDataStreamList.add(contextBoundDataStream);
         return this;
     }
 
@@ -61,7 +61,7 @@ public class ChainBootstrap {
         return this;
     }
 
-    public ChainBootstrap addOnCompleteListener(ContextBoundListener 
dataListener) {
+    public ChainBootstrap onEachDataStreamComplete(ContextBoundListener 
dataListener) {
         onCompleteListenerList.add(dataListener);
         return this;
     }
@@ -72,8 +72,8 @@ public class ChainBootstrap {
             return;
         }
 
-        for (ContextBoundHandler contextBoundHandler : 
contextBoundHandlerList) {
-            if (contextBoundHandler instanceof AbstractListenerBoundHandler 
listenerBoundHandler) {
+        for (ContextBoundDataStream contextBoundDataStream : 
contextBoundDataStreamList) {
+            if (contextBoundDataStream instanceof 
AbstractListenerBoundDataStream listenerBoundHandler) {
                 if (CollectionUtils.isNotEmpty(dataListenerList)) {
                     
listenerBoundHandler.getDataListenerList().addAll(dataListenerList);
                 }
@@ -82,10 +82,10 @@ public class ChainBootstrap {
                 }
             }
 
-            taskChain.addLast(HandlerType.NORMAL, contextBoundHandler);
+            taskChain.addLast(HandlerType.NORMAL, contextBoundDataStream);
         }
 
-        onCompleteContextBoundHandlerList.forEach(handler -> 
taskChain.addLast(HandlerType.ON_COMPLETE, handler));
+        onCompleteContextBoundDataStreamList.forEach(handler -> 
taskChain.addLast(HandlerType.ON_COMPLETE, handler));
 
         if (workerPool != null) {
             workerPool.executeJob(() -> taskChain.execute(context));
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundHandler.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundDataStream.java
similarity index 83%
rename from 
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundHandler.java
rename to 
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundDataStream.java
index 542eb2778..f93261681 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundHandler.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundDataStream.java
@@ -5,7 +5,7 @@ import org.apache.hertzbeat.collector.context.Context;
 /**
  *
  */
-public interface ContextBoundHandler<T> {
+public interface ContextBoundDataStream<T> {
     void execute(Context context, T data);
 
     void whenException(Context context, T data, Throwable throwable);
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
index 769cf60c3..80bb8a0a0 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
@@ -11,5 +11,5 @@ public interface TaskChain<T> {
 
     void execute(Context context, T data);
 
-    void addLast(HandlerType handlerType, ContextBoundHandler<T> handler);
+    void addLast(HandlerType handlerType, ContextBoundDataStream<T> handler);
 }
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataBoundHandler.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataStream.java
similarity index 69%
rename from 
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataBoundHandler.java
rename to 
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataStream.java
index f63708872..9e6da3330 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataBoundHandler.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataStream.java
@@ -1,7 +1,6 @@
 package org.apache.hertzbeat.collector.handler.impl;
 
 import lombok.Setter;
-import org.apache.hertzbeat.collector.constants.ContextKey;
 import org.apache.hertzbeat.collector.context.Context;
 
 import java.util.List;
@@ -9,7 +8,7 @@ import java.util.List;
 /**
  *
  */
-public abstract class AbstractBatchDataBoundHandler<T, R> extends 
AbstractListenerBoundHandler<T, R> {
+public abstract class AbstractBatchDataStream<T, R> extends 
AbstractListenerBoundDataStream<T, R> {
     @Setter
     protected List<T> sourceDataList;
 
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
index ec88492ec..67f931e9e 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
@@ -1,12 +1,11 @@
 package org.apache.hertzbeat.collector.handler.impl;
 
 import org.apache.hertzbeat.collector.constants.HandlerType;
-import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.handler.ContextBoundDataStream;
 import org.apache.hertzbeat.collector.handler.TaskChain;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -14,10 +13,10 @@ import java.util.Map;
  *
  */
 public abstract class AbstractContextBoundTaskChain<T> implements TaskChain<T> 
{
-    protected final Map<HandlerType, List<ContextBoundHandler<T>>> 
contextBoundHandlerMap = new HashMap<>();
+    protected final Map<HandlerType, List<ContextBoundDataStream<T>>> 
contextBoundHandlerMap = new HashMap<>();
 
     @Override
-    public void addLast(HandlerType handlerType, ContextBoundHandler<T> 
handler) {
+    public void addLast(HandlerType handlerType, ContextBoundDataStream<T> 
handler) {
         if (!contextBoundHandlerMap.containsKey(handlerType)) {
             contextBoundHandlerMap.put(handlerType, new ArrayList<>());
         }
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundHandler.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundDataStream.java
similarity index 92%
rename from 
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundHandler.java
rename to 
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundDataStream.java
index 78acdbe54..3e7872ffa 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundHandler.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundDataStream.java
@@ -5,7 +5,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.hertzbeat.collector.constants.ContextKey;
 import org.apache.hertzbeat.collector.constants.ContextStatus;
 import org.apache.hertzbeat.collector.context.Context;
-import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.handler.ContextBoundDataStream;
 import org.apache.hertzbeat.collector.handler.ContextBoundListener;
 
 import java.util.ArrayList;
@@ -14,7 +14,7 @@ import java.util.List;
 /**
  *
  */
-public abstract class AbstractListenerBoundHandler<T, R> implements 
ContextBoundHandler<T> {
+public abstract class AbstractListenerBoundDataStream<T, R> implements 
ContextBoundDataStream<T> {
     @Getter
     private final List<? extends ContextBoundListener<R>> dataListenerList = 
new ArrayList<>();
     @Getter
diff --git 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
index b659eac29..10d759b33 100644
--- 
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
+++ 
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
@@ -3,11 +3,9 @@ package org.apache.hertzbeat.collector.handler.impl;
 import org.apache.hertzbeat.collector.constants.ContextStatus;
 import org.apache.hertzbeat.collector.constants.HandlerType;
 import org.apache.hertzbeat.collector.context.Context;
-import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.handler.ContextBoundDataStream;
 
 import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 
 
 /**
@@ -23,8 +21,8 @@ public class BatchExecuteTaskChain<T> extends 
AbstractContextBoundTaskChain<T> {
     public void execute(Context context, T data) {
         context.setStatus(ContextStatus.RUNNING);
 
-        for (ContextBoundHandler<T> contextBoundHandler : 
contextBoundHandlerMap.getOrDefault(HandlerType.NORMAL, new ArrayList<>())) {
-            runHandler(context, data, contextBoundHandler);
+        for (ContextBoundDataStream<T> contextBoundDataStream : 
contextBoundHandlerMap.getOrDefault(HandlerType.NORMAL, new ArrayList<>())) {
+            runHandler(context, data, contextBoundDataStream);
 
             if (ContextStatus.TRUNCATE_HANDLER.equals(context.getStatus()) || 
ContextStatus.STOP.equals(context.getStatus())) {
                 break;
@@ -37,12 +35,12 @@ public class BatchExecuteTaskChain<T> extends 
AbstractContextBoundTaskChain<T> {
         contextBoundHandlerMap.getOrDefault(HandlerType.ON_COMPLETE, new 
ArrayList<>()).forEach(handler -> runHandler(context, data, handler));
     }
 
-    private static <T> void runHandler(Context context, T data, 
ContextBoundHandler<T> contextBoundHandler) {
+    private static <T> void runHandler(Context context, T data, 
ContextBoundDataStream<T> contextBoundDataStream) {
         try {
-            contextBoundHandler.execute(context, data);
+            contextBoundDataStream.execute(context, data);
         } catch (Exception exception) {
             context.setError(exception);
-            contextBoundHandler.whenException(context, data, exception);
+            contextBoundDataStream.whenException(context, data, exception);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to