Author: suresh Date: Tue Aug 28 17:26:59 2012 New Revision: 1378241 URL: http://svn.apache.org/viewvc?rev=1378241&view=rev Log: HDFS-3791. Namenode will not block until a large directory deletion completes. It allows other operations when the deletion is in progress. Backport of HDFS-173. Contributed by Uma Maheswara Rao.
Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java?rev=1378241&r1=1378240&r2=1378241&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java Tue Aug 28 17:26:59 2012 @@ -58,6 +58,10 @@ class BlocksMap { INodeFile getINode() { return inode; } + + void setINode(INodeFile inode) { + this.inode = inode; + } DatanodeDescriptor getDatanode(int index) { assert this.triplets != null : "BlockInfo is not initialized"; Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1378241&r1=1378240&r2=1378241&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Aug 28 17:26:59 2012 @@ -591,19 +591,27 @@ class FSDirectory implements FSConstants } /** - * Remove the file from management, return blocks + * Delete the target directory and collect the blocks under it + * + * @param src + * Path of a directory to delete + * @param collectedBlocks + * Blocks under the deleted directory + * @return true on successful deletion; else false */ - boolean delete(String src) { + boolean delete(String src, List<Block>collectedBlocks) { if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: "+src); + NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src); } waitForReady(); long now = FSNamesystem.now(); - int filesRemoved = unprotectedDelete(src, now); + int filesRemoved = unprotectedDelete(src, collectedBlocks, now); if (filesRemoved <= 0) { return false; } incrDeletedFileCount(filesRemoved); + // Blocks will be deleted later by the caller of this method + FSNamesystem.getFSNamesystem().removePathAndBlocks(src, null); fsImage.getEditLog().logDelete(src, now); return true; } @@ -625,14 +633,36 @@ class FSDirectory implements FSConstants } /** - * Delete a path from the name space - * Update the count at each ancestor directory with quota - * @param src a string representation of a path to an inode - * @param modificationTime the time the inode is removed - * @param deletedBlocks the place holder for the blocks to be removed + * Delete a path from the name space Update the count at each ancestor + * directory with quota + * + * @param src + * a string representation of a path to an inode + * + * @param mTime + * the time the inode is removed + */ + void unprotectedDelete(String src, long mTime) { + List<Block> collectedBlocks = new ArrayList<Block>(); + int filesRemoved = unprotectedDelete(src, collectedBlocks, mTime); + if (filesRemoved > 0) { + namesystem.removePathAndBlocks(src, collectedBlocks); + } + } + + /** + * Delete a path from the name space Update the count at each ancestor + * directory with quota + * + * @param src + * a string representation of a path to an inode + * @param collectedBlocks + * blocks collected from the deleted path + * @param mtime + * the time the inode is removed * @return the number of inodes deleted; 0 if no inodes are deleted. - */ - int unprotectedDelete(String src, long modificationTime) { + */ + int unprotectedDelete(String src, List<Block> collectedBlocks, long mtime) { src = normalizePath(src); synchronized (rootDir) { @@ -643,32 +673,27 @@ class FSDirectory implements FSConstants NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: " +"failed to remove "+src+" because it does not exist"); return 0; - } else if (inodes.length == 1) { // src is the root + } + if (inodes.length == 1) { // src is the root NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " + "failed to remove " + src + " because the root is not allowed to be deleted"); return 0; - } else { - try { - // Remove the node from the namespace - removeChild(inodes, inodes.length-1); - // set the parent's modification time - inodes[inodes.length-2].setModificationTime(modificationTime); - // GC all the blocks underneath the node. - ArrayList<Block> v = new ArrayList<Block>(); - int filesRemoved = targetNode.collectSubtreeBlocksAndClear(v); - namesystem.removePathAndBlocks(src, v); - if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: " - +src+" is removed"); - } - return filesRemoved; - } catch (IOException e) { - NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " + - "failed to remove " + src + " because " + e.getMessage()); - return 0; - } + } + int pos = inodes.length - 1; + targetNode = removeChild(inodes, pos); + if (targetNode == null) { + return 0; + } + // set the parent's modification time + inodes[pos - 1].setModificationTime(mtime); + int filesRemoved = targetNode + .collectSubtreeBlocksAndClear(collectedBlocks); + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: " + + src + " is removed"); } + return filesRemoved; } } Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1378241&r1=1378240&r2=1378241&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Aug 28 17:26:59 2012 @@ -182,6 +182,7 @@ public class FSNamesystem implements FSC // Default initial capacity and load factor of map public static final int DEFAULT_INITIAL_MAP_CAPACITY = 16; public static final float DEFAULT_MAP_LOAD_FACTOR = 0.75f; + static int BLOCK_DELETION_INCREMENT = 1000; private float blocksInvalidateWorkPct; private int blocksReplWorkMultiplier; @@ -1833,17 +1834,33 @@ public class FSNamesystem implements FSC /** * Adds block to list of blocks which will be invalidated on - * specified datanode and log the move + * specified datanode * @param b block * @param n datanode + * @param log true to create an entry in the log */ - void addToInvalidates(Block b, DatanodeInfo n) { - addToInvalidatesNoLog(b, n); - NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: " - + b.getBlockName() + " is added to invalidSet of " + n.getName()); + void addToInvalidates(Block b, DatanodeInfo dn, boolean log) { + addToInvalidatesNoLog(b, dn); + if (log) { + NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: " + + b.getBlockName() + " to " + dn.getName()); + } } /** + * Adds block to list of blocks which will be invalidated on specified + * datanode and log the operation + * + * @param b + * block + * @param dn + * datanode + */ + void addToInvalidates(Block b, DatanodeInfo dn) { + addToInvalidates(b, dn, true); + } + + /** * Adds block to list of blocks which will be invalidated on * specified datanode * @param b block @@ -1865,10 +1882,16 @@ public class FSNamesystem implements FSC * all its datanodes. */ private void addToInvalidates(Block b) { + StringBuilder datanodes = new StringBuilder(); for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(b); it.hasNext();) { DatanodeDescriptor node = it.next(); - addToInvalidates(b, node); + addToInvalidates(b, node, false); + datanodes.append(node.getName()).append(" "); + } + if (datanodes.length() != 0) { + NameNode.stateChangeLog.info("BLOCK* NameSystem.addToInvalidates: " + + b.getBlockName() + " to " + datanodes.toString()); } } @@ -2025,8 +2048,10 @@ public class FSNamesystem implements FSC if ((!recursive) && (!dir.isDirEmpty(src))) { throw new IOException(src + " is non empty"); } + if (NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src); + } boolean status = deleteInternal(src, true); - getEditLog().logSync(); if (status && auditLog.isInfoEnabled() && isExternalInvocation()) { logAuditEvent(UserGroupInformation.getCurrentUser(), Server.getRemoteIp(), @@ -2036,30 +2061,73 @@ public class FSNamesystem implements FSC } /** - * Remove the indicated filename from the namespace. This may - * invalidate some blocks that make up the file. + * Remove a file/directory from the namespace. + * <p> + * For large directories, deletion is incremental. The blocks under the + * directory are collected and deleted a small number at a time holding the + * {@link FSNamesystem} lock. + * <p> + * For small directory or file the deletion is done in one shot. */ - synchronized boolean deleteInternal(String src, + private boolean deleteInternal(String src, boolean enforcePermission) throws IOException { + boolean deleteNow = false; + ArrayList<Block> collectedBlocks = new ArrayList<Block>(); + synchronized (this) { + if (isInSafeMode()) { + throw new SafeModeException("Cannot delete " + src, safeMode); + } + if (enforcePermission && isPermissionEnabled) { + checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL); + } + // Unlink the target directory from directory tree + if (!dir.delete(src, collectedBlocks)) { + return false; + } + deleteNow = collectedBlocks.size() <= BLOCK_DELETION_INCREMENT; + if (deleteNow) { // Perform small deletes right away + removeBlocks(collectedBlocks); + } + } + + // Log directory deletion to editlog + getEditLog().logSync(); + if (!deleteNow) { + removeBlocks(collectedBlocks); // Incremental deletion of blocks + } + collectedBlocks.clear(); if (NameNode.stateChangeLog.isDebugEnabled()) { - NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src); + NameNode.stateChangeLog.debug("DIR* Namesystem.delete: " + src + + " is removed"); } - if (isInSafeMode()) - throw new SafeModeException("Cannot delete " + src, safeMode); - if (enforcePermission && isPermissionEnabled) { - checkPermission(src, false, null, FsAction.WRITE, null, FsAction.ALL); + return true; + } + + /** From the given list, incrementally remove the blocks from blockManager */ + private void removeBlocks(List<Block> blocks) { + int start = 0; + int end = 0; + while (start < blocks.size()) { + end = BLOCK_DELETION_INCREMENT + start; + end = end > blocks.size() ? blocks.size() : end; + synchronized (this) { + for (int i = start; i < end; i++) { + Block b = blocks.get(i); + blocksMap.removeINode(b); + corruptReplicas.removeFromCorruptReplicasMap(b); + addToInvalidates(b); + } + } + start = end; } - - return dir.delete(src); } - void removePathAndBlocks(String src, List<Block> blocks) throws IOException { + void removePathAndBlocks(String src, List<Block> blocks) { leaseManager.removeLeaseWithPrefixPath(src); - for(Block b : blocks) { - blocksMap.removeINode(b); - corruptReplicas.removeFromCorruptReplicasMap(b); - addToInvalidates(b); + if (blocks == null) { + return; } + removeBlocks(blocks); } /** Get the file info for a specific file. Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1378241&r1=1378240&r2=1378241&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Tue Aug 28 17:26:59 2012 @@ -136,8 +136,9 @@ class INodeFile extends INode { int collectSubtreeBlocksAndClear(List<Block> v) { parent = null; - for (Block blk : blocks) { + for (BlockInfo blk : blocks) { v.add(blk); + blk.setINode(null); } blocks = null; return 1; Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java?rev=1378241&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java Tue Aug 28 17:26:59 2012 @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.common.Util; +import org.junit.Assert; +import org.junit.Test; + +/** + * Ensure during large directory delete, namenode does not block until the + * deletion completes and handles new requests from other clients + */ +public class TestLargeDirectoryDelete { + private static final Log LOG = LogFactory + .getLog(TestLargeDirectoryDelete.class); + private static final Configuration CONF = new Configuration(); + private static final int TOTAL_BLOCKS = 10000; + private MiniDFSCluster mc = null; + private int createOps = 0; + private int lockOps = 0; + + static { + CONF.setLong("dfs.block.size", 1); + CONF.setInt("io.bytes.per.checksum", 1); + } + + /** create a file with a length of <code>filelen</code> */ + private void createFile(final String fileName, final long filelen) + throws IOException { + FileSystem fs = mc.getFileSystem(); + Path filePath = new Path(fileName); + DFSTestUtil.createFile(fs, filePath, filelen, (short) 1, 0); + } + + /** Create a large number of directories and files */ + private void createFiles() throws IOException { + Random rand =new Random(); + // Create files in a directory with random depth + // ranging from 0-10. + for (int i = 0; i < TOTAL_BLOCKS; i += 100) { + String filename = "/root/"; + int dirs = rand.nextInt(10); // Depth of the directory + for (int j = i; j >= (i - dirs); j--) { + filename += j + "/"; + } + filename += "file" + i; + createFile(filename, 100); + } + } + + private int getBlockCount() { + Assert.assertNotNull("Null cluster", mc); + Assert.assertNotNull("No Namenode in cluster", mc.getNameNode()); + FSNamesystem namesystem = mc.getNameNode().getNamesystem(); + Assert.assertNotNull("Null Namesystem in cluster", namesystem); + return (int) namesystem.getBlocksTotal(); + } + + /** + * Run multiple threads doing simultaneous operations on the namenode while a + * large directory is being deleted. + */ + private void runThreads() throws Throwable { + final TestThread threads[] = new TestThread[2]; + + // Thread for creating files + threads[0] = new TestThread() { + @Override + protected void execute() throws Throwable { + while (live) { + try { + long blockcount = mc.getNameNode().getNamesystem().pendingDeletionBlocksCount; + if (blockcount > 0) { + String file = "/tmp" + createOps; + createFile(file, 1); + mc.getFileSystem().delete(new Path(file), true); + createOps++; + } + } catch (IOException ex) { + LOG.info("createFile exception ", ex); + break; + } + } + } + }; + + // Thread that periodically acquires the FSNamesystem lock + threads[1] = new TestThread() { + @Override + protected void execute() throws Throwable { + while (live) { + try { + long blockcount = mc.getNameNode().getNamesystem().pendingDeletionBlocksCount; + if (blockcount > 0) { + synchronized (mc.getNameNode().getNamesystem()) { + lockOps++; + } + Thread.sleep(1); + } + } catch (InterruptedException ex) { + LOG.info("lockOperation exception ", ex); + break; + } + } + } + }; + threads[0].start(); + threads[1].start(); + Thread.sleep(1000); + final long start = Util.now(); + FSNamesystem.BLOCK_DELETION_INCREMENT = 1; + mc.getFileSystem().delete(new Path("/root"), true); // recursive delete + final long end = Util.now(); + threads[0].endThread(); + threads[1].endThread(); + LOG.info("Deletion took " + (end - start) + "msecs"); + LOG.info("createOperations " + createOps); + LOG.info("lockOperations " + lockOps); + Assert.assertTrue(lockOps + createOps > 0); + threads[0].rethrow(); + threads[1].rethrow(); + } + + /** + * An abstract class for tests that catches exceptions and can rethrow them on + * a different thread, and has an {@link #endThread()} operation that flips a + * volatile boolean before interrupting the thread. Also: after running the + * implementation of {@link #execute()} in the implementation class, the + * thread is notified: other threads can wait for it to terminate + */ + private abstract class TestThread extends Thread { + volatile Throwable thrown; + protected volatile boolean live = true; + + @Override + public void run() { + try { + execute(); + } catch (Throwable throwable) { + LOG.warn(throwable); + setThrown(throwable); + } finally { + synchronized (this) { + this.notify(); + } + } + } + + protected abstract void execute() throws Throwable; + + protected synchronized void setThrown(Throwable thrown) { + this.thrown = thrown; + } + + /** + * Rethrow anything caught + * + * @throws Throwable + * any non-null throwable raised by the execute method. + */ + public synchronized void rethrow() throws Throwable { + if (thrown != null) { + throw thrown; + } + } + + /** + * End the thread by setting the live p + */ + public synchronized void endThread() { + live = false; + interrupt(); + try { + wait(); + } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignoring " + e, e); + } + } + } + } + + @Test + public void largeDelete() throws Throwable { + mc = new MiniDFSCluster(CONF, 1, true, null); + try { + mc.waitActive(); + Assert.assertNotNull("No Namenode in cluster", mc.getNameNode()); + createFiles(); + Assert.assertEquals(TOTAL_BLOCKS, getBlockCount()); + runThreads(); + } finally { + mc.shutdown(); + } + } +}