This is an automated email from the ASF dual-hosted git repository.
gongchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new 78b0a428ff [feature] support collector alarm and refactor alarm (#2693)
78b0a428ff is described below
commit 78b0a428ffd1f1c13e05bedb6ddccc62c4c9eab1
Author: kangli <[email protected]>
AuthorDate: Sat Apr 19 13:07:41 2025 +0800
[feature] support collector alarm and refactor alarm (#2693)
Co-authored-by: Calvin <[email protected]>
Co-authored-by: aias00 <[email protected]>
Co-authored-by: shown <[email protected]>
Co-authored-by: aias00 <[email protected]>
Co-authored-by: tomsun28 <[email protected]>
---
.../alert/calculate/AlarmCacheManager.java | 81 +++++++++++++
.../alert/calculate/CollectorAlertHandler.java | 129 +++++++++++++++++++++
.../alert/calculate/PeriodicAlertCalculator.java | 47 +++-----
.../alert/calculate/RealTimeAlertCalculator.java | 51 +++-----
.../hertzbeat/alert/dao/AlertCollectorDao.java | 23 ++--
.../org/apache/hertzbeat/alert/util/AlertUtil.java | 30 ++---
.../src/main/resources/alerter_en_US.properties | 2 +
.../src/main/resources/alerter_zh_CN.properties | 2 +
.../src/main/resources/alerter_zh_TW.properties | 2 +
.../common/support/event/MonitorDeletedEvent.java | 2 +-
.../apache/hertzbeat/manager/dao/CollectorDao.java | 10 +-
.../manager/scheduler/netty/ManageServer.java | 9 +-
.../manager/service/impl/CollectorServiceImpl.java | 2 +-
.../hertzbeat/manager/dao/CollectorDaoTest.java | 8 +-
.../manager/service/CollectorServiceTest.java | 4 +
15 files changed, 296 insertions(+), 106 deletions(-)
diff --git
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/AlarmCacheManager.java
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/AlarmCacheManager.java
new file mode 100644
index 0000000000..803d0305ae
--- /dev/null
+++
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/AlarmCacheManager.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.alert.calculate;
+
+import org.apache.hertzbeat.alert.dao.SingleAlertDao;
+import org.apache.hertzbeat.alert.util.AlertUtil;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.entity.alerter.SingleAlert;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * alert cache manager
+ */
+@Component
+public class AlarmCacheManager {
+
+ /**
+ * The alarm in the process is triggered
+ * key - labels fingerprint
+ */
+ private final Map<String, SingleAlert> pendingAlertMap;
+ /**
+ * The not recover alert
+ * key - labels fingerprint
+ */
+ private final Map<String, SingleAlert> firingAlertMap;
+
+ public AlarmCacheManager(SingleAlertDao singleAlertDao) {
+ this.pendingAlertMap = new ConcurrentHashMap<>(8);
+ this.firingAlertMap = new ConcurrentHashMap<>(8);
+ List<SingleAlert> singleAlerts =
singleAlertDao.querySingleAlertsByStatus(CommonConstants.ALERT_STATUS_FIRING);
+ for (SingleAlert singleAlert : singleAlerts) {
+ String fingerprint =
AlertUtil.calculateFingerprint(singleAlert.getLabels());
+ singleAlert.setId(null);
+ this.firingAlertMap.put(fingerprint, singleAlert);
+ }
+ }
+
+ public void putPending(String fingerPrint, SingleAlert alert) {
+ this.pendingAlertMap.put(fingerPrint, alert);
+ }
+
+ public SingleAlert getPending(String fingerPrint) {
+ return this.pendingAlertMap.get(fingerPrint);
+ }
+
+ public SingleAlert removePending(String fingerPrint) {
+ return this.pendingAlertMap.remove(fingerPrint);
+ }
+
+ public void putFiring(String fingerPrint, SingleAlert alert) {
+ this.firingAlertMap.put(fingerPrint, alert);
+ }
+
+ public SingleAlert getFiring(String fingerPrint) {
+ return this.firingAlertMap.get(fingerPrint);
+ }
+
+ public SingleAlert removeFiring(String fingerPrint) {
+ return this.firingAlertMap.remove(fingerPrint);
+ }
+}
diff --git
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CollectorAlertHandler.java
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CollectorAlertHandler.java
new file mode 100644
index 0000000000..e25bbb2ba1
--- /dev/null
+++
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/CollectorAlertHandler.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hertzbeat.alert.calculate;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.alert.dao.AlertCollectorDao;
+import org.apache.hertzbeat.alert.reduce.AlarmCommonReduce;
+import org.apache.hertzbeat.alert.util.AlertUtil;
+import org.apache.hertzbeat.common.constants.CommonConstants;
+import org.apache.hertzbeat.common.entity.alerter.SingleAlert;
+import org.apache.hertzbeat.common.entity.manager.Collector;
+import org.apache.hertzbeat.common.support.event.SystemConfigChangeEvent;
+import org.apache.hertzbeat.common.util.ResourceBundleUtil;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.ResourceBundle;
+
+/**
+ * handle collector alarm
+ */
+@Component
+@Slf4j
+public class CollectorAlertHandler {
+
+ private static final String KEY_COLLECTOR_NAME = "collectorName";
+ private static final String KEY_COLLECTOR_VERSION = "collectorVersion";
+ private static final String KEY_COLLECTOR_HOST = "collectorHost";
+
+ private final AlertCollectorDao alertCollectorDao;
+
+ private final AlarmCommonReduce alarmCommonReduce;
+
+ private final AlarmCacheManager alarmCacheManager;
+
+ private ResourceBundle bundle;
+
+
+ public CollectorAlertHandler(AlarmCommonReduce alarmCommonReduce,
AlertCollectorDao alertCollectorDao,
+ AlarmCacheManager alarmCacheManager) {
+ this.alarmCommonReduce = alarmCommonReduce;
+ this.alertCollectorDao = alertCollectorDao;
+ this.alarmCacheManager = alarmCacheManager;
+ this.bundle = ResourceBundleUtil.getBundle("alerter");
+ }
+
+ /**
+ * handle collector online
+ *
+ * @param identity collector name
+ */
+ public void online(final String identity) {
+ Collector collector = alertCollectorDao.findCollectorByName(identity);
+ if (collector == null) {
+ return;
+ }
+ Map<String, String> fingerPrints = new HashMap<>(8);
+ fingerPrints.put(KEY_COLLECTOR_NAME, collector.getName());
+ fingerPrints.put(KEY_COLLECTOR_VERSION, collector.getVersion());
+ fingerPrints.put(KEY_COLLECTOR_HOST, collector.getIp());
+ String fingerprint = AlertUtil.calculateFingerprint(fingerPrints);
+ SingleAlert firingAlert = alarmCacheManager.getFiring(fingerprint);
+ if (firingAlert != null) {
+ firingAlert.setTriggerTimes(1);
+ firingAlert.setEndAt(System.currentTimeMillis());
+ firingAlert.setStatus(CommonConstants.ALERT_STATUS_RESOLVED);
+ alarmCommonReduce.reduceAndSendAlarm(firingAlert.clone());
+ }
+ }
+
+
+ /**
+ * handle collector offline
+ *
+ * @param identity collector name
+ */
+ public void offline(final String identity) {
+ Collector collector = alertCollectorDao.findCollectorByName(identity);
+ if (collector == null) {
+ return;
+ }
+ long currentTimeMill = System.currentTimeMillis();
+ Map<String, String> fingerPrints = new HashMap<>(8);
+ fingerPrints.put(KEY_COLLECTOR_NAME, collector.getName());
+ fingerPrints.put(KEY_COLLECTOR_VERSION, collector.getVersion());
+ fingerPrints.put(KEY_COLLECTOR_HOST, collector.getIp());
+ String fingerprint = AlertUtil.calculateFingerprint(fingerPrints);
+ SingleAlert existingAlert = alarmCacheManager.getFiring(fingerprint);
+ if (existingAlert == null) {
+ SingleAlert newAlert = SingleAlert.builder()
+ .labels(fingerPrints)
+ .annotations(fingerPrints)
+
.content(this.bundle.getString("alerter.availability.collector.offline"))
+ .status(CommonConstants.ALERT_STATUS_FIRING)
+ .triggerTimes(1)
+ .startAt(currentTimeMill)
+ .activeAt(currentTimeMill)
+ .build();
+ alarmCacheManager.putFiring(fingerprint, newAlert);
+ alarmCommonReduce.reduceAndSendAlarm(newAlert.clone());
+ }
+
+ }
+
+
+ @EventListener(SystemConfigChangeEvent.class)
+ public void onSystemConfigChangeEvent(SystemConfigChangeEvent event) {
+ log.info("calculate alarm receive system config change event: {}.",
event.getSource());
+ this.bundle = ResourceBundleUtil.getBundle("alerter");
+ }
+
+}
diff --git
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/PeriodicAlertCalculator.java
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/PeriodicAlertCalculator.java
index 82bbd85cf9..eb6ebf5590 100644
---
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/PeriodicAlertCalculator.java
+++
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/PeriodicAlertCalculator.java
@@ -17,19 +17,17 @@
package org.apache.hertzbeat.alert.calculate;
-import java.util.Arrays;
import java.util.HashMap;
-import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.hertzbeat.alert.reduce.AlarmCommonReduce;
import org.apache.hertzbeat.alert.service.DataSourceService;
import org.apache.hertzbeat.alert.util.AlertTemplateUtil;
+import org.apache.hertzbeat.alert.util.AlertUtil;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.alerter.AlertDefine;
import org.apache.hertzbeat.common.entity.alerter.SingleAlert;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.stereotype.Component;
@@ -47,22 +45,13 @@ public class PeriodicAlertCalculator {
private final DataSourceService dataSourceService;
private final AlarmCommonReduce alarmCommonReduce;
- /**
- * The alarm in the process is triggered
- * key - labels fingerprint
- */
- private final Map<String, SingleAlert> pendingAlertMap;
- /**
- * The not recover alert
- * key - labels fingerprint
- */
- private final Map<String, SingleAlert> firingAlertMap;
-
- public PeriodicAlertCalculator(DataSourceService dataSourceService,
AlarmCommonReduce alarmCommonReduce) {
+ private final AlarmCacheManager alarmCacheManager;
+
+ public PeriodicAlertCalculator(DataSourceService dataSourceService,
AlarmCommonReduce alarmCommonReduce,
+ AlarmCacheManager alarmCacheManager) {
this.dataSourceService = dataSourceService;
this.alarmCommonReduce = alarmCommonReduce;
- this.pendingAlertMap = new ConcurrentHashMap<>(8);
- this.firingAlertMap = new ConcurrentHashMap<>(8);
+ this.alarmCacheManager = alarmCacheManager;
}
public void calculate(AlertDefine rule) {
@@ -122,8 +111,8 @@ public class PeriodicAlertCalculator {
private void afterThresholdRuleMatch(long currentTimeMilli, Map<String,
String> fingerPrints,
Map<String, Object> fieldValueMap,
AlertDefine define) {
- String fingerprint = calculateFingerprint(fingerPrints);
- SingleAlert existingAlert = pendingAlertMap.get(fingerprint);
+ String fingerprint = AlertUtil.calculateFingerprint(fingerPrints);
+ SingleAlert existingAlert = alarmCacheManager.getPending(fingerprint);
Map<String, String> labels = new HashMap<>(8);
fieldValueMap.putAll(define.getLabels());
labels.putAll(fingerPrints);
@@ -144,11 +133,11 @@ public class PeriodicAlertCalculator {
// If required trigger times is 1, set to firing status directly
if (requiredTimes <= 1) {
newAlert.setStatus(CommonConstants.ALERT_STATUS_FIRING);
- firingAlertMap.put(fingerprint, newAlert);
+ alarmCacheManager.putFiring(fingerprint, newAlert);
alarmCommonReduce.reduceAndSendAlarm(newAlert.clone());
} else {
// Otherwise put into pending queue first
- pendingAlertMap.put(fingerprint, newAlert);
+ alarmCacheManager.putPending(fingerprint, newAlert);
}
} else {
// Update existing alert
@@ -158,17 +147,17 @@ public class PeriodicAlertCalculator {
// Check if required trigger times reached
if
(existingAlert.getStatus().equals(CommonConstants.ALERT_STATUS_PENDING) &&
existingAlert.getTriggerTimes() >= requiredTimes) {
// Reached trigger times threshold, change to firing status
- pendingAlertMap.remove(fingerprint);
+ alarmCacheManager.removePending(fingerprint);
existingAlert.setStatus(CommonConstants.ALERT_STATUS_FIRING);
- firingAlertMap.put(fingerprint, existingAlert);
+ alarmCacheManager.putFiring(fingerprint, existingAlert);
alarmCommonReduce.reduceAndSendAlarm(existingAlert.clone());
}
}
}
private void handleRecoveredAlert(Map<String, String> fingerprints) {
- String fingerprint = calculateFingerprint(fingerprints);
- SingleAlert firingAlert = firingAlertMap.remove(fingerprint);
+ String fingerprint = AlertUtil.calculateFingerprint(fingerprints);
+ SingleAlert firingAlert = alarmCacheManager.removeFiring(fingerprint);
if (firingAlert != null) {
// todo consider multi times to tig for resolved alert
firingAlert.setTriggerTimes(1);
@@ -176,13 +165,7 @@ public class PeriodicAlertCalculator {
firingAlert.setStatus(CommonConstants.ALERT_STATUS_RESOLVED);
alarmCommonReduce.reduceAndSendAlarm(firingAlert.clone());
}
- pendingAlertMap.remove(fingerprint);
+ alarmCacheManager.removePending(fingerprint);
}
- private String calculateFingerprint(Map<String, String> fingerPrints) {
- List<String> keyList =
fingerPrints.keySet().stream().filter(Objects::nonNull).sorted().toList();
- List<String> valueList =
fingerPrints.values().stream().filter(Objects::nonNull).sorted().toList();
- return Arrays.hashCode(keyList.toArray(new String[0])) + "-"
- + Arrays.hashCode(valueList.toArray(new String[0]));
- }
}
diff --git
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/RealTimeAlertCalculator.java
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/RealTimeAlertCalculator.java
index ac08a63d4b..35b0334270 100644
---
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/RealTimeAlertCalculator.java
+++
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/RealTimeAlertCalculator.java
@@ -18,14 +18,12 @@
package org.apache.hertzbeat.alert.calculate;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -38,6 +36,7 @@ import org.apache.hertzbeat.alert.dao.SingleAlertDao;
import org.apache.hertzbeat.alert.reduce.AlarmCommonReduce;
import org.apache.hertzbeat.alert.service.AlertDefineService;
import org.apache.hertzbeat.alert.util.AlertTemplateUtil;
+import org.apache.hertzbeat.alert.util.AlertUtil;
import org.apache.hertzbeat.common.constants.CommonConstants;
import org.apache.hertzbeat.common.entity.alerter.AlertDefine;
import org.apache.hertzbeat.common.entity.alerter.SingleAlert;
@@ -76,37 +75,20 @@ public class RealTimeAlertCalculator {
private static final Pattern INSTANCE_PATTERN =
Pattern.compile("equals\\(__instance__,\\s\"(\\d+)\"\\)");
private static final Pattern METRICS_PATTERN =
Pattern.compile("equals\\(__metrics__,\"([^\"]+)\"\\)");
- /**
- * The alarm in the process is triggered
- * key - labels fingerprint
- */
- private final Map<String, SingleAlert> pendingAlertMap;
- /**
- * The not recover alert
- * key - labels fingerprint
- */
- private final Map<String, SingleAlert> firingAlertMap;
private final AlerterWorkerPool workerPool;
private final CommonDataQueue dataQueue;
private final AlertDefineService alertDefineService;
private final AlarmCommonReduce alarmCommonReduce;
+ private final AlarmCacheManager alarmCacheManager;
public RealTimeAlertCalculator(AlerterWorkerPool workerPool,
CommonDataQueue dataQueue,
AlertDefineService alertDefineService,
SingleAlertDao singleAlertDao,
- AlarmCommonReduce alarmCommonReduce) {
+ AlarmCommonReduce alarmCommonReduce,
AlarmCacheManager alarmCacheManager) {
this.workerPool = workerPool;
this.dataQueue = dataQueue;
this.alarmCommonReduce = alarmCommonReduce;
this.alertDefineService = alertDefineService;
- this.pendingAlertMap = new ConcurrentHashMap<>(8);
- this.firingAlertMap = new ConcurrentHashMap<>(8);
- // Initialize firing stateAlertMap
- List<SingleAlert> singleAlerts =
singleAlertDao.querySingleAlertsByStatus(CommonConstants.ALERT_STATUS_FIRING);
- for (SingleAlert singleAlert : singleAlerts) {
- String fingerprint = calculateFingerprint(singleAlert.getLabels());
- singleAlert.setId(null);
- firingAlertMap.put(fingerprint, singleAlert);
- }
+ this.alarmCacheManager = alarmCacheManager;
startCalculate();
}
@@ -329,8 +311,8 @@ public class RealTimeAlertCalculator {
}
private void handleRecoveredAlert(Map<String, String> fingerprints) {
- String fingerprint = calculateFingerprint(fingerprints);
- SingleAlert firingAlert = firingAlertMap.remove(fingerprint);
+ String fingerprint = AlertUtil.calculateFingerprint(fingerprints);
+ SingleAlert firingAlert = alarmCacheManager.removeFiring(fingerprint);
if (firingAlert != null) {
// todo consider multi times to tig for resolved alert
firingAlert.setTriggerTimes(1);
@@ -338,13 +320,13 @@ public class RealTimeAlertCalculator {
firingAlert.setStatus(CommonConstants.ALERT_STATUS_RESOLVED);
alarmCommonReduce.reduceAndSendAlarm(firingAlert.clone());
}
- pendingAlertMap.remove(fingerprint);
+ alarmCacheManager.removePending(fingerprint);
}
private void afterThresholdRuleMatch(long currentTimeMilli, Map<String,
String> fingerPrints,
Map<String, Object> fieldValueMap,
AlertDefine define, Map<String, String> annotations) {
- String fingerprint = calculateFingerprint(fingerPrints);
- SingleAlert existingAlert = pendingAlertMap.get(fingerprint);
+ String fingerprint = AlertUtil.calculateFingerprint(fingerPrints);
+ SingleAlert existingAlert = alarmCacheManager.getPending(fingerprint);
fieldValueMap.putAll(define.getLabels());
int requiredTimes = define.getTimes() == null ? 1 : define.getTimes();
if (existingAlert == null) {
@@ -376,11 +358,11 @@ public class RealTimeAlertCalculator {
// If required trigger times is 1, set to firing status directly
if (requiredTimes <= 1) {
newAlert.setStatus(CommonConstants.ALERT_STATUS_FIRING);
- firingAlertMap.put(fingerprint, newAlert);
+ alarmCacheManager.putFiring(fingerprint, newAlert);
alarmCommonReduce.reduceAndSendAlarm(newAlert.clone());
} else {
// Otherwise put into pending queue first
- pendingAlertMap.put(fingerprint, newAlert);
+ alarmCacheManager.putPending(fingerprint, newAlert);
}
} else {
// Update existing alert
@@ -390,9 +372,9 @@ public class RealTimeAlertCalculator {
// Check if required trigger times reached
if
(existingAlert.getStatus().equals(CommonConstants.ALERT_STATUS_PENDING) &&
existingAlert.getTriggerTimes() >= requiredTimes) {
// Reached trigger times threshold, change to firing status
- pendingAlertMap.remove(fingerprint);
+ alarmCacheManager.removePending(fingerprint);
existingAlert.setStatus(CommonConstants.ALERT_STATUS_FIRING);
- firingAlertMap.put(fingerprint, existingAlert);
+ alarmCacheManager.putFiring(fingerprint, existingAlert);
alarmCommonReduce.reduceAndSendAlarm(existingAlert.clone());
}
}
@@ -426,13 +408,6 @@ public class RealTimeAlertCalculator {
}
return match != null && match;
}
-
- private String calculateFingerprint(Map<String, String> fingerPrints) {
- List<String> keyList =
fingerPrints.keySet().stream().filter(Objects::nonNull).sorted().toList();
- List<String> valueList =
fingerPrints.values().stream().filter(Objects::nonNull).sorted().toList();
- return Arrays.hashCode(keyList.toArray(new String[0])) + "-"
- + Arrays.hashCode(valueList.toArray(new String[0]));
- }
private Set<String> kvLabelsToKvStringSet(Map<String, String> labels) {
if (labels == null || labels.isEmpty()) {
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/dao/CollectorDao.java
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/dao/AlertCollectorDao.java
similarity index 66%
copy from
hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/dao/CollectorDao.java
copy to
hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/dao/AlertCollectorDao.java
index 4aefaa2e79..c76a06e6dc 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/dao/CollectorDao.java
+++
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/dao/AlertCollectorDao.java
@@ -15,30 +15,21 @@
* limitations under the License.
*/
-package org.apache.hertzbeat.manager.dao;
+package org.apache.hertzbeat.alert.dao;
-import java.util.Optional;
import org.apache.hertzbeat.common.entity.manager.Collector;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
-import org.springframework.data.jpa.repository.Modifying;
/**
- * Collector repository
+ * Alert Collector Dao
*/
-public interface CollectorDao extends JpaRepository<Collector, Long>,
JpaSpecificationExecutor<Collector> {
-
+public interface AlertCollectorDao extends JpaRepository<Collector, Long>,
JpaSpecificationExecutor<Collector> {
+
/**
- * find collector by name
- * @param name name
+ * Query collector by name
+ * @param name collector name
* @return collector
*/
- Optional<Collector> findCollectorByName(String name);
-
- /**
- * delete collector by name
- * @param collector collector name
- */
- @Modifying
- void deleteCollectorByName(String collector);
+ Collector findCollectorByName(String name);
}
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/support/event/MonitorDeletedEvent.java
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/util/AlertUtil.java
similarity index 55%
copy from
hertzbeat-common/src/main/java/org/apache/hertzbeat/common/support/event/MonitorDeletedEvent.java
copy to
hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/util/AlertUtil.java
index 5049a46b5d..a1ed180c3f 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/support/event/MonitorDeletedEvent.java
+++
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/util/AlertUtil.java
@@ -15,26 +15,26 @@
* limitations under the License.
*/
-package org.apache.hertzbeat.common.support.event;
+package org.apache.hertzbeat.alert.util;
-import org.springframework.context.ApplicationEvent;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
/**
- * the event for system config change
+ * alert util
*/
-public class MonitorDeletedEvent extends ApplicationEvent {
-
+public class AlertUtil {
+
/**
- * monitoring id
+ * calculate fingerprint
+ * @param fingerPrints finger prints
*/
- private final Long monitorId;
-
- public MonitorDeletedEvent(Object source, Long monitorId) {
- super(source);
- this.monitorId = monitorId;
- }
-
- public Long getMonitorId() {
- return monitorId;
+ public static String calculateFingerprint(Map<String, String>
fingerPrints) {
+ List<String> keyList =
fingerPrints.keySet().stream().filter(Objects::nonNull).sorted().toList();
+ List<String> valueList =
fingerPrints.values().stream().filter(Objects::nonNull).sorted().toList();
+ return Arrays.hashCode(keyList.toArray(new String[0])) + "-"
+ + Arrays.hashCode(valueList.toArray(new String[0]));
}
}
diff --git a/hertzbeat-alerter/src/main/resources/alerter_en_US.properties
b/hertzbeat-alerter/src/main/resources/alerter_en_US.properties
index 938da98ac3..e05324b2d0 100644
--- a/hertzbeat-alerter/src/main/resources/alerter_en_US.properties
+++ b/hertzbeat-alerter/src/main/resources/alerter_en_US.properties
@@ -14,6 +14,8 @@
# limitations under the License.
alerter.availability.recover = Availability Alert Resolved, Monitor Status
Normal Now
+alerter.availability.collector.recover = Collector Availability Alert
Resolved, The collector is online
+alerter.availability.collector.offline = Collector Availability Alert Notify,
The collector is offline
alerter.alarm.recover = Alert Resolved Notice
alerter.notify.title = HertzBeat Alert Notify
alerter.notify.target = Monitor Target
diff --git a/hertzbeat-alerter/src/main/resources/alerter_zh_CN.properties
b/hertzbeat-alerter/src/main/resources/alerter_zh_CN.properties
index b1f11e284d..33f4e88d52 100644
--- a/hertzbeat-alerter/src/main/resources/alerter_zh_CN.properties
+++ b/hertzbeat-alerter/src/main/resources/alerter_zh_CN.properties
@@ -14,6 +14,8 @@
# limitations under the License.
alerter.availability.recover = 可用性告警恢复通知, 任务状态已恢复正常
+alerter.availability.collector.recover = 采集器可用性恢复通知,采集器已上线
+alerter.availability.collector.offline = 采集器可用性告警通知,采集器已下线
alerter.alarm.recover = 告警恢复通知
alerter.notify.title = HertzBeat告警通知
alerter.notify.target = 告警目标对象
diff --git a/hertzbeat-alerter/src/main/resources/alerter_zh_TW.properties
b/hertzbeat-alerter/src/main/resources/alerter_zh_TW.properties
index a038fc9ec8..bc9331430b 100644
--- a/hertzbeat-alerter/src/main/resources/alerter_zh_TW.properties
+++ b/hertzbeat-alerter/src/main/resources/alerter_zh_TW.properties
@@ -15,6 +15,8 @@
alerter.availability.recover = 可用性警報已解決,監視狀態現在正常
alerter.alarm.recover = 警報解決通知
+alerter.availability.collector.recover = 采集器可用性恢復通知,采集器已上線
+alerter.availability.collector.offline = 采集器可用性告警通知,采集器已下線
alerter.notify.title = HertzBeat 警報通知
alerter.notify.target = 監視目標
alerter.notify.monitorId = 監視 ID
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/support/event/MonitorDeletedEvent.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/support/event/MonitorDeletedEvent.java
index 5049a46b5d..a451aa91ba 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/support/event/MonitorDeletedEvent.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/support/event/MonitorDeletedEvent.java
@@ -20,7 +20,7 @@ package org.apache.hertzbeat.common.support.event;
import org.springframework.context.ApplicationEvent;
/**
- * the event for system config change
+ * the event for monitor delete
*/
public class MonitorDeletedEvent extends ApplicationEvent {
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/dao/CollectorDao.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/dao/CollectorDao.java
index 4aefaa2e79..8affacd3ae 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/dao/CollectorDao.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/dao/CollectorDao.java
@@ -17,6 +17,7 @@
package org.apache.hertzbeat.manager.dao;
+import java.util.List;
import java.util.Optional;
import org.apache.hertzbeat.common.entity.manager.Collector;
import org.springframework.data.jpa.repository.JpaRepository;
@@ -34,7 +35,14 @@ public interface CollectorDao extends
JpaRepository<Collector, Long>, JpaSpecifi
* @return collector
*/
Optional<Collector> findCollectorByName(String name);
-
+
+ /**
+ * find collectors by names
+ * @param names collector name list
+ * @return collector list
+ */
+ List<Collector> findCollectorsByNameIn(List<String> names);
+
/**
* delete collector by name
* @param collector collector name
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
index 68513fd5c5..352415b65f 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/scheduler/netty/ManageServer.java
@@ -24,6 +24,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.hertzbeat.alert.calculate.CollectorAlertHandler;
import org.apache.hertzbeat.common.entity.message.ClusterMsg;
import org.apache.hertzbeat.common.support.CommonThreadPool;
import org.apache.hertzbeat.manager.scheduler.CollectorJobScheduler;
@@ -56,6 +57,8 @@ public class ManageServer implements CommandLineRunner {
private final CollectorJobScheduler collectorJobScheduler;
+ private final CollectorAlertHandler collectorAlertHandler;
+
private ScheduledExecutorService channelSchedule;
private RemotingServer remotingServer;
@@ -64,9 +67,11 @@ public class ManageServer implements CommandLineRunner {
public ManageServer(final SchedulerProperties schedulerProperties,
final CollectorJobScheduler collectorJobScheduler,
- final CommonThreadPool threadPool) {
+ final CommonThreadPool threadPool,
+ final CollectorAlertHandler collectorAlertHandler) {
this.collectorJobScheduler = collectorJobScheduler;
this.collectorJobScheduler.setManageServer(this);
+ this.collectorAlertHandler = collectorAlertHandler;
this.init(schedulerProperties, threadPool);
}
@@ -98,6 +103,7 @@ public class ManageServer implements CommandLineRunner {
channel.closeFuture();
this.clientChannelTable.remove(collector);
this.collectorJobScheduler.collectorGoOffline(collector);
+ this.collectorAlertHandler.offline(collector);
}
});
} catch (Exception e) {
@@ -131,6 +137,7 @@ public class ManageServer implements CommandLineRunner {
preChannel.close();
}
this.clientChannelTable.put(identity, channel);
+ this.collectorAlertHandler.online(identity);
}
public void closeChannel(final String identity) {
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/CollectorServiceImpl.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/CollectorServiceImpl.java
index 4be9c982b3..cafa02f291 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/CollectorServiceImpl.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/service/impl/CollectorServiceImpl.java
@@ -61,7 +61,7 @@ public class CollectorServiceImpl implements CollectorService
{
@Autowired(required = false)
private ManageServer manageServer;
-
+
@Override
@Transactional(readOnly = true)
public Page<CollectorSummary> getCollectors(String name, int pageIndex,
Integer pageSize) {
diff --git
a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/dao/CollectorDaoTest.java
b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/dao/CollectorDaoTest.java
index fab0126250..997ebeaa56 100644
---
a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/dao/CollectorDaoTest.java
+++
b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/dao/CollectorDaoTest.java
@@ -19,6 +19,7 @@ package org.apache.hertzbeat.manager.dao;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import javax.annotation.Resource;
import org.apache.hertzbeat.common.entity.manager.Collector;
import org.apache.hertzbeat.manager.AbstractSpringIntegrationTest;
@@ -27,6 +28,8 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.transaction.annotation.Transactional;
+import java.util.List;
+
/**
* Test case for {@link CollectorDao}
*/
@@ -66,5 +69,8 @@ public class CollectorDaoTest extends
AbstractSpringIntegrationTest {
assertTrue(collectorDao.findCollectorByName("test").isPresent());
}
-
+ @Test
+ public void findCollectorsByNameIn(){
+
assertFalse(collectorDao.findCollectorsByNameIn(List.of("test")).isEmpty());
+ }
}
diff --git
a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/service/CollectorServiceTest.java
b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/service/CollectorServiceTest.java
index c31ec4c8b6..f5dc2095b7 100644
---
a/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/service/CollectorServiceTest.java
+++
b/hertzbeat-manager/src/test/java/org/apache/hertzbeat/manager/service/CollectorServiceTest.java
@@ -41,6 +41,7 @@ import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Spy;
import org.mockito.junit.jupiter.MockitoExtension;
+import org.springframework.context.ApplicationContext;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.jpa.domain.Specification;
@@ -67,6 +68,9 @@ public class CollectorServiceTest {
@Mock
private ManageServer manageServer;
+ @Mock
+ private ApplicationContext applicationContext;
+
@Test
public void getCollectors() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]