Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2836#discussion_r218173746
  
    --- Diff: 
storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java 
---
    @@ -0,0 +1,229 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.storm.daemon.nimbus;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.function.Function;
    +import org.apache.storm.generated.Assignment;
    +import org.apache.storm.generated.ExecutorInfo;
    +import org.apache.storm.generated.SupervisorWorkerHeartbeat;
    +import 
org.apache.storm.shade.com.google.common.annotations.VisibleForTesting;
    +import org.apache.storm.stats.ClientStatsUtil;
    +import org.apache.storm.stats.StatsUtil;
    +import org.apache.storm.utils.Time;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +/**
    + * Holds a cache of heartbeats from the workers.
    + */
    +public class HeartbeatCache {
    +    private static final Logger LOG = 
LoggerFactory.getLogger(HeartbeatCache.class);
    +    private static final Function<String, ConcurrentHashMap<List<Integer>, 
ExecutorCache>> MAKE_MAP = (k) -> new ConcurrentHashMap<>();
    +
    +    private static class ExecutorCache {
    +        private Boolean isTimedOut;
    +        private Integer nimbusTime;
    +        private Integer executorReportedTime;
    +
    +        public ExecutorCache(Map<String, Object> newBeat) {
    +            if (newBeat != null) {
    +                executorReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +            } else {
    +                executorReportedTime = 0;
    +            }
    +
    +            nimbusTime = Time.currentTimeSecs();
    +            isTimedOut = false;
    +        }
    +
    +        public ExecutorCache(boolean isTimedOut, Integer nimbusTime, 
Integer executorReportedTime) {
    +            this.isTimedOut = isTimedOut;
    +            this.nimbusTime = nimbusTime;
    +            this.executorReportedTime = executorReportedTime;
    +        }
    +
    +        public synchronized Boolean isTimedOut() {
    +            return isTimedOut;
    +        }
    +
    +        public synchronized Integer getNimbusTime() {
    +            return nimbusTime;
    +        }
    +
    +        public synchronized Integer getExecutorReportedTime() {
    +            return executorReportedTime;
    +        }
    +
    +        public synchronized void updateTimeout(Integer timeout) {
    +            isTimedOut = Time.deltaSecs(getNimbusTime()) >= timeout;
    +        }
    +
    +        public synchronized void updateFromHb(Integer timeout, 
Map<String,Object> newBeat) {
    +            if (newBeat != null) {
    +                Integer newReportedTime = (Integer) 
newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0);
    +                if (!newReportedTime.equals(executorReportedTime)) {
    +                    nimbusTime = Time.currentTimeSecs();
    +                }
    +                executorReportedTime = newReportedTime;
    +            }
    +            updateTimeout(timeout);
    +        }
    +    }
    +
    +    //Topology Id -> executor ids -> component -> stats(...)
    +    private final ConcurrentHashMap<String, 
ConcurrentHashMap<List<Integer>, ExecutorCache>> cache;
    +
    +    /**
    +     * Create an empty cache.
    +     */
    +    public HeartbeatCache() {
    +        this.cache = new ConcurrentHashMap<>();
    +    }
    +
    +    /**
    +     * Add an empty topology to the cache for testing purposes.
    +     * @param topoId the id of the topology to add.
    +     */
    +    @VisibleForTesting
    +    public void addEmptyTopoForTests(String topoId) {
    +        cache.put(topoId, new ConcurrentHashMap<>());
    +    }
    +
    +    /**
    +     * Get the number of topologies with cached heartbeats.
    +     * @return the number of topologies with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public int getNumToposCached() {
    +        return cache.size();
    +    }
    +
    +    /**
    +     * Get the topology ids with cached heartbeats.
    +     * @return the set of topology ids with cached heartbeats.
    +     */
    +    @VisibleForTesting
    +    public Set<String> getTopologyIds() {
    +        return cache.keySet();
    +    }
    +
    +    /**
    +     * Remove a specific topology from the cache.
    +     * @param topoId the id of the topology to remove.
    +     */
    +    public void removeTopo(String topoId) {
    +        cache.remove(topoId);
    +    }
    +
    +    /**
    +     * Update the heartbeats for a topology with no heartbeats that came 
in.
    +     * @param topoId the id of the topology to look at.
    +     * @param taskTimeoutSecs the timeout to know if they are too old.
    +     */
    +    public void timeoutOldHeartbeats(String topoId, Integer 
taskTimeoutSecs) {
    +        Map<List<Integer>, ExecutorCache> topoCache = 
cache.computeIfAbsent(topoId, MAKE_MAP);
    +        //if not executor beats, refresh is-timed-out of the cache which 
is done by master
    --- End diff --
    
    The comment came from the original code as I was doing refactoring.  I'll 
make it better.


---

Reply via email to