HDFS-7960. The full block report should prune zombie storages even if they're 
not empty. Contributed by Colin McCabe and Eddy Xu.

(cherry picked from commit 50ee8f4e67a66aa77c5359182f61f3e951844db6)
(cherry picked from commit 2f46ee50bd4efc82ba3d30bd36f7637ea9d9714e)

Conflicts:
        
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
        
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
        
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
        
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
        
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
        
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
        
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
        
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/03d4af39
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/03d4af39
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/03d4af39

Branch: refs/heads/sjlee/hdfs-merge
Commit: 03d4af39e794dc03d764122077b434d658b6405e
Parents: 4c64877
Author: Andrew Wang <w...@apache.org>
Authored: Mon Mar 23 22:00:34 2015 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Thu Aug 13 10:54:26 2015 -0700

----------------------------------------------------------------------
 .../DatanodeProtocolClientSideTranslatorPB.java |   5 +-
 .../DatanodeProtocolServerSideTranslatorPB.java |   4 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |  15 +++
 .../server/blockmanagement/BlockManager.java    |  55 +++++++-
 .../blockmanagement/DatanodeDescriptor.java     |  51 ++++++-
 .../blockmanagement/DatanodeStorageInfo.java    |  13 +-
 .../hdfs/server/datanode/BPServiceActor.java    |  34 +++--
 .../hdfs/server/namenode/NameNodeRpcServer.java |  12 +-
 .../server/protocol/BlockReportContext.java     |  52 +++++++
 .../hdfs/server/protocol/DatanodeProtocol.java  |  10 +-
 .../src/main/proto/DatanodeProtocol.proto       |  14 ++
 .../blockmanagement/TestBlockManager.java       |   8 +-
 .../TestNameNodePrunesMissingStorages.java      | 135 ++++++++++++++++++-
 .../server/datanode/BlockReportTestBase.java    |   4 +-
 .../server/datanode/TestBPOfferService.java     |  10 +-
 .../TestBlockHasMultipleReplicasOnSameDN.java   |   4 +-
 .../datanode/TestDataNodeVolumeFailure.java     |   3 +-
 .../TestDatanodeProtocolRetryPolicy.java        |   4 +-
 ...TestDnRespectsBlockReportSplitThreshold.java |   7 +-
 .../TestNNHandlesBlockReportPerStorage.java     |   7 +-
 .../TestNNHandlesCombinedBlockReport.java       |   4 +-
 .../server/namenode/NNThroughputBenchmark.java  |   9 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |   4 +-
 .../hdfs/server/namenode/ha/TestDNFencing.java  |   4 +-
 24 files changed, 422 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 46023ec..e169d0e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -46,6 +46,7 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlo
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -156,7 +157,8 @@ public class DatanodeProtocolClientSideTranslatorPB 
implements
 
   @Override
   public DatanodeCommand blockReport(DatanodeRegistration registration,
-      String poolId, StorageBlockReport[] reports) throws IOException {
+      String poolId, StorageBlockReport[] reports, BlockReportContext context)
+        throws IOException {
     BlockReportRequestProto.Builder builder = BlockReportRequestProto
         .newBuilder().setRegistration(PBHelper.convert(registration))
         .setBlockPoolId(poolId);
@@ -170,6 +172,7 @@ public class DatanodeProtocolClientSideTranslatorPB 
implements
       }
       builder.addReports(reportBuilder.build());
     }
+    builder.setContext(PBHelper.convert(context));
     BlockReportResponseProto resp;
     try {
       resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index d016735..97de258 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -150,7 +150,9 @@ public class DatanodeProtocolServerSideTranslatorPB 
implements
     }
     try {
       cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
-          request.getBlockPoolId(), report);
+          request.getBlockPoolId(), report,
+          request.hasContext() ?
+              PBHelper.convert(request.getContext()) : null);
     } catch (IOException e) {
       throw new ServiceException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 830ea5e..c52588f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -111,10 +111,12 @@ import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rollin
 import 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
+import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
 import 
org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
@@ -194,6 +196,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
@@ -2926,4 +2929,16 @@ public class PBHelper {
         ezKeyVersionName);
   }
 
+  public static BlockReportContext convert(BlockReportContextProto proto) {
+    return new BlockReportContext(proto.getTotalRpcs(),
+        proto.getCurRpc(), proto.getId());
+  }
+
+  public static BlockReportContextProto convert(BlockReportContext context) {
+    return BlockReportContextProto.newBuilder().
+        setTotalRpcs(context.getTotalRpcs()).
+        setCurRpc(context.getCurRpc()).
+        setId(context.getReportId()).
+        build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 5a38351..69f3e46 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -72,6 +72,7 @@ import 
org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import 
org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -1783,7 +1784,8 @@ public class BlockManager {
    */
   public boolean processReport(final DatanodeID nodeID,
       final DatanodeStorage storage,
-      final BlockListAsLongs newReport) throws IOException {
+      final BlockListAsLongs newReport, BlockReportContext context,
+      boolean lastStorageInRpc) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.now(); //after acquiring write lock
     final long endTime;
@@ -1831,7 +1833,31 @@ public class BlockManager {
             + "contents are no longer considered stale");
         rescanPostponedMisreplicatedBlocks();
       }
-      
+
+      if (context != null) {
+        storageInfo.setLastBlockReportId(context.getReportId());
+        if (lastStorageInRpc) {
+          int rpcsSeen = node.updateBlockReportContext(context);
+          if (rpcsSeen >= context.getTotalRpcs()) {
+            List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
+            if (zombies.isEmpty()) {
+              LOG.debug("processReport 0x" +
+                  Long.toHexString(context.getReportId()) +
+                  ": no zombie storages found.");
+            } else {
+              for (DatanodeStorageInfo zombie : zombies) {
+                removeZombieReplicas(context, zombie);
+              }
+            }
+            node.clearBlockReportContext();
+          } else {
+            LOG.debug("processReport 0x" +
+                Long.toHexString(context.getReportId()) + ": " +
+                (context.getTotalRpcs() - rpcsSeen) +
+                " more RPCs remaining in this report.");
+          }
+        }
+      }
     } finally {
       endTime = Time.now();
       namesystem.writeUnlock();
@@ -1857,6 +1883,31 @@ public class BlockManager {
     return !node.hasStaleStorages();
   }
 
+  private void removeZombieReplicas(BlockReportContext context,
+      DatanodeStorageInfo zombie) {
+    LOG.warn("processReport 0x" + Long.toHexString(context.getReportId()) +
+        ": removing zombie storage " + zombie.getStorageID() +
+        ", which no longer exists on the DataNode.");
+    assert(namesystem.hasWriteLock());
+    Iterator<BlockInfo> iter = zombie.getBlockIterator();
+    int prevBlocks = zombie.numBlocks();
+    while (iter.hasNext()) {
+      BlockInfo block = iter.next();
+      // We assume that a block can be on only one storage in a DataNode.
+      // That's why we pass in the DatanodeDescriptor rather than the
+      // DatanodeStorageInfo.
+      // TODO: remove this assumption in case we want to put a block on
+      // more than one storage on a datanode (and because it's a difficult
+      // assumption to really enforce)
+      removeStoredBlock(block, zombie.getDatanodeDescriptor());
+      invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block);
+    }
+    assert(zombie.numBlocks() == 0);
+    LOG.warn("processReport 0x" + Long.toHexString(context.getReportId()) +
+        ": removed " + prevBlocks + " replicas from storage " +
+        zombie.getStorageID() + ", which no longer exists on the DataNode.");
+  }
+
   /**
    * Rescan the list of blocks which were previously postponed.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index a407fe8..1bef805 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,6 +32,7 @@ import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.util.EnumCounters;
@@ -63,7 +66,25 @@ public class DatanodeDescriptor extends DatanodeInfo {
   // Stores status of decommissioning.
   // If node is not decommissioning, do not use this object for anything.
   public final DecommissioningStatus decommissioningStatus = new 
DecommissioningStatus();
-  
+
+  private long curBlockReportId = 0;
+
+  private BitSet curBlockReportRpcsSeen = null;
+
+  public int updateBlockReportContext(BlockReportContext context) {
+    if (curBlockReportId != context.getReportId()) {
+      curBlockReportId = context.getReportId();
+      curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
+    }
+    curBlockReportRpcsSeen.set(context.getCurRpc());
+    return curBlockReportRpcsSeen.cardinality();
+  }
+
+  public void clearBlockReportContext() {
+    curBlockReportId = 0;
+    curBlockReportRpcsSeen = null;
+  }
+
   /** Block and targets pair */
   @InterfaceAudience.Private
   @InterfaceStability.Evolving
@@ -282,6 +303,34 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
+  static final private List<DatanodeStorageInfo> EMPTY_STORAGE_INFO_LIST =
+      ImmutableList.of();
+
+  List<DatanodeStorageInfo> removeZombieStorages() {
+    List<DatanodeStorageInfo> zombies = null;
+    synchronized (storageMap) {
+      Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
+          storageMap.entrySet().iterator();
+      while (iter.hasNext()) {
+        Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
+        DatanodeStorageInfo storageInfo = entry.getValue();
+        if (storageInfo.getLastBlockReportId() != curBlockReportId) {
+          LOG.info(storageInfo.getStorageID() + " had lastBlockReportId 0x" +
+              Long.toHexString(storageInfo.getLastBlockReportId()) +
+              ", but curBlockReportId = 0x" +
+              Long.toHexString(curBlockReportId));
+          iter.remove();
+          if (zombies == null) {
+            zombies = new LinkedList<DatanodeStorageInfo>();
+          }
+          zombies.add(storageInfo);
+        }
+        storageInfo.setLastBlockReportId(0);
+      }
+    }
+    return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
+  }
+
   /**
    * Remove block from the list of blocks belonging to the data-node. Remove
    * data-node from the block.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 8c44b30..34930c8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -115,6 +115,9 @@ public class DatanodeStorageInfo {
   private volatile BlockInfo blockList = null;
   private int numBlocks = 0;
 
+  // The ID of the last full block report which updated this storage.
+  private long lastBlockReportId = 0;
+
   /** The number of block reports received */
   private int blockReportCount = 0;
 
@@ -178,7 +181,15 @@ public class DatanodeStorageInfo {
     this.remaining = remaining;
     this.blockPoolUsed = blockPoolUsed;
   }
-  
+
+  long getLastBlockReportId() {
+    return lastBlockReportId;
+  }
+
+  void setLastBlockReportId(long lastBlockReportId) {
+    this.lastBlockReportId = lastBlockReportId;
+  }
+
   State getState() {
     return this.state;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 62ba1ab..3d116ea 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -43,6 +43,7 @@ import 
org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -449,6 +450,17 @@ class BPServiceActor implements Runnable {
     return sendImmediateIBR;
   }
 
+  private long prevBlockReportId = 0;
+
+  private long generateUniqueBlockReportId() {
+    long id = System.nanoTime();
+    if (id <= prevBlockReportId) {
+      id = prevBlockReportId + 1;
+    }
+    prevBlockReportId = id;
+    return id;
+  }
+
   /**
    * Report the list blocks to the Namenode
    * @return DatanodeCommands returned by the NN. May be null.
@@ -492,11 +504,13 @@ class BPServiceActor implements Runnable {
     int numRPCs = 0;
     boolean success = false;
     long brSendStartTime = now();
+    long reportId = generateUniqueBlockReportId();
     try {
       if (totalBlockCount < dnConf.blockReportSplitThreshold) {
         // Below split threshold, send all reports in a single message.
         DatanodeCommand cmd = bpNamenode.blockReport(
-            bpRegistration, bpos.getBlockPoolId(), reports);
+            bpRegistration, bpos.getBlockPoolId(), reports,
+              new BlockReportContext(1, 0, reportId));
         numRPCs = 1;
         numReportsSent = reports.length;
         if (cmd != null) {
@@ -504,10 +518,11 @@ class BPServiceActor implements Runnable {
         }
       } else {
         // Send one block report per message.
-        for (StorageBlockReport report : reports) {
-          StorageBlockReport singleReport[] = { report };
+        for (int r = 0; r < reports.length; r++) {
+          StorageBlockReport singleReport[] = { reports[r] };
           DatanodeCommand cmd = bpNamenode.blockReport(
-              bpRegistration, bpos.getBlockPoolId(), singleReport);
+              bpRegistration, bpos.getBlockPoolId(), singleReport,
+              new BlockReportContext(reports.length, r, reportId));
           numReportsSent++;
           numRPCs++;
           if (cmd != null) {
@@ -523,11 +538,12 @@ class BPServiceActor implements Runnable {
       dn.getMetrics().addBlockReport(brSendCost);
       final int nCmds = cmds.size();
       LOG.info((success ? "S" : "Uns") +
-          "uccessfully sent " + numReportsSent +
-          " of " + reports.length +
-          " blockreports for " + totalBlockCount +
-          " total blocks using " + numRPCs +
-          " RPCs. This took " + brCreateCost +
+          "uccessfully sent block report 0x" +
+          Long.toHexString(reportId) + ",  containing " + reports.length +
+          " storage report(s), of which we sent " + numReportsSent + "." +
+          " The reports had " + totalBlockCount +
+          " total blocks and used " + numRPCs +
+          " RPC(s). This took " + brCreateCost +
           " msec to generate and " + brSendCost +
           " msecs for RPC and NN processing." +
           " Got back " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 0850ec1..62a51b7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -119,6 +119,7 @@ import 
org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import 
org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -1143,7 +1144,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
 
   @Override // DatanodeProtocol
   public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
-      String poolId, StorageBlockReport[] reports) throws IOException {
+        String poolId, StorageBlockReport[] reports,
+        BlockReportContext context) throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     if(blockStateChangeLog.isDebugEnabled()) {
@@ -1152,14 +1154,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
     }
     final BlockManager bm = namesystem.getBlockManager(); 
     boolean noStaleStorages = false;
-    for(StorageBlockReport r : reports) {
-      final BlockListAsLongs blocks = new BlockListAsLongs(r.getBlocks());
+    for (int r = 0; r < reports.length; r++) {
+      final BlockListAsLongs blocks =
+          new BlockListAsLongs(reports[r].getBlocks());
       //
       // BlockManager.processReport accumulates information of prior calls
       // for the same node and storage, so the value returned by the last
       // call of this loop is the final updated value for noStaleStorage.
       //
-      noStaleStorages = bm.processReport(nodeReg, r.getStorage(), blocks);
+      noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(),
+          blocks, context, (r == reports.length - 1));
       metrics.incrStorageBlockReportOps();
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
new file mode 100644
index 0000000..a084a81
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+/**
+ * The context of the block report.
+ *
+ * This is a set of fields that the Datanode sends to provide context about a
+ * block report RPC.  The context includes a unique 64-bit ID which
+ * identifies the block report as a whole.  It also includes the total number
+ * of RPCs which this block report is split into, and the index into that
+ * total for the current RPC.
+ */
+public class BlockReportContext {
+  private final int totalRpcs;
+  private final int curRpc;
+  private final long reportId;
+
+  public BlockReportContext(int totalRpcs, int curRpc, long reportId) {
+    this.totalRpcs = totalRpcs;
+    this.curRpc = curRpc;
+    this.reportId = reportId;
+  }
+
+  public int getTotalRpcs() {
+    return totalRpcs;
+  }
+
+  public int getCurRpc() {
+    return curRpc;
+  }
+
+  public long getReportId() {
+    return reportId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index c3b0f68..c506082 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -125,20 +124,23 @@ public interface DatanodeProtocol {
    *     Each finalized block is represented as 3 longs. Each under-
    *     construction replica is represented as 4 longs.
    *     This is done instead of Block[] to reduce memory used by block 
reports.
-   *     
+   * @param reports report of blocks per storage
+   * @param context Context information for this block report.
+   *
    * @return - the next command for DN to process.
    * @throws IOException
    */
   @Idempotent
   public DatanodeCommand blockReport(DatanodeRegistration registration,
-      String poolId, StorageBlockReport[] reports) throws IOException;
+            String poolId, StorageBlockReport[] reports,
+            BlockReportContext context) throws IOException;
     
 
   /**
    * Communicates the complete list of locally cached blocks to the NameNode.
    * 
    * This method is similar to
-   * {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[])},
+   * {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[], 
BlockReportContext)},
    * which is used to communicated blocks stored on disk.
    *
    * @param            The datanode registration.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 53ff52e..6a71dcd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -212,11 +212,25 @@ message HeartbeatResponseProto {
  *                second long represents length
  *                third long represents gen stamp
  *                fourth long (if under construction) represents replica state
+ * context      - An optional field containing information about the context
+ *                of this block report.
  */
 message BlockReportRequestProto {
   required DatanodeRegistrationProto registration = 1;
   required string blockPoolId = 2;
   repeated StorageBlockReportProto reports = 3;
+  optional BlockReportContextProto context = 4;
+}
+
+message BlockReportContextProto  {
+  // The total number of RPCs this block report is broken into.
+  required int32 totalRpcs = 1;
+
+  // The index of the current RPC (zero-based)
+  required int32 curRpc = 2;
+
+  // The unique 64-bit ID of this block report
+  required int64 id = 3;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 5beb811..ddb6143 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -560,12 +560,12 @@ public class TestBlockManager {
     reset(node);
     
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        new BlockListAsLongs(null, null));
+        new BlockListAsLongs(null, null), null, false);
     assertEquals(1, ds.getBlockReportCount());
     // send block report again, should NOT be processed
     reset(node);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        new BlockListAsLongs(null, null));
+        new BlockListAsLongs(null, null), null, false);
     assertEquals(1, ds.getBlockReportCount());
 
     // re-register as if node restarted, should update existing node
@@ -576,7 +576,7 @@ public class TestBlockManager {
     // send block report, should be processed after restart
     reset(node);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-                     new BlockListAsLongs(null, null));
+                     new BlockListAsLongs(null, null), null, false);
     // Reinitialize as registration with empty storage list pruned
     // node.storageMap.
     ds = node.getStorageInfos()[0];
@@ -608,7 +608,7 @@ public class TestBlockManager {
     reset(node);
     doReturn(1).when(node).numBlocks();
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        new BlockListAsLongs(null, null));
+        new BlockListAsLongs(null, null), null, false);
     assertEquals(1, ds.getBlockReportCount());
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index 88300de..b5eebcd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -18,27 +18,41 @@
 
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import com.google.common.base.Supplier;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.math3.stat.inference.TestUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
 
 
 public class TestNameNodePrunesMissingStorages {
@@ -110,7 +124,9 @@ public class TestNameNodePrunesMissingStorages {
   }
 
   /**
-   * Verify that the NameNode does not prune storages with blocks.
+   * Verify that the NameNode does not prune storages with blocks
+   * simply as a result of a heartbeat being sent missing that storage.
+   *
    * @throws IOException
    */
   @Test (timeout=300000)
@@ -118,4 +134,119 @@ public class TestNameNodePrunesMissingStorages {
     // Run the test with 1 storage, after the text still expect 1 storage.
     runTest(GenericTestUtils.getMethodName(), true, 1, 1);
   }
+
+  /**
+   * Regression test for HDFS-7960.<p/>
+   *
+   * Shutting down a datanode, removing a storage directory, and restarting
+   * the DataNode should not produce zombie storages.
+   */
+  @Test(timeout=300000)
+  public void testRemovingStorageDoesNotProduceZombies() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+    final int NUM_STORAGES_PER_DN = 2;
+    final MiniDFSCluster cluster = new MiniDFSCluster
+        .Builder(conf).numDataNodes(3)
+        .storagesPerDatanode(NUM_STORAGES_PER_DN)
+        .build();
+    try {
+      cluster.waitActive();
+      for (DataNode dn : cluster.getDataNodes()) {
+        assertEquals(NUM_STORAGES_PER_DN,
+          cluster.getNamesystem().getBlockManager().
+              getDatanodeManager().getDatanode(dn.getDatanodeId()).
+              getStorageInfos().length);
+      }
+      // Create a file which will end up on all 3 datanodes.
+      final Path TEST_PATH = new Path("/foo1");
+      DistributedFileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH, 1024, (short) 3, 0xcafecafe);
+      for (DataNode dn : cluster.getDataNodes()) {
+        DataNodeTestUtils.triggerBlockReport(dn);
+      }
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/foo1"));
+      cluster.getNamesystem().writeLock();
+      final String storageIdToRemove;
+      String datanodeUuid;
+      // Find the first storage which this block is in.
+      try {
+        Iterator<DatanodeStorageInfo> storageInfoIter =
+            cluster.getNamesystem().getBlockManager().
+                getStorages(block.getLocalBlock()).iterator();
+        assertTrue(storageInfoIter.hasNext());
+        DatanodeStorageInfo info = storageInfoIter.next();
+        storageIdToRemove = info.getStorageID();
+        datanodeUuid = info.getDatanodeDescriptor().getDatanodeUuid();
+      } finally {
+        cluster.getNamesystem().writeUnlock();
+      }
+      // Find the DataNode which holds that first storage.
+      final DataNode datanodeToRemoveStorageFrom;
+      int datanodeToRemoveStorageFromIdx = 0;
+      while (true) {
+        if (datanodeToRemoveStorageFromIdx >= cluster.getDataNodes().size()) {
+          Assert.fail("failed to find datanode with uuid " + datanodeUuid);
+          datanodeToRemoveStorageFrom = null;
+          break;
+        }
+        DataNode dn = cluster.getDataNodes().
+            get(datanodeToRemoveStorageFromIdx);
+        if (dn.getDatanodeUuid().equals(datanodeUuid)) {
+          datanodeToRemoveStorageFrom = dn;
+          break;
+        }
+        datanodeToRemoveStorageFromIdx++;
+      }
+      // Find the volume within the datanode which holds that first storage.
+      List<? extends FsVolumeSpi> volumes =
+          datanodeToRemoveStorageFrom.getFSDataset().getVolumes();
+      assertEquals(NUM_STORAGES_PER_DN, volumes.size());
+      String volumeDirectoryToRemove = null;
+      for (FsVolumeSpi volume : volumes) {
+        if (volume.getStorageID().equals(storageIdToRemove)) {
+          volumeDirectoryToRemove = volume.getBasePath();
+        }
+      }
+      // Shut down the datanode and remove the volume.
+      // Replace the volume directory with a regular file, which will
+      // cause a volume failure.  (If we merely removed the directory,
+      // it would be re-initialized with a new storage ID.)
+      assertNotNull(volumeDirectoryToRemove);
+      datanodeToRemoveStorageFrom.shutdown();
+      FileUtil.fullyDelete(new File(volumeDirectoryToRemove));
+      FileOutputStream fos = new FileOutputStream(volumeDirectoryToRemove);
+      try {
+        fos.write(1);
+      } finally {
+        fos.close();
+      }
+      cluster.restartDataNode(datanodeToRemoveStorageFromIdx);
+      // Wait for the NameNode to remove the storage.
+      LOG.info("waiting for the datanode to remove " + storageIdToRemove);
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          final DatanodeDescriptor dnDescriptor =
+              cluster.getNamesystem().getBlockManager().getDatanodeManager().
+                  getDatanode(datanodeToRemoveStorageFrom.getDatanodeUuid());
+          assertNotNull(dnDescriptor);
+          DatanodeStorageInfo[] infos = dnDescriptor.getStorageInfos();
+          for (DatanodeStorageInfo info : infos) {
+            if (info.getStorageID().equals(storageIdToRemove)) {
+              LOG.info("Still found storage " + storageIdToRemove + " on " +
+                  info + ".");
+              return false;
+            }
+          }
+          assertEquals(NUM_STORAGES_PER_DN - 1, infos.length);
+          return true;
+        }
+      }, 10, 30000);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index e9557da..52212db 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -53,6 +53,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@@ -603,7 +604,8 @@ public abstract class BlockReportTestBase {
         .when(spy).blockReport(
           Mockito.<DatanodeRegistration>anyObject(),
           Mockito.anyString(),
-          Mockito.<StorageBlockReport[]>anyObject());
+          Mockito.<StorageBlockReport[]>anyObject(),
+          Mockito.<BlockReportContext>anyObject());
 
       // Force a block report to be generated. The block report will have
       // an RBW replica in it. Wait for the RPC to be sent, but block

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 7171c49..c601dd8 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -41,6 +41,7 @@ import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -209,7 +210,8 @@ public class TestBPOfferService {
         .when(mockNN2).blockReport(
             Mockito.<DatanodeRegistration>anyObject(),  
             Mockito.eq(FAKE_BPID),
-            Mockito.<StorageBlockReport[]>anyObject());
+            Mockito.<StorageBlockReport[]>anyObject(),
+            Mockito.<BlockReportContext>anyObject());
 
     bpos.start();
     try {
@@ -399,7 +401,8 @@ public class TestBPOfferService {
           Mockito.verify(mockNN).blockReport(
               Mockito.<DatanodeRegistration>anyObject(),  
               Mockito.eq(FAKE_BPID),
-              Mockito.<StorageBlockReport[]>anyObject());
+              Mockito.<StorageBlockReport[]>anyObject(),
+              Mockito.<BlockReportContext>anyObject());
           return true;
         } catch (Throwable t) {
           LOG.info("waiting on block report: " + t.getMessage());
@@ -424,7 +427,8 @@ public class TestBPOfferService {
           Mockito.verify(mockNN).blockReport(
                   Mockito.<DatanodeRegistration>anyObject(),
                   Mockito.eq(FAKE_BPID),
-                  Mockito.<StorageBlockReport[]>anyObject());
+                  Mockito.<StorageBlockReport[]>anyObject(),
+                  Mockito.<BlockReportContext>anyObject());
           return true;
         } catch (Throwable t) {
           LOG.info("waiting on block report: " + t.getMessage());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
index e71c0ea..18975a2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@@ -121,7 +122,8 @@ public class TestBlockHasMultipleReplicasOnSameDN {
     }
 
     // Should not assert!
-    cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports);
+    cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
+        new BlockReportContext(1, 0, System.nanoTime()));
 
     // Get the block locations once again.
     locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * 
NUM_BLOCKS);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 6b9c4b1..8256ccd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -58,6 +58,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -184,7 +185,7 @@ public class TestDataNodeVolumeFailure {
             new StorageBlockReport(dnStorage, blockList.getBlockListAsLongs());
     }
     
-    cluster.getNameNodeRpc().blockReport(dnR, bpid, reports);
+    cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null);
 
     // verify number of blocks and files...
     verify(filename, filesize);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index c7ed5b9..bdc439b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -135,7 +136,8 @@ public class TestDatanodeProtocolRetryPolicy {
           Mockito.verify(mockNN).blockReport(
               Mockito.eq(datanodeRegistration),
               Mockito.eq(POOL_ID),
-              Mockito.<StorageBlockReport[]>anyObject());
+              Mockito.<StorageBlockReport[]>anyObject(),
+              Mockito.<BlockReportContext>anyObject());
           return true;
         } catch (Throwable t) {
           LOG.info("waiting on block report: " + t.getMessage());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
index 7058d71..ec53741 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import 
org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
@@ -133,7 +134,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
     Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
         any(DatanodeRegistration.class),
         anyString(),
-        captor.capture());
+        captor.capture(),  Mockito.<BlockReportContext>anyObject());
 
     verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
   }
@@ -165,7 +166,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
     Mockito.verify(nnSpy, times(1)).blockReport(
         any(DatanodeRegistration.class),
         anyString(),
-        captor.capture());
+        captor.capture(),  Mockito.<BlockReportContext>anyObject());
 
     verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), 
BLOCKS_IN_FILE);
   }
@@ -197,7 +198,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
     Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
         any(DatanodeRegistration.class),
         anyString(),
-        captor.capture());
+        captor.capture(), Mockito.<BlockReportContext>anyObject());
 
     verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
index 1b03786..b150b0d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.util.Time;
 
 
 /**
@@ -33,10 +35,13 @@ public class TestNNHandlesBlockReportPerStorage extends 
BlockReportTestBase {
   @Override
   protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
       StorageBlockReport[] reports) throws IOException {
+    int i = 0;
     for (StorageBlockReport report : reports) {
       LOG.info("Sending block report for storage " + 
report.getStorage().getStorageID());
       StorageBlockReport[] singletonReport = { report };
-      cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport);
+      cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
+          new BlockReportContext(reports.length, i, System.nanoTime()));
+      i++;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
index 036b550..dca3c88 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 
@@ -34,6 +35,7 @@ public class TestNNHandlesCombinedBlockReport extends 
BlockReportTestBase {
   protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
                                   StorageBlockReport[] reports) throws 
IOException {
     LOG.info("Sending combined block reports for " + dnR);
-    cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+    cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
+        new BlockReportContext(1, 0, System.nanoTime()));
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index 1fb1c1f..b7b3e01 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -46,6 +46,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -938,7 +939,8 @@ public class NNThroughputBenchmark implements Tool {
               new BlockListAsLongs(null, null).getBlockListAsLongs())
       };
       nameNodeProto.blockReport(dnRegistration, 
-          nameNode.getNamesystem().getBlockPoolId(), reports);
+          nameNode.getNamesystem().getBlockPoolId(), reports,
+              new BlockReportContext(1, 0, System.nanoTime()));
     }
 
     /**
@@ -1181,8 +1183,9 @@ public class NNThroughputBenchmark implements Tool {
       long start = Time.now();
       StorageBlockReport[] report = { new StorageBlockReport(
           dn.storage, dn.getBlockReportList()) };
-      nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
-          .getBlockPoolId(), report);
+      nameNodeProto.blockReport(dn.dnRegistration,
+          nameNode.getNamesystem().getBlockPoolId(), report,
+          new BlockReportContext(1, 0, System.nanoTime()));
       long end = Time.now();
       return end-start;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index 256f30a..566261e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -132,7 +133,8 @@ public class TestDeadDatanode {
         new DatanodeStorage(reg.getDatanodeUuid()),
         new long[] { 0L, 0L, 0L }) };
     try {
-      dnp.blockReport(reg, poolId, report);
+      dnp.blockReport(reg, poolId, report,
+          new BlockReportContext(1, 0, System.nanoTime()));
       fail("Expected IOException is not thrown");
     } catch (IOException ex) {
       // Expected

http://git-wip-us.apache.org/repos/asf/hadoop/blob/03d4af39/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
index 75d5b70..a538b6e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
@@ -53,6 +53,7 @@ import 
org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.io.IOUtils;
@@ -537,7 +538,8 @@ public class TestDNFencing {
         .when(spy).blockReport(
           Mockito.<DatanodeRegistration>anyObject(),
           Mockito.anyString(),
-          Mockito.<StorageBlockReport[]>anyObject());
+          Mockito.<StorageBlockReport[]>anyObject(),
+          Mockito.<BlockReportContext>anyObject());
       dn.scheduleAllBlockReport(0);
       delayer.waitForCall();
       

Reply via email to