HDFS-6833.  DirectoryScanner should not register a deleting block with memory 
of DataNode.  Contributed by Shinichi Yamashita


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6dae6d12
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6dae6d12
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6dae6d12

Branch: refs/heads/HDFS-7836
Commit: 6dae6d12ec5abb716e1501cd4e18b10ae7809b94
Parents: 06ce1d9
Author: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Authored: Fri Mar 13 02:25:32 2015 +0800
Committer: Tsz-Wo Nicholas Sze <szets...@hortonworks.com>
Committed: Fri Mar 13 02:25:32 2015 +0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |  3 +
 .../hdfs/server/datanode/DirectoryScanner.java  | 20 ++++---
 .../server/datanode/fsdataset/FsDatasetSpi.java |  5 ++
 .../impl/FsDatasetAsyncDiskService.java         | 31 +++++++++-
 .../datanode/fsdataset/impl/FsDatasetImpl.java  | 41 +++++++++++++-
 .../server/datanode/SimulatedFSDataset.java     |  5 ++
 .../extdataset/ExternalDatasetImpl.java         |  5 ++
 .../fsdataset/impl/TestFsDatasetImpl.java       | 59 ++++++++++++++++++++
 8 files changed, 158 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 07213dd..e52b849 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1139,6 +1139,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7830. DataNode does not release the volume lock when adding a volume
     fails. (Lei Xu via Colin P. Mccabe)
 
+    HDFS-6833.  DirectoryScanner should not register a deleting block with
+    memory of DataNode.  (Shinichi Yamashita via szetszwo)
+
     BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
 
       HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 01f967f..61dfb14 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -443,13 +443,14 @@ public class DirectoryScanner implements Runnable {
         int d = 0; // index for blockpoolReport
         int m = 0; // index for memReprot
         while (m < memReport.length && d < blockpoolReport.length) {
-          FinalizedReplica memBlock = memReport[Math.min(m, memReport.length - 
1)];
-          ScanInfo info = blockpoolReport[Math.min(
-              d, blockpoolReport.length - 1)];
+          FinalizedReplica memBlock = memReport[m];
+          ScanInfo info = blockpoolReport[d];
           if (info.getBlockId() < memBlock.getBlockId()) {
-            // Block is missing in memory
-            statsRecord.missingMemoryBlocks++;
-            addDifference(diffRecord, statsRecord, info);
+            if (!dataset.isDeletingBlock(bpid, info.getBlockId())) {
+              // Block is missing in memory
+              statsRecord.missingMemoryBlocks++;
+              addDifference(diffRecord, statsRecord, info);
+            }
             d++;
             continue;
           }
@@ -495,8 +496,11 @@ public class DirectoryScanner implements Runnable {
                         current.getBlockId(), current.getVolume());
         }
         while (d < blockpoolReport.length) {
-          statsRecord.missingMemoryBlocks++;
-          addDifference(diffRecord, statsRecord, blockpoolReport[d++]);
+          if (!dataset.isDeletingBlock(bpid, blockpoolReport[d].getBlockId())) 
{
+            statsRecord.missingMemoryBlocks++;
+            addDifference(diffRecord, statsRecord, blockpoolReport[d]);
+          }
+          d++;
         }
         LOG.info(statsRecord.toString());
       } //end for

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 10c8369..5b183e6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -543,4 +543,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
    * Check whether the block was pinned
    */
   public boolean getPinning(ExtendedBlock block) throws IOException;
+  
+  /**
+   * Confirm whether the block is deleting
+   */
+  public boolean isDeletingBlock(String bpid, long blockId);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
index 13e854f..c1d3990 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
@@ -22,7 +22,10 @@ import java.io.File;
 import java.io.FileDescriptor;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -64,9 +67,14 @@ class FsDatasetAsyncDiskService {
   private static final long THREADS_KEEP_ALIVE_SECONDS = 60; 
   
   private final DataNode datanode;
+  private final FsDatasetImpl fsdatasetImpl;
   private final ThreadGroup threadGroup;
   private Map<File, ThreadPoolExecutor> executors
       = new HashMap<File, ThreadPoolExecutor>();
+  private Map<String, Set<Long>> deletedBlockIds 
+      = new HashMap<String, Set<Long>>();
+  private static final int MAX_DELETED_BLOCKS = 64;
+  private int numDeletedBlocks = 0;
   
   /**
    * Create a AsyncDiskServices with a set of volumes (specified by their
@@ -75,8 +83,9 @@ class FsDatasetAsyncDiskService {
    * The AsyncDiskServices uses one ThreadPool per volume to do the async
    * disk operations.
    */
-  FsDatasetAsyncDiskService(DataNode datanode) {
+  FsDatasetAsyncDiskService(DataNode datanode, FsDatasetImpl fsdatasetImpl) {
     this.datanode = datanode;
+    this.fsdatasetImpl = fsdatasetImpl;
     this.threadGroup = new ThreadGroup(getClass().getSimpleName());
   }
 
@@ -286,7 +295,27 @@ class FsDatasetAsyncDiskService {
         LOG.info("Deleted " + block.getBlockPoolId() + " "
             + block.getLocalBlock() + " file " + blockFile);
       }
+      updateDeletedBlockId(block);
       IOUtils.cleanup(null, volumeRef);
     }
   }
+  
+  private synchronized void updateDeletedBlockId(ExtendedBlock block) {
+    Set<Long> blockIds = deletedBlockIds.get(block.getBlockPoolId());
+    if (blockIds == null) {
+      blockIds = new HashSet<Long>();
+      deletedBlockIds.put(block.getBlockPoolId(), blockIds);
+    }
+    blockIds.add(block.getBlockId());
+    numDeletedBlocks++;
+    if (numDeletedBlocks == MAX_DELETED_BLOCKS) {
+      for (Entry<String, Set<Long>> e : deletedBlockIds.entrySet()) {
+        String bpid = e.getKey();
+        Set<Long> bs = e.getValue();
+        fsdatasetImpl.removeDeletedBlocks(bpid, bs);
+        bs.clear();
+      }
+      numDeletedBlocks = 0;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 0f28aa4..48ac6ca 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -237,6 +237,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private volatile boolean fsRunning;
 
   final ReplicaMap volumeMap;
+  final Map<String, Set<Long>> deletingBlock;
   final RamDiskReplicaTracker ramDiskReplicaTracker;
   final RamDiskAsyncLazyPersistService asyncLazyPersistService;
 
@@ -298,8 +299,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
             VolumeChoosingPolicy.class), conf);
     volumes = new FsVolumeList(volumeFailureInfos, datanode.getBlockScanner(),
         blockChooserImpl);
-    asyncDiskService = new FsDatasetAsyncDiskService(datanode);
+    asyncDiskService = new FsDatasetAsyncDiskService(datanode, this);
     asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
+    deletingBlock = new HashMap<String, Set<Long>>();
 
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       addVolume(dataLocations, storage.getStorageDir(idx));
@@ -1795,7 +1797,12 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
               +  ". Parent not found for file " + f);
           continue;
         }
-        volumeMap.remove(bpid, invalidBlks[i]);
+        ReplicaInfo removing = volumeMap.remove(bpid, invalidBlks[i]);
+        addDeletingBlock(bpid, removing.getBlockId());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Block file " + removing.getBlockFile().getName()
+              + " is to be deleted");
+        }
       }
 
       if (v.isTransientStorage()) {
@@ -3005,5 +3012,35 @@ class FsDatasetImpl implements 
FsDatasetSpi<FsVolumeImpl> {
     FileStatus fss = localFS.getFileStatus(new Path(f.getAbsolutePath()));
     return fss.getPermission().getStickyBit();
   }
+  
+  @Override
+  public boolean isDeletingBlock(String bpid, long blockId) {
+    synchronized(deletingBlock) {
+      Set<Long> s = deletingBlock.get(bpid);
+      return s != null ? s.contains(blockId) : false;
+    }
+  }
+  
+  public void removeDeletedBlocks(String bpid, Set<Long> blockIds) {
+    synchronized (deletingBlock) {
+      Set<Long> s = deletingBlock.get(bpid);
+      if (s != null) {
+        for (Long id : blockIds) {
+          s.remove(id);
+        }
+      }
+    }
+  }
+  
+  private void addDeletingBlock(String bpid, Long blockId) {
+    synchronized(deletingBlock) {
+      Set<Long> s = deletingBlock.get(bpid);
+      if (s == null) {
+        s = new HashSet<Long>();
+        deletingBlock.put(bpid, s);
+      }
+      s.add(blockId);
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 8ae5415..f0dbd0f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -1318,5 +1318,10 @@ public class SimulatedFSDataset implements 
FsDatasetSpi<FsVolumeSpi> {
   public boolean getPinning(ExtendedBlock b) throws IOException {
     return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned;
   }
+  
+  @Override
+  public boolean isDeletingBlock(String bpid, long blockId) {
+    throw new UnsupportedOperationException();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index a3c9935..6653cca 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -429,4 +429,9 @@ public class ExternalDatasetImpl implements 
FsDatasetSpi<ExternalVolumeImpl> {
   public boolean getPinning(ExtendedBlock block) throws IOException {
     return false;
   }
+  
+  @Override
+  public boolean isDeletingBlock(String bpid, long blockId) {
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6dae6d12/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 3b47dd0..403cb2a 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -18,10 +18,14 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import com.google.common.collect.Lists;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
@@ -29,8 +33,11 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DNConf;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
 import 
org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
@@ -88,6 +95,8 @@ public class TestFsDatasetImpl {
   private DataNode datanode;
   private DataStorage storage;
   private FsDatasetImpl dataset;
+  
+  private final static String BLOCKPOOL = "BP-TEST";
 
   private static Storage.StorageDirectory createStorageDirectory(File root) {
     Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
@@ -334,4 +343,54 @@ public class TestFsDatasetImpl {
 
     FsDatasetTestUtil.assertFileLockReleased(badDir.toString());
   }
+  
+  @Test
+  public void testDeletingBlocks() throws IOException {
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(new 
HdfsConfiguration()).build();
+    try {
+      cluster.waitActive();
+      DataNode dn = cluster.getDataNodes().get(0);
+      
+      FsDatasetImpl ds = (FsDatasetImpl) DataNodeTestUtils.getFSDataset(dn);
+      FsVolumeImpl vol = ds.getVolumes().get(0);
+
+      ExtendedBlock eb;
+      ReplicaInfo info;
+      List<Block> blockList = new ArrayList<Block>();
+      for (int i = 1; i <= 63; i++) {
+        eb = new ExtendedBlock(BLOCKPOOL, i, 1, 1000 + i);
+        info = new FinalizedReplica(
+            eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
+        ds.volumeMap.add(BLOCKPOOL, info);
+        info.getBlockFile().createNewFile();
+        info.getMetaFile().createNewFile();
+        blockList.add(info);
+      }
+      ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // Nothing to do
+      }
+      assertTrue(ds.isDeletingBlock(BLOCKPOOL, blockList.get(0).getBlockId()));
+
+      blockList.clear();
+      eb = new ExtendedBlock(BLOCKPOOL, 64, 1, 1064);
+      info = new FinalizedReplica(
+          eb.getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
+      ds.volumeMap.add(BLOCKPOOL, info);
+      info.getBlockFile().createNewFile();
+      info.getMetaFile().createNewFile();
+      blockList.add(info);
+      ds.invalidate(BLOCKPOOL, blockList.toArray(new Block[0]));
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        // Nothing to do
+      }
+      assertFalse(ds.isDeletingBlock(BLOCKPOOL, 
blockList.get(0).getBlockId()));
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }

Reply via email to