Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2836#discussion_r218140315
--- Diff:
storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java
---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.daemon.nimbus;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import org.apache.storm.generated.Assignment;
+import org.apache.storm.generated.ExecutorInfo;
+import org.apache.storm.generated.SupervisorWorkerHeartbeat;
+import
org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
+import org.apache.storm.stats.ClientStatsUtil;
+import org.apache.storm.stats.StatsUtil;
+import org.apache.storm.utils.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Holds a cache of heartbeats from the workers.
+ */
+public class HeartbeatCache {
+ private static final Logger LOG =
LoggerFactory.getLogger(HeartbeatCache.class);
+ private static final Function<String, ConcurrentHashMap<List<Integer>,
ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
+
+ private static class ExecutorCache {
+ private Boolean isTimedOut;
+ private Integer nimbusTime;
+ private Integer executorReportedTime;
+
+ public ExecutorCache(Map<String, Object> newBeat) {
+ if (newBeat != null) {
+ executorReportedTime = (Integer)
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ } else {
+ executorReportedTime = 0;
+ }
+
+ nimbusTime = Time.currentTimeSecs();
+ isTimedOut = false;
+ }
+
+ public ExecutorCache(boolean isTimedOut, Integer nimbusTime,
Integer executorReportedTime) {
+ this.isTimedOut = isTimedOut;
+ this.nimbusTime = nimbusTime;
+ this.executorReportedTime = executorReportedTime;
+ }
+
+ public synchronized Boolean isTimedOut() {
+ return isTimedOut;
+ }
+
+ public synchronized Integer getNimbusTime() {
+ return nimbusTime;
+ }
+
+ public synchronized Integer getExecutorReportedTime() {
+ return executorReportedTime;
+ }
+
+ public synchronized void updateTimeout(Integer timeout) {
+ isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
+ }
+
+ public synchronized void updateFromHb(Integer timeout,
Map<String,Object> newBeat) {
+ if (newBeat != null) {
+ Integer newReportedTime = (Integer)
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
+ if (!newReportedTime.equals(executorReportedTime)) {
+ nimbusTime = Time.currentTimeSecs();
+ }
+ executorReportedTime = newReportedTime;
+ }
+ updateTimeout(timeout);
+ }
+ }
+
+ //Topology Id -> executor ids -> component -> stats(...)
+ private final ConcurrentHashMap<String,
ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
+
+ /**
+ * Create an empty cache.
+ */
+ public HeartbeatCache() {
+ this.cache = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Add an empty topology to the cache for testing purposes.
+ * @param topoId the id of the topology to add.
+ */
+ @VisibleForTesting
+ public void addEmptyTopoForTests(String topoId) {
+ cache.put(topoId, new ConcurrentHashMap<>());
+ }
+
+ /**
+ * Get the number of topologies with cached heartbeats.
+ * @return the number of topologies with cached heartbeats.
+ */
+ @VisibleForTesting
+ public int getNumToposCached() {
+ return cache.size();
+ }
+
+ /**
+ * Get the topology ids with cached heartbeats.
+ * @return the set of topology ids with cached heartbeats.
+ */
+ @VisibleForTesting
+ public Set<String> getTopologyIds() {
+ return cache.keySet();
+ }
+
+ /**
+ * Remove a specific topology from the cache.
+ * @param topoId the id of the topology to remove.
+ */
+ public void removeTopo(String topoId) {
+ cache.remove(topoId);
+ }
+
+ /**
+ * Update the heartbeats for a topology with no heartbeats that came
in.
--- End diff --
I don't understand what this is saying.
---