Repository: hadoop
Updated Branches:
  refs/heads/HADOOP-12756 846ada2de -> 9f473cf90


HDFS-10301. Interleaving processing of storages from repeated block reports 
causes false zombie storage detection, removes valid blocks. Contributed by 
Vinitha Gankidi.

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/85a20508
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/85a20508
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/85a20508

Branch: refs/heads/HADOOP-12756
Commit: 85a20508bd04851d47c24b7562ec2927d5403446
Parents: 59466b8
Author: Vinitha Reddy Gankidi <vigank...@linkedin.com>
Authored: Mon Jul 25 16:56:45 2016 -0700
Committer: Konstantin V Shvachko <s...@apache.org>
Committed: Mon Jul 25 18:50:59 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/protocol/BlockListAsLongs.java  | 45 +++++++++++
 .../server/blockmanagement/BlockManager.java    | 52 +++++++------
 .../BlockReportLeaseManager.java                |  4 +-
 .../blockmanagement/DatanodeDescriptor.java     | 29 +------
 .../blockmanagement/DatanodeStorageInfo.java    | 11 ---
 .../hdfs/server/datanode/BPServiceActor.java    | 35 +++++++--
 .../hdfs/server/namenode/NameNodeRpcServer.java | 40 ++++++----
 .../blockmanagement/TestBlockManager.java       | 19 ++---
 .../TestNameNodePrunesMissingStorages.java      | 80 ++++++++++++++++++--
 ...TestDnRespectsBlockReportSplitThreshold.java | 33 +++++++-
 .../TestNNHandlesBlockReportPerStorage.java     | 34 +++++++--
 11 files changed, 274 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/85a20508/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
index 26c7ffb..26340a9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
@@ -63,6 +63,34 @@ public abstract class BlockListAsLongs implements 
Iterable<BlockReportReplica> {
     public Iterator<BlockReportReplica> iterator() {
       return Collections.emptyIterator();
     }
+    @Override
+    public boolean isStorageReport() {
+      return false;
+    }
+  };
+
+  // STORAGE_REPORT is used to report all storages in the DN
+  public static final BlockListAsLongs STORAGE_REPORT = new BlockListAsLongs() 
{
+    @Override
+    public int getNumberOfBlocks() {
+      return -1;
+    }
+    @Override
+    public ByteString getBlocksBuffer() {
+      return ByteString.EMPTY;
+    }
+    @Override
+    public long[] getBlockListAsLongs() {
+      return EMPTY_LONGS;
+    }
+    @Override
+    public Iterator<BlockReportReplica> iterator() {
+      return Collections.emptyIterator();
+    }
+    @Override
+    public boolean isStorageReport() {
+      return true;
+    }
   };
 
   /**
@@ -253,6 +281,13 @@ public abstract class BlockListAsLongs implements 
Iterable<BlockReportReplica> {
   abstract public long[] getBlockListAsLongs();
 
   /**
+   * Return true for STORAGE_REPORT BlocksListsAsLongs.
+   * Otherwise return false.
+   * @return boolean
+   */
+  abstract public boolean isStorageReport();
+
+  /**
    * Returns a singleton iterator over blocks in the block report.  Do not
    * add the returned blocks to a collection.
    * @return Iterator
@@ -392,6 +427,11 @@ public abstract class BlockListAsLongs implements 
Iterable<BlockReportReplica> {
     }
 
     @Override
+    public boolean isStorageReport() {
+      return false;
+    }
+
+    @Override
     public Iterator<BlockReportReplica> iterator() {
       return new Iterator<BlockReportReplica>() {
         final BlockReportReplica block = new BlockReportReplica();
@@ -475,6 +515,11 @@ public abstract class BlockListAsLongs implements 
Iterable<BlockReportReplica> {
     }
 
     @Override
+    public boolean isStorageReport() {
+      return false;
+    }
+
+    @Override
     public Iterator<BlockReportReplica> iterator() {
       return new Iterator<BlockReportReplica>() {
         private final BlockReportReplica block = new BlockReportReplica();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85a20508/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 349b018..d927b2a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2138,7 +2138,7 @@ public class BlockManager implements BlockStatsMXBean {
   public boolean processReport(final DatanodeID nodeID,
       final DatanodeStorage storage,
       final BlockListAsLongs newReport,
-      BlockReportContext context, boolean lastStorageInRpc) throws IOException 
{
+      BlockReportContext context) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.monotonicNow(); //after acquiring write lock
     final long endTime;
@@ -2189,30 +2189,14 @@ public class BlockManager implements BlockStatsMXBean {
       
       storageInfo.receivedBlockReport();
       if (context != null) {
-        storageInfo.setLastBlockReportId(context.getReportId());
-        if (lastStorageInRpc) {
-          int rpcsSeen = node.updateBlockReportContext(context);
-          if (rpcsSeen >= context.getTotalRpcs()) {
-            long leaseId = blockReportLeaseManager.removeLease(node);
-            BlockManagerFaultInjector.getInstance().
-                removeBlockReportLease(node, leaseId);
-            List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
-            if (zombies.isEmpty()) {
-              LOG.debug("processReport 0x{}: no zombie storages found.",
-                  Long.toHexString(context.getReportId()));
-            } else {
-              for (DatanodeStorageInfo zombie : zombies) {
-                removeZombieReplicas(context, zombie);
-              }
-            }
-            node.clearBlockReportContext();
-          } else {
-            LOG.debug("processReport 0x{}: {} more RPCs remaining in this " +
-                    "report.", Long.toHexString(context.getReportId()),
-                (context.getTotalRpcs() - rpcsSeen)
-            );
-          }
+        if (context.getTotalRpcs() == context.getCurRpc() + 1) {
+          long leaseId = this.getBlockReportLeaseManager().removeLease(node);
+          BlockManagerFaultInjector.getInstance().
+              removeBlockReportLease(node, leaseId);
         }
+        LOG.debug("Processing RPC with index {} out of total {} RPCs in "
+                + "processReport 0x{}", context.getCurRpc(),
+            context.getTotalRpcs(), Long.toHexString(context.getReportId()));
       }
     } finally {
       endTime = Time.monotonicNow();
@@ -2238,6 +2222,26 @@ public class BlockManager implements BlockStatsMXBean {
     return !node.hasStaleStorages();
   }
 
+  public void removeZombieStorages(DatanodeRegistration nodeReg,
+      BlockReportContext context, Set<String> storageIDsInBlockReport)
+      throws UnregisteredNodeException {
+    namesystem.writeLock();
+    DatanodeDescriptor node = this.getDatanodeManager().getDatanode(nodeReg);
+    if (node != null) {
+      List<DatanodeStorageInfo> zombies =
+          node.removeZombieStorages(storageIDsInBlockReport);
+      if (zombies.isEmpty()) {
+        LOG.debug("processReport 0x{}: no zombie storages found.",
+            Long.toHexString(context.getReportId()));
+      } else {
+        for (DatanodeStorageInfo zombie : zombies) {
+          this.removeZombieReplicas(context, zombie);
+        }
+      }
+    }
+    namesystem.writeUnlock();
+  }
+
   private void removeZombieReplicas(BlockReportContext context,
       DatanodeStorageInfo zombie) {
     LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85a20508/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
index 7db05c7..34e0949 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
@@ -308,10 +308,10 @@ class BlockReportLeaseManager {
       return false;
     }
     if (node.leaseId == 0) {
-      LOG.warn("BR lease 0x{} is not valid for DN {}, because the DN " +
+      LOG.warn("BR lease 0x{} is not found for DN {}, because the DN " +
                "is not in the pending set.",
                Long.toHexString(id), dn.getDatanodeUuid());
-      return false;
+      return true;
     }
     if (pruneIfExpired(monotonicNowMs, node)) {
       LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85a20508/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 1646129..d807ab6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -43,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
-import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -154,9 +152,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
   public final DecommissioningStatus decommissioningStatus =
       new DecommissioningStatus();
 
-  private long curBlockReportId = 0;
-
-  private BitSet curBlockReportRpcsSeen = null;
 
   private final Map<String, DatanodeStorageInfo> storageMap =
       new HashMap<>();
@@ -257,20 +252,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
     updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
   }
 
-  public int updateBlockReportContext(BlockReportContext context) {
-    if (curBlockReportId != context.getReportId()) {
-      curBlockReportId = context.getReportId();
-      curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
-    }
-    curBlockReportRpcsSeen.set(context.getCurRpc());
-    return curBlockReportRpcsSeen.cardinality();
-  }
-
-  public void clearBlockReportContext() {
-    curBlockReportId = 0;
-    curBlockReportRpcsSeen = null;
-  }
-
   public CachedBlocksList getPendingCached() {
     return pendingCached;
   }
@@ -334,7 +315,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
-  List<DatanodeStorageInfo> removeZombieStorages() {
+  List<DatanodeStorageInfo>
+      removeZombieStorages(Set<String> storageIDsInBlockReport) {
     List<DatanodeStorageInfo> zombies = null;
     synchronized (storageMap) {
       Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
@@ -342,18 +324,13 @@ public class DatanodeDescriptor extends DatanodeInfo {
       while (iter.hasNext()) {
         Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
         DatanodeStorageInfo storageInfo = entry.getValue();
-        if (storageInfo.getLastBlockReportId() != curBlockReportId) {
-          LOG.info("{} had lastBlockReportId 0x{} but curBlockReportId = 0x{}",
-              storageInfo.getStorageID(),
-              Long.toHexString(storageInfo.getLastBlockReportId()),
-              Long.toHexString(curBlockReportId));
+        if (!storageIDsInBlockReport.contains(storageInfo.getStorageID())) {
           iter.remove();
           if (zombies == null) {
             zombies = new LinkedList<>();
           }
           zombies.add(storageInfo);
         }
-        storageInfo.setLastBlockReportId(0);
       }
     }
     return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85a20508/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index 843a8d5..1b7cd7c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -98,9 +98,6 @@ public class DatanodeStorageInfo {
 
   private final FoldedTreeSet<BlockInfo> blocks = new FoldedTreeSet<>();
 
-  // The ID of the last full block report which updated this storage.
-  private long lastBlockReportId = 0;
-
   /** The number of block reports received */
   private int blockReportCount = 0;
 
@@ -165,14 +162,6 @@ public class DatanodeStorageInfo {
     this.blockPoolUsed = blockPoolUsed;
   }
 
-  long getLastBlockReportId() {
-    return lastBlockReportId;
-  }
-
-  void setLastBlockReportId(long lastBlockReportId) {
-    this.lastBlockReportId = lastBlockReportId;
-  }
-
   State getState() {
     return this.state;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85a20508/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 69989fb..f18cf0b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -367,11 +367,36 @@ class BPServiceActor implements Runnable {
       } else {
         // Send one block report per message.
         for (int r = 0; r < reports.length; r++) {
-          StorageBlockReport singleReport[] = { reports[r] };
-          DatanodeCommand cmd = bpNamenode.blockReport(
-              bpRegistration, bpos.getBlockPoolId(), singleReport,
-              new BlockReportContext(reports.length, r, reportId,
-                  fullBrLeaseId, true));
+          StorageBlockReport[] singleReport = {reports[r]};
+          DatanodeCommand cmd;
+          if (r != reports.length - 1) {
+            cmd = bpNamenode.blockReport(bpRegistration, bpos.getBlockPoolId(),
+                singleReport, new BlockReportContext(reports.length, r,
+                    reportId, fullBrLeaseId, true));
+          } else {
+            StorageBlockReport[] lastSplitReport =
+                new StorageBlockReport[perVolumeBlockLists.size()];
+            // When block reports are split, the last RPC in the block report
+            // has the information about all storages in the block report.
+            // See HDFS-10301 for more details. To achieve this, the last RPC
+            // has 'n' storage reports, where 'n' is the number of storages in
+            // a DN. The actual block replicas are reported only for the
+            // last/n-th storage.
+            i = 0;
+            for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair :
+                perVolumeBlockLists.entrySet()) {
+              lastSplitReport[i++] = new StorageBlockReport(
+                  kvPair.getKey(), BlockListAsLongs.STORAGE_REPORT);
+              if (i == r) {
+                lastSplitReport[i] = reports[r];
+                break;
+              }
+            }
+            cmd = bpNamenode.blockReport(
+                bpRegistration, bpos.getBlockPoolId(), lastSplitReport,
+                new BlockReportContext(reports.length, r, reportId,
+                    fullBrLeaseId, true));
+          }
           numReportsSent++;
           numRPCs++;
           if (cmd != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85a20508/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 6b52949..3f36fcc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1435,25 +1435,37 @@ public class NameNodeRpcServer implements 
NamenodeProtocols {
     boolean noStaleStorages = false;
     for (int r = 0; r < reports.length; r++) {
       final BlockListAsLongs blocks = reports[r].getBlocks();
-      //
-      // BlockManager.processReport accumulates information of prior calls
-      // for the same node and storage, so the value returned by the last
-      // call of this loop is the final updated value for noStaleStorage.
-      //
-      final int index = r;
-      noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
-        @Override
-        public Boolean call() throws IOException {
-          return bm.processReport(nodeReg, reports[index].getStorage(),
-              blocks, context, (index == reports.length - 1));
-        }
-      });
-      metrics.incrStorageBlockReportOps();
+      if (!blocks.isStorageReport()) {
+        //
+        // BlockManager.processReport accumulates information of prior calls
+        // for the same node and storage, so the value returned by the last
+        // call of this loop is the final updated value for noStaleStorage.
+        //
+        final int index = r;
+        noStaleStorages = bm.runBlockOp(new Callable<Boolean>() {
+          @Override
+          public Boolean call()
+              throws IOException {
+            return bm.processReport(nodeReg, reports[index].getStorage(),
+                blocks, context);
+          }
+        });
+        metrics.incrStorageBlockReportOps();
+      }
     }
     BlockManagerFaultInjector.getInstance().
         incomingBlockReportRpc(nodeReg, context);
 
     if (nn.getFSImage().isUpgradeFinalized() &&
+        context.getTotalRpcs() == context.getCurRpc() + 1) {
+      Set<String> storageIDsInBlockReport = new HashSet<>();
+      for (StorageBlockReport report : reports) {
+        storageIDsInBlockReport.add(report.getStorage().getStorageID());
+      }
+      bm.removeZombieStorages(nodeReg, context, storageIDsInBlockReport);
+    }
+
+    if (nn.getFSImage().isUpgradeFinalized() &&
         !namesystem.isRollingUpgrade() &&
         !nn.isStandbyState() &&
         noStaleStorages) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85a20508/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index 394fae9..8c231d1 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -713,12 +713,12 @@ public class TestBlockManager {
     reset(node);
     
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        BlockListAsLongs.EMPTY, null, false);
+        BlockListAsLongs.EMPTY, null);
     assertEquals(1, ds.getBlockReportCount());
     // send block report again, should NOT be processed
     reset(node);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        BlockListAsLongs.EMPTY, null, false);
+        BlockListAsLongs.EMPTY, null);
     assertEquals(1, ds.getBlockReportCount());
 
     // re-register as if node restarted, should update existing node
@@ -729,7 +729,7 @@ public class TestBlockManager {
     // send block report, should be processed after restart
     reset(node);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-                     BlockListAsLongs.EMPTY, null, false);
+                     BlockListAsLongs.EMPTY, null);
     // Reinitialize as registration with empty storage list pruned
     // node.storageMap.
     ds = node.getStorageInfos()[0];
@@ -758,7 +758,7 @@ public class TestBlockManager {
     reset(node);
     doReturn(1).when(node).numBlocks();
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        BlockListAsLongs.EMPTY, null, false);
+        BlockListAsLongs.EMPTY, null);
     assertEquals(1, ds.getBlockReportCount());
   }
 
@@ -832,7 +832,7 @@ public class TestBlockManager {
     assertEquals(0, ds.getBlockReportCount());
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
         builder.build(),
-        new BlockReportContext(1, 0, System.nanoTime(), 0, true), false);
+        new BlockReportContext(1, 0, System.nanoTime(), 0, true));
     assertEquals(1, ds.getBlockReportCount());
 
     // verify the storage info is correct
@@ -871,8 +871,7 @@ public class TestBlockManager {
     assertEquals(0, ds.getBlockReportCount());
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
                      generateReport(blocks),
-                     new BlockReportContext(1, 0, System.nanoTime(), 0, false),
-                     false);
+                     new BlockReportContext(1, 0, System.nanoTime(), 0, 
false));
     assertEquals(1, ds.getBlockReportCount());
     // verify the storage info is correct
     for (BlockInfo block : blocks) {
@@ -882,8 +881,7 @@ public class TestBlockManager {
     // Send unsorted report
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
                      generateReport(blocks),
-                     new BlockReportContext(1, 0, System.nanoTime(), 0, false),
-                     false);
+                     new BlockReportContext(1, 0, System.nanoTime(), 0, 
false));
     assertEquals(2, ds.getBlockReportCount());
     // verify the storage info is correct
     for (BlockInfo block : blocks) {
@@ -894,8 +892,7 @@ public class TestBlockManager {
     Collections.sort(blocks);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
                      generateReport(blocks),
-                     new BlockReportContext(1, 0, System.nanoTime(), 0, true),
-                     false);
+                     new BlockReportContext(1, 0, System.nanoTime(), 0, true));
     assertEquals(3, ds.getBlockReportCount());
     // verify the storage info is correct
     for (BlockInfo block : blocks) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85a20508/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index b11b48a..be38afe 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -19,34 +19,40 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import com.google.common.base.Supplier;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeReferences;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
@@ -55,8 +61,6 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
-import java.util.Arrays;
-import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
@@ -368,4 +372,68 @@ public class TestNameNodePrunesMissingStorages {
       cluster.shutdown();
     }
   }
+
+  @Test(timeout=300000)
+  public void testInterleavedFullBlockReports() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
+        36000000L);
+    int numStoragesPerDatanode = 6;
+    final MiniDFSCluster cluster = new MiniDFSCluster
+        .Builder(conf).numDataNodes(1)
+        .storagesPerDatanode(numStoragesPerDatanode)
+        .build();
+    try {
+      LOG.info("waiting for cluster to become active...");
+      cluster.waitActive();
+      // Get the datanode registration and the block reports
+      DataNode dn = cluster.getDataNodes().get(0);
+      final String blockPoolId = cluster.getNamesystem().getBlockPoolId();
+      LOG.info("Block pool id: " + blockPoolId);
+      final DatanodeRegistration dnR = dn.getDNRegistrationForBP(blockPoolId);
+      Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
+          dn.getFSDataset().getBlockReports(blockPoolId);
+      final StorageBlockReport[] reports =
+          new StorageBlockReport[perVolumeBlockLists.size()];
+      int reportIndex = 0;
+      for(Map.Entry<DatanodeStorage, BlockListAsLongs> kvPair :
+          perVolumeBlockLists.entrySet()) {
+        DatanodeStorage dnStorage = kvPair.getKey();
+        BlockListAsLongs blockList = kvPair.getValue();
+        reports[reportIndex++] =
+            new StorageBlockReport(dnStorage, blockList);
+      }
+      // Get the list of storage ids associated with the datanode
+      // before the test
+      BlockManager bm =
+          cluster.getNameNode().getNamesystem().getBlockManager();
+      final DatanodeDescriptor dnDescriptor = bm.getDatanodeManager().
+          getDatanode(cluster.getDataNodes().get(0).getDatanodeUuid());
+      DatanodeStorageInfo[] storageInfos = dnDescriptor.getStorageInfos();
+      // Send the full block report concurrently using
+      // numThreads=numStoragesPerDatanode
+      ExecutorService executorService = Executors.
+          newFixedThreadPool(numStoragesPerDatanode);
+      List<Future<DatanodeCommand>> futureList =
+          new ArrayList<>(numStoragesPerDatanode);
+      for (int i = 0; i < numStoragesPerDatanode; i++) {
+        futureList.add(executorService.submit(new Callable<DatanodeCommand>() {
+          @Override
+          public DatanodeCommand call() throws IOException {
+            return cluster.getNameNodeRpc().blockReport(dnR, blockPoolId,
+                 reports, new BlockReportContext(1, 0, System.nanoTime(),
+                     0L, true));
+          }
+        }));
+      }
+      for (Future<DatanodeCommand> future: futureList) {
+        future.get();
+      }
+      executorService.shutdown();
+      // Verify that the storages match before and after the test
+      Assert.assertArrayEquals(storageInfos, dnDescriptor.getStorageInfos());
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85a20508/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
index bf0e3c1..f41c546 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
@@ -41,6 +41,7 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.times;
@@ -88,6 +89,34 @@ public class TestDnRespectsBlockReportSplitThreshold {
         blockCount * BLOCK_SIZE, BLOCK_SIZE, REPL_FACTOR, seed);
   }
 
+  private void verifyCapturedArgumentsSplit(
+      ArgumentCaptor<StorageBlockReport[]> captor,
+      int expectedReportsPerCall,
+      int expectedTotalBlockCount) {
+    List<StorageBlockReport[]> listOfReports = captor.getAllValues();
+    int numBlocksReported = 0;
+    int storageIndex = 0;
+    int listOfReportsSize = listOfReports.size();
+    for (StorageBlockReport[] reports : listOfReports) {
+      if (storageIndex < (listOfReportsSize - 1)) {
+        assertThat(reports.length, is(expectedReportsPerCall));
+      } else {
+        assertThat(reports.length, is(listOfReportsSize));
+      }
+      for (StorageBlockReport report : reports) {
+        BlockListAsLongs blockList = report.getBlocks();
+        if (!blockList.isStorageReport()) {
+          numBlocksReported += blockList.getNumberOfBlocks();
+        } else {
+          assertEquals(blockList.getNumberOfBlocks(), -1);
+        }
+      }
+      storageIndex++;
+    }
+
+    assert(numBlocksReported >= expectedTotalBlockCount);
+  }
+
   private void verifyCapturedArguments(
       ArgumentCaptor<StorageBlockReport[]> captor,
       int expectedReportsPerCall,
@@ -136,7 +165,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
         anyString(),
         captor.capture(), Mockito.<BlockReportContext>anyObject());
 
-    verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
+    verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE);
   }
 
   /**
@@ -200,7 +229,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
         anyString(),
         captor.capture(), Mockito.<BlockReportContext>anyObject());
 
-    verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
+    verifyCapturedArgumentsSplit(captor, 1, BLOCKS_IN_FILE);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85a20508/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
index 791ee20..524243b 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@@ -34,13 +35,32 @@ public class TestNNHandlesBlockReportPerStorage extends 
BlockReportTestBase {
   @Override
   protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
       StorageBlockReport[] reports) throws IOException {
-    int i = 0;
-    for (StorageBlockReport report : reports) {
-      LOG.info("Sending block report for storage " + 
report.getStorage().getStorageID());
-      StorageBlockReport[] singletonReport = { report };
-      cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
-          new BlockReportContext(reports.length, i, System.nanoTime(), 0L, 
true));
-      i++;
+    for (int r = 0; r < reports.length; r++) {
+      LOG.info("Sending block report for storage " +
+          reports[r].getStorage().getStorageID());
+      StorageBlockReport[] singletonReport = {reports[r]};
+      if (r != reports.length - 1) {
+        cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
+            new BlockReportContext(reports.length, r, System.nanoTime(),
+                0L, true));
+      } else {
+        StorageBlockReport[] lastSplitReport =
+            new StorageBlockReport[reports.length];
+        // When block reports are split, send a dummy storage report for all
+        // other storages in the blockreport along with the last storage report
+        for (int i = 0; i <= r; i++) {
+          if (i == r) {
+            lastSplitReport[i] = reports[r];
+            break;
+          }
+          lastSplitReport[i] =
+              new StorageBlockReport(reports[i].getStorage(),
+                  BlockListAsLongs.STORAGE_REPORT);
+        }
+        cluster.getNameNodeRpc().blockReport(dnR, poolId, lastSplitReport,
+            new BlockReportContext(reports.length, r, System.nanoTime(),
+                0L, true));
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to