HDFS-7225. Remove stale block invalidation work when DN re-registers with 
different UUID. (Zhe Zhang and Andrew Wang)

(cherry picked from commit 406c09ad1150c4971c2b7675fcb0263d40517fbf)
(cherry picked from commit 2e15754a92c6589308ccbbb646166353cc2f2456)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/014d07de
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/014d07de
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/014d07de

Branch: refs/heads/sjlee/hdfs-merge
Commit: 014d07de2e9b39be4b6793f0e09fcf8548570ad5
Parents: d79a584
Author: Andrew Wang <w...@apache.org>
Authored: Tue Nov 18 22:14:04 2014 -0800
Committer: Sangjin Lee <sj...@apache.org>
Committed: Wed Aug 12 21:32:30 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../server/blockmanagement/BlockManager.java    |  21 ++-
 .../server/blockmanagement/DatanodeManager.java |   2 +
 .../TestComputeInvalidateWork.java              | 167 +++++++++++++++----
 4 files changed, 156 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/014d07de/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 47ec910..cc4d2ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -35,6 +35,9 @@ Release 2.6.1 - UNRELEASED
 
     HDFS-8486. DN startup may cause severe data loss. (daryn via cmccabe)
 
+    HDFS-7225. Remove stale block invalidation work when DN re-registers with
+    different UUID. (Zhe Zhang and Andrew Wang)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/014d07de/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 17112bf..d26cc52 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -1112,6 +1112,18 @@ public class BlockManager {
   }
 
   /**
+   * Remove all block invalidation tasks under this datanode UUID;
+   * used when a datanode registers with a new UUID and the old one
+   * is wiped.
+   */
+  void removeFromInvalidates(final DatanodeInfo datanode) {
+    if (!namesystem.isPopulatingReplQueues()) {
+      return;
+    }
+    invalidateBlocks.remove(datanode);
+  }
+
+  /**
    * Mark the block belonging to datanode as corrupt
    * @param blk Block to be marked as corrupt
    * @param dn Datanode which holds the corrupt replica
@@ -3395,7 +3407,14 @@ public class BlockManager {
         return 0;
       }
       try {
-        toInvalidate = 
invalidateBlocks.invalidateWork(datanodeManager.getDatanode(dn));
+        DatanodeDescriptor dnDescriptor = datanodeManager.getDatanode(dn);
+        if (dnDescriptor == null) {
+          LOG.warn("DataNode " + dn + " cannot be found with UUID " +
+              dn.getDatanodeUuid() + ", removing block invalidation work.");
+          invalidateBlocks.remove(dn);
+          return 0;
+        }
+        toInvalidate = invalidateBlocks.invalidateWork(dnDescriptor);
         
         if (toInvalidate == null) {
           return 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/014d07de/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 6a52349..80965b9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -593,6 +593,8 @@ public class DatanodeManager {
     synchronized (datanodeMap) {
       host2DatanodeMap.remove(datanodeMap.remove(key));
     }
+    // Also remove all block invalidation tasks under this node
+    blockManager.removeFromInvalidates(new DatanodeInfo(node));
     if (LOG.isDebugEnabled()) {
       LOG.debug(getClass().getSimpleName() + ".wipeDatanode("
           + node + "): storage " + key 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/014d07de/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
index d0edd48..fecca4e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestComputeInvalidateWork.java
@@ -17,66 +17,161 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import java.util.UUID;
+
 import static org.junit.Assert.assertEquals;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.util.VersionInfo;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 /**
  * Test if FSNamesystem handles heartbeat right
  */
 public class TestComputeInvalidateWork {
+
+  private Configuration conf;
+  private final int NUM_OF_DATANODES = 3;
+  private MiniDFSCluster cluster;
+  private FSNamesystem namesystem;
+  private BlockManager bm;
+  private DatanodeDescriptor[] nodes;
+
+  @Before
+  public void setup() throws Exception {
+    conf = new HdfsConfiguration();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES)
+        .build();
+    cluster.waitActive();
+    namesystem = cluster.getNamesystem();
+    bm = namesystem.getBlockManager();
+    nodes = bm.getDatanodeManager().getHeartbeatManager().getDatanodes();
+    assertEquals(nodes.length, NUM_OF_DATANODES);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
   /**
-   * Test if {@link FSNamesystem#computeInvalidateWork(int)}
+   * Test if {@link BlockManager#computeInvalidateWork(int)}
    * can schedule invalidate work correctly 
    */
-  @Test
+  @Test(timeout=120000)
   public void testCompInvalidate() throws Exception {
-    final Configuration conf = new HdfsConfiguration();
-    final int NUM_OF_DATANODES = 3;
-    final MiniDFSCluster cluster = new 
MiniDFSCluster.Builder(conf).numDataNodes(NUM_OF_DATANODES).build();
+    final int blockInvalidateLimit = bm.getDatanodeManager()
+        .blockInvalidateLimit;
+    namesystem.writeLock();
     try {
-      cluster.waitActive();
-      final FSNamesystem namesystem = cluster.getNamesystem();
-      final BlockManager bm = namesystem.getBlockManager();
-      final int blockInvalidateLimit = 
bm.getDatanodeManager().blockInvalidateLimit;
-      final DatanodeDescriptor[] nodes = bm.getDatanodeManager(
-          ).getHeartbeatManager().getDatanodes();
-      assertEquals(nodes.length, NUM_OF_DATANODES);
+      for (int i=0; i<nodes.length; i++) {
+        for(int j=0; j<3*blockInvalidateLimit+1; j++) {
+          Block block = new Block(i*(blockInvalidateLimit+1)+j, 0,
+              GenerationStamp.LAST_RESERVED_STAMP);
+          bm.addToInvalidates(block, nodes[i]);
+        }
+      }
       
+      assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
+          bm.computeInvalidateWork(NUM_OF_DATANODES+1));
+      assertEquals(blockInvalidateLimit*NUM_OF_DATANODES,
+          bm.computeInvalidateWork(NUM_OF_DATANODES));
+      assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1),
+          bm.computeInvalidateWork(NUM_OF_DATANODES-1));
+      int workCount = bm.computeInvalidateWork(1);
+      if (workCount == 1) {
+        assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2));
+      } else {
+        assertEquals(workCount, blockInvalidateLimit);
+        assertEquals(2, bm.computeInvalidateWork(2));
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
+  /**
+   * Reformatted DataNodes will replace the original UUID in the
+   * {@link DatanodeManager#datanodeMap}. This tests if block
+   * invalidation work on the original DataNode can be skipped.
+   */
+  @Test(timeout=120000)
+  public void testDatanodeReformat() throws Exception {
+    namesystem.writeLock();
+    try {
+      Block block = new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP);
+      bm.addToInvalidates(block, nodes[0]);
+      // Change the datanode UUID to emulate a reformation
+      nodes[0].setDatanodeUuidForTesting("fortesting");
+      // Since UUID has changed, the invalidation work should be skipped
+      assertEquals(0, bm.computeInvalidateWork(1));
+      assertEquals(0, bm.getPendingDeletionBlocksCount());
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
+  @Test(timeout=12000)
+  public void testDatanodeReRegistration() throws Exception {
+    // Create a test file
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final Path path = new Path("/testRR");
+    // Create a file and shutdown the DNs, which populates InvalidateBlocks
+    DFSTestUtil.createFile(dfs, path, dfs.getDefaultBlockSize(),
+        (short) NUM_OF_DATANODES, 0xED0ED0);
+    for (DataNode dn : cluster.getDataNodes()) {
+      dn.shutdown();
+    }
+    dfs.delete(path, false);
+    namesystem.writeLock();
+    InvalidateBlocks invalidateBlocks;
+    int expected = NUM_OF_DATANODES;
+    try {
+      invalidateBlocks = (InvalidateBlocks) Whitebox
+          .getInternalState(cluster.getNamesystem().getBlockManager(),
+              "invalidateBlocks");
+      assertEquals("Expected invalidate blocks to be the number of DNs",
+          (long) expected, invalidateBlocks.numBlocks());
+    } finally {
+      namesystem.writeUnlock();
+    }
+    // Re-register each DN and see that it wipes the invalidation work
+    for (DataNode dn : cluster.getDataNodes()) {
+      DatanodeID did = dn.getDatanodeId();
+      did.setDatanodeUuidForTesting(UUID.randomUUID().toString());
+      DatanodeRegistration reg = new DatanodeRegistration(did,
+          new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE),
+          new ExportedBlockKeys(),
+          VersionInfo.getVersion());
       namesystem.writeLock();
       try {
-        for (int i=0; i<nodes.length; i++) {
-          for(int j=0; j<3*blockInvalidateLimit+1; j++) {
-            Block block = new Block(i*(blockInvalidateLimit+1)+j, 0, 
-                GenerationStamp.LAST_RESERVED_STAMP);
-            bm.addToInvalidates(block, nodes[i]);
-          }
-        }
-        
-        assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, 
-            bm.computeInvalidateWork(NUM_OF_DATANODES+1));
-        assertEquals(blockInvalidateLimit*NUM_OF_DATANODES, 
-            bm.computeInvalidateWork(NUM_OF_DATANODES));
-        assertEquals(blockInvalidateLimit*(NUM_OF_DATANODES-1), 
-            bm.computeInvalidateWork(NUM_OF_DATANODES-1));
-        int workCount = bm.computeInvalidateWork(1);
-        if (workCount == 1) {
-          assertEquals(blockInvalidateLimit+1, bm.computeInvalidateWork(2));
-        } else {
-          assertEquals(workCount, blockInvalidateLimit);
-          assertEquals(2, bm.computeInvalidateWork(2));
-        }
+        bm.getDatanodeManager().registerDatanode(reg);
+        expected--;
+        assertEquals("Expected number of invalidate blocks to decrease",
+            (long) expected, invalidateBlocks.numBlocks());
       } finally {
-        namesystem.writeUnlock();
+          namesystem.writeUnlock();
       }
-    } finally {
-      cluster.shutdown();
     }
   }
 }

Reply via email to