This is an automated email from the ASF dual-hosted git repository.
rabreu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 14efb2e89 fix(HeartbeatCache): Falsely timing out alive executors when
heartbeat TIME_SECS does not advance (#8420)
14efb2e89 is described below
commit 14efb2e893ce3e68f9f4cccc8da1ac3dba821236
Author: Diogo Filipe Pinto Pereira <[email protected]>
AuthorDate: Fri Mar 6 11:13:59 2026 +0000
fix(HeartbeatCache): Falsely timing out alive executors when heartbeat
TIME_SECS does not advance (#8420)
---
.../apache/storm/daemon/nimbus/HeartbeatCache.java | 40 +++--
.../storm/daemon/nimbus/HeartbeatCacheTest.java | 195 +++++++++++++++++++++
2 files changed, 224 insertions(+), 11 deletions(-)
diff --git
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
index bb6449c91..c69aa2c25 100644
---
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
+++
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
+
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.SupervisorWorkerHeartbeat;
@@ -72,7 +73,16 @@ public class HeartbeatCache {
isTimedOut = Time.deltaSecs(getNimbusTimeSecs()) >= timeout;
}
- public synchronized void updateFromHb(Integer timeout, Map<String,
Object> newBeat) {
+ // Used for RPC heartbeats: nimbusTimeSecs is refreshed on every
heartbeat so that
+ // idle-but-alive executors (whose stats TIME_SECS may not advance)
are not falsely timed out.
+ public synchronized void updateFromRpcHb(Integer timeout) {
+ nimbusTimeSecs = Time.currentTimeSecs();
+ updateTimeout(timeout);
+ }
+
+ // Used for ZK heartbeats: nimbusTimeSecs is only refreshed when the
executor's stats
+ // TIME_SECS advances, preserving zombie detection for legacy
topologies.
+ public synchronized void updateFromZkHb(Integer timeout, Map<String,
Object> newBeat) {
if (newBeat != null) {
Integer newReportedTime = (Integer)
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
if (!newReportedTime.equals(executorReportedTimeSecs)) {
@@ -96,6 +106,7 @@ public class HeartbeatCache {
/**
* Add an empty topology to the cache for testing purposes.
+ *
* @param topoId the id of the topology to add.
*/
@VisibleForTesting
@@ -105,6 +116,7 @@ public class HeartbeatCache {
/**
* Get the number of topologies with cached heartbeats.
+ *
* @return the number of topologies with cached heartbeats.
*/
@VisibleForTesting
@@ -114,6 +126,7 @@ public class HeartbeatCache {
/**
* Get the topology ids with cached heartbeats.
+ *
* @return the set of topology ids with cached heartbeats.
*/
@VisibleForTesting
@@ -123,6 +136,7 @@ public class HeartbeatCache {
/**
* Remove a specific topology from the cache.
+ *
* @param topoId the id of the topology to remove.
*/
public void removeTopo(String topoId) {
@@ -131,7 +145,8 @@ public class HeartbeatCache {
/**
* Go through all executors and time them out if needed.
- * @param topoId the id of the topology to look at.
+ *
+ * @param topoId the id of the topology to look at.
* @param taskTimeoutSecs the timeout to know if they are too old.
*/
public void timeoutOldHeartbeats(String topoId, Integer taskTimeoutSecs) {
@@ -143,10 +158,11 @@ public class HeartbeatCache {
/**
* Update the cache with heartbeats from a worker through zookeeper.
- * @param topoId the id to the topology.
+ *
+ * @param topoId the id to the topology.
* @param executorBeats the HB data.
- * @param allExecutors the executors.
- * @param timeout the timeout.
+ * @param allExecutors the executors.
+ * @param timeout the timeout.
*/
public void updateFromZkHeartbeat(String topoId, Map<List<Integer>,
Map<String, Object>> executorBeats,
Set<List<Integer>> allExecutors, Integer
timeout) {
@@ -158,12 +174,13 @@ public class HeartbeatCache {
for (List<Integer> executor : allExecutors) {
final Map<String, Object> newBeat = executorBeats.get(executor);
ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k)
-> new ExecutorCache(newBeat));
- currBeat.updateFromHb(timeout, newBeat);
+ currBeat.updateFromZkHb(timeout, newBeat);
}
}
/**
* Update the heartbeats for a given worker.
+ *
* @param workerHeartbeat the heartbeats from the worker.
* @param taskTimeoutSecs the timeout we should be looking at.
*/
@@ -176,22 +193,23 @@ public class HeartbeatCache {
List<Integer> executor =
Arrays.asList(executorInfo.get_task_start(), executorInfo.get_task_end());
final Map<String, Object> newBeat = executorBeats.get(executor);
ExecutorCache currBeat = topoCache.computeIfAbsent(executor, (k)
-> new ExecutorCache(newBeat));
- currBeat.updateFromHb(taskTimeoutSecs, newBeat);
+ currBeat.updateFromRpcHb(taskTimeoutSecs);
}
}
/**
* Get all of the alive executors for a given topology.
- * @param topoId the id of the topology we are looking for.
- * @param allExecutors all of the executors for this topology.
- * @param assignment the current topology assignment.
+ *
+ * @param topoId the id of the topology we are looking for.
+ * @param allExecutors all of the executors for this topology.
+ * @param assignment the current topology assignment.
* @param taskLaunchSecs timeout for right after a worker is launched.
* @return the set of tasks that are alive.
*/
public Set<List<Integer>> getAliveExecutors(String topoId,
Set<List<Integer>> allExecutors, Assignment assignment, int taskLaunchSecs) {
Map<List<Integer>, ExecutorCache> topoCache =
cache.computeIfAbsent(topoId, MAKE_MAP);
LOG.debug("Computing alive executors for {}\nExecutors:
{}\nAssignment: {}\nHeartbeat cache: {}",
- topoId, allExecutors, assignment, topoCache);
+ topoId, allExecutors, assignment, topoCache);
Set<List<Integer>> ret = new HashSet<>();
Map<List<Long>, Long> execToStartTimes =
assignment.get_executor_start_time_secs();
diff --git
a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/HeartbeatCacheTest.java
b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/HeartbeatCacheTest.java
new file mode 100644
index 000000000..5ae1ee315
--- /dev/null
+++
b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/HeartbeatCacheTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.NodeInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.utils.Time;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class HeartbeatCacheTest {
+ private static final String TOPO_ID = "test-topology-1";
+ private static final int TIMEOUT_SECS = 30;
+
+ @Test
+ void testExecutorRemainsAliveWhenHeartbeatTimestampDoesNotAdvance() {
+ try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
+ HeartbeatCache cache = new HeartbeatCache();
+ Set<List<Integer>> allExecutors =
Collections.singleton(Arrays.asList(1, 1));
+ Assignment assignment = mkAssignment(Time.currentTimeSecs(), 1, 1);
+
+ // First heartbeat at t=0 with TIME_SECS=100
+ cache.updateHeartbeat(mkWorkerHeartbeat(TOPO_ID, 100, 1, 1),
TIMEOUT_SECS);
+
+ // Advance time to just before timeout
+ Time.advanceTimeSecs(TIMEOUT_SECS - 1);
+
+ // Second heartbeat arrives but TIME_SECS is still 100 (same
second, stats not rotated)
+ cache.updateHeartbeat(mkWorkerHeartbeat(TOPO_ID, 100, 1, 1),
TIMEOUT_SECS);
+
+ // Advance just 2 more seconds: now t = TIMEOUT_SECS + 1, which is
past the original
+ // timeout window (rooted at t=0) but well within the refreshed
window (rooted at t=TIMEOUT_SECS-1).
+ Time.advanceTimeSecs(2);
+
+ // Simulate the scheduling cycle timeout check
+ cache.timeoutOldHeartbeats(TOPO_ID, TIMEOUT_SECS);
+
+ // Executor should still be alive because a fresh heartbeat was
received at t=(TIMEOUT_SECS-1)
+ Set<List<Integer>> alive = cache.getAliveExecutors(TOPO_ID,
allExecutors, assignment, TIMEOUT_SECS);
+ assertFalse(alive.isEmpty(), "Executor should be alive after
receiving a recent heartbeat even if TIME_SECS did not advance");
+ }
+ }
+
+ @Test
+ void testExecutorTimesOutWhenNoHeartbeatReceived() {
+ try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
+ HeartbeatCache cache = new HeartbeatCache();
+ Set<List<Integer>> allExecutors =
Collections.singleton(Arrays.asList(1, 1));
+ Assignment assignment = mkAssignment(Time.currentTimeSecs(), 1, 1);
+
+ // Single heartbeat at t=0
+ cache.updateHeartbeat(mkWorkerHeartbeat(TOPO_ID, 100, 1, 1),
TIMEOUT_SECS);
+
+ // No more heartbeats — advance time past the timeout
+ Time.advanceTimeSecs(TIMEOUT_SECS + 1);
+
+ // Simulate the scheduling cycle timeout check
+ cache.timeoutOldHeartbeats(TOPO_ID, TIMEOUT_SECS);
+
+ Set<List<Integer>> alive = cache.getAliveExecutors(TOPO_ID,
allExecutors, assignment, TIMEOUT_SECS);
+ assertTrue(alive.isEmpty(), "Executor should be timed out after no
heartbeat for longer than timeout");
+ }
+ }
+
+ @Test
+ void testExecutorAliveWithRegularHeartbeats() {
+ try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
+ HeartbeatCache cache = new HeartbeatCache();
+ Set<List<Integer>> allExecutors =
Collections.singleton(Arrays.asList(1, 1));
+ Assignment assignment = mkAssignment(Time.currentTimeSecs(), 1, 1);
+
+ // Send heartbeats every second for 60 seconds
+ for (int t = 0; t < 60; t++) {
+ cache.updateHeartbeat(mkWorkerHeartbeat(TOPO_ID,
Time.currentTimeSecs(), 1, 1), TIMEOUT_SECS);
+ Time.advanceTimeSecs(1);
+ }
+
+ // Simulate the scheduling cycle timeout check
+ cache.timeoutOldHeartbeats(TOPO_ID, TIMEOUT_SECS);
+
+ Set<List<Integer>> alive = cache.getAliveExecutors(TOPO_ID,
allExecutors, assignment, TIMEOUT_SECS);
+ assertFalse(alive.isEmpty(), "Executor should be alive when
receiving regular heartbeats");
+ }
+ }
+
+ @Test
+ void testZkExecutorTimesOutWhenTimeSecsStopsAdvancing() {
+ try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
+ HeartbeatCache cache = new HeartbeatCache();
+ Set<List<Integer>> allExecutors =
Collections.singleton(Arrays.asList(1, 1));
+ Assignment assignment = mkAssignment(Time.currentTimeSecs(), 1, 1);
+
+ // Heartbeats with advancing TIME_SECS — executor is healthy
+ for (int t = 0; t < 5; t++) {
+ cache.updateFromZkHeartbeat(TOPO_ID, mkZkExecutorBeats(1, 1, t
* 10), allExecutors, TIMEOUT_SECS);
+ Time.advanceTimeSecs(1);
+ }
+
+ // TIME_SECS freezes — zombie executor keeps sending heartbeats
but stats are stuck
+ int frozenTimeSecs = 40;
+ for (int t = 0; t < TIMEOUT_SECS + 1; t++) {
+ cache.updateFromZkHeartbeat(TOPO_ID, mkZkExecutorBeats(1, 1,
frozenTimeSecs), allExecutors, TIMEOUT_SECS);
+ Time.advanceTimeSecs(1);
+ }
+
+ cache.timeoutOldHeartbeats(TOPO_ID, TIMEOUT_SECS);
+
+ Set<List<Integer>> alive = cache.getAliveExecutors(TOPO_ID,
allExecutors, assignment, TIMEOUT_SECS);
+ assertTrue(alive.isEmpty(), "ZK executor should be timed out when
TIME_SECS stops advancing (zombie detection)");
+ }
+ }
+
+ @Test
+ void testZkExecutorAliveWhenTimeSecsAdvances() {
+ try (Time.SimulatedTime ignored = new Time.SimulatedTime()) {
+ HeartbeatCache cache = new HeartbeatCache();
+ Set<List<Integer>> allExecutors =
Collections.singleton(Arrays.asList(1, 1));
+ Assignment assignment = mkAssignment(Time.currentTimeSecs(), 1, 1);
+
+ // Heartbeats with advancing TIME_SECS every second
+ for (int t = 0; t < 60; t++) {
+ cache.updateFromZkHeartbeat(TOPO_ID, mkZkExecutorBeats(1, 1,
t), allExecutors, TIMEOUT_SECS);
+ Time.advanceTimeSecs(1);
+ }
+
+ cache.timeoutOldHeartbeats(TOPO_ID, TIMEOUT_SECS);
+
+ Set<List<Integer>> alive = cache.getAliveExecutors(TOPO_ID,
allExecutors, assignment, TIMEOUT_SECS);
+ assertFalse(alive.isEmpty(), "ZK executor should be alive when
TIME_SECS advances regularly");
+ }
+ }
+
+ private SupervisorWorkerHeartbeat mkWorkerHeartbeat(String topoId, int
timeSecs, int... executors) {
+ SupervisorWorkerHeartbeat hb = new SupervisorWorkerHeartbeat();
+ hb.set_storm_id(topoId);
+ hb.set_time_secs(timeSecs);
+ for (int i = 0; i < executors.length - 1; i += 2) {
+ ExecutorInfo info = new ExecutorInfo();
+ info.set_task_start(executors[i]);
+ info.set_task_end(executors[i + 1]);
+ hb.add_to_executors(info);
+ }
+ return hb;
+ }
+
+
+ private Map<List<Integer>, Map<String, Object>> mkZkExecutorBeats(int
taskStart, int taskEnd, int timeSecs) {
+ Map<String, Object> beat = new HashMap<>();
+ beat.put(ClientStatsUtil.TIME_SECS, timeSecs);
+ return Collections.singletonMap(Arrays.asList(taskStart, taskEnd),
beat);
+ }
+
+ private Assignment mkAssignment(int startTimeSecs, int... executors) {
+ Assignment assignment = new Assignment();
+ Map<List<Long>, Long> execToStartTime = new HashMap<>();
+ Map<List<Long>, NodeInfo> execToNodePort = new HashMap<>();
+ NodeInfo nodeInfo = new NodeInfo("node1",
Collections.singleton(6700L));
+ for (int i = 0; i < executors.length - 1; i += 2) {
+ List<Long> exec = Arrays.asList((long) executors[i], (long)
executors[i + 1]);
+ execToStartTime.put(exec, (long) startTimeSecs);
+ execToNodePort.put(exec, nodeInfo);
+ }
+ assignment.set_executor_start_time_secs(execToStartTime);
+ assignment.set_executor_node_port(execToNodePort);
+ return assignment;
+ }
+}