Repository: hadoop
Updated Branches:
  refs/heads/branch-2 e82135df8 -> 3be39d022


HDFS-9198. Coalesce IBR processing in the NN. (Daryn Sharp via umamahesh)

(cherry picked from commit f741476146574550a1a208d58ef8be76639e5ddc)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3be39d02
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3be39d02
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3be39d02

Branch: refs/heads/branch-2
Commit: 3be39d022971e3fcb370e48cffa1e2bb709d5c4e
Parents: e82135d
Author: Uma Mahesh <umamah...@apache.org>
Authored: Wed Dec 16 18:16:39 2015 -0800
Committer: Uma Mahesh <umamah...@apache.org>
Committed: Wed Dec 16 22:04:19 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   2 +
 .../server/blockmanagement/BlockManager.java    | 154 ++++++++++++++++-
 .../blockmanagement/DatanodeDescriptor.java     |  17 +-
 .../server/blockmanagement/DatanodeManager.java |   2 +-
 .../blockmanagement/DatanodeStorageInfo.java    |   2 +-
 .../hdfs/server/namenode/CacheManager.java      |   2 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |  40 ++++-
 .../namenode/metrics/NameNodeMetrics.java       |  12 ++
 .../hadoop/hdfs/TestDatanodeRegistration.java   | 140 ++++++++++++++-
 .../blockmanagement/TestBlockManager.java       | 173 ++++++++++++++++++-
 .../blockmanagement/TestPendingReplication.java |   4 +-
 .../server/datanode/BlockReportTestBase.java    |  10 +-
 .../datanode/TestIncrementalBrVariations.java   |   8 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |  16 +-
 14 files changed, 542 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 0263f17..b6a258e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1637,6 +1637,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-9430. Remove waitForLoadingFSImage since checkNNStartup has ensured
     image loaded and namenode started. (Brahma Reddy Battula via mingma)
 
+    HDFS-9198. Coalesce IBR processing in the NN. (Daryn Sharp via umamahesh)
+
 Release 2.7.3 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 4e36c0f..1b331ba 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -34,6 +34,11 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -91,6 +96,7 @@ import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.LightWeightGSet;
 import org.apache.hadoop.util.Time;
 
@@ -192,6 +198,10 @@ public class BlockManager implements BlockStatsMXBean {
   /** Replication thread. */
   final Daemon replicationThread = new Daemon(new ReplicationMonitor());
   
+  /** Block report thread for handling async reports. */
+  private final BlockReportProcessingThread blockReportThread =
+      new BlockReportProcessingThread();
+
   /** Store blocks -> datanodedescriptor(s) map of corrupt replicas */
   final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
 
@@ -483,6 +493,7 @@ public class BlockManager implements BlockStatsMXBean {
     datanodeManager.activate(conf);
     this.replicationThread.setName("ReplicationMonitor");
     this.replicationThread.start();
+    this.blockReportThread.start();
     mxBeanName = MBeans.register("NameNode", "BlockStats", this);
     bmSafeMode.activate(blockTotal);
   }
@@ -491,7 +502,9 @@ public class BlockManager implements BlockStatsMXBean {
     bmSafeMode.close();
     try {
       replicationThread.interrupt();
+      blockReportThread.interrupt();
       replicationThread.join(3000);
+      blockReportThread.join(3000);
     } catch (InterruptedException ie) {
     }
     datanodeManager.close();
@@ -1877,7 +1890,7 @@ public class BlockManager implements BlockStatsMXBean {
 
     try {
       node = datanodeManager.getDatanode(nodeID);
-      if (node == null || !node.isAlive()) {
+      if (node == null || !node.isRegistered()) {
         throw new IOException(
             "ProcessReport from dead or unregistered node: " + nodeID);
       }
@@ -3229,17 +3242,23 @@ public class BlockManager implements BlockStatsMXBean {
   public void processIncrementalBlockReport(final DatanodeID nodeID,
       final StorageReceivedDeletedBlocks srdb) throws IOException {
     assert namesystem.hasWriteLock();
-    int received = 0;
-    int deleted = 0;
-    int receiving = 0;
     final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
-    if (node == null || !node.isAlive()) {
+    if (node == null || !node.isRegistered()) {
       blockLog.warn("BLOCK* processIncrementalBlockReport"
               + " is received from dead or unregistered node {}", nodeID);
       throw new IOException(
           "Got incremental block report from unregistered or dead node");
     }
+    try {
+      processIncrementalBlockReport(node, srdb);
+    } catch (Exception ex) {
+      node.setForceRegistration(true);
+      throw ex;
+    }
+  }
 
+  private void processIncrementalBlockReport(final DatanodeDescriptor node,
+      final StorageReceivedDeletedBlocks srdb) throws IOException {
     DatanodeStorageInfo storageInfo =
         node.getStorageInfo(srdb.getStorage().getStorageID());
     if (storageInfo == null) {
@@ -3251,6 +3270,10 @@ public class BlockManager implements BlockStatsMXBean {
       storageInfo = node.updateStorage(srdb.getStorage());
     }
 
+    int received = 0;
+    int deleted = 0;
+    int receiving = 0;
+
     for (ReceivedDeletedBlockInfo rdbi : srdb.getBlocks()) {
       switch (rdbi.getStatus()) {
       case DELETED_BLOCK:
@@ -3268,17 +3291,17 @@ public class BlockManager implements BlockStatsMXBean {
         break;
       default:
         String msg = 
-          "Unknown block status code reported by " + nodeID +
+          "Unknown block status code reported by " + node +
           ": " + rdbi;
         blockLog.warn(msg);
         assert false : msg; // if assertions are enabled, throw.
         break;
       }
       blockLog.debug("BLOCK* block {}: {} is received from {}",
-          rdbi.getStatus(), rdbi.getBlock(), nodeID);
+          rdbi.getStatus(), rdbi.getBlock(), node);
     }
     blockLog.debug("*BLOCK* NameNode.processIncrementalBlockReport: from "
-            + "{} receiving: {}, received: {}, deleted: {}", nodeID, receiving,
+            + "{} receiving: {}, received: {}, deleted: {}", node, receiving,
         received, deleted);
   }
 
@@ -3879,4 +3902,119 @@ public class BlockManager implements BlockStatsMXBean {
     return false;
   }
 
+  // async processing of an action, used for IBRs.
+  public void enqueueBlockOp(final Runnable action) throws IOException {
+    try {
+      blockReportThread.enqueue(action);
+    } catch (InterruptedException ie) {
+      throw new IOException(ie);
+    }
+  }
+
+  // sync batch processing for a full BR.
+  public <T> T runBlockOp(final Callable<T> action)
+      throws IOException {
+    final FutureTask<T> future = new FutureTask<T>(action);
+    enqueueBlockOp(future);
+    try {
+      return future.get();
+    } catch (ExecutionException ee) {
+      Throwable cause = ee.getCause();
+      if (cause == null) {
+        cause = ee;
+      }
+      if (!(cause instanceof IOException)) {
+        cause = new IOException(cause);
+      }
+      throw (IOException)cause;
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+      throw new IOException(ie);
+    }
+  }
+
+  @VisibleForTesting
+  public void flushBlockOps() throws IOException {
+    runBlockOp(new Callable<Void>(){
+      @Override
+      public Void call() {
+        return null;
+      }
+    });
+  }
+
+  public int getBlockOpQueueLength() {
+    return blockReportThread.queue.size();
+  }
+
+  private class BlockReportProcessingThread extends Thread {
+    private static final long MAX_LOCK_HOLD_MS = 4;
+    private long lastFull = 0;
+
+    private final BlockingQueue<Runnable> queue =
+        new ArrayBlockingQueue<Runnable>(1024);
+
+    BlockReportProcessingThread() {
+      super("Block report processor");
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      try {
+        processQueue();
+      } catch (Throwable t) {
+        ExitUtil.terminate(1,
+            getName() + " encountered fatal exception: " + t);
+      }
+    }
+
+    private void processQueue() {
+      while (namesystem.isRunning()) {
+        NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
+        try {
+          Runnable action = queue.take();
+          // batch as many operations in the write lock until the queue
+          // runs dry, or the max lock hold is reached.
+          int processed = 0;
+          namesystem.writeLock();
+          metrics.setBlockOpsQueued(queue.size() + 1);
+          try {
+            long start = Time.monotonicNow();
+            do {
+              processed++;
+              action.run();
+              if (Time.monotonicNow() - start > MAX_LOCK_HOLD_MS) {
+                break;
+              }
+              action = queue.poll();
+            } while (action != null);
+          } finally {
+            namesystem.writeUnlock();
+            metrics.addBlockOpsBatched(processed - 1);
+          }
+        } catch (InterruptedException e) {
+          // ignore unless thread was specifically interrupted.
+          if (Thread.interrupted()) {
+            break;
+          }
+        }
+      }
+      queue.clear();
+    }
+
+    void enqueue(Runnable action) throws InterruptedException {
+      if (!queue.offer(action)) {
+        if (!isAlive() && namesystem.isRunning()) {
+          ExitUtil.terminate(1, getName()+" is not running");
+        }
+        long now = Time.monotonicNow();
+        if (now - lastFull > 4000) {
+          lastFull = now;
+          LOG.info("Block report queue is full");
+        }
+        queue.put(action);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index fbace92..78c7f81 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -189,7 +189,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
   // This is an optimization, because contains takes O(n) time on Arraylist
   private boolean isAlive = false;
   private boolean needKeyUpdate = false;
-  
+  private boolean forceRegistration = false;
+
   // A system administrator can tune the balancer bandwidth parameter
   // (dfs.balance.bandwidthPerSec) dynamically by calling
   // "dfsadmin -setBalanacerBandwidth <newbandwidth>", at which point the
@@ -301,7 +302,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
-  DatanodeStorageInfo[] getStorageInfos() {
+  @VisibleForTesting
+  public DatanodeStorageInfo[] getStorageInfos() {
     synchronized (storageMap) {
       final Collection<DatanodeStorageInfo> storages = storageMap.values();
       return storages.toArray(new DatanodeStorageInfo[storages.size()]);
@@ -824,6 +826,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
       storage.setBlockReportCount(0);
     }
     heartbeatedSinceRegistration = false;
+    forceRegistration = false;
   }
 
   /**
@@ -906,6 +909,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
         return false;
     }
     return true;
- }
+  }
+
+  public void setForceRegistration(boolean force) {
+    forceRegistration = force;
+  }
+
+  public boolean isRegistered() {
+    return isAlive() && !forceRegistration;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 0828d91..802bb76 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1412,7 +1412,7 @@ public class DatanodeManager {
       throw new DisallowedDatanodeException(nodeinfo);
     }
 
-    if (nodeinfo == null || !nodeinfo.isAlive()) {
+    if (nodeinfo == null || !nodeinfo.isRegistered()) {
       return new DatanodeCommand[]{RegisterCommand.REGISTER};
     }
     heartbeatManager.updateHeartbeat(nodeinfo, reports, cacheCapacity,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 216d6d2..d4658e5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -151,7 +151,7 @@ public class DatanodeStorageInfo {
     this.state = s.getState();
   }
 
-  int getBlockReportCount() {
+  public int getBlockReportCount() {
     return blockReportCount;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
index 4fd9ca8..b1f936b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
@@ -934,7 +934,7 @@ public final class CacheManager {
     try {
       final DatanodeDescriptor datanode = 
           blockManager.getDatanodeManager().getDatanode(datanodeID);
-      if (datanode == null || !datanode.isAlive()) {
+      if (datanode == null || !datanode.isRegistered()) {
         throw new IOException(
             "processCacheReport from dead or unregistered datanode: " +
             datanode);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index a1fed93..7a5e99b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -36,6 +36,7 @@ import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 
 import com.google.common.collect.Lists;
 
@@ -1358,9 +1359,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
   }
 
   @Override // DatanodeProtocol
-  public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
-        String poolId, StorageBlockReport[] reports,
-        BlockReportContext context) throws IOException {
+  public DatanodeCommand blockReport(final DatanodeRegistration nodeReg,
+        String poolId, final StorageBlockReport[] reports,
+        final BlockReportContext context) throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     if(blockStateChangeLog.isDebugEnabled()) {
@@ -1376,8 +1377,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
       // for the same node and storage, so the value returned by the last
       // call of this loop is the final updated value for noStaleStorage.
       //
-      noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(),
-          blocks, context, (r == reports.length - 1));
+      final int index = r;
+      noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
+        @Override
+        public Boolean call() throws IOException {
+          return bm.processReport(nodeReg, reports[index].getStorage(),
+              blocks, context, (index == reports.length - 1));
+        }
+      });
       metrics.incrStorageBlockReportOps();
     }
     BlockManagerFaultInjector.getInstance().
@@ -1407,8 +1414,9 @@ class NameNodeRpcServer implements NamenodeProtocols {
   }
 
   @Override // DatanodeProtocol
-  public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String 
poolId,
-      StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws 
IOException {
+  public void blockReceivedAndDeleted(final DatanodeRegistration nodeReg,
+      String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks)
+          throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     metrics.incrBlockReceivedAndDeletedOps();
@@ -1417,8 +1425,22 @@ class NameNodeRpcServer implements NamenodeProtocols {
           +"from "+nodeReg+" "+receivedAndDeletedBlocks.length
           +" blocks.");
     }
-    for(StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
-      namesystem.processIncrementalBlockReport(nodeReg, r);
+    final BlockManager bm = namesystem.getBlockManager();
+    for (final StorageReceivedDeletedBlocks r : receivedAndDeletedBlocks) {
+      bm.enqueueBlockOp(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            namesystem.processIncrementalBlockReport(nodeReg, r);
+          } catch (Exception ex) {
+            // usually because the node is unregistered/dead.  next heartbeat
+            // will correct the problem
+            blockStateChangeLog.error(
+                "*BLOCK* NameNode.blockReceivedAndDeleted: "
+                    + "failed from " + nodeReg + ": " + ex.getMessage());
+          }
+        }
+      });
     }
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
index 31bc164..54b5c6e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
@@ -76,6 +76,10 @@ public class NameNodeMetrics {
   MutableCounterLong blockReceivedAndDeletedOps;
   @Metric("Number of blockReports from individual storages")
   MutableCounterLong storageBlockReportOps;
+  @Metric("Number of blockReports and blockReceivedAndDeleted queued")
+  MutableGaugeInt blockOpsQueued;
+  @Metric("Number of blockReports and blockReceivedAndDeleted batch processed")
+  MutableCounterLong blockOpsBatched;
 
   @Metric("Number of file system operations")
   public long totalFileOps(){
@@ -267,6 +271,14 @@ public class NameNodeMetrics {
     storageBlockReportOps.incr();
   }
 
+  public void setBlockOpsQueued(int size) {
+    blockOpsQueued.set(size);
+  }
+
+  public void addBlockOpsBatched(int count) {
+    blockOpsBatched.incr(count);
+  }
+
   public void addTransaction(long latency) {
     transactions.add(latency);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
index ee9fa4b..ccac99f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeRegistration.java
@@ -20,21 +20,32 @@ package org.apache.hadoop.hdfs;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.client.BlockReportOptions;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.VersionInfo;
 import org.junit.Test;
 
+import com.google.common.base.Supplier;
+
 import java.net.InetSocketAddress;
 import java.security.Permission;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.doReturn;
@@ -308,4 +319,131 @@ public class TestDatanodeRegistration {
       }
     }
   }
-}
+
+  // IBRs are async operations to free up IPC handlers.  This means the IBR
+  // response will not contain non-IPC level exceptions - which in practice
+  // should not occur other than dead/unregistered node which will trigger a
+  // re-registration.  If a non-IPC exception does occur, the safety net is
+  // a forced re-registration on the next heartbeat.
+  @Test(timeout=10000)
+  public void testForcedRegistration() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 4);
+    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 
Integer.MAX_VALUE);
+
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.waitActive();
+    cluster.getHttpUri(0);
+    FSNamesystem fsn = cluster.getNamesystem();
+    String bpId = fsn.getBlockPoolId();
+
+    DataNode dn = cluster.getDataNodes().get(0);
+    DatanodeDescriptor dnd =
+        NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
+    DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+    DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
+
+    // registration should not change after heartbeat.
+    assertTrue(dnd.isRegistered());
+    DatanodeRegistration lastReg = dn.getDNRegistrationForBP(bpId);
+    waitForHeartbeat(dn, dnd);
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+    // force a re-registration on next heartbeat.
+    dnd.setForceRegistration(true);
+    assertFalse(dnd.isRegistered());
+    waitForHeartbeat(dn, dnd);
+    assertTrue(dnd.isRegistered());
+    DatanodeRegistration newReg = dn.getDNRegistrationForBP(bpId);
+    assertNotSame(lastReg, newReg);
+    lastReg = newReg;
+
+    // registration should not change on subsequent heartbeats.
+    waitForHeartbeat(dn, dnd);
+    assertTrue(dnd.isRegistered());
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+    assertTrue(waitForBlockReport(dn, dnd));
+    assertTrue(dnd.isRegistered());
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+    // check that block report is not processed and registration didn't change.
+    dnd.setForceRegistration(true);
+    assertFalse(waitForBlockReport(dn, dnd));
+    assertFalse(dnd.isRegistered());
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+    // heartbeat should trigger re-registration, and next block report should
+    // not change registration.
+    waitForHeartbeat(dn, dnd);
+    assertTrue(dnd.isRegistered());
+    newReg = dn.getDNRegistrationForBP(bpId);
+    assertNotSame(lastReg, newReg);
+    lastReg = newReg;
+    assertTrue(waitForBlockReport(dn, dnd));
+    assertTrue(dnd.isRegistered());
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+    // registration doesn't change.
+    ExtendedBlock eb = new ExtendedBlock(bpId, 1234);
+    dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
+    DataNodeTestUtils.triggerDeletionReport(dn);
+    assertTrue(dnd.isRegistered());
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+
+    // a failed IBR will effectively unregister the node.
+    boolean failed = false;
+    try {
+      // pass null to cause a failure since there aren't any easy failure
+      // modes since it shouldn't happen.
+      fsn.processIncrementalBlockReport(lastReg, null);
+    } catch (NullPointerException npe) {
+      failed = true;
+    }
+    assertTrue("didn't fail", failed);
+    assertFalse(dnd.isRegistered());
+
+    // should remain unregistered until next heartbeat.
+    dn.notifyNamenodeDeletedBlock(eb, storage.getStorageID());
+    DataNodeTestUtils.triggerDeletionReport(dn);
+    assertFalse(dnd.isRegistered());
+    assertSame(lastReg, dn.getDNRegistrationForBP(bpId));
+    waitForHeartbeat(dn, dnd);
+    assertTrue(dnd.isRegistered());
+    assertNotSame(lastReg, dn.getDNRegistrationForBP(bpId));
+  }
+
+  private void waitForHeartbeat(final DataNode dn, final DatanodeDescriptor 
dnd)
+      throws Exception {
+    final long lastUpdate = dnd.getLastUpdateMonotonic();
+    Thread.sleep(1);
+    DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+    DataNodeTestUtils.triggerHeartbeat(dn);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return lastUpdate != dnd.getLastUpdateMonotonic();
+      }
+    }, 10, 100000);
+    DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+  }
+
+  private boolean waitForBlockReport(final DataNode dn,
+      final DatanodeDescriptor dnd) throws Exception {
+    final DatanodeStorageInfo storage = dnd.getStorageInfos()[0];
+    final long lastCount = storage.getBlockReportCount();
+    dn.triggerBlockReport(
+        new BlockReportOptions.Factory().setIncremental(false).build());
+    try {
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          return lastCount != storage.getBlockReportCount();
+        }
+      }, 10, 100);
+    } catch (TimeoutException te) {
+      return false;
+    }
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 5c874ba..1cebac1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import static 
org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -34,8 +35,20 @@ import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
@@ -60,8 +73,12 @@ import 
org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.MetricsAsserts;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -831,4 +848,158 @@ public class TestBlockManager {
     Assert.assertFalse(policyDefault.useDelHint(delHint, null, moreThan1Racks,
         null, excessTypes));
   }
-}
+
+  @Test
+  public void testBlockReportQueueing() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      cluster.waitActive();
+      final FSNamesystem fsn = cluster.getNamesystem();
+      final BlockManager bm = fsn.getBlockManager();
+      final ExecutorService executor = Executors.newCachedThreadPool();
+
+      final CyclicBarrier startBarrier = new CyclicBarrier(2);
+      final CountDownLatch endLatch = new CountDownLatch(3);
+
+      // create a task intended to block while processing, thus causing
+      // the queue to backup.  simulates how a full BR is processed.
+      FutureTask<?> blockingOp = new FutureTask<Void>(
+          new Callable<Void>(){
+            @Override
+            public Void call() throws IOException {
+              return bm.runBlockOp(new Callable<Void>() {
+                @Override
+                public Void call()
+                    throws InterruptedException, BrokenBarrierException {
+                  // use a barrier to control the blocking.
+                  startBarrier.await();
+                  endLatch.countDown();
+                  return null;
+                }
+              });
+            }
+          });
+
+      // create an async task.  simulates how an IBR is processed.
+      Callable<?> asyncOp = new Callable<Void>(){
+        @Override
+        public Void call() throws IOException {
+          bm.enqueueBlockOp(new Runnable() {
+            @Override
+            public void run() {
+              // use the latch to signal if the op has run.
+              endLatch.countDown();
+            }
+          });
+          return null;
+        }
+      };
+
+      // calling get forces its execution so we can test if it's blocked.
+      Future<?> blockedFuture = executor.submit(blockingOp);
+      boolean isBlocked = false;
+      try {
+        // wait 1s for the future to block.  it should run instantaneously.
+        blockedFuture.get(1, TimeUnit.SECONDS);
+      } catch (TimeoutException te) {
+        isBlocked = true;
+      }
+      assertTrue(isBlocked);
+
+      // should effectively return immediately since calls are queued.
+      // however they should be backed up in the queue behind the blocking
+      // operation.
+      executor.submit(asyncOp).get(1, TimeUnit.SECONDS);
+      executor.submit(asyncOp).get(1, TimeUnit.SECONDS);
+
+      // check the async calls are queued, and first is still blocked.
+      assertEquals(2, bm.getBlockOpQueueLength());
+      assertFalse(blockedFuture.isDone());
+
+      // unblock the queue, wait for last op to complete, check the blocked
+      // call has returned
+      startBarrier.await(1, TimeUnit.SECONDS);
+      assertTrue(endLatch.await(1, TimeUnit.SECONDS));
+      assertEquals(0, bm.getBlockOpQueueLength());
+      assertTrue(blockingOp.isDone());
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  // spam the block manager with IBRs to verify queuing is occurring.
+  @Test
+  public void testAsyncIBR() throws Exception {
+    Logger.getRootLogger().setLevel(Level.WARN);
+
+    // will create files with many small blocks.
+    final int blkSize = 4*1024;
+    final int fileSize = blkSize * 100;
+    final byte[] buf = new byte[2*blkSize];
+    final int numWriters = 4;
+    final int repl = 3;
+
+    final CyclicBarrier barrier = new CyclicBarrier(numWriters);
+    final CountDownLatch writeLatch = new CountDownLatch(numWriters);
+    final AtomicBoolean failure = new AtomicBoolean();
+
+    final Configuration conf = new HdfsConfiguration();
+    conf.getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, blkSize);
+    final MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(8).build();
+
+    try {
+      cluster.waitActive();
+      // create multiple writer threads to create a file with many blocks.
+      // will test that concurrent writing causes IBR batching in the NN
+      Thread[] writers = new Thread[numWriters];
+      for (int i=0; i < writers.length; i++) {
+        final Path p = new Path("/writer"+i);
+        writers[i] = new Thread(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              FileSystem fs = cluster.getFileSystem();
+              FSDataOutputStream os =
+                  fs.create(p, true, buf.length, (short)repl, blkSize);
+              // align writers for maximum chance of IBR batching.
+              barrier.await();
+              int remaining = fileSize;
+              while (remaining > 0) {
+                os.write(buf);
+                remaining -= buf.length;
+              }
+              os.close();
+            } catch (Exception e) {
+              e.printStackTrace();
+              failure.set(true);
+            }
+            // let main thread know we are done.
+            writeLatch.countDown();
+          }
+        });
+        writers[i].start();
+      }
+
+      // when and how many IBRs are queued is indeterminate, so just watch
+      // the metrics and verify something was queued at during execution.
+      boolean sawQueued = false;
+      while (!writeLatch.await(10, TimeUnit.MILLISECONDS)) {
+        assertFalse(failure.get());
+        MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
+        long queued = MetricsAsserts.getIntGauge("BlockOpsQueued", rb);
+        sawQueued |= (queued > 0);
+      }
+      assertFalse(failure.get());
+      assertTrue(sawQueued);
+
+      // verify that batching of the IBRs occurred.
+      MetricsRecordBuilder rb = getMetrics("NameNodeActivity");
+      long batched = MetricsAsserts.getLongCounter("BlockOpsBatched", rb);
+      assertTrue(batched > 0);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
index 3d399a2..b5b0cf2 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java
@@ -304,7 +304,8 @@ public class TestPendingReplication {
           reportDnNum++;
         }
       }
-
+      // IBRs are async, make sure the NN processes all of them.
+      cluster.getNamesystem().getBlockManager().flushBlockOps();
       assertEquals(DATANODE_COUNT - 3,
           blkManager.pendingReplications.getNumReplicas(blocks[0]));
 
@@ -322,6 +323,7 @@ public class TestPendingReplication {
         }
       }
 
+      cluster.getNamesystem().getBlockManager().flushBlockOps();
       assertEquals(DATANODE_COUNT - 3,
           blkManager.pendingReplications.getNumReplicas(blocks[0]));
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index c4a2d06..0a57005 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -113,9 +113,13 @@ public abstract class BlockReportTestBase {
 
   @After
   public void shutDownCluster() throws IOException {
-    fs.close();
-    cluster.shutdownDataNodes();
-    cluster.shutdown();
+    if (fs != null) {
+      fs.close();
+    }
+    if (cluster != null) {
+      cluster.shutdownDataNodes();
+      cluster.shutdown();
+    }
   }
 
   protected static void resetConfiguration() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
index 989e216..0801701 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestIncrementalBrVariations.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.*;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.UUID;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -187,7 +186,9 @@ public class TestIncrementalBrVariations {
       }
 
       // Make sure that the deleted block from each storage was picked up
-      // by the NameNode.
+      // by the NameNode.  IBRs are async, make sure the NN processes
+      // all of them.
+      cluster.getNamesystem().getBlockManager().flushBlockOps();
       assertThat(cluster.getNamesystem().getMissingBlocksCount(),
           is((long) reports.length));
     }
@@ -256,7 +257,8 @@ public class TestIncrementalBrVariations {
 
     // Send the report to the NN.
     cluster.getNameNodeRpc().blockReceivedAndDeleted(dn0Reg, poolId, reports);
-
+    // IBRs are async, make sure the NN processes all of them.
+    cluster.getNamesystem().getBlockManager().flushBlockOps();
     // Make sure that the NN has learned of the new storage.
     DatanodeStorageInfo storageInfo = cluster.getNameNode()
                                              .getNamesystem()

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3be39d02/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index c5262d4..bfd026c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -100,14 +100,14 @@ public class TestDeadDatanode {
         null) };
     StorageReceivedDeletedBlocks[] storageBlocks = { 
         new StorageReceivedDeletedBlocks(reg.getDatanodeUuid(), blocks) };
-    
-    // Ensure blockReceived call from dead datanode is rejected with 
IOException
-    try {
-      dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
-      fail("Expected IOException is not thrown");
-    } catch (IOException ex) {
-      // Expected
-    }
+
+    // Ensure blockReceived call from dead datanode is not rejected with
+    // IOException, since it's async, but the node remains unregistered.
+    dnp.blockReceivedAndDeleted(reg, poolId, storageBlocks);
+    BlockManager bm = cluster.getNamesystem().getBlockManager();
+    // IBRs are async, make sure the NN processes all of them.
+    bm.flushBlockOps();
+    assertFalse(bm.getDatanodeManager().getDatanode(reg).isRegistered());
 
     // Ensure blockReport from dead datanode is rejected with IOException
     StorageBlockReport[] report = { new StorageBlockReport(

Reply via email to