This is an automated email from the ASF dual-hosted git repository. weichiu pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push: new 25591ef51bb HDFS-16595. Slow peer metrics - add median, mad and upper latency limits (#4357) 25591ef51bb is described below commit 25591ef51bb7ded0ee920b4dd53978bb9c4d198b Author: Viraj Jasani <vjas...@apache.org> AuthorDate: Fri Jun 3 15:10:14 2022 -0700 HDFS-16595. Slow peer metrics - add median, mad and upper latency limits (#4357) Reviewed-by: Tao Li <toms...@apache.org> Signed-off-by: Wei-Chiu Chuang <weic...@apache.org> --- .../hdfs/server/protocol/OutlierMetrics.java | 90 ++++++++++++++++++++++ .../hdfs/server/protocol/SlowPeerReports.java | 8 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 25 ++++-- .../server/blockmanagement/DatanodeManager.java | 8 +- .../blockmanagement/SlowPeerDisabledTracker.java | 3 +- .../SlowPeerLatencyWithReportingNode.java | 38 ++++++++- .../server/blockmanagement/SlowPeerTracker.java | 19 +++-- .../datanode/metrics/DataNodePeerMetrics.java | 9 ++- .../server/datanode/metrics/OutlierDetector.java | 35 +++++++-- .../src/main/proto/DatanodeProtocol.proto | 3 + .../apache/hadoop/hdfs/TestSlowDatanodeReport.java | 18 ++++- .../hadoop/hdfs/protocolPB/TestPBHelper.java | 9 ++- .../TestReplicationPolicyExcludeSlowNodes.java | 20 +++-- .../blockmanagement/TestSlowPeerTracker.java | 71 +++++++++-------- .../TestDataNodeOutlierDetectionViaMetrics.java | 3 +- 15 files changed, 282 insertions(+), 77 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/OutlierMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/OutlierMetrics.java new file mode 100644 index 00000000000..452885d0c05 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/OutlierMetrics.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.protocol; + +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Outlier detection metrics - median, median absolute deviation, upper latency limit, + * actual latency etc. + */ +@InterfaceAudience.Private +public class OutlierMetrics { + + private final Double median; + private final Double mad; + private final Double upperLimitLatency; + private final Double actualLatency; + + public OutlierMetrics(Double median, Double mad, Double upperLimitLatency, + Double actualLatency) { + this.median = median; + this.mad = mad; + this.upperLimitLatency = upperLimitLatency; + this.actualLatency = actualLatency; + } + + public Double getMedian() { + return median; + } + + public Double getMad() { + return mad; + } + + public Double getUpperLimitLatency() { + return upperLimitLatency; + } + + public Double getActualLatency() { + return actualLatency; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + OutlierMetrics that = (OutlierMetrics) o; + + return new EqualsBuilder() + .append(median, that.median) + .append(mad, that.mad) + .append(upperLimitLatency, that.upperLimitLatency) + .append(actualLatency, that.actualLatency) + .isEquals(); + } + + @Override + public int hashCode() { + return new HashCodeBuilder(17, 37) + .append(median) + .append(mad) + .append(upperLimitLatency) + .append(actualLatency) + .toHashCode(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java index a3b3445d5a7..586e2f4931e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java @@ -51,7 +51,7 @@ public final class SlowPeerReports { * meaningful and must be avoided. */ @Nonnull - private final Map<String, Double> slowPeers; + private final Map<String, OutlierMetrics> slowPeers; /** * An object representing a SlowPeerReports with no entries. Should @@ -61,19 +61,19 @@ public final class SlowPeerReports { public static final SlowPeerReports EMPTY_REPORT = new SlowPeerReports(ImmutableMap.of()); - private SlowPeerReports(Map<String, Double> slowPeers) { + private SlowPeerReports(Map<String, OutlierMetrics> slowPeers) { this.slowPeers = slowPeers; } public static SlowPeerReports create( - @Nullable Map<String, Double> slowPeers) { + @Nullable Map<String, OutlierMetrics> slowPeers) { if (slowPeers == null || slowPeers.isEmpty()) { return EMPTY_REPORT; } return new SlowPeerReports(slowPeers); } - public Map<String, Double> getSlowPeers() { + public Map<String, OutlierMetrics> getSlowPeers() { return slowPeers; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 690ad0c2790..ce27342729b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -112,6 +112,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; @@ -853,11 +854,15 @@ public class PBHelper { List<SlowPeerReportProto> slowPeerInfoProtos = new ArrayList<>(slowPeers.getSlowPeers().size()); - for (Map.Entry<String, Double> entry : - slowPeers.getSlowPeers().entrySet()) { - slowPeerInfoProtos.add(SlowPeerReportProto.newBuilder() + for (Map.Entry<String, OutlierMetrics> entry : slowPeers.getSlowPeers().entrySet()) { + OutlierMetrics outlierMetrics = entry.getValue(); + slowPeerInfoProtos.add( + SlowPeerReportProto.newBuilder() .setDataNodeId(entry.getKey()) - .setAggregateLatency(entry.getValue()) + .setAggregateLatency(outlierMetrics.getActualLatency()) + .setMedian(outlierMetrics.getMedian()) + .setMad(outlierMetrics.getMad()) + .setUpperLimitLatency(outlierMetrics.getUpperLimitLatency()) .build()); } return slowPeerInfoProtos; @@ -871,15 +876,19 @@ public class PBHelper { return SlowPeerReports.EMPTY_REPORT; } - Map<String, Double> slowPeersMap = new HashMap<>(slowPeerProtos.size()); + Map<String, OutlierMetrics> slowPeersMap = new HashMap<>(slowPeerProtos.size()); for (SlowPeerReportProto proto : slowPeerProtos) { if (!proto.hasDataNodeId()) { // The DataNodeId should be reported. continue; } - slowPeersMap.put( - proto.getDataNodeId(), - proto.hasAggregateLatency() ? proto.getAggregateLatency() : 0.0); + Double aggregateLatency = proto.hasAggregateLatency() ? proto.getAggregateLatency() : 0.0; + Double medianLatency = proto.hasMedian() ? proto.getMedian() : 0.0; + Double madLatency = proto.hasMad() ? proto.getMad() : 0.0; + Double upperLimitLatency = proto.hasUpperLimitLatency() ? proto.getUpperLimitLatency() : 0.0; + OutlierMetrics outlierMetrics = + new OutlierMetrics(medianLatency, madLatency, upperLimitLatency, aggregateLatency); + slowPeersMap.put(proto.getDataNodeId(), outlierMetrics); } return SlowPeerReports.create(slowPeersMap); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 84053839248..e75caeffef2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1896,14 +1896,14 @@ public class DatanodeManager { Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned"); if (slowPeerTracker.isSlowPeerTrackerEnabled()) { - final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers(); + final Map<String, OutlierMetrics> slowPeersMap = slowPeers.getSlowPeers(); if (!slowPeersMap.isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap); } - for (Map.Entry<String, Double> slowNodeId : slowPeersMap.entrySet()) { - slowPeerTracker.addReport(slowNodeId.getKey(), nodeReg.getIpcAddr(false), - slowNodeId.getValue()); + for (Map.Entry<String, OutlierMetrics> slowNodeEntry : slowPeersMap.entrySet()) { + slowPeerTracker.addReport(slowNodeEntry.getKey(), nodeReg.getIpcAddr(false), + slowNodeEntry.getValue()); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerDisabledTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerDisabledTracker.java index 567984204a2..ac109e0c90a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerDisabledTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerDisabledTracker.java @@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics; import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Timer; @@ -58,7 +59,7 @@ public class SlowPeerDisabledTracker extends SlowPeerTracker { } @Override - public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) { + public void addReport(String slowNode, String reportingNode, OutlierMetrics slowNodeMetrics) { LOG.trace("Adding slow peer report is disabled. To enable it, please enable config {}.", DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerLatencyWithReportingNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerLatencyWithReportingNode.java index a3f90062600..b90f809f1ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerLatencyWithReportingNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerLatencyWithReportingNode.java @@ -38,13 +38,31 @@ final class SlowPeerLatencyWithReportingNode @JsonProperty("ReportedLatency") private final Double reportedLatency; + @JsonProperty("MedianLatency") + private final Double medianLatency; + + @JsonProperty("MadLatency") + private final Double madLatency; + + @JsonProperty("UpperLimitLatency") + private final Double upperLimitLatency; + SlowPeerLatencyWithReportingNode( @JsonProperty("ReportingNode") String reportingNode, @JsonProperty("ReportedLatency") - Double reportedLatency) { + Double reportedLatency, + @JsonProperty("MedianLatency") + Double medianLatency, + @JsonProperty("MadLatency") + Double madLatency, + @JsonProperty("UpperLimitLatency") + Double upperLimitLatency) { this.reportingNode = reportingNode; this.reportedLatency = reportedLatency; + this.medianLatency = medianLatency; + this.madLatency = madLatency; + this.upperLimitLatency = upperLimitLatency; } public String getReportingNode() { @@ -55,6 +73,18 @@ final class SlowPeerLatencyWithReportingNode return reportedLatency; } + public Double getMedianLatency() { + return medianLatency; + } + + public Double getMadLatency() { + return madLatency; + } + + public Double getUpperLimitLatency() { + return upperLimitLatency; + } + @Override public int compareTo(SlowPeerLatencyWithReportingNode o) { return this.reportingNode.compareTo(o.getReportingNode()); @@ -75,6 +105,9 @@ final class SlowPeerLatencyWithReportingNode return new EqualsBuilder() .append(reportingNode, that.reportingNode) .append(reportedLatency, that.reportedLatency) + .append(medianLatency, that.medianLatency) + .append(madLatency, that.madLatency) + .append(upperLimitLatency, that.upperLimitLatency) .isEquals(); } @@ -83,6 +116,9 @@ final class SlowPeerLatencyWithReportingNode return new HashCodeBuilder(17, 37) .append(reportingNode) .append(reportedLatency) + .append(medianLatency) + .append(madLatency) + .append(upperLimitLatency) .toHashCode(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java index e771d2f89f1..ec47b6941ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java @@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics; import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.util.Timer; import org.slf4j.Logger; @@ -123,9 +124,10 @@ public class SlowPeerTracker { * * @param slowNode DataNodeId of the peer suspected to be slow. * @param reportingNode DataNodeId of the node reporting on its peer. - * @param slowNodeLatency Aggregate latency of slownode as reported by the reporting node. + * @param slowNodeMetrics Aggregate latency metrics of slownode as reported by the + * reporting node. */ - public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) { + public void addReport(String slowNode, String reportingNode, OutlierMetrics slowNodeMetrics) { ConcurrentMap<String, LatencyWithLastReportTime> nodeEntries = allReports.get(slowNode); if (nodeEntries == null) { @@ -136,7 +138,7 @@ public class SlowPeerTracker { // Replace the existing entry from this node, if any. nodeEntries.put(reportingNode, - new LatencyWithLastReportTime(timer.monotonicNow(), slowNodeLatency)); + new LatencyWithLastReportTime(timer.monotonicNow(), slowNodeMetrics)); } /** @@ -195,8 +197,11 @@ public class SlowPeerTracker { for (Map.Entry<String, LatencyWithLastReportTime> entry : reports.entrySet()) { if (now - entry.getValue().getTime() < reportValidityMs) { + OutlierMetrics outlierMetrics = entry.getValue().getLatency(); validReports.add( - new SlowPeerLatencyWithReportingNode(entry.getKey(), entry.getValue().getLatency())); + new SlowPeerLatencyWithReportingNode(entry.getKey(), outlierMetrics.getActualLatency(), + outlierMetrics.getMedian(), outlierMetrics.getMad(), + outlierMetrics.getUpperLimitLatency())); } } return validReports; @@ -279,9 +284,9 @@ public class SlowPeerTracker { private static class LatencyWithLastReportTime { private final Long time; - private final Double latency; + private final OutlierMetrics latency; - LatencyWithLastReportTime(Long time, Double latency) { + LatencyWithLastReportTime(Long time, OutlierMetrics latency) { this.time = time; this.latency = latency; } @@ -290,7 +295,7 @@ public class SlowPeerTracker { return time; } - public Double getLatency() { + public OutlierMetrics getLatency() { return latency; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java index 2e456b67ca1..a77c3ba0643 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics; import org.apache.hadoop.metrics2.MetricsJsonBuilder; import org.apache.hadoop.metrics2.lib.MutableRollingAverages; import org.apache.hadoop.util.Preconditions; @@ -55,7 +56,7 @@ public class DataNodePeerMetrics { private final String name; // Strictly to be used by test code only. Source code is not supposed to use this. - private Map<String, Double> testOutlier = null; + private Map<String, OutlierMetrics> testOutlier = null; private final OutlierDetector slowNodeDetector; @@ -143,7 +144,7 @@ public class DataNodePeerMetrics { * Retrieve the set of dataNodes that look significantly slower * than their peers. */ - public Map<String, Double> getOutliers() { + public Map<String, OutlierMetrics> getOutliers() { // outlier must be null for source code. if (testOutlier == null) { // This maps the metric name to the aggregate latency. @@ -151,7 +152,7 @@ public class DataNodePeerMetrics { final Map<String, Double> stats = sendPacketDownstreamRollingAverages.getStats(minOutlierDetectionSamples); LOG.trace("DataNodePeerMetrics: Got stats: {}", stats); - return slowNodeDetector.getOutliers(stats); + return slowNodeDetector.getOutlierMetrics(stats); } else { // this happens only for test code. return testOutlier; @@ -164,7 +165,7 @@ public class DataNodePeerMetrics { * * @param outlier outlier directly set by tests. */ - public void setTestOutliers(Map<String, Double> outlier) { + public void setTestOutliers(Map<String, OutlierMetrics> outlier) { this.testOutlier = outlier; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java index 39feca03d66..e13cf275178 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java @@ -22,6 +22,8 @@ import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,6 +110,26 @@ public class OutlierDetector { * @return */ public Map<String, Double> getOutliers(Map<String, Double> stats) { + final Map<String, Double> slowResources = new HashMap<>(); + Map<String, OutlierMetrics> slowResourceMetrics = getOutlierMetrics(stats); + slowResourceMetrics.forEach( + (node, outlierMetrics) -> slowResources.put(node, outlierMetrics.getActualLatency())); + return slowResources; + } + + /** + * Return a set of nodes whose latency is much higher than + * their counterparts. The input is a map of (resource {@literal ->} aggregate + * latency) entries. + * + * The aggregate may be an arithmetic mean or a percentile e.g. + * 90th percentile. Percentiles are a better choice than median + * since latency is usually not a normal distribution. + * + * @param stats map of aggregate latency entries. + * @return map of outlier nodes to outlier metrics. + */ + public Map<String, OutlierMetrics> getOutlierMetrics(Map<String, Double> stats) { if (stats.size() < minNumResources) { LOG.debug("Skipping statistical outlier detection as we don't have " + "latency data for enough resources. Have {}, need at least {}", @@ -124,19 +146,20 @@ public class OutlierDetector { upperLimitLatency = Math.max( upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad)); - final Map<String, Double> slowResources = new HashMap<>(); + final Map<String, OutlierMetrics> slowResources = new HashMap<>(); - LOG.trace("getOutliers: List={}, MedianLatency={}, " + - "MedianAbsoluteDeviation={}, upperLimitLatency={}", - sorted, median, mad, upperLimitLatency); + LOG.trace("getOutliers: List={}, MedianLatency={}, " + + "MedianAbsoluteDeviation={}, upperLimitLatency={}", sorted, median, mad, + upperLimitLatency); // Find resources whose latency exceeds the threshold. for (Map.Entry<String, Double> entry : stats.entrySet()) { if (entry.getValue() > upperLimitLatency) { - slowResources.put(entry.getKey(), entry.getValue()); + OutlierMetrics outlierMetrics = + new OutlierMetrics(median, mad, upperLimitLatency, entry.getValue()); + slowResources.put(entry.getKey(), outlierMetrics); } } - return slowResources; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 48a3855c038..c537ce3ae49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -408,6 +408,9 @@ message CommitBlockSynchronizationResponseProto { message SlowPeerReportProto { optional string dataNodeId = 1; optional double aggregateLatency = 2; + optional double median = 3; + optional double mad = 4; + optional double upperLimitLatency = 5; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java index d6c728c68ac..187919c39b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSlowDatanodeReport.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics; import org.apache.hadoop.test.GenericTestUtils; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY; @@ -72,8 +73,10 @@ public class TestSlowDatanodeReport { public void testSingleNodeReport() throws Exception { List<DataNode> dataNodes = cluster.getDataNodes(); DataNode slowNode = dataNodes.get(1); + OutlierMetrics outlierMetrics = new OutlierMetrics(1.245, 2.69375, 4.5667, 15.5); dataNodes.get(0).getPeerMetrics().setTestOutliers( - ImmutableMap.of(slowNode.getDatanodeHostname() + ":" + slowNode.getIpcPort(), 15.5)); + ImmutableMap.of(slowNode.getDatanodeHostname() + ":" + slowNode.getIpcPort(), + outlierMetrics)); DistributedFileSystem distributedFileSystem = cluster.getFileSystem(); Assert.assertEquals(3, distributedFileSystem.getDataNodeStats().length); GenericTestUtils.waitFor(() -> { @@ -91,15 +94,22 @@ public class TestSlowDatanodeReport { Assert.assertTrue( cluster.getNameNode().getSlowPeersReport().contains(slowNode.getDatanodeHostname())); Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("15.5")); + Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("1.245")); + Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("2.69375")); + Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("4.5667")); } @Test public void testMultiNodesReport() throws Exception { List<DataNode> dataNodes = cluster.getDataNodes(); + OutlierMetrics outlierMetrics1 = new OutlierMetrics(2.498237, 19.2495, 23.568204, 14.5); + OutlierMetrics outlierMetrics2 = new OutlierMetrics(3.2535, 22.4945, 44.5667, 18.7); dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of( - dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 14.5)); + dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), + outlierMetrics1)); dataNodes.get(1).getPeerMetrics().setTestOutliers(ImmutableMap.of( - dataNodes.get(2).getDatanodeHostname() + ":" + dataNodes.get(2).getIpcPort(), 18.7)); + dataNodes.get(2).getDatanodeHostname() + ":" + dataNodes.get(2).getIpcPort(), + outlierMetrics2)); DistributedFileSystem distributedFileSystem = cluster.getFileSystem(); Assert.assertEquals(3, distributedFileSystem.getDataNodeStats().length); GenericTestUtils.waitFor(() -> { @@ -120,6 +130,8 @@ public class TestSlowDatanodeReport { .contains(dataNodes.get(2).getDatanodeHostname())); Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("14.5")); Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("18.7")); + Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("23.568204")); + Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("22.4945")); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index e9bcef52708..a1b2e633edd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.protocolPB; import org.apache.hadoop.thirdparty.protobuf.UninitializedMessageException; import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies; +import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics; import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports; import static org.hamcrest.CoreMatchers.is; @@ -805,8 +806,14 @@ public class TestPBHelper { @Test public void testSlowPeerInfoPBHelper() { // Test with a map that has a few slow peer entries. + OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 0.0); + OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 1.0); + OutlierMetrics outlierMetrics3 = new OutlierMetrics(0.0, 0.0, 0.0, 2.0); final SlowPeerReports slowPeers = SlowPeerReports.create( - ImmutableMap.of("peer1", 0.0, "peer2", 1.0, "peer3", 2.0)); + ImmutableMap.of( + "peer1", outlierMetrics1, + "peer2", outlierMetrics2, + "peer3", outlierMetrics3)); SlowPeerReports slowPeersConverted1 = PBHelper.convertSlowPeerInfo( PBHelper.convertSlowPeerInfo(slowPeers)); assertTrue( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java index c29bd0d41f4..05af3f9bb56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyExcludeSlowNodes.java @@ -22,6 +22,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.TestBlockStoragePolicy; import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -88,12 +90,18 @@ public class TestReplicationPolicyExcludeSlowNodes // mock slow nodes SlowPeerTracker tracker = dnManager.getSlowPeerTracker(); - tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr(), 1.29463); - tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr(), 2.9576); - tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr(), 3.59674); - tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr(), 4.238456); - tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr(), 5.18375); - tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr(), 6.39576); + OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 1.29463); + tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr(), outlierMetrics1); + OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 2.9576); + tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr(), outlierMetrics2); + OutlierMetrics outlierMetrics3 = new OutlierMetrics(0.0, 0.0, 0.0, 3.59674); + tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr(), outlierMetrics3); + OutlierMetrics outlierMetrics4 = new OutlierMetrics(0.0, 0.0, 0.0, 4.238456); + tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr(), outlierMetrics4); + OutlierMetrics outlierMetrics5 = new OutlierMetrics(0.0, 0.0, 0.0, 5.18375); + tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr(), outlierMetrics5); + OutlierMetrics outlierMetrics6 = new OutlierMetrics(0.0, 0.0, 0.0, 6.39576); + tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr(), outlierMetrics6); // waiting for slow nodes collector run Thread.sleep(3000); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java index 67a212f21ec..67fb5a6a48c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectReader; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics; import org.apache.hadoop.util.FakeTimer; import org.junit.Before; import org.junit.Rule; @@ -44,8 +45,7 @@ import static org.junit.Assert.assertTrue; * Tests for {@link SlowPeerTracker}. */ public class TestSlowPeerTracker { - public static final Logger LOG = LoggerFactory.getLogger( - TestSlowPeerTracker.class); + private static final Logger LOG = LoggerFactory.getLogger(TestSlowPeerTracker.class); /** * Set a timeout for every test case. @@ -79,9 +79,9 @@ public class TestSlowPeerTracker { @Test public void testReportsAreRetrieved() { - tracker.addReport("node2", "node1", 1.2); - tracker.addReport("node3", "node1", 2.1); - tracker.addReport("node3", "node2", 1.22); + tracker.addReport("node2", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 1.2)); + tracker.addReport("node3", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 2.1)); + tracker.addReport("node3", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 1.22)); assertThat(tracker.getReportsForAllDataNodes().size(), is(2)); assertThat(tracker.getReportsForNode("node2").size(), is(1)); @@ -94,9 +94,9 @@ public class TestSlowPeerTracker { */ @Test public void testAllReportsAreExpired() { - tracker.addReport("node2", "node1", 0.123); - tracker.addReport("node3", "node2", 0.2334); - tracker.addReport("node1", "node3", 1.234); + tracker.addReport("node2", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 0.123)); + tracker.addReport("node3", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 0.2334)); + tracker.addReport("node1", "node3", new OutlierMetrics(0.0, 0.0, 0.0, 1.234)); // No reports should expire after 1ms. timer.advance(1); @@ -116,10 +116,10 @@ public class TestSlowPeerTracker { */ @Test public void testSomeReportsAreExpired() { - tracker.addReport("node3", "node1", 1.234); - tracker.addReport("node3", "node2", 1.222); + tracker.addReport("node3", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 1.234)); + tracker.addReport("node3", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 1.222)); timer.advance(reportValidityMs); - tracker.addReport("node3", "node4", 1.20); + tracker.addReport("node3", "node4", new OutlierMetrics(0.0, 0.0, 0.0, 1.20)); assertThat(tracker.getReportsForAllDataNodes().size(), is(1)); assertThat(tracker.getReportsForNode("node3").size(), is(1)); assertEquals(1, tracker.getReportsForNode("node3").stream() @@ -131,22 +131,28 @@ public class TestSlowPeerTracker { */ @Test public void testReplacement() { - tracker.addReport("node2", "node1", 2.1); + OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 2.1); + tracker.addReport("node2", "node1", outlierMetrics1); timer.advance(reportValidityMs); // Expire the report. assertThat(tracker.getReportsForAllDataNodes().size(), is(0)); // This should replace the expired report with a newer valid one. - tracker.addReport("node2", "node1", 0.001); + OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 0.001); + tracker.addReport("node2", "node1", outlierMetrics2); assertThat(tracker.getReportsForAllDataNodes().size(), is(1)); assertThat(tracker.getReportsForNode("node2").size(), is(1)); } @Test public void testGetJson() throws IOException { - tracker.addReport("node1", "node2", 1.1); - tracker.addReport("node2", "node3", 1.23); - tracker.addReport("node2", "node1", 2.13); - tracker.addReport("node4", "node1", 1.244); + OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 1.1); + tracker.addReport("node1", "node2", outlierMetrics1); + OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 1.23); + tracker.addReport("node2", "node3", outlierMetrics2); + OutlierMetrics outlierMetrics3 = new OutlierMetrics(0.0, 0.0, 0.0, 2.13); + tracker.addReport("node2", "node1", outlierMetrics3); + OutlierMetrics outlierMetrics4 = new OutlierMetrics(0.0, 0.0, 0.0, 1.244); + tracker.addReport("node4", "node1", outlierMetrics4); final Set<SlowPeerJsonReport> reports = getAndDeserializeJson(); @@ -161,17 +167,17 @@ public class TestSlowPeerTracker { @Test public void testGetJsonSizeIsLimited() throws IOException { - tracker.addReport("node1", "node2", 1.634); - tracker.addReport("node1", "node3", 2.3566); - tracker.addReport("node2", "node3", 3.869); - tracker.addReport("node2", "node4", 4.1356); - tracker.addReport("node3", "node4", 1.73057); - tracker.addReport("node3", "node5", 2.4956730); - tracker.addReport("node4", "node6", 3.29847); - tracker.addReport("node5", "node6", 4.13444); - tracker.addReport("node5", "node7", 5.10845); - tracker.addReport("node6", "node8", 2.37464); - tracker.addReport("node6", "node7", 1.29475656); + tracker.addReport("node1", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 1.634)); + tracker.addReport("node1", "node3", new OutlierMetrics(0.0, 0.0, 0.0, 2.3566)); + tracker.addReport("node2", "node3", new OutlierMetrics(0.0, 0.0, 0.0, 3.869)); + tracker.addReport("node2", "node4", new OutlierMetrics(0.0, 0.0, 0.0, 4.1356)); + tracker.addReport("node3", "node4", new OutlierMetrics(0.0, 0.0, 0.0, 1.73057)); + tracker.addReport("node3", "node5", new OutlierMetrics(0.0, 0.0, 0.0, 2.4956730)); + tracker.addReport("node4", "node6", new OutlierMetrics(0.0, 0.0, 0.0, 3.29847)); + tracker.addReport("node5", "node6", new OutlierMetrics(0.0, 0.0, 0.0, 4.13444)); + tracker.addReport("node5", "node7", new OutlierMetrics(0.0, 0.0, 0.0, 5.10845)); + tracker.addReport("node6", "node8", new OutlierMetrics(0.0, 0.0, 0.0, 2.37464)); + tracker.addReport("node6", "node7", new OutlierMetrics(0.0, 0.0, 0.0, 1.29475656)); final Set<SlowPeerJsonReport> reports = getAndDeserializeJson(); @@ -215,13 +221,16 @@ public class TestSlowPeerTracker { public void testLowRankedElementsIgnored() throws IOException { // Insert 5 nodes with 2 peer reports each. for (int i = 0; i < 5; ++i) { - tracker.addReport("node" + i, "reporter1", 1.295673); - tracker.addReport("node" + i, "reporter2", 2.38560); + OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 1.295673); + tracker.addReport("node" + i, "reporter1", outlierMetrics1); + OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 2.38560); + tracker.addReport("node" + i, "reporter2", outlierMetrics2); } // Insert 10 nodes with 1 peer report each. for (int i = 10; i < 20; ++i) { - tracker.addReport("node" + i, "reporter1", 3.4957); + OutlierMetrics outlierMetrics = new OutlierMetrics(0.0, 0.0, 0.0, 3.4957); + tracker.addReport("node" + i, "reporter1", outlierMetrics); } final Set<SlowPeerJsonReport> reports = getAndDeserializeJson(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java index 1faddb362ec..0042bcb0426 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.metrics; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics; import org.apache.hadoop.metrics2.lib.MetricsTestHelper; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Before; @@ -100,7 +101,7 @@ public class TestDataNodeOutlierDetectionViaMetrics { } }, 500, 100_000); - final Map<String, Double> outliers = peerMetrics.getOutliers(); + final Map<String, OutlierMetrics> outliers = peerMetrics.getOutliers(); LOG.info("Got back outlier nodes: {}", outliers); assertThat(outliers.size(), is(1)); assertTrue(outliers.containsKey(slowNodeName)); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org