Repository: hadoop
Updated Branches:
  refs/heads/trunk 993c2140c -> 460a94a10


HDFS-14045. Use different metrics in DataNode to better measure latency of 
heartbeat/blockReports/incrementalBlockReports of Active/Standby NN. 
Contributed by Jiandan Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/460a94a1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/460a94a1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/460a94a1

Branch: refs/heads/trunk
Commit: 460a94a10f9c314b77a25e14efbf7c4dc3f5d9aa
Parents: 993c214
Author: Inigo Goiri <inigo...@apache.org>
Authored: Thu Nov 15 10:58:57 2018 -0800
Committer: Inigo Goiri <inigo...@apache.org>
Committed: Thu Nov 15 10:58:57 2018 -0800

----------------------------------------------------------------------
 .../hadoop-common/src/site/markdown/Metrics.md  | 10 +++
 .../hdfs/server/datanode/BPOfferService.java    | 12 ++--
 .../hdfs/server/datanode/BPServiceActor.java    | 38 ++++++++---
 .../hdfs/server/datanode/BlockPoolManager.java  | 15 +++-
 .../datanode/IncrementalBlockReportManager.java |  5 +-
 .../datanode/metrics/DataNodeMetrics.java       | 33 +++++++--
 .../server/datanode/TestBPOfferService.java     | 12 +++-
 .../server/datanode/TestBlockPoolManager.java   |  4 +-
 .../server/datanode/TestDataNodeMetrics.java    | 72 ++++++++++++++++++++
 .../server/datanode/TestDatanodeRegister.java   |  2 +-
 10 files changed, 175 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/460a94a1/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md 
b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
index 83ad40a..357b705 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md
@@ -367,14 +367,24 @@ Each metrics record contains tags such as SessionId and 
Hostname as additional i
 | `ReplaceBlockOpAvgTime` | Average time of block replace operations in 
milliseconds |
 | `HeartbeatsNumOps` | Total number of heartbeats |
 | `HeartbeatsAvgTime` | Average heartbeat time in milliseconds |
+| `HeartbeatsFor`*ServiceId*`-`*NNId*`NumOps` | Total number of heartbeats to 
specific serviceId and nnId |
+| `HeartbeatsFor`*ServiceId*`-`*NNId*`AvgTime` | Average heartbeat time in 
milliseconds to specific serviceId and nnId |
 | `HeartbeatsTotalNumOps` | Total number of heartbeats which is a duplicate of 
HeartbeatsNumOps |
 | `HeartbeatsTotalAvgTime` | Average total heartbeat time in milliseconds |
+| `HeartbeatsTotalFor`*ServiceId*`-`*NNId*`NumOps` | Total number of 
heartbeats to specific serviceId and nnId which is a duplicate of 
`HeartbeatsFor`*ServiceId*`-`*NNId*`NumOps` |
+| `HeartbeatsTotalFor`*ServiceId*`-`*NNId*`AvgTime` | Average total heartbeat 
time in milliseconds to specific serviceId and nnId |
 | `LifelinesNumOps` | Total number of lifeline messages |
 | `LifelinesAvgTime` | Average lifeline message processing time in 
milliseconds |
+| `LifelinesFor`*ServiceId*`-`*NNId*`NumOps` | Total number of lifeline 
messages to specific serviceId and nnId |
+| `LifelinesFor`*ServiceId*`-`*NNId*`AvgTime` | Average lifeline message 
processing time to specific serviceId and nnId in milliseconds |
 | `BlockReportsNumOps` | Total number of block report operations |
 | `BlockReportsAvgTime` | Average time of block report operations in 
milliseconds |
+| `BlockReports`*ServiceId*`-`*NNId*`NumOps` | Total number of block report 
operations to specific serviceId and nnId |
+| `BlockReports`*ServiceId*`-`*NNId*`AvgTime` | Average time of block report 
operations to specific serviceId and nnId in milliseconds |
 | `IncrementalBlockReportsNumOps` | Total number of incremental block report 
operations |
 | `IncrementalBlockReportsAvgTime` | Average time of incremental block report 
operations in milliseconds |
+| `IncrementalBlockReports`*ServiceId*`-`*NNId*`NumOps` | Total number of 
incremental block report operations to specific serviceId and nnId |
+| `IncrementalBlockReports`*ServiceId*`-`*NNId*`AvgTime` | Average time of 
incremental block report operations to specific serviceId and nnId in 
milliseconds |
 | `CacheReportsNumOps` | Total number of cache report operations |
 | `CacheReportsAvgTime` | Average time of cache report operations in 
milliseconds |
 | `PacketAckRoundTripTimeNanosNumOps` | Total number of ack round trip |

http://git-wip-us.apache.org/repos/asf/hadoop/blob/460a94a1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index a25f6a9..3233e2c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -123,7 +123,7 @@ class BPOfferService {
   }
 
   BPOfferService(
-      final String nameserviceId,
+      final String nameserviceId, List<String> nnIds,
       List<InetSocketAddress> nnAddrs,
       List<InetSocketAddress> lifelineNnAddrs,
       DataNode dn) {
@@ -135,12 +135,13 @@ class BPOfferService {
     this.dn = dn;
 
     for (int i = 0; i < nnAddrs.size(); ++i) {
-      this.bpServices.add(new BPServiceActor(nnAddrs.get(i),
-          lifelineNnAddrs.get(i), this));
+      this.bpServices.add(new BPServiceActor(nameserviceId, nnIds.get(i),
+          nnAddrs.get(i), lifelineNnAddrs.get(i), this));
     }
   }
 
-  void refreshNNList(ArrayList<InetSocketAddress> addrs,
+  void refreshNNList(String serviceId, List<String> nnIds,
+      ArrayList<InetSocketAddress> addrs,
       ArrayList<InetSocketAddress> lifelineAddrs) throws IOException {
     Set<InetSocketAddress> oldAddrs = Sets.newHashSet();
     for (BPServiceActor actor : bpServices) {
@@ -151,7 +152,8 @@ class BPOfferService {
     // Process added NNs
     Set<InetSocketAddress> addedNNs = Sets.difference(newAddrs, oldAddrs);
     for (InetSocketAddress addedNN : addedNNs) {
-      BPServiceActor actor = new BPServiceActor(addedNN,
+      BPServiceActor actor = new BPServiceActor(serviceId,
+          nnIds.get(addrs.indexOf(addedNN)), addedNN,
           lifelineAddrs.get(addrs.indexOf(addedNN)), this);
       actor.start();
       bpServices.add(actor);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/460a94a1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 8f7a186..c4faa39 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -100,6 +100,8 @@ class BPServiceActor implements Runnable {
     CONNECTING, INIT_FAILED, RUNNING, EXITED, FAILED;
   }
 
+  private String serviceId = null;
+  private String nnId = null;
   private volatile RunningState runningState = RunningState.CONNECTING;
   private volatile boolean shouldServiceRun = true;
   private final DataNode dn;
@@ -115,8 +117,8 @@ class BPServiceActor implements Runnable {
   final LinkedList<BPServiceActorAction> bpThreadQueue 
       = new LinkedList<BPServiceActorAction>();
 
-  BPServiceActor(InetSocketAddress nnAddr, InetSocketAddress lifelineNnAddr,
-      BPOfferService bpos) {
+  BPServiceActor(String serviceId, String nnId, InetSocketAddress nnAddr,
+      InetSocketAddress lifelineNnAddr, BPOfferService bpos) {
     this.bpos = bpos;
     this.dn = bpos.getDataNode();
     this.nnAddr = nnAddr;
@@ -134,6 +136,12 @@ class BPServiceActor implements Runnable {
         dnConf.outliersReportIntervalMs);
     // get the value of maxDataLength.
     this.maxDataLength = dnConf.getMaxDataLength();
+    if (serviceId != null) {
+      this.serviceId = serviceId;
+    }
+    if (nnId != null) {
+      this.nnId = nnId;
+    }
   }
 
   public DatanodeRegistration getBpRegistration() {
@@ -354,7 +362,7 @@ class BPServiceActor implements Runnable {
     // or we will report an RBW replica after the BlockReport already reports
     // a FINALIZED one.
     ibrManager.sendIBRs(bpNamenode, bpRegistration,
-        bpos.getBlockPoolId());
+        bpos.getBlockPoolId(), getRpcMetricSuffix());
 
     long brCreateStartTime = monotonicNow();
     Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
@@ -417,7 +425,7 @@ class BPServiceActor implements Runnable {
       // Log the block report processing stats from Datanode perspective
       long brSendCost = monotonicNow() - brSendStartTime;
       long brCreateCost = brSendStartTime - brCreateStartTime;
-      dn.getMetrics().addBlockReport(brSendCost);
+      dn.getMetrics().addBlockReport(brSendCost, getRpcMetricSuffix());
       final int nCmds = cmds.size();
       LOG.info((success ? "S" : "Uns") +
           "uccessfully sent block report 0x" +
@@ -439,6 +447,18 @@ class BPServiceActor implements Runnable {
     return cmds.size() == 0 ? null : cmds;
   }
 
+  private String getRpcMetricSuffix() {
+    if (serviceId == null && nnId == null) {
+      return null;
+    } else if (serviceId == null && nnId != null) {
+      return nnId;
+    } else if (serviceId != null && nnId == null) {
+      return serviceId;
+    } else {
+      return serviceId + "-" + nnId;
+    }
+  }
+
   DatanodeCommand cacheReport() throws IOException {
     // If caching is disabled, do not send a cache report
     if (dn.getFSDataset().getCacheCapacity() == 0) {
@@ -657,7 +677,8 @@ class BPServiceActor implements Runnable {
               }
               fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
             }
-            dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);
+            dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime,
+                getRpcMetricSuffix());
 
             // If the state of this NN has changed (eg STANDBY->ACTIVE)
             // then let the BPOfferService update itself.
@@ -687,7 +708,7 @@ class BPServiceActor implements Runnable {
         if (!dn.areIBRDisabledForTests() &&
             (ibrManager.sendImmediately()|| sendHeartbeat)) {
           ibrManager.sendIBRs(bpNamenode, bpRegistration,
-              bpos.getBlockPoolId());
+              bpos.getBlockPoolId(), getRpcMetricSuffix());
         }
 
         List<DatanodeCommand> cmds = null;
@@ -709,7 +730,7 @@ class BPServiceActor implements Runnable {
 
         if (sendHeartbeat) {
           dn.getMetrics().addHeartbeatTotal(
-              scheduler.monotonicNow() - startTime);
+              scheduler.monotonicNow() - startTime, getRpcMetricSuffix());
         }
 
         // There is no work to do;  sleep until hearbeat timer elapses, 
@@ -1059,7 +1080,8 @@ class BPServiceActor implements Runnable {
         return;
       }
       sendLifeline();
-      dn.getMetrics().addLifeline(scheduler.monotonicNow() - startTime);
+      dn.getMetrics().addLifeline(scheduler.monotonicNow() - startTime,
+          getRpcMetricSuffix());
       scheduler.scheduleNextLifeline(scheduler.monotonicNow());
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/460a94a1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
index b03c511..9a7b6bc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java
@@ -216,14 +216,18 @@ class BlockPoolManager {
               lifelineAddrMap.get(nsToAdd);
           ArrayList<InetSocketAddress> addrs =
               Lists.newArrayListWithCapacity(nnIdToAddr.size());
+          ArrayList<String> nnIds =
+              Lists.newArrayListWithCapacity(nnIdToAddr.size());
           ArrayList<InetSocketAddress> lifelineAddrs =
               Lists.newArrayListWithCapacity(nnIdToAddr.size());
           for (String nnId : nnIdToAddr.keySet()) {
             addrs.add(nnIdToAddr.get(nnId));
+            nnIds.add(nnId);
             lifelineAddrs.add(nnIdToLifelineAddr != null ?
                 nnIdToLifelineAddr.get(nnId) : null);
           }
-          BPOfferService bpos = createBPOS(nsToAdd, addrs, lifelineAddrs);
+          BPOfferService bpos = createBPOS(nsToAdd, nnIds, addrs,
+              lifelineAddrs);
           bpByNameserviceId.put(nsToAdd, bpos);
           offerServices.add(bpos);
         }
@@ -260,17 +264,20 @@ class BlockPoolManager {
             Lists.newArrayListWithCapacity(nnIdToAddr.size());
         ArrayList<InetSocketAddress> lifelineAddrs =
             Lists.newArrayListWithCapacity(nnIdToAddr.size());
+        ArrayList<String> nnIds = Lists.newArrayListWithCapacity(
+            nnIdToAddr.size());
         for (String nnId : nnIdToAddr.keySet()) {
           addrs.add(nnIdToAddr.get(nnId));
           lifelineAddrs.add(nnIdToLifelineAddr != null ?
               nnIdToLifelineAddr.get(nnId) : null);
+          nnIds.add(nnId);
         }
         try {
           UserGroupInformation.getLoginUser()
               .doAs(new PrivilegedExceptionAction<Object>() {
                 @Override
                 public Object run() throws Exception {
-                  bpos.refreshNNList(addrs, lifelineAddrs);
+                  bpos.refreshNNList(nsToRefresh, nnIds, addrs, lifelineAddrs);
                   return null;
                 }
               });
@@ -288,8 +295,10 @@ class BlockPoolManager {
    */
   protected BPOfferService createBPOS(
       final String nameserviceId,
+      List<String> nnIds,
       List<InetSocketAddress> nnAddrs,
       List<InetSocketAddress> lifelineNnAddrs) {
-    return new BPOfferService(nameserviceId, nnAddrs, lifelineNnAddrs, dn);
+    return new BPOfferService(nameserviceId, nnIds, nnAddrs, lifelineNnAddrs,
+        dn);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/460a94a1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
index 1779374..9515b73 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/IncrementalBlockReportManager.java
@@ -194,7 +194,7 @@ class IncrementalBlockReportManager {
 
   /** Send IBRs to namenode. */
   void sendIBRs(DatanodeProtocol namenode, DatanodeRegistration registration,
-      String bpid) throws IOException {
+      String bpid, String nnRpcLatencySuffix) throws IOException {
     // Generate a list of the pending reports for each storage under the lock
     final StorageReceivedDeletedBlocks[] reports = generateIBRs();
     if (reports.length == 0) {
@@ -214,7 +214,8 @@ class IncrementalBlockReportManager {
     } finally {
 
       if (success) {
-        dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime);
+        dnMetrics.addIncrementalBlockReport(monotonicNow() - startTime,
+            nnRpcLatencySuffix);
         lastIBR = startTime;
       } else {
         // If we didn't succeed in sending the report, put all of the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/460a94a1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index 8f445a6..89cd1ca 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
 import org.apache.hadoop.metrics2.source.JvmMetrics;
 
 import java.util.concurrent.ThreadLocalRandom;
@@ -161,6 +162,10 @@ public class DataNodeMetrics {
   private MutableCounterLong ecReconstructionWriteTimeMillis;
 
   final MetricsRegistry registry = new MetricsRegistry("datanode");
+  @Metric("Milliseconds spent on calling NN rpc")
+  private MutableRatesWithAggregation
+      nnRpcLatency = registry.newRatesWithAggregation("nnRpcLatency");
+
   final String name;
   JvmMetrics jvmMetrics = null;
   private DataNodeUsageReportUtil dnUsageReportUtil;
@@ -232,25 +237,41 @@ public class DataNodeMetrics {
   public JvmMetrics getJvmMetrics() {
     return jvmMetrics;
   }
-  
-  public void addHeartbeat(long latency) {
+
+  public void addHeartbeat(long latency, String rpcMetricSuffix) {
     heartbeats.add(latency);
+    if (rpcMetricSuffix != null) {
+      nnRpcLatency.add("HeartbeatsFor" + rpcMetricSuffix, latency);
+    }
   }
 
-  public void addHeartbeatTotal(long latency) {
+  public void addHeartbeatTotal(long latency, String rpcMetricSuffix) {
     heartbeatsTotal.add(latency);
+    if (rpcMetricSuffix != null) {
+      nnRpcLatency.add("HeartbeatsTotalFor" + rpcMetricSuffix, latency);
+    }
   }
 
-  public void addLifeline(long latency) {
+  public void addLifeline(long latency, String rpcMetricSuffix) {
     lifelines.add(latency);
+    if (rpcMetricSuffix != null) {
+      nnRpcLatency.add("LifelinesFor" + rpcMetricSuffix, latency);
+    }
   }
 
-  public void addBlockReport(long latency) {
+  public void addBlockReport(long latency, String rpcMetricSuffix) {
     blockReports.add(latency);
+    if (rpcMetricSuffix != null) {
+      nnRpcLatency.add("BlockReportsFor" + rpcMetricSuffix, latency);
+    }
   }
 
-  public void addIncrementalBlockReport(long latency) {
+  public void addIncrementalBlockReport(long latency,
+      String rpcMetricSuffix) {
     incrementalBlockReports.add(latency);
+    if (rpcMetricSuffix != null) {
+      nnRpcLatency.add("IncrementalBlockReportsFor" + rpcMetricSuffix, 
latency);
+    }
   }
 
   public void addCacheReport(long latency) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/460a94a1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 26a9f37..1dc9aa9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -436,13 +436,16 @@ public class TestBPOfferService {
     // function to return the corresponding proxies.
 
     final Map<InetSocketAddress, DatanodeProtocolClientSideTranslatorPB> nnMap 
= Maps.newLinkedHashMap();
+    List<String> nnIds = Lists.newArrayListWithCapacity(nns.length);
     for (int port = 0; port < nns.length; port++) {
       nnMap.put(new InetSocketAddress(port), nns[port]);
       Mockito.doReturn(nns[port]).when(mockDn).connectToNN(
           Mockito.eq(new InetSocketAddress(port)));
+      nnIds.add("nn" + port);
     }
 
-    return new BPOfferService("test_ns", Lists.newArrayList(nnMap.keySet()),
+    return new BPOfferService("test_ns", nnIds,
+        Lists.newArrayList(nnMap.keySet()),
         Collections.<InetSocketAddress>nCopies(nnMap.size(), null), mockDn);
   }
 
@@ -912,7 +915,12 @@ public class TestBPOfferService {
       addrs.add(new InetSocketAddress(2));
       lifelineAddrs.add(null);
 
-      bpos.refreshNNList(addrs, lifelineAddrs);
+      ArrayList<String> nnIds = new ArrayList<>(addrs.size());
+      for (int i = 0; i < addrs.size(); i++) {
+        nnIds.add("nn" + i);
+      }
+
+      bpos.refreshNNList("serviceId", nnIds, addrs, lifelineAddrs);
 
       assertEquals(2, bpos.getBPServiceActors().size());
       // wait for handshake to run

http://git-wip-us.apache.org/repos/asf/hadoop/blob/460a94a1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
index e061e18..65ff9b0 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockPoolManager.java
@@ -54,6 +54,7 @@ public class TestBlockPoolManager {
       @Override
       protected BPOfferService createBPOS(
           final String nameserviceId,
+          List<String> nnIds,
           List<InetSocketAddress> nnAddrs,
           List<InetSocketAddress> lifelineNnAddrs) {
         final int idx = mockIdx++;
@@ -69,7 +70,8 @@ public class TestBlockPoolManager {
                   doLog("refresh #" + idx);
                   return null;
                 }
-              }).when(bpos).refreshNNList(
+              }).when(bpos).refreshNNList(Mockito.anyString(),
+                  Mockito.<List<String>>any(),
                   Mockito.<ArrayList<InetSocketAddress>>any(),
                   Mockito.<ArrayList<InetSocketAddress>>any());
         } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/460a94a1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
index 98ccd8e..b4e2640 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
@@ -27,6 +28,7 @@ import java.io.Closeable;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.List;
@@ -35,6 +37,7 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.Lists;
 
 import net.jcip.annotations.NotThreadSafe;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -428,4 +431,73 @@ public class TestDataNodeMetrics {
       }
     }, 1000, 6000);
   }
+
+  @Test
+  public void testNNRpcMetricsWithNonHA() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    // setting heartbeat interval to 1 hour to prevent bpServiceActor sends
+    // heartbeat periodically to NN during running test case, and 
bpServiceActor
+    // only sends heartbeat once after startup
+    conf.setTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, 1, TimeUnit.HOURS);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    cluster.waitActive();
+    DataNode dn = cluster.getDataNodes().get(0);
+    MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
+    assertCounter("HeartbeatsNumOps", 1L, rb);
+  }
+
+  @Test
+  public void testNNRpcMetricsWithHA() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    // setting heartbeat interval to 1 hour to prevent bpServiceActor sends
+    // heartbeat periodically to NN during running test case, and 
bpServiceActor
+    // only sends heartbeat once after startup
+    conf.setTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, 1, TimeUnit.HOURS);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(
+        MiniDFSNNTopology.simpleHATopology()).build();
+    cluster.waitActive();
+    DataNode dn = cluster.getDataNodes().get(0);
+    cluster.transitionToActive(0);
+    MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
+    assertCounter("HeartbeatsForminidfs-ns-nn1NumOps", 1L, rb);
+    assertCounter("HeartbeatsForminidfs-ns-nn2NumOps", 1L, rb);
+    assertCounter("HeartbeatsNumOps", 2L, rb);
+  }
+
+  @Test
+  public void testNNRpcMetricsWithFederation() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    // setting heartbeat interval to 1 hour to prevent bpServiceActor sends
+    // heartbeat periodically to NN during running test case, and 
bpServiceActor
+    // only sends heartbeat once after startup
+    conf.setTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, 1, TimeUnit.HOURS);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(
+        MiniDFSNNTopology.simpleFederatedTopology("ns1,ns2")).build();
+    cluster.waitActive();
+    DataNode dn = cluster.getDataNodes().get(0);
+    MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
+    assertCounter("HeartbeatsForns1NumOps", 1L, rb);
+    assertCounter("HeartbeatsForns2NumOps", 1L, rb);
+    assertCounter("HeartbeatsNumOps", 2L, rb);
+  }
+
+  @Test
+  public void testNNRpcMetricsWithFederationAndHA() throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    // setting heartbeat interval to 1 hour to prevent bpServiceActor sends
+    // heartbeat periodically to NN during running test case, and 
bpServiceActor
+    // only sends heartbeat once after startup
+    conf.setTimeDuration(DFS_HEARTBEAT_INTERVAL_KEY, 1, TimeUnit.HOURS);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).nnTopology(
+        MiniDFSNNTopology.simpleHAFederatedTopology(2)).build();
+    cluster.waitActive();
+    DataNode dn = cluster.getDataNodes().get(0);
+    MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
+
+    assertCounter("HeartbeatsForns0-nn0NumOps", 1L, rb);
+    assertCounter("HeartbeatsForns0-nn1NumOps", 1L, rb);
+    assertCounter("HeartbeatsForns1-nn0NumOps", 1L, rb);
+    assertCounter("HeartbeatsForns1-nn1NumOps", 1L, rb);
+    assertCounter("HeartbeatsNumOps", 4L, rb);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/460a94a1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
index 38eb054..13fe9e3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRegister.java
@@ -62,7 +62,7 @@ public class TestDatanodeRegister {
     BPOfferService mockBPOS = mock(BPOfferService.class);
     doReturn(mockDN).when(mockBPOS).getDataNode();
     
-    actor = new BPServiceActor(INVALID_ADDR, null, mockBPOS);
+    actor = new BPServiceActor("test", "test", INVALID_ADDR, null, mockBPOS);
 
     fakeNsInfo = mock(NamespaceInfo.class);
     // Return a a good software version.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to