Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2343#discussion_r141188696 --- Diff: storm-server/src/main/java/org/apache/storm/scheduler/blacklist/BlacklistScheduler.java --- @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.scheduler.blacklist; + +import com.google.common.collect.EvictingQueue; +import org.apache.storm.DaemonConfig; +import org.apache.storm.metric.StormMetricsRegistry; +import org.apache.storm.scheduler.Cluster; +import org.apache.storm.scheduler.IScheduler; +import org.apache.storm.scheduler.SupervisorDetails; +import org.apache.storm.scheduler.Topologies; +import org.apache.storm.scheduler.WorkerSlot; +import org.apache.storm.scheduler.blacklist.reporters.IReporter; +import org.apache.storm.scheduler.blacklist.reporters.LogReporter; +import org.apache.storm.scheduler.blacklist.strategies.DefaultBlacklistStrategy; +import org.apache.storm.scheduler.blacklist.strategies.IBlacklistStrategy; +import org.apache.storm.utils.ObjectReader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +public class BlacklistScheduler implements IScheduler { + private static final Logger LOG = LoggerFactory.getLogger(BlacklistScheduler.class); + + public static final int DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME = 1800; + public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT = 3; + public static final int DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME = 300; + + private final IScheduler underlyingScheduler; + @SuppressWarnings("rawtypes") + private Map _conf; + + protected int toleranceTime; + protected int toleranceCount; + protected int resumeTime; + protected IReporter reporter; + protected IBlacklistStrategy blacklistStrategy; + + protected int nimbusMonitorFreqSecs; + + protected Map<String, Set<Integer>> cachedSupervisors; + + //key is supervisor key ,value is supervisor ports + protected EvictingQueue<HashMap<String, Set<Integer>>> badSupervisorsToleranceSlidingWindow; + protected int windowSize; + protected Set<String> blacklistHost; + + public BlacklistScheduler(IScheduler underlyingScheduler) { + this.underlyingScheduler = underlyingScheduler; + } + + @Override + public void prepare(Map conf) { + LOG.info("Preparing black list scheduler"); + underlyingScheduler.prepare(conf); + _conf = conf; + + toleranceTime = ObjectReader.getInt(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_TIME), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_TIME); + toleranceCount = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_TOLERANCE_COUNT), DEFAULT_BLACKLIST_SCHEDULER_TOLERANCE_COUNT); + resumeTime = ObjectReader.getInt( _conf.get(DaemonConfig.BLACKLIST_SCHEDULER_RESUME_TIME), DEFAULT_BLACKLIST_SCHEDULER_RESUME_TIME); + + String reporterClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_REPORTER), + LogReporter.class.getName()); + reporter = (IReporter) initializeInstance(reporterClassName, "blacklist reporter"); + + String strategyClassName = ObjectReader.getString(_conf.get(DaemonConfig.BLACKLIST_SCHEDULER_STRATEGY), + DefaultBlacklistStrategy.class.getName()); + blacklistStrategy = (IBlacklistStrategy) initializeInstance(strategyClassName, "blacklist strategy"); + + nimbusMonitorFreqSecs = ObjectReader.getInt( _conf.get(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS)); + blacklistStrategy.prepare(_conf); + + windowSize = toleranceTime / nimbusMonitorFreqSecs; + badSupervisorsToleranceSlidingWindow = EvictingQueue.create(windowSize); + cachedSupervisors = new HashMap<>(); + blacklistHost = new HashSet<>(); + + StormMetricsRegistry.registerGauge("nimbus:num-blacklisted-supervisor", new Callable<Integer>() { + @Override + public Integer call() throws Exception { + //nimbus:num-blacklisted-supervisor + none blacklisted supervisor = nimbus:num-supervisors + return blacklistHost.size(); + } + }); + } + + @Override + public void schedule(Topologies topologies, Cluster cluster) { + LOG.debug("running Black List scheduler"); + Map<String, SupervisorDetails> supervisors = cluster.getSupervisors(); + LOG.debug("AssignableSlots: {}", cluster.getAssignableSlots()); + LOG.debug("AvailableSlots: {}", cluster.getAvailableSlots()); + LOG.debug("UsedSlots: {}", cluster.getUsedSlots()); + + blacklistStrategy.resumeFromBlacklist(); + badSupervisors(supervisors); + Set<String> blacklistHosts = getBlacklistHosts(cluster, topologies); + this.blacklistHost = blacklistHosts; + cluster.setBlacklistedHosts(blacklistHosts); + removeLongTimeDisappearFromCache(); + + underlyingScheduler.schedule(topologies, cluster); + } + + @Override + public Map<String, Object> config() { + return underlyingScheduler.config(); + } + + private void badSupervisors(Map<String, SupervisorDetails> supervisors) { + Set<String> cachedSupervisorsKeySet = cachedSupervisors.keySet(); + Set<String> supervisorsKeySet = supervisors.keySet(); + + Set<String> badSupervisorKeys = Sets.difference(cachedSupervisorsKeySet, supervisorsKeySet); //cached supervisor doesn't show up + HashMap<String, Set<Integer>> badSupervisors = new HashMap<>(); + for (String key : badSupervisorKeys) { + badSupervisors.put(key, cachedSupervisors.get(key)); + } + + for (Map.Entry<String, SupervisorDetails> entry : supervisors.entrySet()) { + String key = entry.getKey(); + SupervisorDetails supervisorDetails = entry.getValue(); + if (cachedSupervisors.containsKey(key)) { + Set<Integer> badSlots = badSlots(supervisorDetails, key); + if (badSlots.size() > 0) { //supervisor contains bad slots + badSupervisors.put(key, badSlots); + } + } else { + cachedSupervisors.put(key, supervisorDetails.getAllPorts()); //new supervisor to cache + } + } + + badSupervisorsToleranceSlidingWindow.add(badSupervisors); + } + + private Set<Integer> badSlots(SupervisorDetails supervisor, String supervisorKey) { + Set<Integer> cachedSupervisorPorts = cachedSupervisors.get(supervisorKey); + Set<Integer> supervisorPorts = supervisor.getAllPorts(); + + Set<Integer> newPorts = Sets.difference(supervisorPorts, cachedSupervisorPorts); + if (newPorts.size() > 0) { + //add new ports to cached supervisor + cachedSupervisors.put(supervisorKey, Sets.union(newPorts, cachedSupervisorPorts)); + } + + Set<Integer> badSlots = Sets.difference(cachedSupervisorPorts, supervisorPorts); + return badSlots; + } + + private Set<String> getBlacklistHosts(Cluster cluster, Topologies topologies) { + Set<String> blacklistSet = blacklistStrategy.getBlacklist(new ArrayList<>(badSupervisorsToleranceSlidingWindow), cluster, topologies); + Set<String> blacklistHostSet = new HashSet<>(); + for (String supervisor : blacklistSet) { + String host = cluster.getHost(supervisor); + if (host != null) { + blacklistHostSet.add(host); + } else { + LOG.info("supervisor {} is not alive, do not need to add to blacklist.", supervisor); + } + } + return blacklistHostSet; + } + + /** + * supervisor or port never exits once in tolerance time will be removed from cache. + */ + private void removeLongTimeDisappearFromCache() { + + Map<String, Integer> supervisorCountMap = new HashMap<String, Integer>(); + Map<WorkerSlot, Integer> slotCountMap = new HashMap<WorkerSlot, Integer>(); + + for (Map<String, Set<Integer>> item : badSupervisorsToleranceSlidingWindow) { + Set<String> supervisors = item.keySet(); + for (String supervisor : supervisors) { + int supervisorCount = supervisorCountMap.getOrDefault(supervisor, 0); + Set<Integer> slots = item.get(supervisor); + if (slots.equals(cachedSupervisors.get(supervisor))) { // treat supervisor as bad only if all slots are bad + supervisorCountMap.put(supervisor, supervisorCount + 1); + } + for (Integer slot : slots) { + WorkerSlot workerSlot = new WorkerSlot(supervisor, slot); + int slotCount = slotCountMap.getOrDefault(workerSlot, 0); + slotCountMap.put(workerSlot, slotCount + 1); + } + } + } + + for (Map.Entry<String, Integer> entry : supervisorCountMap.entrySet()) { + String key = entry.getKey(); + int value = entry.getValue(); + if (value == windowSize) { // supervisor which was never back to normal in tolerance period will be removed from cache + cachedSupervisors.remove(key); + LOG.info("Supervisor {} was never back to normal during tolerance period, probably dead. Will remove from cache.", key); + } + } + + for (Map.Entry<WorkerSlot, Integer> entry : slotCountMap.entrySet()) { + WorkerSlot workerSlot = entry.getKey(); + String supervisorKey = workerSlot.getNodeId(); + Integer slot = workerSlot.getPort(); + int value = entry.getValue(); + if (value == windowSize) { // worker slot which was never back to normal in tolerance period will be removed from cache + Set<Integer> slots = cachedSupervisors.get(supervisorKey); + if (slots != null) { // slots will be null while supervisor has been removed from cached supervisors + slots.remove(slot); + cachedSupervisors.put(supervisorKey, slots); + } + LOG.info("Worker slot {} was never back to normal during tolerance period, probably dead. Will be removed from cache.", workerSlot); + } + } + } + + private Object initializeInstance(String className, String representation) { --- End diff -- I think there is a function in Utils, or ServerUtils that can do this for us.
---