HDFS-11461. DataNode Disk Outlier Detection. Contributed by Hanisha Koneru.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b3ec531f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b3ec531f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b3ec531f Branch: refs/heads/HDFS-7240 Commit: b3ec531f400dd0a6506dc71233d38ae57b764a43 Parents: 747bafa Author: Arpit Agarwal <a...@apache.org> Authored: Thu Mar 2 12:45:48 2017 -0800 Committer: Arpit Agarwal <a...@apache.org> Committed: Thu Mar 2 12:45:48 2017 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 6 +- .../server/blockmanagement/SlowPeerTracker.java | 4 +- .../hdfs/server/datanode/BPServiceActor.java | 2 +- .../hadoop/hdfs/server/datanode/DNConf.java | 16 +- .../hadoop/hdfs/server/datanode/DataNode.java | 10 + .../datanode/metrics/DataNodeDiskMetrics.java | 181 +++++++++++++++++ .../datanode/metrics/DataNodePeerMetrics.java | 6 +- .../datanode/metrics/OutlierDetector.java | 182 +++++++++++++++++ .../datanode/metrics/SlowNodeDetector.java | 194 ------------------- .../src/main/resources/hdfs-default.xml | 2 +- .../TestDataNodeOutlierDetectionViaMetrics.java | 6 +- .../datanode/metrics/TestSlowNodeDetector.java | 30 +-- 12 files changed, 412 insertions(+), 227 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ec531f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 68cef36..be20829 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -677,9 +677,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT = "dfs.block.misreplication.processing.limit"; public static final int DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT = 10000; - public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY = - "dfs.datanode.slow.peers.report.interval"; - public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT = + public static final String DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY = + "dfs.datanode.outliers.report.interval"; + public static final String DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT = "30m"; // property for fsimage compression http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ec531f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java index cf3a20c..c72a621 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java @@ -94,8 +94,8 @@ public class SlowPeerTracker { this.timer = timer; this.allReports = new ConcurrentHashMap<>(); this.reportValidityMs = conf.getTimeDuration( - DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY, - DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT, + DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, + DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS) * 3; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ec531f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 644a8ab..a0ba627 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -129,7 +129,7 @@ class BPServiceActor implements Runnable { prevBlockReportId = ThreadLocalRandom.current().nextLong(); scheduler = new Scheduler(dnConf.heartBeatInterval, dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval, - dnConf.slowPeersReportIntervalMs); + dnConf.outliersReportIntervalMs); // get the value of maxDataLength. this.maxDataLength = dnConf.getMaxDataLength(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ec531f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 2723677..3275ba8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -30,8 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; @@ -95,7 +95,8 @@ public class DNConf { final long blockReportInterval; final long blockReportSplitThreshold; final boolean peerStatsEnabled; - final long slowPeersReportIntervalMs; + final boolean diskStatsEnabled; + final long outliersReportIntervalMs; final long ibrInterval; final long initialBlockReportDelayMs; final long cacheReportInterval; @@ -173,9 +174,12 @@ public class DNConf { this.peerStatsEnabled = getConf().getBoolean( DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT); - this.slowPeersReportIntervalMs = getConf().getTimeDuration( - DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY, - DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT, + this.diskStatsEnabled = getConf().getBoolean( + DFSConfigKeys.DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY, + DFSConfigKeys.DFS_DATANODE_ENABLE_FILEIO_PROFILING_DEFAULT); + this.outliersReportIntervalMs = getConf().getTimeDuration( + DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY, + DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_DEFAULT, TimeUnit.MILLISECONDS); this.ibrInterval = getConf().getLong( DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ec531f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 5db41bd..6f24858 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -164,6 +164,7 @@ import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResour import org.apache.hadoop.hdfs.server.datanode.erasurecode.ErasureCodingWorker; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeDiskMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics; import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer; @@ -336,6 +337,7 @@ public class DataNode extends ReconfigurableBase DataNodeMetrics metrics; @Nullable private DataNodePeerMetrics peerMetrics; + private DataNodeDiskMetrics diskMetrics; private InetSocketAddress streamingAddr; // See the note below in incrDatanodeNetworkErrors re: concurrency. @@ -1390,6 +1392,11 @@ public class DataNode extends ReconfigurableBase dnConf.saslPropsResolver, dnConf.trustedChannelResolver); saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); startMetricsLogger(); + + if (dnConf.diskStatsEnabled) { + diskMetrics = new DataNodeDiskMetrics(this, + dnConf.outliersReportIntervalMs); + } } /** @@ -2046,6 +2053,9 @@ public class DataNode extends ReconfigurableBase if (metrics != null) { metrics.shutdown(); } + if (diskMetrics != null) { + diskMetrics.shutdownAndWait(); + } if (dataNodeInfoBeanName != null) { MBeans.unregister(dataNodeInfoBeanName); dataNodeInfoBeanName = null; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ec531f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java new file mode 100644 index 0000000..85e2bd9 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeDiskMetrics.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode.metrics; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.util.Daemon; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; + +/** + * This class detects and maintains DataNode disk outliers and their + * latencies for different ops (metadata, read, write). + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class DataNodeDiskMetrics { + + public static final Logger LOG = LoggerFactory.getLogger( + DataNodeDiskMetrics.class); + + private DataNode dn; + private final long MIN_OUTLIER_DETECTION_DISKS = 5; + private final long SLOW_DISK_LOW_THRESHOLD_MS = 20; + private final long detectionInterval; + private volatile boolean shouldRun; + private OutlierDetector slowDiskDetector; + private Daemon slowDiskDetectionDaemon; + private volatile Map<String, Map<DiskOutlierDetectionOp, Double>> diskOutliersStats; + + public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs) { + this.dn = dn; + this.detectionInterval = diskOutlierDetectionIntervalMs; + slowDiskDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_DISKS, + SLOW_DISK_LOW_THRESHOLD_MS); + shouldRun = true; + startDiskOutlierDetectionThread(); + } + + private void startDiskOutlierDetectionThread() { + slowDiskDetectionDaemon = new Daemon(new Runnable() { + @Override + public void run() { + while (shouldRun) { + Map<String, Double> metadataOpStats = Maps.newHashMap(); + Map<String, Double> readIoStats = Maps.newHashMap(); + Map<String, Double> writeIoStats = Maps.newHashMap(); + FsDatasetSpi.FsVolumeReferences fsVolumeReferences = null; + try { + fsVolumeReferences = dn.getFSDataset().getFsVolumeReferences(); + Iterator<FsVolumeSpi> volumeIterator = fsVolumeReferences + .iterator(); + while (volumeIterator.hasNext()) { + FsVolumeSpi volume = volumeIterator.next(); + DataNodeVolumeMetrics metrics = volumeIterator.next().getMetrics(); + String volumeName = volume.getBaseURI().getPath(); + + metadataOpStats.put(volumeName, + metrics.getMetadataOperationMean()); + readIoStats.put(volumeName, metrics.getReadIoMean()); + writeIoStats.put(volumeName, metrics.getWriteIoMean()); + } + } finally { + if (fsVolumeReferences != null) { + try { + fsVolumeReferences.close(); + } catch (IOException e) { + LOG.error("Error in releasing FS Volume references", e); + } + } + } + if (metadataOpStats.isEmpty() && readIoStats.isEmpty() && + writeIoStats.isEmpty()) { + LOG.debug("No disk stats available for detecting outliers."); + return; + } + + detectAndUpdateDiskOutliers(metadataOpStats, readIoStats, + writeIoStats); + + try { + Thread.sleep(detectionInterval); + } catch (InterruptedException e) { + LOG.error("Disk Outlier Detection thread interrupted", e); + Thread.currentThread().interrupt(); + } + } + } + }); + slowDiskDetectionDaemon.start(); + } + + private void detectAndUpdateDiskOutliers(Map<String, Double> metadataOpStats, + Map<String, Double> readIoStats, Map<String, Double> writeIoStats) { + Set<String> diskOutliersSet = Sets.newHashSet(); + + // Get MetadataOp Outliers + Map<String, Double> metadataOpOutliers = slowDiskDetector + .getOutliers(metadataOpStats); + if (!metadataOpOutliers.isEmpty()) { + diskOutliersSet.addAll(metadataOpOutliers.keySet()); + } + + // Get ReadIo Outliers + Map<String, Double> readIoOutliers = slowDiskDetector + .getOutliers(readIoStats); + if (!readIoOutliers.isEmpty()) { + diskOutliersSet.addAll(readIoOutliers.keySet()); + } + + // Get WriteIo Outliers + Map<String, Double> writeIoOutliers = slowDiskDetector + .getOutliers(writeIoStats); + if (!readIoOutliers.isEmpty()) { + diskOutliersSet.addAll(writeIoOutliers.keySet()); + } + + Map<String, Map<DiskOutlierDetectionOp, Double>> diskStats = + Maps.newHashMap(); + for (String disk : diskOutliersSet) { + Map<DiskOutlierDetectionOp, Double> diskStat = Maps.newHashMap(); + diskStat.put(DiskOutlierDetectionOp.METADATA, metadataOpStats.get(disk)); + diskStat.put(DiskOutlierDetectionOp.READ, readIoStats.get(disk)); + diskStat.put(DiskOutlierDetectionOp.WRITE, writeIoStats.get(disk)); + diskStats.put(disk, diskStat); + } + + diskOutliersStats = diskStats; + LOG.debug("Updated disk outliers."); + } + + /** + * Lists the types of operations on which disk latencies are measured. + */ + public enum DiskOutlierDetectionOp { + METADATA, + READ, + WRITE + } + + public Map<String, + Map<DiskOutlierDetectionOp, Double>> getDiskOutliersStats() { + return diskOutliersStats; + } + + public void shutdownAndWait() { + shouldRun = false; + slowDiskDetectionDaemon.interrupt(); + try { + slowDiskDetectionDaemon.join(); + } catch (InterruptedException e) { + LOG.error("Disk Outlier Detection daemon did not shutdown", e); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ec531f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java index 5241c78..827bdd2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java @@ -52,8 +52,9 @@ public class DataNodePeerMetrics { * Threshold in milliseconds below which a DataNode is definitely not slow. */ private static final long LOW_THRESHOLD_MS = 5; + private static final long MIN_OUTLIER_DETECTION_NODES = 10; - private final SlowNodeDetector slowNodeDetector; + private final OutlierDetector slowNodeDetector; /** * Minimum number of packet send samples which are required to qualify @@ -68,7 +69,8 @@ public class DataNodePeerMetrics { final long windowSizeMs, final int numWindows) { this.name = name; - this.slowNodeDetector = new SlowNodeDetector(LOW_THRESHOLD_MS); + this.slowNodeDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_NODES, + LOW_THRESHOLD_MS); sendPacketDownstreamRollingAvgerages = new RollingAverages( windowSizeMs, numWindows); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ec531f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java new file mode 100644 index 0000000..771a17b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * A utility class to help detect resources (nodes/ disks) whose aggregate + * latency is an outlier within a given set. + * + * We use the median absolute deviation for outlier detection as + * described in the following publication: + * + * Leys, C., et al., Detecting outliers: Do not use standard deviation + * around the mean, use absolute deviation around the median. + * http://dx.doi.org/10.1016/j.jesp.2013.03.013 + * + * We augment the above scheme with the following heuristics to be even + * more conservative: + * + * 1. Skip outlier detection if the sample size is too small. + * 2. Never flag resources whose aggregate latency is below a low threshold. + * 3. Never flag resources whose aggregate latency is less than a small + * multiple of the median. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class OutlierDetector { + public static final Logger LOG = + LoggerFactory.getLogger(OutlierDetector.class); + + /** + * Minimum number of resources to run outlier detection. + */ + private final long minNumResources; + + /** + * The multiplier is from Leys, C. et al. + */ + private static final double MAD_MULTIPLIER = (double) 1.4826; + + /** + * Threshold in milliseconds below which a node/ disk is definitely not slow. + */ + private final long lowThresholdMs; + + /** + * Deviation multiplier. A sample is considered to be an outlier if it + * exceeds the median by (multiplier * median abs. deviation). 3 is a + * conservative choice. + */ + private static final int DEVIATION_MULTIPLIER = 3; + + /** + * If most of the samples are clustered together, the MAD can be + * low. The median multiplier introduces another safeguard to avoid + * overaggressive outlier detection. + */ + @VisibleForTesting + static final int MEDIAN_MULTIPLIER = 3; + + public OutlierDetector(long minNumResources, long lowThresholdMs) { + this.minNumResources = minNumResources; + this.lowThresholdMs = lowThresholdMs; + } + + /** + * Return a set of nodes/ disks whose latency is much higher than + * their counterparts. The input is a map of (resource -> aggregate latency) + * entries. + * + * The aggregate may be an arithmetic mean or a percentile e.g. + * 90th percentile. Percentiles are a better choice than median + * since latency is usually not a normal distribution. + * + * This method allocates temporary memory O(n) and + * has run time O(n.log(n)), where n = stats.size(). + * + * @return + */ + public Map<String, Double> getOutliers(Map<String, Double> stats) { + if (stats.size() < minNumResources) { + LOG.debug("Skipping statistical outlier detection as we don't have " + + "latency data for enough resources. Have {}, need at least {}", + stats.size(), minNumResources); + return ImmutableMap.of(); + } + // Compute the median absolute deviation of the aggregates. + final List<Double> sorted = new ArrayList<>(stats.values()); + Collections.sort(sorted); + final Double median = computeMedian(sorted); + final Double mad = computeMad(sorted); + Double upperLimitLatency = Math.max( + lowThresholdMs, median * MEDIAN_MULTIPLIER); + upperLimitLatency = Math.max( + upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad)); + + final Map<String, Double> slowResources = new HashMap<>(); + + LOG.trace("getOutliers: List={}, MedianLatency={}, " + + "MedianAbsoluteDeviation={}, upperLimitLatency={}", + sorted, median, mad, upperLimitLatency); + + // Find resources whose latency exceeds the threshold. + for (Map.Entry<String, Double> entry : stats.entrySet()) { + if (entry.getValue() > upperLimitLatency) { + slowResources.put(entry.getKey(), entry.getValue()); + } + } + + return slowResources; + } + + /** + * Compute the Median Absolute Deviation of a sorted list. + */ + public static Double computeMad(List<Double> sortedValues) { + if (sortedValues.size() == 0) { + throw new IllegalArgumentException( + "Cannot compute the Median Absolute Deviation " + + "of an empty list."); + } + + // First get the median of the values. + Double median = computeMedian(sortedValues); + List<Double> deviations = new ArrayList<>(sortedValues); + + // Then update the list to store deviation from the median. + for (int i = 0; i < sortedValues.size(); ++i) { + deviations.set(i, Math.abs(sortedValues.get(i) - median)); + } + + // Finally get the median absolute deviation. + Collections.sort(deviations); + return computeMedian(deviations) * MAD_MULTIPLIER; + } + + /** + * Compute the median of a sorted list. + */ + public static Double computeMedian(List<Double> sortedValues) { + if (sortedValues.size() == 0) { + throw new IllegalArgumentException( + "Cannot compute the median of an empty list."); + } + + Double median = sortedValues.get(sortedValues.size() / 2); + if (sortedValues.size() % 2 == 0) { + median += sortedValues.get((sortedValues.size() / 2) - 1); + median /= 2; + } + return median; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ec531f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java deleted file mode 100644 index b6278ce..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java +++ /dev/null @@ -1,194 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.datanode.metrics; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - -/** - * A utility class to help detect nodes whose aggregate latency - * is an outlier within a given set. - * - * We use the median absolute deviation for outlier detection as - * described in the following publication: - * - * Leys, C., et al., Detecting outliers: Do not use standard deviation - * around the mean, use absolute deviation around the median. - * http://dx.doi.org/10.1016/j.jesp.2013.03.013 - * - * We augment the above scheme with the following heuristics to be even - * more conservative: - * - * 1. Skip outlier detection if the sample size is too small. - * 2. Never flag nodes whose aggregate latency is below a low threshold. - * 3. Never flag nodes whose aggregate latency is less than a small - * multiple of the median. - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class SlowNodeDetector { - public static final Logger LOG = - LoggerFactory.getLogger(SlowNodeDetector.class); - - /** - * Minimum number of peers to run outlier detection. - */ - private static long minOutlierDetectionPeers = 10; - - /** - * The multiplier is from Leys, C. et al. - */ - private static final double MAD_MULTIPLIER = (double) 1.4826; - - /** - * Threshold in milliseconds below which a DataNode is definitely not slow. - */ - private final long lowThresholdMs; - - /** - * Deviation multiplier. A sample is considered to be an outlier if it - * exceeds the median by (multiplier * median abs. deviation). 3 is a - * conservative choice. - */ - private static final int DEVIATION_MULTIPLIER = 3; - - /** - * If most of the samples are clustered together, the MAD can be - * low. The median multiplier introduces another safeguard to avoid - * overaggressive outlier detection. - */ - @VisibleForTesting - static final int MEDIAN_MULTIPLIER = 3; - - public SlowNodeDetector(long lowThresholdMs) { - this.lowThresholdMs = lowThresholdMs; - } - - /** - * Return a set of DataNodes whose latency is much higher than - * their peers. The input is a map of (node -> aggregate latency) - * entries. - * - * The aggregate may be an arithmetic mean or a percentile e.g. - * 90th percentile. Percentiles are a better choice than median - * since latency is usually not a normal distribution. - * - * This method allocates temporary memory O(n) and - * has run time O(n.log(n)), where n = stats.size(). - * - * @return - */ - public Map<String, Double> getOutliers(Map<String, Double> stats) { - if (stats.size() < minOutlierDetectionPeers) { - LOG.debug("Skipping statistical outlier detection as we don't have " + - "latency data for enough peers. Have {}, need at least {}", - stats.size(), minOutlierDetectionPeers); - return ImmutableMap.of(); - } - // Compute the median absolute deviation of the aggregates. - final List<Double> sorted = new ArrayList<>(stats.values()); - Collections.sort(sorted); - final Double median = computeMedian(sorted); - final Double mad = computeMad(sorted); - Double upperLimitLatency = Math.max( - lowThresholdMs, median * MEDIAN_MULTIPLIER); - upperLimitLatency = Math.max( - upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad)); - - final Map<String, Double> slowNodes = new HashMap<>(); - - LOG.trace("getOutliers: List={}, MedianLatency={}, " + - "MedianAbsoluteDeviation={}, upperLimitLatency={}", - sorted, median, mad, upperLimitLatency); - - // Find nodes whose latency exceeds the threshold. - for (Map.Entry<String, Double> entry : stats.entrySet()) { - if (entry.getValue() > upperLimitLatency) { - slowNodes.put(entry.getKey(), entry.getValue()); - } - } - - return slowNodes; - } - - /** - * Compute the Median Absolute Deviation of a sorted list. - */ - public static Double computeMad(List<Double> sortedValues) { - if (sortedValues.size() == 0) { - throw new IllegalArgumentException( - "Cannot compute the Median Absolute Deviation " + - "of an empty list."); - } - - // First get the median of the values. - Double median = computeMedian(sortedValues); - List<Double> deviations = new ArrayList<>(sortedValues); - - // Then update the list to store deviation from the median. - for (int i = 0; i < sortedValues.size(); ++i) { - deviations.set(i, Math.abs(sortedValues.get(i) - median)); - } - - // Finally get the median absolute deviation. - Collections.sort(deviations); - return computeMedian(deviations) * MAD_MULTIPLIER; - } - - /** - * Compute the median of a sorted list. - */ - public static Double computeMedian(List<Double> sortedValues) { - if (sortedValues.size() == 0) { - throw new IllegalArgumentException( - "Cannot compute the median of an empty list."); - } - - Double median = sortedValues.get(sortedValues.size() / 2); - if (sortedValues.size() % 2 == 0) { - median += sortedValues.get((sortedValues.size() / 2) - 1); - median /= 2; - } - return median; - } - - /** - * This method *must not* be used outside of unit tests. - */ - @VisibleForTesting - static void setMinOutlierDetectionPeers(long minOutlierDetectionPeers) { - SlowNodeDetector.minOutlierDetectionPeers = minOutlierDetectionPeers; - } - - @VisibleForTesting - static long getMinOutlierDetectionPeers() { - return minOutlierDetectionPeers; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ec531f/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index c220025..36b93b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -2009,7 +2009,7 @@ </property> <property> - <name>dfs.datanode.slow.peers.report.interval</name> + <name>dfs.datanode.outliers.report.interval</name> <value>30m</value> <description> This setting controls how frequently DataNodes will report their peer http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ec531f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java index 34e15e5..eb7769e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java @@ -54,13 +54,14 @@ public class TestDataNodeOutlierDetectionViaMetrics { private static final int ROLLING_AVERAGE_WINDOWS = 10; private static final int SLOW_NODE_LATENCY_MS = 20_000; private static final int FAST_NODE_MAX_LATENCY_MS = 5; + private static final long MIN_OUTLIER_DETECTION_PEERS = 10; private Random random = new Random(System.currentTimeMillis()); @Before public void setup() { GenericTestUtils.setLogLevel(DataNodePeerMetrics.LOG, Level.ALL); - GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL); + GenericTestUtils.setLogLevel(OutlierDetector.LOG, Level.ALL); } /** @@ -111,8 +112,7 @@ public class TestDataNodeOutlierDetectionViaMetrics { */ public void injectFastNodesSamples(DataNodePeerMetrics peerMetrics) { for (int nodeIndex = 0; - nodeIndex < SlowNodeDetector.getMinOutlierDetectionPeers(); - ++nodeIndex) { + nodeIndex < MIN_OUTLIER_DETECTION_PEERS; ++nodeIndex) { final String nodeName = "FastNode-" + nodeIndex; LOG.info("Generating stats for node {}", nodeName); for (int i = 0; http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3ec531f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java index 7b368c4..f06a87a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java @@ -40,7 +40,7 @@ import java.util.Set; import static org.junit.Assert.assertTrue; /** - * Unit tests for {@link SlowNodeDetector}. + * Unit tests for {@link OutlierDetector}. */ public class TestSlowNodeDetector { public static final Logger LOG = @@ -183,7 +183,7 @@ public class TestSlowNodeDetector { .put(ImmutableMap.of( "n1", LOW_THRESHOLD + 0.1, "n2", LOW_THRESHOLD + 0.1, - "n3", LOW_THRESHOLD * SlowNodeDetector.MEDIAN_MULTIPLIER - 0.1), + "n3", LOW_THRESHOLD * OutlierDetector.MEDIAN_MULTIPLIER - 0.1), ImmutableSet.of()) // A statistical outlier must be returned if it is outside a @@ -192,7 +192,7 @@ public class TestSlowNodeDetector { "n1", LOW_THRESHOLD + 0.1, "n2", LOW_THRESHOLD + 0.1, "n3", (LOW_THRESHOLD + 0.1) * - SlowNodeDetector.MEDIAN_MULTIPLIER + 0.1), + OutlierDetector.MEDIAN_MULTIPLIER + 0.1), ImmutableSet.of("n3")) // Only the statistical outliers n3 and n11 should be returned. @@ -233,13 +233,13 @@ public class TestSlowNodeDetector { .build(); - private SlowNodeDetector slowNodeDetector; + private OutlierDetector slowNodeDetector; @Before public void setup() { - slowNodeDetector = new SlowNodeDetector((long) LOW_THRESHOLD); - SlowNodeDetector.setMinOutlierDetectionPeers(MIN_OUTLIER_DETECTION_PEERS); - GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL); + slowNodeDetector = new OutlierDetector(MIN_OUTLIER_DETECTION_PEERS, + (long) LOW_THRESHOLD); + GenericTestUtils.setLogLevel(OutlierDetector.LOG, Level.ALL); } @Test @@ -259,7 +259,7 @@ public class TestSlowNodeDetector { } /** - * Unit test for {@link SlowNodeDetector#computeMedian(List)}. + * Unit test for {@link OutlierDetector#computeMedian(List)}. */ @Test public void testMediansFromTestMatrix() { @@ -267,7 +267,7 @@ public class TestSlowNodeDetector { medianTestMatrix.entrySet()) { final List<Double> inputList = new ArrayList<>(entry.getKey()); Collections.sort(inputList); - final Double median = SlowNodeDetector.computeMedian(inputList); + final Double median = OutlierDetector.computeMedian(inputList); final Double expectedMedian = entry.getValue().getLeft(); // Ensure that the median is within 0.001% of expected. @@ -283,7 +283,7 @@ public class TestSlowNodeDetector { } /** - * Unit test for {@link SlowNodeDetector#computeMad(List)}. + * Unit test for {@link OutlierDetector#computeMad(List)}. */ @Test public void testMadsFromTestMatrix() { @@ -291,7 +291,7 @@ public class TestSlowNodeDetector { medianTestMatrix.entrySet()) { final List<Double> inputList = new ArrayList<>(entry.getKey()); Collections.sort(inputList); - final Double mad = SlowNodeDetector.computeMad(inputList); + final Double mad = OutlierDetector.computeMad(inputList); final Double expectedMad = entry.getValue().getRight(); // Ensure that the MAD is within 0.001% of expected. @@ -316,20 +316,20 @@ public class TestSlowNodeDetector { } /** - * Verify that {@link SlowNodeDetector#computeMedian(List)} throws when + * Verify that {@link OutlierDetector#computeMedian(List)} throws when * passed an empty list. */ @Test(expected=IllegalArgumentException.class) public void testMedianOfEmptyList() { - SlowNodeDetector.computeMedian(Collections.emptyList()); + OutlierDetector.computeMedian(Collections.emptyList()); } /** - * Verify that {@link SlowNodeDetector#computeMad(List)} throws when + * Verify that {@link OutlierDetector#computeMad(List)} throws when * passed an empty list. */ @Test(expected=IllegalArgumentException.class) public void testMadOfEmptyList() { - SlowNodeDetector.computeMedian(Collections.emptyList()); + OutlierDetector.computeMedian(Collections.emptyList()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org