Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2836#discussion_r218173746 --- Diff: storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.nimbus; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.storm.generated.Assignment; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.SupervisorWorkerHeartbeat; +import org.apache.storm.shade.com.google.common.annotations.VisibleForTesting; +import org.apache.storm.stats.ClientStatsUtil; +import org.apache.storm.stats.StatsUtil; +import org.apache.storm.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Holds a cache of heartbeats from the workers. + */ +public class HeartbeatCache { + private static final Logger LOG = LoggerFactory.getLogger(HeartbeatCache.class); + private static final Function<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>(); + + private static class ExecutorCache { + private Boolean isTimedOut; + private Integer nimbusTime; + private Integer executorReportedTime; + + public ExecutorCache(Map<String, Object> newBeat) { + if (newBeat != null) { + executorReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); + } else { + executorReportedTime = 0; + } + + nimbusTime = Time.currentTimeSecs(); + isTimedOut = false; + } + + public ExecutorCache(boolean isTimedOut, Integer nimbusTime, Integer executorReportedTime) { + this.isTimedOut = isTimedOut; + this.nimbusTime = nimbusTime; + this.executorReportedTime = executorReportedTime; + } + + public synchronized Boolean isTimedOut() { + return isTimedOut; + } + + public synchronized Integer getNimbusTime() { + return nimbusTime; + } + + public synchronized Integer getExecutorReportedTime() { + return executorReportedTime; + } + + public synchronized void updateTimeout(Integer timeout) { + isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout; + } + + public synchronized void updateFromHb(Integer timeout, Map<String,Object> newBeat) { + if (newBeat != null) { + Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); + if (!newReportedTime.equals(executorReportedTime)) { + nimbusTime = Time.currentTimeSecs(); + } + executorReportedTime = newReportedTime; + } + updateTimeout(timeout); + } + } + + //Topology Id -> executor ids -> component -> stats(...) + private final ConcurrentHashMap<String, ConcurrentHashMap<List<Integer>, ExecutorCache>> cache; + + /** + * Create an empty cache. + */ + public HeartbeatCache() { + this.cache = new ConcurrentHashMap<>(); + } + + /** + * Add an empty topology to the cache for testing purposes. + * @param topoId the id of the topology to add. + */ + @VisibleForTesting + public void addEmptyTopoForTests(String topoId) { + cache.put(topoId, new ConcurrentHashMap<>()); + } + + /** + * Get the number of topologies with cached heartbeats. + * @return the number of topologies with cached heartbeats. + */ + @VisibleForTesting + public int getNumToposCached() { + return cache.size(); + } + + /** + * Get the topology ids with cached heartbeats. + * @return the set of topology ids with cached heartbeats. + */ + @VisibleForTesting + public Set<String> getTopologyIds() { + return cache.keySet(); + } + + /** + * Remove a specific topology from the cache. + * @param topoId the id of the topology to remove. + */ + public void removeTopo(String topoId) { + cache.remove(topoId); + } + + /** + * Update the heartbeats for a topology with no heartbeats that came in. + * @param topoId the id of the topology to look at. + * @param taskTimeoutSecs the timeout to know if they are too old. + */ + public void timeoutOldHeartbeats(String topoId, Integer taskTimeoutSecs) { + Map<List<Integer>, ExecutorCache> topoCache = cache.computeIfAbsent(topoId, MAKE_MAP); + //if not executor beats, refresh is-timed-out of the cache which is done by master --- End diff -- The comment came from the original code as I was doing refactoring. I'll make it better.
---