This is an automated email from the ASF dual-hosted git repository.

rabreu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 14efb2e89 fix(HeartbeatCache): Falsely timing out alive executors when 
heartbeat TIME_SECS does not advance (#8420)
14efb2e89 is described below

commit 14efb2e893ce3e68f9f4cccc8da1ac3dba821236
Author: Diogo Filipe Pinto Pereira <[email protected]>
AuthorDate: Fri Mar 6 11:13:59 2026 +0000

    fix(HeartbeatCache): Falsely timing out alive executors when heartbeat 
TIME_SECS does not advance (#8420)
---
 .../apache/storm/daemon/nimbus/HeartbeatCache.java |  40 +++--
 .../storm/daemon/nimbus/HeartbeatCacheTest.java    | 195 +++++++++++++++++++++
 2 files changed, 224 insertions(+), 11 deletions(-)

diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
index bb6449c91..c69aa2c25 100644
--- 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
+++ 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
+
 import org.apache.storm.generated.Assignment;
 import org.apache.storm.generated.ExecutorInfo;
 import org.apache.storm.generated.SupervisorWorkerHeartbeat;
@@ -72,7 +73,16 @@ public class HeartbeatCache {
             isTimedOut = Time.deltaSecs(getNimbusTimeSecs()) >= timeout;
         }
 
-        public synchronized void updateFromHb(Integer timeout, Map<String, 
Object> newBeat) {
+        // Used for RPC heartbeats: nimbusTimeSecs is refreshed on every 
heartbeat so that
+        // idle-but-alive executors (whose stats TIME_SECS may not advance) 
are not falsely timed out.
+        public synchronized void updateFromRpcHb(Integer timeout) {
+            nimbusTimeSecs = Time.currentTimeSecs();
+            updateTimeout(timeout);
+        }
+
+        // Used for ZK heartbeats: nimbusTimeSecs is only refreshed when the 
executor's stats
+        // TIME_SECS advances, preserving zombie detection for legacy 
topologies.
+        public synchronized void updateFromZkHb(Integer timeout, Map<String, 
Object> newBeat) {
             if (newBeat != null) {
                 Integer newReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
                 if (!newReportedTime.equals(executorReportedTimeSecs)) {
@@ -96,6 +106,7 @@ public class HeartbeatCache {
 
     /**
      * Add an empty topology to the cache for testing purposes.
+     *
      * @param topoId the id of the topology to add.
      */
     @VisibleForTesting
@@ -105,6 +116,7 @@ public class HeartbeatCache {
 
     /**
      * Get the number of topologies with cached heartbeats.
+     *
      * @return the number of topologies with cached heartbeats.
      */
     @VisibleForTesting
@@ -114,6 +126,7 @@ public class HeartbeatCache {
 
     /**
      * Get the topology ids with cached heartbeats.
+     *
      * @return the set of topology ids with cached heartbeats.
      */
     @VisibleForTesting
@@ -123,6 +136,7 @@ public class HeartbeatCache {
 
     /**
      * Remove a specific topology from the cache.
+     *
      * @param topoId the id of the topology to remove.
      */
     public void removeTopo(String topoId) {
@@ -131,7 +145,8 @@ public class HeartbeatCache {
 
     /**
      * Go through all executors and time them out if needed.
-     * @param topoId the id of the topology to look at.
+     *
+     * @param topoId          the id of the topology to look at.
      * @param taskTimeoutSecs the timeout to know if they are too old.
      */
     public void timeoutOldHeartbeats(String topoId, Integer taskTimeoutSecs) {
@@ -143,10 +158,11 @@ public class HeartbeatCache {
 
     /**
      * Update the cache with heartbeats from a worker through zookeeper.
-     * @param topoId the id to the topology.
+     *
+     * @param topoId        the id to the topology.
      * @param executorBeats the HB data.
-     * @param allExecutors the executors.
-     * @param timeout the timeout.
+     * @param allExecutors  the executors.
+     * @param timeout       the timeout.
      */
     public void updateFromZkHeartbeat(String topoId, Map<List<Integer>, 
Map<String, Object>> executorBeats,
                                       Set<List<Integer>> allExecutors, Integer 
timeout) {
@@ -158,12 +174,13 @@ public class HeartbeatCache {
         for (List<Integer> executor : allExecutors) {
             final Map<String, Object> newBeat = executorBeats.get(executor);
             ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) 
-> new ExecutorCache(newBeat));
-            currBeat.updateFromHb(timeout, newBeat);
+            currBeat.updateFromZkHb(timeout, newBeat);
         }
     }
 
     /**
      * Update the heartbeats for a given worker.
+     *
      * @param workerHeartbeat the heartbeats from the worker.
      * @param taskTimeoutSecs the timeout we should be looking at.
      */
@@ -176,22 +193,23 @@ public class HeartbeatCache {
             List<Integer> executor = 
Arrays.asList(executorInfo.get_task_start(), executorInfo.get_task_end());
             final Map<String, Object> newBeat = executorBeats.get(executor);
             ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k) 
-> new ExecutorCache(newBeat));
-            currBeat.updateFromHb(taskTimeoutSecs, newBeat);
+            currBeat.updateFromRpcHb(taskTimeoutSecs);
         }
     }
 
     /**
      * Get all of the alive executors for a given topology.
-     * @param topoId the id of the topology we are looking for.
-     * @param allExecutors all of the executors for this topology.
-     * @param assignment the current topology assignment.
+     *
+     * @param topoId         the id of the topology we are looking for.
+     * @param allExecutors   all of the executors for this topology.
+     * @param assignment     the current topology assignment.
      * @param taskLaunchSecs timeout for right after a worker is launched.
      * @return the set of tasks that are alive.
      */
     public Set<List<Integer>> getAliveExecutors(String topoId, 
Set<List<Integer>> allExecutors, Assignment assignment, int taskLaunchSecs) {
         Map<List<Integer>, ExecutorCache> topoCache = 
cache.computeIfAbsent(topoId, MAKE_MAP);
         LOG.debug("Computing alive executors for {}\nExecutors: 
{}\nAssignment: {}\nHeartbeat cache: {}",
-            topoId, allExecutors, assignment, topoCache);
+                topoId, allExecutors, assignment, topoCache);
 
         Set<List<Integer>> ret = new HashSet<>();
         Map<List<Long>, Long> execToStartTimes = 
assignment.get_executor_start_time_secs();
diff --git 
a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/HeartbeatCacheTest.java
 
b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/HeartbeatCacheTest.java
new file mode 100644
index 000000000..5ae1ee315
--- /dev/null
+++ 
b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/HeartbeatCacheTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.utils.Time;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class HeartbeatCacheTest {
+    private static final String TOPO_ID = "test-topology-1";
+    private static final int TIMEOUT_SECS = 30;
+
+    @Test
+    void testExecutorRemainsAliveWhenHeartbeatTimestampDoesNotAdvance() {
+        try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
+            HeartbeatCache cache = new HeartbeatCache();
+            Set<List<Integer>> allExecutors = 
Collections.singleton(Arrays.asList(1, 1));
+            Assignment assignment = mkAssignment(Time.currentTimeSecs(), 1, 1);
+
+            // First heartbeat at t=0 with TIME_SECS=100
+            cache.updateHeartbeat(mkWorkerHeartbeat(TOPO_ID, 100, 1, 1), 
TIMEOUT_SECS);
+
+            // Advance time to just before timeout
+            Time.advanceTimeSecs(TIMEOUT_SECS - 1);
+
+            // Second heartbeat arrives but TIME_SECS is still 100 (same 
second, stats not rotated)
+            cache.updateHeartbeat(mkWorkerHeartbeat(TOPO_ID, 100, 1, 1), 
TIMEOUT_SECS);
+
+            // Advance just 2 more seconds: now t = TIMEOUT_SECS + 1, which is 
past the original
+            // timeout window (rooted at t=0) but well within the refreshed 
window (rooted at t=TIMEOUT_SECS-1).
+            Time.advanceTimeSecs(2);
+
+            // Simulate the scheduling cycle timeout check
+            cache.timeoutOldHeartbeats(TOPO_ID, TIMEOUT_SECS);
+
+            // Executor should still be alive because a fresh heartbeat was 
received at t=(TIMEOUT_SECS-1)
+            Set<List<Integer>> alive = cache.getAliveExecutors(TOPO_ID, 
allExecutors, assignment, TIMEOUT_SECS);
+            assertFalse(alive.isEmpty(), "Executor should be alive after 
receiving a recent heartbeat even if TIME_SECS did not advance");
+        }
+    }
+
+    @Test
+    void testExecutorTimesOutWhenNoHeartbeatReceived() {
+        try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
+            HeartbeatCache cache = new HeartbeatCache();
+            Set<List<Integer>> allExecutors = 
Collections.singleton(Arrays.asList(1, 1));
+            Assignment assignment = mkAssignment(Time.currentTimeSecs(), 1, 1);
+
+            // Single heartbeat at t=0
+            cache.updateHeartbeat(mkWorkerHeartbeat(TOPO_ID, 100, 1, 1), 
TIMEOUT_SECS);
+
+            // No more heartbeats — advance time past the timeout
+            Time.advanceTimeSecs(TIMEOUT_SECS + 1);
+
+            // Simulate the scheduling cycle timeout check
+            cache.timeoutOldHeartbeats(TOPO_ID, TIMEOUT_SECS);
+
+            Set<List<Integer>> alive = cache.getAliveExecutors(TOPO_ID, 
allExecutors, assignment, TIMEOUT_SECS);
+            assertTrue(alive.isEmpty(), "Executor should be timed out after no 
heartbeat for longer than timeout");
+        }
+    }
+
+    @Test
+    void testExecutorAliveWithRegularHeartbeats() {
+        try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
+            HeartbeatCache cache = new HeartbeatCache();
+            Set<List<Integer>> allExecutors = 
Collections.singleton(Arrays.asList(1, 1));
+            Assignment assignment = mkAssignment(Time.currentTimeSecs(), 1, 1);
+
+            // Send heartbeats every second for 60 seconds
+            for (int t = 0; t < 60; t++) {
+                cache.updateHeartbeat(mkWorkerHeartbeat(TOPO_ID, 
Time.currentTimeSecs(), 1, 1), TIMEOUT_SECS);
+                Time.advanceTimeSecs(1);
+            }
+
+            // Simulate the scheduling cycle timeout check
+            cache.timeoutOldHeartbeats(TOPO_ID, TIMEOUT_SECS);
+
+            Set<List<Integer>> alive = cache.getAliveExecutors(TOPO_ID, 
allExecutors, assignment, TIMEOUT_SECS);
+            assertFalse(alive.isEmpty(), "Executor should be alive when 
receiving regular heartbeats");
+        }
+    }
+
+    @Test
+    void testZkExecutorTimesOutWhenTimeSecsStopsAdvancing() {
+        try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
+            HeartbeatCache cache = new HeartbeatCache();
+            Set<List<Integer>> allExecutors = 
Collections.singleton(Arrays.asList(1, 1));
+            Assignment assignment = mkAssignment(Time.currentTimeSecs(), 1, 1);
+
+            // Heartbeats with advancing TIME_SECS — executor is healthy
+            for (int t = 0; t < 5; t++) {
+                cache.updateFromZkHeartbeat(TOPO_ID, mkZkExecutorBeats(1, 1, t 
* 10), allExecutors, TIMEOUT_SECS);
+                Time.advanceTimeSecs(1);
+            }
+
+            // TIME_SECS freezes — zombie executor keeps sending heartbeats 
but stats are stuck
+            int frozenTimeSecs = 40;
+            for (int t = 0; t < TIMEOUT_SECS + 1; t++) {
+                cache.updateFromZkHeartbeat(TOPO_ID, mkZkExecutorBeats(1, 1, 
frozenTimeSecs), allExecutors, TIMEOUT_SECS);
+                Time.advanceTimeSecs(1);
+            }
+
+            cache.timeoutOldHeartbeats(TOPO_ID, TIMEOUT_SECS);
+
+            Set<List<Integer>> alive = cache.getAliveExecutors(TOPO_ID, 
allExecutors, assignment, TIMEOUT_SECS);
+            assertTrue(alive.isEmpty(), "ZK executor should be timed out when 
TIME_SECS stops advancing (zombie detection)");
+        }
+    }
+
+    @Test
+    void testZkExecutorAliveWhenTimeSecsAdvances() {
+        try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
+            HeartbeatCache cache = new HeartbeatCache();
+            Set<List<Integer>> allExecutors = 
Collections.singleton(Arrays.asList(1, 1));
+            Assignment assignment = mkAssignment(Time.currentTimeSecs(), 1, 1);
+
+            // Heartbeats with advancing TIME_SECS every second
+            for (int t = 0; t < 60; t++) {
+                cache.updateFromZkHeartbeat(TOPO_ID, mkZkExecutorBeats(1, 1, 
t), allExecutors, TIMEOUT_SECS);
+                Time.advanceTimeSecs(1);
+            }
+
+            cache.timeoutOldHeartbeats(TOPO_ID, TIMEOUT_SECS);
+
+            Set<List<Integer>> alive = cache.getAliveExecutors(TOPO_ID, 
allExecutors, assignment, TIMEOUT_SECS);
+            assertFalse(alive.isEmpty(), "ZK executor should be alive when 
TIME_SECS advances regularly");
+        }
+    }
+
+    private SupervisorWorkerHeartbeat mkWorkerHeartbeat(String topoId, int 
timeSecs, int... executors) {
+        SupervisorWorkerHeartbeat hb = new SupervisorWorkerHeartbeat();
+        hb.set_storm_id(topoId);
+        hb.set_time_secs(timeSecs);
+        for (int i = 0; i < executors.length - 1; i += 2) {
+            ExecutorInfo info = new ExecutorInfo();
+            info.set_task_start(executors[i]);
+            info.set_task_end(executors[i + 1]);
+            hb.add_to_executors(info);
+        }
+        return hb;
+    }
+
+
+    private Map<List<Integer>, Map<String, Object>> mkZkExecutorBeats(int 
taskStart, int taskEnd, int timeSecs) {
+        Map<String, Object> beat = new HashMap<>();
+        beat.put(ClientStatsUtil.TIME_SECS, timeSecs);
+        return Collections.singletonMap(Arrays.asList(taskStart, taskEnd), 
beat);
+    }
+
+    private Assignment mkAssignment(int startTimeSecs, int... executors) {
+        Assignment assignment = new Assignment();
+        Map<List<Long>, Long> execToStartTime = new HashMap<>();
+        Map<List<Long>, NodeInfo> execToNodePort = new HashMap<>();
+        NodeInfo nodeInfo = new NodeInfo("node1", 
Collections.singleton(6700L));
+        for (int i = 0; i < executors.length - 1; i += 2) {
+            List<Long> exec = Arrays.asList((long) executors[i], (long) 
executors[i + 1]);
+            execToStartTime.put(exec, (long) startTimeSecs);
+            execToNodePort.put(exec, nodeInfo);
+        }
+        assignment.set_executor_start_time_secs(execToStartTime);
+        assignment.set_executor_node_port(execToNodePort);
+        return assignment;
+    }
+}

Reply via email to