Author: suresh
Date: Tue Aug 28 17:26:59 2012
New Revision: 1378241

URL: http://svn.apache.org/viewvc?rev=1378241&view=rev
Log:
HDFS-3791. Namenode will not block until a large directory deletion completes. 
It allows other operations when the deletion is in progress. Backport of 
HDFS-173. Contributed by Uma Maheswara Rao.

Added:
    
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java
Modified:
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

Modified: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java?rev=1378241&r1=1378240&r2=1378241&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
 Tue Aug 28 17:26:59 2012
@@ -58,6 +58,10 @@ class BlocksMap {
     INodeFile getINode() {
       return inode;
     }
+    
+    void setINode(INodeFile inode) {
+      this.inode = inode;
+    }
 
     DatanodeDescriptor getDatanode(int index) {
       assert this.triplets != null : "BlockInfo is not initialized";

Modified: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1378241&r1=1378240&r2=1378241&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
 Tue Aug 28 17:26:59 2012
@@ -591,19 +591,27 @@ class FSDirectory implements FSConstants
   }
     
   /**
-   * Remove the file from management, return blocks
+   * Delete the target directory and collect the blocks under it
+   * 
+   * @param src
+   *          Path of a directory to delete
+   * @param collectedBlocks
+   *          Blocks under the deleted directory
+   * @return true on successful deletion; else false
    */
-  boolean delete(String src) {
+  boolean delete(String src, List<Block>collectedBlocks) {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: "+src);
+      NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src);
     }
     waitForReady();
     long now = FSNamesystem.now();
-    int filesRemoved = unprotectedDelete(src, now);
+    int filesRemoved = unprotectedDelete(src, collectedBlocks, now);
     if (filesRemoved <= 0) {
       return false;
     }
     incrDeletedFileCount(filesRemoved);
+    // Blocks will be deleted later by the caller of this method
+    FSNamesystem.getFSNamesystem().removePathAndBlocks(src, null);
     fsImage.getEditLog().logDelete(src, now);
     return true;
   }
@@ -625,14 +633,36 @@ class FSDirectory implements FSConstants
   }
   
   /**
-   * Delete a path from the name space
-   * Update the count at each ancestor directory with quota
-   * @param src a string representation of a path to an inode
-   * @param modificationTime the time the inode is removed
-   * @param deletedBlocks the place holder for the blocks to be removed
+   * Delete a path from the name space Update the count at each ancestor
+   * directory with quota
+   * 
+   * @param src
+   *          a string representation of a path to an inode
+   * 
+   * @param mTime
+   *          the time the inode is removed
+   */
+  void unprotectedDelete(String src, long mTime) {
+    List<Block> collectedBlocks = new ArrayList<Block>();
+    int filesRemoved = unprotectedDelete(src, collectedBlocks, mTime);
+    if (filesRemoved > 0) {
+      namesystem.removePathAndBlocks(src, collectedBlocks);
+    }
+  }
+  
+  /**
+   * Delete a path from the name space Update the count at each ancestor
+   * directory with quota
+   * 
+   * @param src
+   *          a string representation of a path to an inode
+   * @param collectedBlocks
+   *          blocks collected from the deleted path
+   * @param mtime
+   *          the time the inode is removed
    * @return the number of inodes deleted; 0 if no inodes are deleted.
-   */ 
-  int unprotectedDelete(String src, long modificationTime) {
+   */
+  int unprotectedDelete(String src, List<Block> collectedBlocks, long mtime) {
     src = normalizePath(src);
 
     synchronized (rootDir) {
@@ -643,32 +673,27 @@ class FSDirectory implements FSConstants
         NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
             +"failed to remove "+src+" because it does not exist");
         return 0;
-      } else if (inodes.length == 1) { // src is the root
+      } 
+      if (inodes.length == 1) { // src is the root
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
             "failed to remove " + src +
             " because the root is not allowed to be deleted");
         return 0;
-      } else {
-        try {
-          // Remove the node from the namespace
-          removeChild(inodes, inodes.length-1);
-          // set the parent's modification time
-          inodes[inodes.length-2].setModificationTime(modificationTime);
-          // GC all the blocks underneath the node.
-          ArrayList<Block> v = new ArrayList<Block>();
-          int filesRemoved = targetNode.collectSubtreeBlocksAndClear(v);
-          namesystem.removePathAndBlocks(src, v);
-          if (NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: 
"
-              +src+" is removed");
-          }
-          return filesRemoved;
-        } catch (IOException e) {
-          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
-              "failed to remove " + src + " because " + e.getMessage());
-          return 0;
-        }
+      } 
+      int pos = inodes.length - 1;
+      targetNode = removeChild(inodes, pos);
+      if (targetNode == null) {
+        return 0;
+      }
+      // set the parent's modification time
+      inodes[pos - 1].setModificationTime(mtime);
+      int filesRemoved = targetNode
+          .collectSubtreeBlocksAndClear(collectedBlocks);
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+            + src + " is removed");
       }
+      return filesRemoved;
     }
   }
 

Modified: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1378241&r1=1378240&r2=1378241&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
 Tue Aug 28 17:26:59 2012
@@ -182,6 +182,7 @@ public class FSNamesystem implements FSC
   // Default initial capacity and load factor of map
   public static final int DEFAULT_INITIAL_MAP_CAPACITY = 16;
   public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f;
+  static int BLOCK_DELETION_INCREMENT = 1000;
   
   private float blocksInvalidateWorkPct;
   private int blocksReplWorkMultiplier;
@@ -1833,17 +1834,33 @@ public class FSNamesystem implements FSC
 
   /**
    * Adds block to list of blocks which will be invalidated on 
-   * specified datanode and log the move
+   * specified datanode
    * @param b block
    * @param n datanode
+   * @param log true to create an entry in the log
    */
-  void addToInvalidates(Block b, DatanodeInfo n) {
-    addToInvalidatesNoLog(b, n);
-    NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
-        + b.getBlockName() + " is added to invalidSet of " + n.getName());
+  void addToInvalidates(Block b, DatanodeInfo dn, boolean log) {
+    addToInvalidatesNoLog(b, dn);
+    if (log) {
+      NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
+          + b.getBlockName() + " to " + dn.getName());
+    }
   }
 
   /**
+   * Adds block to list of blocks which will be invalidated on specified
+   * datanode and log the operation
+   * 
+   * @param b
+   *          block
+   * @param dn
+   *          datanode
+   */
+  void addToInvalidates(Block b, DatanodeInfo dn) {
+    addToInvalidates(b, dn, true);
+  }
+  
+  /**
    * Adds block to list of blocks which will be invalidated on 
    * specified datanode
    * @param b block
@@ -1865,10 +1882,16 @@ public class FSNamesystem implements FSC
    * all its datanodes.
    */
   private void addToInvalidates(Block b) {
+    StringBuilder datanodes = new StringBuilder();
     for (Iterator<DatanodeDescriptor> it = 
                                 blocksMap.nodeIterator(b); it.hasNext();) {
       DatanodeDescriptor node = it.next();
-      addToInvalidates(b, node);
+      addToInvalidates(b, node, false);
+      datanodes.append(node.getName()).append(" ");
+    }
+    if (datanodes.length() != 0) {
+      NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: "
+          + b.getBlockName() + " to " + datanodes.toString());
     }
   }
 
@@ -2025,8 +2048,10 @@ public class FSNamesystem implements FSC
       if ((!recursive) && (!dir.isDirEmpty(src))) {
         throw new IOException(src + " is non empty");
       }
+      if (NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
+      }
       boolean status = deleteInternal(src, true);
-      getEditLog().logSync();
       if (status && auditLog.isInfoEnabled() && isExternalInvocation()) {
         logAuditEvent(UserGroupInformation.getCurrentUser(),
                       Server.getRemoteIp(),
@@ -2036,30 +2061,73 @@ public class FSNamesystem implements FSC
     }
     
   /**
-   * Remove the indicated filename from the namespace.  This may
-   * invalidate some blocks that make up the file.
+   * Remove a file/directory from the namespace.
+   * <p>
+   * For large directories, deletion is incremental. The blocks under the
+   * directory are collected and deleted a small number at a time holding the
+   * {@link FSNamesystem} lock.
+   * <p>
+   * For small directory or file the deletion is done in one shot.
    */
-  synchronized boolean deleteInternal(String src, 
+  private boolean deleteInternal(String src,
       boolean enforcePermission) throws IOException {
+    boolean deleteNow = false;
+    ArrayList<Block> collectedBlocks = new ArrayList<Block>();
+    synchronized (this) {
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot delete " + src, safeMode);
+      }
+      if (enforcePermission && isPermissionEnabled) {
+        checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+      }
+      // Unlink the target directory from directory tree
+      if (!dir.delete(src, collectedBlocks)) {
+        return false;
+      }
+      deleteNow = collectedBlocks.size() <= BLOCK_DELETION_INCREMENT;
+      if (deleteNow) { // Perform small deletes right away
+        removeBlocks(collectedBlocks);
+      }
+    }
+    
+    // Log directory deletion to editlog
+    getEditLog().logSync();
+    if (!deleteNow) {
+      removeBlocks(collectedBlocks); // Incremental deletion of blocks
+    }
+    collectedBlocks.clear();
     if (NameNode.stateChangeLog.isDebugEnabled()) {
-      NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src);
+      NameNode.stateChangeLog.debug("DIR* Namesystem.delete: " + src
+          + " is removed");
     }
-    if (isInSafeMode())
-      throw new SafeModeException("Cannot delete " + src, safeMode);
-    if (enforcePermission && isPermissionEnabled) {
-      checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL);
+    return true;
+  }
+  
+  /** From the given list, incrementally remove the blocks from blockManager */
+  private void removeBlocks(List<Block> blocks) {
+    int start = 0;
+    int end = 0;
+    while (start < blocks.size()) {
+      end = BLOCK_DELETION_INCREMENT + start;
+      end = end > blocks.size() ? blocks.size() : end;
+      synchronized (this) {
+        for (int i = start; i < end; i++) {
+          Block b = blocks.get(i);
+          blocksMap.removeINode(b);
+          corruptReplicas.removeFromCorruptReplicasMap(b);
+          addToInvalidates(b);
+        }
+      }
+      start = end;
     }
-
-    return dir.delete(src);
   }
 
-  void removePathAndBlocks(String src, List<Block> blocks) throws IOException {
+  void removePathAndBlocks(String src, List<Block> blocks) {
     leaseManager.removeLeaseWithPrefixPath(src);
-    for(Block b : blocks) {
-      blocksMap.removeINode(b);
-      corruptReplicas.removeFromCorruptReplicasMap(b);
-      addToInvalidates(b);
+    if (blocks == null) {
+      return;
     }
+    removeBlocks(blocks);
   }
 
   /** Get the file info for a specific file.

Modified: 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1378241&r1=1378240&r2=1378241&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
 (original)
+++ 
hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
 Tue Aug 28 17:26:59 2012
@@ -136,8 +136,9 @@ class INodeFile extends INode {
 
   int collectSubtreeBlocksAndClear(List<Block> v) {
     parent = null;
-    for (Block blk : blocks) {
+    for (BlockInfo blk : blocks) {
       v.add(blk);
+      blk.setINode(null);
     }
     blocks = null;
     return 1;

Added: 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java?rev=1378241&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java
 (added)
+++ 
hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java
 Tue Aug 28 17:26:59 2012
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.common.Util;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Ensure during large directory delete, namenode does not block until the
+ * deletion completes and handles new requests from other clients
+ */
+public class TestLargeDirectoryDelete {
+  private static final Log LOG = LogFactory
+      .getLog(TestLargeDirectoryDelete.class);
+  private static final Configuration CONF = new Configuration();
+  private static final int TOTAL_BLOCKS = 10000;
+  private MiniDFSCluster mc = null;
+  private int createOps = 0;
+  private int lockOps = 0;
+
+  static {
+    CONF.setLong("dfs.block.size", 1);
+    CONF.setInt("io.bytes.per.checksum", 1);
+  }
+
+  /** create a file with a length of <code>filelen</code> */
+  private void createFile(final String fileName, final long filelen)
+      throws IOException {
+    FileSystem fs = mc.getFileSystem();
+    Path filePath = new Path(fileName);
+    DFSTestUtil.createFile(fs, filePath, filelen, (short) 1, 0);
+  }
+
+  /** Create a large number of directories and files */
+  private void createFiles() throws IOException {
+    Random rand =new Random();
+    // Create files in a directory with random depth
+    // ranging from 0-10.
+    for (int i = 0; i < TOTAL_BLOCKS; i += 100) {
+      String filename = "/root/";
+      int dirs = rand.nextInt(10); // Depth of the directory
+      for (int j = i; j >= (i - dirs); j--) {
+        filename += j + "/";
+      }
+      filename += "file" + i;
+      createFile(filename, 100);
+    }
+  }
+
+  private int getBlockCount() {
+    Assert.assertNotNull("Null cluster", mc);
+    Assert.assertNotNull("No Namenode in cluster", mc.getNameNode());
+    FSNamesystem namesystem = mc.getNameNode().getNamesystem();
+    Assert.assertNotNull("Null Namesystem in cluster", namesystem);
+    return (int) namesystem.getBlocksTotal();
+  }
+
+  /**
+   * Run multiple threads doing simultaneous operations on the namenode while a
+   * large directory is being deleted.
+   */
+  private void runThreads() throws Throwable {
+    final TestThread threads[] = new TestThread[2];
+
+    // Thread for creating files
+    threads[0] = new TestThread() {
+      @Override
+      protected void execute() throws Throwable {
+        while (live) {
+          try {
+            long blockcount = 
mc.getNameNode().getNamesystem().pendingDeletionBlocksCount;
+            if (blockcount > 0) {
+              String file = "/tmp" + createOps;
+              createFile(file, 1);
+              mc.getFileSystem().delete(new Path(file), true);
+              createOps++;
+            }
+          } catch (IOException ex) {
+            LOG.info("createFile exception ", ex);
+            break;
+          }
+        }
+      }
+    };
+
+    // Thread that periodically acquires the FSNamesystem lock
+    threads[1] = new TestThread() {
+      @Override
+      protected void execute() throws Throwable {
+        while (live) {
+          try {
+            long blockcount = 
mc.getNameNode().getNamesystem().pendingDeletionBlocksCount;
+            if (blockcount > 0) {
+              synchronized (mc.getNameNode().getNamesystem()) {
+                lockOps++;
+              }
+              Thread.sleep(1);
+            }
+          } catch (InterruptedException ex) {
+            LOG.info("lockOperation exception ", ex);
+            break;
+          }
+        }
+      }
+    };
+    threads[0].start();
+    threads[1].start();
+    Thread.sleep(1000);
+    final long start = Util.now();
+    FSNamesystem.BLOCK_DELETION_INCREMENT = 1;
+    mc.getFileSystem().delete(new Path("/root"), true); // recursive delete
+    final long end = Util.now();
+    threads[0].endThread();
+    threads[1].endThread();
+    LOG.info("Deletion took " + (end - start) + "msecs");
+    LOG.info("createOperations " + createOps);
+    LOG.info("lockOperations " + lockOps);
+    Assert.assertTrue(lockOps + createOps > 0);
+    threads[0].rethrow();
+    threads[1].rethrow();
+  }
+
+  /**
+   * An abstract class for tests that catches exceptions and can rethrow them 
on
+   * a different thread, and has an {@link #endThread()} operation that flips a
+   * volatile boolean before interrupting the thread. Also: after running the
+   * implementation of {@link #execute()} in the implementation class, the
+   * thread is notified: other threads can wait for it to terminate
+   */
+  private abstract class TestThread extends Thread {
+    volatile Throwable thrown;
+    protected volatile boolean live = true;
+
+    @Override
+    public void run() {
+      try {
+        execute();
+      } catch (Throwable throwable) {
+        LOG.warn(throwable);
+        setThrown(throwable);
+      } finally {
+        synchronized (this) {
+          this.notify();
+        }
+      }
+    }
+
+    protected abstract void execute() throws Throwable;
+
+    protected synchronized void setThrown(Throwable thrown) {
+      this.thrown = thrown;
+    }
+
+    /**
+     * Rethrow anything caught
+     * 
+     * @throws Throwable
+     *           any non-null throwable raised by the execute method.
+     */
+    public synchronized void rethrow() throws Throwable {
+      if (thrown != null) {
+        throw thrown;
+      }
+    }
+
+    /**
+     * End the thread by setting the live p
+     */
+    public synchronized void endThread() {
+      live = false;
+      interrupt();
+      try {
+        wait();
+      } catch (InterruptedException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring " + e, e);
+        }
+      }
+    }
+  }
+
+  @Test
+  public void largeDelete() throws Throwable {
+    mc = new MiniDFSCluster(CONF, 1, true, null);
+    try {
+      mc.waitActive();
+      Assert.assertNotNull("No Namenode in cluster", mc.getNameNode());
+      createFiles();
+      Assert.assertEquals(TOTAL_BLOCKS, getBlockCount());
+      runThreads();
+    } finally {
+      mc.shutdown();
+    }
+  }
+}


Reply via email to