This is an automated email from the ASF dual-hosted git repository.
zhaoqingran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git
The following commit(s) were added to refs/heads/master by this push:
new bc098aa4d2 [bugfix]manager: Avoid NPE when metrics data queue is empty
(#3848)
bc098aa4d2 is described below
commit bc098aa4d25518d2f0ac22edaf1dd9bb97e4fa7f
Author: starryCoder <[email protected]>
AuthorDate: Fri Nov 14 16:09:49 2025 +0800
[bugfix]manager: Avoid NPE when metrics data queue is empty (#3848)
Co-authored-by: Duansg <[email protected]>
---
.../realtime/MetricsRealTimeAlertCalculator.java | 3 ++
.../common/queue/impl/RedisCommonDataQueue.java | 58 ++++++++++------------
.../queue/impl/RedisCommonDataQueueTest.java | 16 +++---
.../component/sd/ServiceDiscoveryWorker.java | 3 ++
4 files changed, 43 insertions(+), 37 deletions(-)
diff --git
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
index 8dd4401418..5fe19edd04 100644
---
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
+++
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
@@ -125,6 +125,9 @@ public class MetricsRealTimeAlertCalculator {
while (!Thread.currentThread().isInterrupted()) {
try {
CollectRep.MetricsData metricsData =
dataQueue.pollMetricsDataToAlerter();
+ if (metricsData == null) {
+ continue;
+ }
calculate(metricsData);
dataQueue.sendMetricsDataToStorage(metricsData);
} catch (InterruptedException ignored) {
diff --git
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
index 52073d03c1..ba69b25242 100644
---
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
+++
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
@@ -17,6 +17,7 @@
package org.apache.hertzbeat.common.queue.impl;
+import io.lettuce.core.KeyValue;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
@@ -86,33 +87,19 @@ public class RedisCommonDataQueue implements
CommonDataQueue, DisposableBean {
}
@Override
- public CollectRep.MetricsData pollMetricsDataToAlerter() {
- try {
- return syncCommands.rpop(metricsDataQueueNameToAlerter);
- } catch (Exception e) {
- log.error(e.getMessage());
- return null;
- }
+ public CollectRep.MetricsData pollMetricsDataToAlerter() throws
InterruptedException {
+ return genericBlockingPollFunction(metricsDataQueueNameToAlerter,
syncCommands);
}
@Override
public CollectRep.MetricsData pollMetricsDataToStorage() throws
InterruptedException {
- try {
- return syncCommands.rpop(metricsDataQueueNameToStorage);
- } catch (Exception e) {
- log.error(e.getMessage());
- return null;
- }
+ return genericBlockingPollFunction(metricsDataQueueNameToStorage,
syncCommands);
+
}
@Override
public CollectRep.MetricsData pollServiceDiscoveryData() throws
InterruptedException {
- try {
- return syncCommands.rpop(metricsDataQueueNameForServiceDiscovery);
- } catch (Exception e) {
- log.error(e.getMessage());
- return null;
- }
+ return
genericBlockingPollFunction(metricsDataQueueNameForServiceDiscovery,
syncCommands);
}
@Override
@@ -153,12 +140,7 @@ public class RedisCommonDataQueue implements
CommonDataQueue, DisposableBean {
@Override
public LogEntry pollLogEntry() throws InterruptedException {
- try {
- return logEntrySyncCommands.rpop(logEntryQueueName);
- } catch (Exception e) {
- log.error("Failed to poll LogEntry from Redis: {}",
e.getMessage());
- throw new InterruptedException("Failed to poll LogEntry from
Redis");
- }
+ return genericBlockingPollFunction(logEntryQueueName,
logEntrySyncCommands);
}
@Override
@@ -172,12 +154,7 @@ public class RedisCommonDataQueue implements
CommonDataQueue, DisposableBean {
@Override
public LogEntry pollLogEntryToStorage() throws InterruptedException {
- try {
- return logEntrySyncCommands.rpop(logEntryToStorageQueueName);
- } catch (Exception e) {
- log.error("Failed to poll LogEntry from storage via Redis: {}",
e.getMessage());
- throw new InterruptedException("Failed to poll LogEntry from
storage via Redis");
- }
+ return genericBlockingPollFunction(logEntryToStorageQueueName,
logEntrySyncCommands);
}
@Override
@@ -187,4 +164,23 @@ public class RedisCommonDataQueue implements
CommonDataQueue, DisposableBean {
redisClient.shutdown();
}
+
+
+ private <T> T genericBlockingPollFunction(String key,
RedisCommands<String, T> commands) throws InterruptedException {
+ try {
+ // Use BRPOP for blocking pop with the configured timeout.
+ // If data arrives, it returns immediately; if it times out, it
returns null.
+ KeyValue<String, T> keyData = commands.brpop(1L, key);
+ if (keyData != null) {
+ return keyData.getValue();
+ } else {
+ // Returns null on timeout
+ return null;
+ }
+ } catch (Exception e) {
+ log.error("Redis BRPOP failed: {}", e.getMessage());
+ return null;
+ }
+ }
+
}
diff --git
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
index c6cf873955..affb9a37ea 100644
---
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
+++
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
@@ -27,6 +27,7 @@ import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.KeyValue;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
@@ -95,11 +96,13 @@ class RedisCommonDataQueueTest {
CollectRep.MetricsData metricsData =
CollectRep.MetricsData.newBuilder()
.setMetrics("test metrics")
.build();
+ String queueName = "metricsDataQueueToAlerter";
-
when(syncCommands.rpop("metricsDataQueueToAlerter")).thenReturn(metricsData);
+ when(syncCommands.brpop(1L,
queueName)).thenReturn(KeyValue.just(queueName, metricsData));
CollectRep.MetricsData actualMetricsData =
redisCommonDataQueue.pollMetricsDataToAlerter();
assertEquals(metricsData, actualMetricsData);
+ verify(syncCommands).brpop(1L, queueName);
}
@Test
@@ -163,13 +166,14 @@ class RedisCommonDataQueueTest {
.severityText("WARN")
.body("Test warning log message")
.build();
+ String queueName = "logEntryQueue";
-
when(logEntrySyncCommands.rpop("logEntryQueue")).thenReturn(expectedLogEntry);
+ when(logEntrySyncCommands.brpop(1L,
queueName)).thenReturn(KeyValue.just(queueName, expectedLogEntry));
LogEntry result = redisCommonDataQueue.pollLogEntry();
assertEquals(expectedLogEntry, result);
- verify(logEntrySyncCommands).rpop("logEntryQueue");
+ verify(logEntrySyncCommands).brpop(1L, "logEntryQueue");
}
@Test
@@ -181,13 +185,13 @@ class RedisCommonDataQueueTest {
.severityText("FATAL")
.body("Critical error log for storage")
.build();
+ String queueName = "logEntryToStorageQueue";
-
when(logEntrySyncCommands.rpop("logEntryToStorageQueue")).thenReturn(expectedLogEntry);
-
+ when(logEntrySyncCommands.brpop(1L,
queueName)).thenReturn(KeyValue.just(queueName, expectedLogEntry));
LogEntry result = redisCommonDataQueue.pollLogEntryToStorage();
assertEquals(expectedLogEntry, result);
- verify(logEntrySyncCommands).rpop("logEntryToStorageQueue");
+ verify(logEntrySyncCommands).brpop(1L, "logEntryToStorageQueue");
}
@Test
diff --git
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
index 740f4277a6..1ab255d72f 100644
---
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
+++
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
@@ -82,6 +82,9 @@ public class ServiceDiscoveryWorker implements
InitializingBean {
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try (final CollectRep.MetricsData metricsData =
dataQueue.pollServiceDiscoveryData()) {
+ if (metricsData == null) {
+ continue;
+ }
Long monitorId = metricsData.getId();
final Monitor mainMonitor =
monitorDao.findById(monitorId).orElse(null);
if (mainMonitor == null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]