Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2800#discussion_r210639246
--- Diff:
storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java ---
@@ -1660,26 +1658,18 @@ private TopologyDetails readTopologyDetails(String
topoId, StormBase base) throw
private void updateHeartbeatsFromZkHeartbeat(String topoId,
Set<List<Integer>> allExecutors, Assignment existingAssignment) {
LOG.debug("Updating heartbeats for {} {} (from ZK heartbeat)",
topoId, allExecutors);
- IStormClusterState state = stormClusterState;
Map<List<Integer>, Map<String, Object>> executorBeats =
- StatsUtil.convertExecutorBeats(state.executorBeats(topoId,
existingAssignment.get_executor_node_port()));
- Map<List<Integer>, Map<String, Object>> cache =
StatsUtil.updateHeartbeatCacheFromZkHeartbeat(heartbeatsCache.get().get(topoId),
-
executorBeats, allExecutors,
-
ObjectReader.getInt(conf.get(
-
DaemonConfig
-
.NIMBUS_TASK_TIMEOUT_SECS)));
- heartbeatsCache.getAndUpdate(new Assoc<>(topoId, cache));
+
StatsUtil.convertExecutorBeats(stormClusterState.executorBeats(topoId,
existingAssignment.get_executor_node_port()));
+ heartbeatsCache.compute(topoId, (k, v) ->
+ //Guaranteed side-effect-free
--- End diff --
We should probably put this requirement in comments on the two methods in
StatsUtil rather than here, so they stay side effect free.
---