This is an automated email from the ASF dual-hosted git repository.

zhaoqingran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hertzbeat.git


The following commit(s) were added to refs/heads/master by this push:
     new bc098aa4d2 [bugfix]manager: Avoid NPE when metrics data queue is empty 
(#3848)
bc098aa4d2 is described below

commit bc098aa4d25518d2f0ac22edaf1dd9bb97e4fa7f
Author: starryCoder <[email protected]>
AuthorDate: Fri Nov 14 16:09:49 2025 +0800

    [bugfix]manager: Avoid NPE when metrics data queue is empty (#3848)
    
    Co-authored-by: Duansg <[email protected]>
---
 .../realtime/MetricsRealTimeAlertCalculator.java   |  3 ++
 .../common/queue/impl/RedisCommonDataQueue.java    | 58 ++++++++++------------
 .../queue/impl/RedisCommonDataQueueTest.java       | 16 +++---
 .../component/sd/ServiceDiscoveryWorker.java       |  3 ++
 4 files changed, 43 insertions(+), 37 deletions(-)

diff --git 
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
 
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
index 8dd4401418..5fe19edd04 100644
--- 
a/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
+++ 
b/hertzbeat-alerter/src/main/java/org/apache/hertzbeat/alert/calculate/realtime/MetricsRealTimeAlertCalculator.java
@@ -125,6 +125,9 @@ public class MetricsRealTimeAlertCalculator {
             while (!Thread.currentThread().isInterrupted()) {
                 try {
                     CollectRep.MetricsData metricsData = 
dataQueue.pollMetricsDataToAlerter();
+                    if (metricsData == null) {
+                        continue;
+                    }
                     calculate(metricsData);
                     dataQueue.sendMetricsDataToStorage(metricsData);
                 } catch (InterruptedException ignored) {
diff --git 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
index 52073d03c1..ba69b25242 100644
--- 
a/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
+++ 
b/hertzbeat-common/src/main/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueue.java
@@ -17,6 +17,7 @@
 
 package org.apache.hertzbeat.common.queue.impl;
 
+import io.lettuce.core.KeyValue;
 import io.lettuce.core.RedisClient;
 import io.lettuce.core.RedisURI;
 import io.lettuce.core.api.StatefulRedisConnection;
@@ -86,33 +87,19 @@ public class RedisCommonDataQueue implements 
CommonDataQueue, DisposableBean {
     }
 
     @Override
-    public CollectRep.MetricsData pollMetricsDataToAlerter() {
-        try {
-            return syncCommands.rpop(metricsDataQueueNameToAlerter);
-        } catch (Exception e) {
-            log.error(e.getMessage());
-            return null;
-        }
+    public CollectRep.MetricsData pollMetricsDataToAlerter() throws 
InterruptedException {
+        return genericBlockingPollFunction(metricsDataQueueNameToAlerter, 
syncCommands);
     }
 
     @Override
     public CollectRep.MetricsData pollMetricsDataToStorage() throws 
InterruptedException {
-        try {
-            return syncCommands.rpop(metricsDataQueueNameToStorage);
-        } catch (Exception e) {
-            log.error(e.getMessage());
-            return null;
-        }
+        return genericBlockingPollFunction(metricsDataQueueNameToStorage, 
syncCommands);
+
     }
 
     @Override
     public CollectRep.MetricsData pollServiceDiscoveryData() throws 
InterruptedException {
-        try {
-            return syncCommands.rpop(metricsDataQueueNameForServiceDiscovery);
-        } catch (Exception e) {
-            log.error(e.getMessage());
-            return null;
-        }
+        return 
genericBlockingPollFunction(metricsDataQueueNameForServiceDiscovery, 
syncCommands);
     }
 
     @Override
@@ -153,12 +140,7 @@ public class RedisCommonDataQueue implements 
CommonDataQueue, DisposableBean {
 
     @Override
     public LogEntry pollLogEntry() throws InterruptedException {
-        try {
-            return logEntrySyncCommands.rpop(logEntryQueueName);
-        } catch (Exception e) {
-            log.error("Failed to poll LogEntry from Redis: {}", 
e.getMessage());
-            throw new InterruptedException("Failed to poll LogEntry from 
Redis");
-        }
+        return genericBlockingPollFunction(logEntryQueueName, 
logEntrySyncCommands);
     }
 
     @Override
@@ -172,12 +154,7 @@ public class RedisCommonDataQueue implements 
CommonDataQueue, DisposableBean {
 
     @Override
     public LogEntry pollLogEntryToStorage() throws InterruptedException {
-        try {
-            return logEntrySyncCommands.rpop(logEntryToStorageQueueName);
-        } catch (Exception e) {
-            log.error("Failed to poll LogEntry from storage via Redis: {}", 
e.getMessage());
-            throw new InterruptedException("Failed to poll LogEntry from 
storage via Redis");
-        }
+        return genericBlockingPollFunction(logEntryToStorageQueueName, 
logEntrySyncCommands);
     }
 
     @Override
@@ -187,4 +164,23 @@ public class RedisCommonDataQueue implements 
CommonDataQueue, DisposableBean {
         redisClient.shutdown();
     }
 
+
+
+    private <T> T genericBlockingPollFunction(String key, 
RedisCommands<String, T> commands) throws InterruptedException {
+        try {
+            // Use BRPOP for blocking pop with the configured timeout.
+            // If data arrives, it returns immediately; if it times out, it 
returns null.
+            KeyValue<String, T> keyData = commands.brpop(1L, key);
+            if (keyData != null) {
+                return keyData.getValue();
+            } else {
+                // Returns null on timeout
+                return null;
+            }
+        } catch (Exception e) {
+            log.error("Redis BRPOP failed: {}", e.getMessage());
+            return null;
+        }
+    }
+
 }
diff --git 
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
 
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
index c6cf873955..affb9a37ea 100644
--- 
a/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
+++ 
b/hertzbeat-common/src/test/java/org/apache/hertzbeat/common/queue/impl/RedisCommonDataQueueTest.java
@@ -27,6 +27,7 @@ import io.lettuce.core.RedisClient;
 import io.lettuce.core.RedisURI;
 import io.lettuce.core.api.StatefulRedisConnection;
 import io.lettuce.core.api.sync.RedisCommands;
+import io.lettuce.core.KeyValue;
 import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
@@ -95,11 +96,13 @@ class RedisCommonDataQueueTest {
         CollectRep.MetricsData metricsData = 
CollectRep.MetricsData.newBuilder()
                 .setMetrics("test metrics")
                 .build();
+        String queueName = "metricsDataQueueToAlerter";
 
-        
when(syncCommands.rpop("metricsDataQueueToAlerter")).thenReturn(metricsData);
+        when(syncCommands.brpop(1L, 
queueName)).thenReturn(KeyValue.just(queueName, metricsData));
 
         CollectRep.MetricsData actualMetricsData = 
redisCommonDataQueue.pollMetricsDataToAlerter();
         assertEquals(metricsData, actualMetricsData);
+        verify(syncCommands).brpop(1L, queueName);
     }
 
     @Test
@@ -163,13 +166,14 @@ class RedisCommonDataQueueTest {
                 .severityText("WARN")
                 .body("Test warning log message")
                 .build();
+        String queueName = "logEntryQueue";
 
-        
when(logEntrySyncCommands.rpop("logEntryQueue")).thenReturn(expectedLogEntry);
+        when(logEntrySyncCommands.brpop(1L, 
queueName)).thenReturn(KeyValue.just(queueName, expectedLogEntry));
 
         LogEntry result = redisCommonDataQueue.pollLogEntry();
         
         assertEquals(expectedLogEntry, result);
-        verify(logEntrySyncCommands).rpop("logEntryQueue");
+        verify(logEntrySyncCommands).brpop(1L, "logEntryQueue");
     }
 
     @Test
@@ -181,13 +185,13 @@ class RedisCommonDataQueueTest {
                 .severityText("FATAL")
                 .body("Critical error log for storage")
                 .build();
+        String queueName = "logEntryToStorageQueue";
 
-        
when(logEntrySyncCommands.rpop("logEntryToStorageQueue")).thenReturn(expectedLogEntry);
-
+        when(logEntrySyncCommands.brpop(1L, 
queueName)).thenReturn(KeyValue.just(queueName, expectedLogEntry));
         LogEntry result = redisCommonDataQueue.pollLogEntryToStorage();
         
         assertEquals(expectedLogEntry, result);
-        verify(logEntrySyncCommands).rpop("logEntryToStorageQueue");
+        verify(logEntrySyncCommands).brpop(1L, "logEntryToStorageQueue");
     }
 
     @Test
diff --git 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
index 740f4277a6..1ab255d72f 100644
--- 
a/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
+++ 
b/hertzbeat-manager/src/main/java/org/apache/hertzbeat/manager/component/sd/ServiceDiscoveryWorker.java
@@ -82,6 +82,9 @@ public class ServiceDiscoveryWorker implements 
InitializingBean {
         public void run() {
             while (!Thread.currentThread().isInterrupted()) {
                 try (final CollectRep.MetricsData metricsData = 
dataQueue.pollServiceDiscoveryData()) {
+                    if (metricsData == null) {
+                        continue;
+                    }
                     Long monitorId = metricsData.getId();
                     final Monitor mainMonitor = 
monitorDao.findById(monitorId).orElse(null);
                     if (mainMonitor == null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to