This is an automated email from the ASF dual-hosted git repository.

tasanuma pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new db67952  HDFS-16396. Reconfig slow peer parameters for datanode (#3827)
db67952 is described below

commit db67952f9f38305ca263c033ed13fe772f79f300
Author: litao <tomlees...@gmail.com>
AuthorDate: Tue Feb 15 12:40:46 2022 +0800

    HDFS-16396. Reconfig slow peer parameters for datanode (#3827)
    
    Reviewed-by: Ayush Saxena <ayushsax...@apache.org>
    (cherry picked from commit 0c194f2157ff4473ea95dff1d7d40c386398f4a4)
---
 .../hdfs/server/datanode/BPServiceActor.java       |  2 +-
 .../hadoop/hdfs/server/datanode/BlockReceiver.java |  4 +-
 .../apache/hadoop/hdfs/server/datanode/DNConf.java |  6 +-
 .../hadoop/hdfs/server/datanode/DataNode.java      | 70 ++++++++++++++++-
 .../hadoop/hdfs/server/datanode/DataXceiver.java   |  2 +-
 .../datanode/metrics/DataNodePeerMetrics.java      | 57 +++++++++++---
 .../server/datanode/metrics/OutlierDetector.java   | 20 ++++-
 .../datanode/TestDataNodeReconfiguration.java      | 87 ++++++++++++++++++++++
 .../org/apache/hadoop/hdfs/tools/TestDFSAdmin.java |  2 +-
 9 files changed, 230 insertions(+), 20 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 8f0286c..c8f18fa 100755
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -534,7 +534,7 @@ class BPServiceActor implements Runnable {
         volumeFailureSummary.getFailedStorageLocations().length : 0;
     final boolean outliersReportDue = scheduler.isOutliersReportDue(now);
     final SlowPeerReports slowPeers =
-        outliersReportDue && dn.getPeerMetrics() != null ?
+        outliersReportDue && dnConf.peerStatsEnabled && dn.getPeerMetrics() != 
null ?
             SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
             SlowPeerReports.EMPTY_REPORT;
     final SlowDiskReports slowDisks =
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 315914a..e20f437 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -878,7 +878,7 @@ class BlockReceiver implements Closeable {
    */
   private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) {
     final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics();
-    if (peerMetrics != null && isPenultimateNode) {
+    if (datanode.getDnConf().peerStatsEnabled && peerMetrics != null && 
isPenultimateNode) {
       peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs);
     }
   }
@@ -1093,7 +1093,7 @@ class BlockReceiver implements Closeable {
     if (downstreams != null && downstreams.length > 0) {
       downstreamDNs = downstreams;
       isPenultimateNode = (downstreams.length == 1);
-      if (isPenultimateNode && datanode.getPeerMetrics() != null) {
+      if (isPenultimateNode && datanode.getDnConf().peerStatsEnabled) {
         mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ?
             downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr());
         LOG.debug("Will collect peer metrics for downstream node {}",
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 40d0df3..563fbde 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -108,7 +108,7 @@ public class DNConf {
   private final long lifelineIntervalMs;
   volatile long blockReportInterval;
   volatile long blockReportSplitThreshold;
-  final boolean peerStatsEnabled;
+  volatile boolean peerStatsEnabled;
   final boolean diskStatsEnabled;
   final long outliersReportIntervalMs;
   final long ibrInterval;
@@ -507,4 +507,8 @@ public class DNConf {
     dn.getConf().set(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, delayMs);
     initBlockReportDelay();
   }
+
+  void setPeerStatsEnabled(boolean enablePeerStats) {
+    peerStatsEnabled = enablePeerStats;
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index e37dce6..d6eb51b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -44,11 +44,19 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
@@ -319,7 +327,11 @@ public class DataNode extends ReconfigurableBase
               DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY,
               DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
               DFS_DATANODE_MAX_RECEIVER_THREADS_KEY,
-              DFS_CACHEREPORT_INTERVAL_MSEC_KEY));
+              DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
+              DFS_DATANODE_PEER_STATS_ENABLED_KEY,
+              DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
+              DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
+              DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY));
 
   public static final Log METRICS_LOG = 
LogFactory.getLog("DataNodeMetricsLog");
 
@@ -361,7 +373,7 @@ public class DataNode extends ReconfigurableBase
 
   DataNodeMetrics metrics;
   @Nullable
-  private DataNodePeerMetrics peerMetrics;
+  private volatile DataNodePeerMetrics peerMetrics;
   private DataNodeDiskMetrics diskMetrics;
   private InetSocketAddress streamingAddr;
 
@@ -633,6 +645,11 @@ public class DataNode extends ReconfigurableBase
       return reconfDataXceiverParameters(property, newVal);
     case DFS_CACHEREPORT_INTERVAL_MSEC_KEY:
       return reconfCacheReportParameters(property, newVal);
+    case DFS_DATANODE_PEER_STATS_ENABLED_KEY:
+    case DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY:
+    case DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY:
+    case DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY:
+      return reconfSlowPeerParameters(property, newVal);
     default:
       break;
     }
@@ -712,6 +729,53 @@ public class DataNode extends ReconfigurableBase
     }
   }
 
+  private String reconfSlowPeerParameters(String property, String newVal)
+      throws ReconfigurationException {
+    String result = null;
+    try {
+      LOG.info("Reconfiguring {} to {}", property, newVal);
+      if (property.equals(DFS_DATANODE_PEER_STATS_ENABLED_KEY)) {
+        Preconditions.checkNotNull(dnConf, "DNConf has not been initialized.");
+        if (newVal != null && !newVal.equalsIgnoreCase("true")
+            && !newVal.equalsIgnoreCase("false")) {
+          throw new IllegalArgumentException("Not a valid Boolean value for " 
+ property +
+              " in reconfSlowPeerParameters");
+        }
+        boolean enable = (newVal == null ? 
DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT :
+            Boolean.parseBoolean(newVal));
+        result = Boolean.toString(enable);
+        dnConf.setPeerStatsEnabled(enable);
+        if (enable) {
+          // Create if it doesn't exist, overwrite if it does.
+          peerMetrics = DataNodePeerMetrics.create(getDisplayName(), 
getConf());
+        }
+      } else if 
(property.equals(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY)) {
+        Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be 
disabled.");
+        long minNodes = (newVal == null ? 
DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT :
+            Long.parseLong(newVal));
+        result = Long.toString(minNodes);
+        peerMetrics.setMinOutlierDetectionNodes(minNodes);
+      } else if (property.equals(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY)) {
+        Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be 
disabled.");
+        long threshold = (newVal == null ? 
DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT :
+            Long.parseLong(newVal));
+        result = Long.toString(threshold);
+        peerMetrics.setLowThresholdMs(threshold);
+      } else if 
(property.equals(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY)) {
+        Preconditions.checkNotNull(peerMetrics, "DataNode peer stats may be 
disabled.");
+        long minSamples = (newVal == null ?
+            DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT :
+            Long.parseLong(newVal));
+        result = Long.toString(minSamples);
+        peerMetrics.setMinOutlierDetectionSamples(minSamples);
+      }
+      LOG.info("RECONFIGURE* changed {} to {}", property, newVal);
+      return result;
+    } catch (IllegalArgumentException e) {
+      throw new ReconfigurationException(property, newVal, 
getConf().get(property), e);
+    }
+  }
+
   /**
    * Get a list of the keys of the re-configurable properties in configuration.
    */
@@ -3764,7 +3828,7 @@ public class DataNode extends ReconfigurableBase
 
   @Override // DataNodeMXBean
   public String getSendPacketDownstreamAvgInfo() {
-    return peerMetrics != null ?
+    return dnConf.peerStatsEnabled && peerMetrics != null ?
         peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson() : null;
   }
 
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
index b63b8cc..a511e81 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
@@ -341,7 +341,7 @@ class DataXceiver extends Receiver implements Runnable {
    * the thread dies away.
    */
   private void collectThreadLocalStates() {
-    if (datanode.getPeerMetrics() != null) {
+    if (datanode.getDnConf().peerStatsEnabled && datanode.getPeerMetrics() != 
null) {
       datanode.getPeerMetrics().collectThreadLocalStates();
     }
   }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
index 750e53d..f62a7b5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java
@@ -21,18 +21,23 @@ package org.apache.hadoop.hdfs.server.datanode.metrics;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.metrics2.MetricsJsonBuilder;
 import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
+import org.apache.hadoop.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
 
 /**
  * This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for
@@ -57,15 +62,15 @@ public class DataNodePeerMetrics {
    * for outlier detection. If the number of samples is below this then
    * outlier detection is skipped.
    */
-  private final long minOutlierDetectionSamples;
+  private volatile long minOutlierDetectionSamples;
   /**
    * Threshold in milliseconds below which a DataNode is definitely not slow.
    */
-  private final long lowThresholdMs;
+  private volatile long lowThresholdMs;
   /**
    * Minimum number of nodes to run outlier detection.
    */
-  private final long minOutlierDetectionNodes;
+  private volatile long minOutlierDetectionNodes;
 
   public DataNodePeerMetrics(final String name, Configuration conf) {
     this.name = name;
@@ -73,11 +78,11 @@ public class DataNodePeerMetrics {
         DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY,
         DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_DEFAULT);
     lowThresholdMs =
-        conf.getLong(DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
-            DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
+        conf.getLong(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
+            DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
     minOutlierDetectionNodes =
-        
conf.getLong(DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
-            DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
+        conf.getLong(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
+            DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
     this.slowNodeDetector =
         new OutlierDetector(minOutlierDetectionNodes, lowThresholdMs);
     sendPacketDownstreamRollingAverages = new MutableRollingAverages("Time");
@@ -87,7 +92,7 @@ public class DataNodePeerMetrics {
     return name;
   }
 
-  long getMinOutlierDetectionSamples() {
+  public long getMinOutlierDetectionSamples() {
     return minOutlierDetectionSamples;
   }
 
@@ -150,4 +155,38 @@ public class DataNodePeerMetrics {
   public MutableRollingAverages getSendPacketDownstreamRollingAverages() {
     return sendPacketDownstreamRollingAverages;
   }
+
+  public void setMinOutlierDetectionNodes(long minNodes) {
+    Preconditions.checkArgument(minNodes > 0,
+        DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY + " should be larger than 
0");
+    minOutlierDetectionNodes = minNodes;
+    this.slowNodeDetector.setMinNumResources(minNodes);
+  }
+
+  public long getMinOutlierDetectionNodes() {
+    return minOutlierDetectionNodes;
+  }
+
+  public void setLowThresholdMs(long thresholdMs) {
+    Preconditions.checkArgument(thresholdMs > 0,
+        DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY + " should be larger than 
0");
+    lowThresholdMs = thresholdMs;
+    this.slowNodeDetector.setLowThresholdMs(thresholdMs);
+  }
+
+  public long getLowThresholdMs() {
+    return lowThresholdMs;
+  }
+
+  public void setMinOutlierDetectionSamples(long minSamples) {
+    Preconditions.checkArgument(minSamples > 0,
+        DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY +
+            " should be larger than 0");
+    minOutlierDetectionSamples = minSamples;
+  }
+
+  @VisibleForTesting
+  public OutlierDetector getSlowNodeDetector() {
+    return this.slowNodeDetector;
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java
index a30baa1..e7466fd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/OutlierDetector.java
@@ -60,7 +60,7 @@ public class OutlierDetector {
   /**
    * Minimum number of resources to run outlier detection.
    */
-  private final long minNumResources;
+  private volatile long minNumResources;
 
   /**
    * The multiplier is from Leys, C. et al.
@@ -70,7 +70,7 @@ public class OutlierDetector {
   /**
    * Threshold in milliseconds below which a node/ disk is definitely not slow.
    */
-  private final long lowThresholdMs;
+  private volatile long lowThresholdMs;
 
   /**
    * Deviation multiplier. A sample is considered to be an outlier if it
@@ -180,4 +180,20 @@ public class OutlierDetector {
     }
     return median;
   }
+
+  public void setMinNumResources(long minNodes) {
+    minNumResources = minNodes;
+  }
+
+  public long getMinOutlierDetectionNodes() {
+    return minNumResources;
+  }
+
+  public void setLowThresholdMs(long thresholdMs) {
+    lowThresholdMs = thresholdMs;
+  }
+
+  public long getLowThresholdMs() {
+    return lowThresholdMs;
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java
index 75f1ee9..2150ea0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java
@@ -28,7 +28,14 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT;
+import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -45,6 +52,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.test.LambdaTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -84,6 +92,7 @@ public class TestDataNodeReconfiguration {
   private void startDFSCluster(int numNameNodes, int numDataNodes)
       throws IOException {
     Configuration conf = new Configuration();
+    conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
 
     MiniDFSNNTopology nnTopology = MiniDFSNNTopology
         .simpleFederatedTopology(numNameNodes);
@@ -467,4 +476,82 @@ public class TestDataNodeReconfiguration {
           dn.getConf().get(DFS_CACHEREPORT_INTERVAL_MSEC_KEY));
     }
   }
+
+  @Test
+  public void testSlowPeerParameters() throws Exception {
+    String[] slowPeersParameters = {
+        DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY,
+        DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY,
+        DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY};
+
+    for (int i = 0; i < NUM_DATA_NODE; i++) {
+      DataNode dn = cluster.getDataNodes().get(i);
+
+      // Try invalid values.
+      LambdaTestUtils.intercept(ReconfigurationException.class,
+          "Could not change property dfs.datanode.peer.stats.enabled from 
'true' to 'text'",
+          () -> dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, 
"text"));
+
+      for (String parameter : slowPeersParameters) {
+        try {
+          dn.reconfigureProperty(parameter, "text");
+          fail("ReconfigurationException expected");
+        } catch (ReconfigurationException expected) {
+          assertTrue("expecting NumberFormatException",
+              expected.getCause() instanceof NumberFormatException);
+        }
+
+        try {
+          dn.reconfigureProperty(parameter, String.valueOf(-1));
+          fail("ReconfigurationException expected");
+        } catch (ReconfigurationException expected) {
+          assertTrue("expecting IllegalArgumentException",
+              expected.getCause() instanceof IllegalArgumentException);
+        }
+      }
+
+      // Change and verify properties.
+      dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "false");
+      assertFalse(dn.getDnConf().peerStatsEnabled);
+
+      // Reset DFS_DATANODE_PEER_STATS_ENABLED_KEY to true.
+      dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
+      for (String parameter : slowPeersParameters) {
+        dn.reconfigureProperty(parameter, "123");
+      }
+      assertEquals(123, dn.getPeerMetrics().getMinOutlierDetectionNodes());
+      assertEquals(123, dn.getPeerMetrics().getLowThresholdMs());
+      assertEquals(123, dn.getPeerMetrics().getMinOutlierDetectionSamples());
+      assertEquals(123,
+          
dn.getPeerMetrics().getSlowNodeDetector().getMinOutlierDetectionNodes());
+      assertEquals(123,
+          dn.getPeerMetrics().getSlowNodeDetector().getLowThresholdMs());
+
+      // Revert to default and verify.
+      dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, null);
+      assertEquals(String.format("expect %s is not configured",
+          DFS_DATANODE_PEER_STATS_ENABLED_KEY), null,
+          dn.getConf().get(DFS_DATANODE_PEER_STATS_ENABLED_KEY));
+
+      // Reset DFS_DATANODE_PEER_STATS_ENABLED_KEY to true.
+      dn.reconfigureProperty(DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true");
+
+      for (String parameter : slowPeersParameters) {
+        dn.reconfigureProperty(parameter, null);
+      }
+      assertEquals(String.format("expect %s is not configured",
+          DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY), null,
+          dn.getConf().get(DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY));
+      assertEquals(String.format("expect %s is not configured",
+          DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY), null,
+          dn.getConf().get(DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_KEY));
+      assertEquals(String.format("expect %s is not configured",
+          DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY), null,
+          
dn.getConf().get(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY));
+      
assertEquals(dn.getPeerMetrics().getSlowNodeDetector().getMinOutlierDetectionNodes(),
+          DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_DEFAULT);
+      
assertEquals(dn.getPeerMetrics().getSlowNodeDetector().getLowThresholdMs(),
+          DFS_DATANODE_SLOWPEER_LOW_THRESHOLD_MS_DEFAULT);
+    }
+  }
 }
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
index 0f16fd7..12ae6e7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -330,7 +330,7 @@ public class TestDFSAdmin {
     final List<String> outs = Lists.newArrayList();
     final List<String> errs = Lists.newArrayList();
     getReconfigurableProperties("datanode", address, outs, errs);
-    assertEquals(8, outs.size());
+    assertEquals(12, outs.size());
     assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, outs.get(1));
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to