This is an automated email from the ASF dual-hosted git repository.
zhengqiwei pushed a commit to branch refactor_collector
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/refactor_collector by this
push:
new 195e7e95f [refactor] rename handler to data stream and delete
UnitConverter.java
195e7e95f is described below
commit 195e7e95fa958256d8f7ce2caff9cbf2ca9b3394
Author: Calvin <[email protected]>
AuthorDate: Sat Sep 6 15:45:33 2025 +0800
[refactor] rename handler to data stream and delete UnitConverter.java
---
.../collector/dispatch/CommonDispatcher.java | 15 +++++++-------
...dler.java => CollectMetricsDataDataStream.java} | 4 ++--
...ynamicSubTaskCollectMetricsDataDataStream.java} | 4 ++--
.../listener/CalculateFieldsListener.java | 5 ++---
.../{RerunHandler.java => RerunDataStream.java} | 4 ++--
.../collector/dispatch/unit/UnitConverter.java | 15 --------------
.../collector/handler/ChainBootstrap.java | 24 +++++++++++-----------
...undHandler.java => ContextBoundDataStream.java} | 2 +-
.../hertzbeat/collector/handler/TaskChain.java | 2 +-
...ndHandler.java => AbstractBatchDataStream.java} | 3 +--
.../impl/AbstractContextBoundTaskChain.java | 7 +++----
...r.java => AbstractListenerBoundDataStream.java} | 4 ++--
.../handler/impl/BatchExecuteTaskChain.java | 14 ++++++-------
13 files changed, 41 insertions(+), 62 deletions(-)
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
index 213bb434a..0c94d8cfb 100644
---
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/dispatch/CommonDispatcher.java
@@ -22,13 +22,12 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.collector.constants.ContextKey;
import org.apache.hertzbeat.collector.context.impl.DefaultContext;
import
org.apache.hertzbeat.collector.dispatch.entrance.internal.CollectJobService;
-import org.apache.hertzbeat.collector.dispatch.unit.UnitConverter;
import org.apache.hertzbeat.collector.handler.ChainBootstrap;
-import org.apache.hertzbeat.collector.handler.CollectMetricsDataHandler;
+import org.apache.hertzbeat.collector.handler.CollectMetricsDataDataStream;
import org.apache.hertzbeat.collector.listener.CalculateFieldsListener;
import org.apache.hertzbeat.collector.listener.MetricsDataDeliveryListener;
import org.apache.hertzbeat.collector.listener.RemoveTimeoutMonitorListener;
-import org.apache.hertzbeat.collector.listener.RerunHandler;
+import org.apache.hertzbeat.collector.listener.RerunDataStream;
import org.apache.hertzbeat.collector.listener.ResponseJobDataListener;
import org.apache.hertzbeat.collector.listener.ValidateResponseListener;
import org.apache.hertzbeat.collector.metrics.HertzBeatMetricsCollector;
@@ -168,14 +167,14 @@ public class CommonDispatcher implements
MetricsTaskDispatch, CollectDataDispatc
bootstrap.addContext(ContextKey.META_DATA, metaData)
.addContext(ContextKey.JOB, job)
.addContext(ContextKey.TIMEOUT, timeout)
- .addListener(new CalculateFieldsListener(new
UnitConverter(unitConvertList)))
+ .addListener(new CalculateFieldsListener(unitConvertList))
.addListener(new ValidateResponseListener())
- .addOnCompleteListener(new
RemoveTimeoutMonitorListener(collectTaskTimeoutMonitor));
+ .onEachDataStreamComplete(new
RemoveTimeoutMonitorListener(collectTaskTimeoutMonitor));
if (job.isCyclic()) {
bootstrap.withWorkerPool(workerPool)
.addListener(new
MetricsDataDeliveryListener(commonDataQueue))
- .onComplete(new RerunHandler(timerDispatch));
+ .onComplete(new RerunDataStream(timerDispatch));
} else {
bootstrap.addListener(new ResponseJobDataListener(timerDispatch));
}
@@ -223,12 +222,12 @@ public class CommonDispatcher implements
MetricsTaskDispatch, CollectDataDispatc
.forEach(priority -> {
if (job.isCyclic() ||
isOneTimeJobAndIsAvailableMetrics(job, priority)) {
List<Metrics> metricsList =
currentCollectMetrics.get(priority);
- CollectMetricsDataHandler collectHandler =
CollectMetricsDataHandler.builder()
+ CollectMetricsDataDataStream collectHandler =
CollectMetricsDataDataStream.builder()
.collectTaskTimeoutMonitor(collectTaskTimeoutMonitor)
.build();
collectHandler.setSourceDataList(metricsList);
- chainBootstrap.handler(collectHandler);
+ chainBootstrap.addDataStream(collectHandler);
}
});
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataHandler.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataDataStream.java
similarity index 97%
rename from
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataHandler.java
rename to
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataDataStream.java
index f3cca9d8d..d941f9a5d 100644
---
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataHandler.java
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/CollectMetricsDataDataStream.java
@@ -12,7 +12,7 @@ import org.apache.hertzbeat.collector.constants.ContextKey;
import org.apache.hertzbeat.collector.constants.ContextStatus;
import org.apache.hertzbeat.collector.context.Context;
import org.apache.hertzbeat.collector.dispatch.CollectTaskTimeoutMonitor;
-import
org.apache.hertzbeat.collector.handler.impl.AbstractBatchDataBoundHandler;
+import org.apache.hertzbeat.collector.handler.impl.AbstractBatchDataStream;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.job.Job;
import org.apache.hertzbeat.common.entity.job.Metrics;
@@ -26,7 +26,7 @@ import org.apache.hertzbeat.common.timer.Timeout;
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(callSuper = true)
-public class CollectMetricsDataHandler extends
AbstractBatchDataBoundHandler<Metrics, CollectRep.MetricsData.Builder> {
+public class CollectMetricsDataDataStream extends
AbstractBatchDataStream<Metrics, CollectRep.MetricsData.Builder> {
private CollectTaskTimeoutMonitor collectTaskTimeoutMonitor;
@Override
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataHandler.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataDataStream.java
similarity index 75%
rename from
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataHandler.java
rename to
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataDataStream.java
index 74cbea66c..3a8194665 100644
---
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataHandler.java
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/handler/DynamicSubTaskCollectMetricsDataDataStream.java
@@ -1,14 +1,14 @@
package org.apache.hertzbeat.collector.handler;
import org.apache.hertzbeat.collector.context.Context;
-import
org.apache.hertzbeat.collector.handler.impl.AbstractBatchDataBoundHandler;
+import org.apache.hertzbeat.collector.handler.impl.AbstractBatchDataStream;
import org.apache.hertzbeat.common.entity.job.Metrics;
import org.apache.hertzbeat.common.entity.message.CollectRep;
/**
*
*/
-public class DynamicSubTaskCollectMetricsDataHandler extends
AbstractBatchDataBoundHandler<Metrics, CollectRep.MetricsData.Builder> {
+public class DynamicSubTaskCollectMetricsDataDataStream extends
AbstractBatchDataStream<Metrics, CollectRep.MetricsData.Builder> {
@Override
public CollectRep.MetricsData.Builder executeWithResponse(Context context,
Metrics data) {
//todo 动态拆分
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
index 0a2a8b333..163c6420f 100644
---
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/CalculateFieldsListener.java
@@ -6,7 +6,6 @@ import org.apache.commons.jexl3.JexlExpression;
import org.apache.hertzbeat.collector.constants.ContextKey;
import org.apache.hertzbeat.collector.context.Context;
import org.apache.hertzbeat.collector.dispatch.unit.UnitConvert;
-import org.apache.hertzbeat.collector.dispatch.unit.UnitConverter;
import org.apache.hertzbeat.collector.handler.ContextBoundListener;
import org.apache.hertzbeat.collector.util.CollectUtil;
import org.apache.hertzbeat.common.constants.CommonConstants;
@@ -32,13 +31,13 @@ import java.util.stream.Collectors;
@Slf4j
@AllArgsConstructor
public class CalculateFieldsListener implements
ContextBoundListener<CollectRep.MetricsData.Builder> {
- private UnitConverter unitConverter;
+ private List<UnitConvert> unitConvertList;
@Override
public void execute(Context context, CollectRep.MetricsData.Builder data) {
Metrics metrics = context.get(ContextKey.METRICS);
- this.calculateFields(metrics, unitConverter.getUnitConvertList(),
data);
+ this.calculateFields(metrics, unitConvertList, data);
}
/**
diff --git
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunHandler.java
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunDataStream.java
similarity index 89%
rename from
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunHandler.java
rename to
hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunDataStream.java
index 750f44476..de416c42e 100644
---
a/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunHandler.java
+++
b/hertzbeat-collector/hertzbeat-collector-collector/src/main/java/org/apache/hertzbeat/collector/listener/RerunDataStream.java
@@ -3,7 +3,7 @@ package org.apache.hertzbeat.collector.listener;
import lombok.AllArgsConstructor;
import org.apache.hertzbeat.collector.constants.ContextKey;
import org.apache.hertzbeat.collector.context.Context;
-import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.handler.ContextBoundDataStream;
import org.apache.hertzbeat.collector.timer.TimerDispatch;
import org.apache.hertzbeat.collector.timer.WheelTimerTask;
import org.apache.hertzbeat.common.entity.job.Job;
@@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit;
* 周期任务专用
*/
@AllArgsConstructor
-public class RerunHandler implements ContextBoundHandler<Object> {
+public class RerunDataStream implements ContextBoundDataStream<Object> {
private TimerDispatch timerDispatch;
@Override
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/unit/UnitConverter.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/unit/UnitConverter.java
deleted file mode 100644
index 16cb38ba0..000000000
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/dispatch/unit/UnitConverter.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package org.apache.hertzbeat.collector.dispatch.unit;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import java.util.List;
-
-/**
- *
- */
-@Data
-@AllArgsConstructor
-public class UnitConverter {
- private List<UnitConvert> unitConvertList;
-}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
index eed03a450..246a4ef96 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ChainBootstrap.java
@@ -6,7 +6,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.hertzbeat.collector.constants.HandlerType;
import org.apache.hertzbeat.collector.context.Context;
import org.apache.hertzbeat.collector.dispatch.WorkerPool;
-import
org.apache.hertzbeat.collector.handler.impl.AbstractListenerBoundHandler;
+import
org.apache.hertzbeat.collector.handler.impl.AbstractListenerBoundDataStream;
import java.util.ArrayList;
import java.util.List;
@@ -20,8 +20,8 @@ public class ChainBootstrap {
private Context context;
private TaskChain<?> taskChain;
private WorkerPool workerPool;
- private final List<ContextBoundHandler> contextBoundHandlerList = new
ArrayList<>();
- private final List<ContextBoundHandler> onCompleteContextBoundHandlerList
= new ArrayList<>();
+ private final List<ContextBoundDataStream> contextBoundDataStreamList =
new ArrayList<>();
+ private final List<ContextBoundDataStream>
onCompleteContextBoundDataStreamList = new ArrayList<>();
private final List<ContextBoundListener> dataListenerList = new
ArrayList<>();
private final List<ContextBoundListener> onCompleteListenerList = new
ArrayList<>();
@@ -46,13 +46,13 @@ public class ChainBootstrap {
return this;
}
- public ChainBootstrap handler(ContextBoundHandler contextBoundHandler) {
- contextBoundHandlerList.add(contextBoundHandler);
+ public ChainBootstrap addDataStream(ContextBoundDataStream
contextBoundDataStream) {
+ contextBoundDataStreamList.add(contextBoundDataStream);
return this;
}
- public ChainBootstrap onComplete(ContextBoundHandler contextBoundHandler) {
- onCompleteContextBoundHandlerList.add(contextBoundHandler);
+ public ChainBootstrap onComplete(ContextBoundDataStream
contextBoundDataStream) {
+ onCompleteContextBoundDataStreamList.add(contextBoundDataStream);
return this;
}
@@ -61,7 +61,7 @@ public class ChainBootstrap {
return this;
}
- public ChainBootstrap addOnCompleteListener(ContextBoundListener
dataListener) {
+ public ChainBootstrap onEachDataStreamComplete(ContextBoundListener
dataListener) {
onCompleteListenerList.add(dataListener);
return this;
}
@@ -72,8 +72,8 @@ public class ChainBootstrap {
return;
}
- for (ContextBoundHandler contextBoundHandler :
contextBoundHandlerList) {
- if (contextBoundHandler instanceof AbstractListenerBoundHandler
listenerBoundHandler) {
+ for (ContextBoundDataStream contextBoundDataStream :
contextBoundDataStreamList) {
+ if (contextBoundDataStream instanceof
AbstractListenerBoundDataStream listenerBoundHandler) {
if (CollectionUtils.isNotEmpty(dataListenerList)) {
listenerBoundHandler.getDataListenerList().addAll(dataListenerList);
}
@@ -82,10 +82,10 @@ public class ChainBootstrap {
}
}
- taskChain.addLast(HandlerType.NORMAL, contextBoundHandler);
+ taskChain.addLast(HandlerType.NORMAL, contextBoundDataStream);
}
- onCompleteContextBoundHandlerList.forEach(handler ->
taskChain.addLast(HandlerType.ON_COMPLETE, handler));
+ onCompleteContextBoundDataStreamList.forEach(handler ->
taskChain.addLast(HandlerType.ON_COMPLETE, handler));
if (workerPool != null) {
workerPool.executeJob(() -> taskChain.execute(context));
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundHandler.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundDataStream.java
similarity index 83%
rename from
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundHandler.java
rename to
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundDataStream.java
index 542eb2778..f93261681 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundHandler.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/ContextBoundDataStream.java
@@ -5,7 +5,7 @@ import org.apache.hertzbeat.collector.context.Context;
/**
*
*/
-public interface ContextBoundHandler<T> {
+public interface ContextBoundDataStream<T> {
void execute(Context context, T data);
void whenException(Context context, T data, Throwable throwable);
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
index 769cf60c3..80bb8a0a0 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/TaskChain.java
@@ -11,5 +11,5 @@ public interface TaskChain<T> {
void execute(Context context, T data);
- void addLast(HandlerType handlerType, ContextBoundHandler<T> handler);
+ void addLast(HandlerType handlerType, ContextBoundDataStream<T> handler);
}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataBoundHandler.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataStream.java
similarity index 69%
rename from
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataBoundHandler.java
rename to
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataStream.java
index f63708872..9e6da3330 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataBoundHandler.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractBatchDataStream.java
@@ -1,7 +1,6 @@
package org.apache.hertzbeat.collector.handler.impl;
import lombok.Setter;
-import org.apache.hertzbeat.collector.constants.ContextKey;
import org.apache.hertzbeat.collector.context.Context;
import java.util.List;
@@ -9,7 +8,7 @@ import java.util.List;
/**
*
*/
-public abstract class AbstractBatchDataBoundHandler<T, R> extends
AbstractListenerBoundHandler<T, R> {
+public abstract class AbstractBatchDataStream<T, R> extends
AbstractListenerBoundDataStream<T, R> {
@Setter
protected List<T> sourceDataList;
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
index ec88492ec..67f931e9e 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractContextBoundTaskChain.java
@@ -1,12 +1,11 @@
package org.apache.hertzbeat.collector.handler.impl;
import org.apache.hertzbeat.collector.constants.HandlerType;
-import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.handler.ContextBoundDataStream;
import org.apache.hertzbeat.collector.handler.TaskChain;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -14,10 +13,10 @@ import java.util.Map;
*
*/
public abstract class AbstractContextBoundTaskChain<T> implements TaskChain<T>
{
- protected final Map<HandlerType, List<ContextBoundHandler<T>>>
contextBoundHandlerMap = new HashMap<>();
+ protected final Map<HandlerType, List<ContextBoundDataStream<T>>>
contextBoundHandlerMap = new HashMap<>();
@Override
- public void addLast(HandlerType handlerType, ContextBoundHandler<T>
handler) {
+ public void addLast(HandlerType handlerType, ContextBoundDataStream<T>
handler) {
if (!contextBoundHandlerMap.containsKey(handlerType)) {
contextBoundHandlerMap.put(handlerType, new ArrayList<>());
}
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundHandler.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundDataStream.java
similarity index 92%
rename from
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundHandler.java
rename to
hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundDataStream.java
index 78acdbe54..3e7872ffa 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundHandler.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/AbstractListenerBoundDataStream.java
@@ -5,7 +5,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.hertzbeat.collector.constants.ContextKey;
import org.apache.hertzbeat.collector.constants.ContextStatus;
import org.apache.hertzbeat.collector.context.Context;
-import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.handler.ContextBoundDataStream;
import org.apache.hertzbeat.collector.handler.ContextBoundListener;
import java.util.ArrayList;
@@ -14,7 +14,7 @@ import java.util.List;
/**
*
*/
-public abstract class AbstractListenerBoundHandler<T, R> implements
ContextBoundHandler<T> {
+public abstract class AbstractListenerBoundDataStream<T, R> implements
ContextBoundDataStream<T> {
@Getter
private final List<? extends ContextBoundListener<R>> dataListenerList =
new ArrayList<>();
@Getter
diff --git
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
index b659eac29..10d759b33 100644
---
a/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
+++
b/hertzbeat-collector/hertzbeat-collector-common/src/main/java/org/apache/hertzbeat/collector/handler/impl/BatchExecuteTaskChain.java
@@ -3,11 +3,9 @@ package org.apache.hertzbeat.collector.handler.impl;
import org.apache.hertzbeat.collector.constants.ContextStatus;
import org.apache.hertzbeat.collector.constants.HandlerType;
import org.apache.hertzbeat.collector.context.Context;
-import org.apache.hertzbeat.collector.handler.ContextBoundHandler;
+import org.apache.hertzbeat.collector.handler.ContextBoundDataStream;
import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
/**
@@ -23,8 +21,8 @@ public class BatchExecuteTaskChain<T> extends
AbstractContextBoundTaskChain<T> {
public void execute(Context context, T data) {
context.setStatus(ContextStatus.RUNNING);
- for (ContextBoundHandler<T> contextBoundHandler :
contextBoundHandlerMap.getOrDefault(HandlerType.NORMAL, new ArrayList<>())) {
- runHandler(context, data, contextBoundHandler);
+ for (ContextBoundDataStream<T> contextBoundDataStream :
contextBoundHandlerMap.getOrDefault(HandlerType.NORMAL, new ArrayList<>())) {
+ runHandler(context, data, contextBoundDataStream);
if (ContextStatus.TRUNCATE_HANDLER.equals(context.getStatus()) ||
ContextStatus.STOP.equals(context.getStatus())) {
break;
@@ -37,12 +35,12 @@ public class BatchExecuteTaskChain<T> extends
AbstractContextBoundTaskChain<T> {
contextBoundHandlerMap.getOrDefault(HandlerType.ON_COMPLETE, new
ArrayList<>()).forEach(handler -> runHandler(context, data, handler));
}
- private static <T> void runHandler(Context context, T data,
ContextBoundHandler<T> contextBoundHandler) {
+ private static <T> void runHandler(Context context, T data,
ContextBoundDataStream<T> contextBoundDataStream) {
try {
- contextBoundHandler.execute(context, data);
+ contextBoundDataStream.execute(context, data);
} catch (Exception exception) {
context.setError(exception);
- contextBoundHandler.whenException(context, data, exception);
+ contextBoundDataStream.whenException(context, data, exception);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]