Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Wed Aug 20 01:34:29 2014 @@ -18,13 +18,19 @@ package org.apache.hadoop.hdfs.server.datanode; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.HardLink; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; @@ -35,13 +41,31 @@ import org.apache.hadoop.hdfs.server.com import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DiskChecker; -import java.io.*; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.channels.FileLock; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** * Data storage information file. @@ -149,43 +173,99 @@ public class DataStorage extends Storage } /** - * Analyze storage directories. - * Recover from previous transitions if required. - * Perform fs state transition if necessary depending on the namespace info. - * Read storage info. - * <br> - * This method should be synchronized between multiple DN threads. Only the - * first DN thread does DN level storage dir recoverTransitionRead. - * + * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}} + */ + private void writeAll(Collection<StorageDirectory> dirs) throws IOException { + this.layoutVersion = getServiceLayoutVersion(); + for (StorageDirectory dir : dirs) { + writeProperties(dir); + } + } + + /** + * Add a list of volumes to be managed by DataStorage. If the volume is empty, + * format it, otherwise recover it from previous transitions if required. + * + * @param datanode the reference to DataNode. * @param nsInfo namespace information * @param dataDirs array of data storage directories * @param startOpt startup option * @throws IOException */ - synchronized void recoverTransitionRead(DataNode datanode, + synchronized void addStorageLocations(DataNode datanode, NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException { - if (initialized) { - // DN storage has been initialized, no need to do anything - return; + // Similar to recoverTransitionRead, it first ensures the datanode level + // format is completed. + List<StorageLocation> tmpDataDirs = + new ArrayList<StorageLocation>(dataDirs); + addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true); + + Collection<File> bpDataDirs = new ArrayList<File>(); + String bpid = nsInfo.getBlockPoolID(); + for (StorageLocation dir : dataDirs) { + File dnRoot = dir.getFile(); + File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot, + STORAGE_DIR_CURRENT)); + bpDataDirs.add(bpRoot); } - LOG.info("Data-node version: " + HdfsConstants.DATANODE_LAYOUT_VERSION - + " and name-node layout version: " + nsInfo.getLayoutVersion()); - - // 1. For each data directory calculate its state and - // check whether all is consistent before transitioning. - // Format and recover. - this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); - ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size()); + // mkdir for the list of BlockPoolStorage + makeBlockPoolDataDir(bpDataDirs, null); + BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid); + if (bpStorage == null) { + bpStorage = new BlockPoolSliceStorage( + nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(), + nsInfo.getClusterID()); + } + + bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt); + addBlockPoolStorage(bpid, bpStorage); + } + + /** + * Add a list of volumes to be managed by this DataStorage. If the volume is + * empty, it formats the volume, otherwise it recovers it from previous + * transitions if required. + * + * If isInitialize is false, only the directories that have finished the + * doTransition() process will be added into DataStorage. + * + * @param datanode the reference to DataNode. + * @param nsInfo namespace information + * @param dataDirs array of data storage directories + * @param startOpt startup option + * @param isInitialize whether it is called when DataNode starts up. + * @throws IOException + */ + private synchronized void addStorageLocations(DataNode datanode, + NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, + StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs) + throws IOException { + Set<String> existingStorageDirs = new HashSet<String>(); + for (int i = 0; i < getNumStorageDirs(); i++) { + existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath()); + } + + // 1. For each data directory calculate its state and check whether all is + // consistent before transitioning. Format and recover. + ArrayList<StorageState> dataDirStates = + new ArrayList<StorageState>(dataDirs.size()); + List<StorageDirectory> addedStorageDirectories = + new ArrayList<StorageDirectory>(); for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) { File dataDir = it.next().getFile(); + if (existingStorageDirs.contains(dataDir.getAbsolutePath())) { + LOG.info("Storage directory " + dataDir + " has already been used."); + it.remove(); + continue; + } StorageDirectory sd = new StorageDirectory(dataDir); StorageState curState; try { curState = sd.analyzeStorage(startOpt, this); // sd is locked but not opened - switch(curState) { + switch (curState) { case NORMAL: break; case NON_EXISTENT: @@ -194,7 +274,8 @@ public class DataStorage extends Storage it.remove(); continue; case NOT_FORMATTED: // format - LOG.info("Storage directory " + dataDir + " is not formatted"); + LOG.info("Storage directory " + dataDir + " is not formatted for " + + nsInfo.getBlockPoolID()); LOG.info("Formatting ..."); format(sd, nsInfo, datanode.getDatanodeUuid()); break; @@ -208,33 +289,82 @@ public class DataStorage extends Storage //continue with other good dirs continue; } - // add to the storage list - addStorageDir(sd); + if (isInitialize) { + addStorageDir(sd); + } + addedStorageDirectories.add(sd); dataDirStates.add(curState); } - if (dataDirs.size() == 0 || dataDirStates.size() == 0) // none of the data dirs exist + if (dataDirs.size() == 0 || dataDirStates.size() == 0) { + // none of the data dirs exist + if (ignoreExistingDirs) { + return; + } throw new IOException( "All specified directories are not accessible or do not exist."); + } // 2. Do transitions // Each storage directory is treated individually. - // During startup some of them can upgrade or rollback - // while others could be uptodate for the regular startup. - try { - for (int idx = 0; idx < getNumStorageDirs(); idx++) { - doTransition(datanode, getStorageDir(idx), nsInfo, startOpt); - createStorageID(getStorageDir(idx)); + // During startup some of them can upgrade or rollback + // while others could be up-to-date for the regular startup. + for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator(); + it.hasNext(); ) { + StorageDirectory sd = it.next(); + try { + doTransition(datanode, sd, nsInfo, startOpt); + createStorageID(sd); + } catch (IOException e) { + if (!isInitialize) { + sd.unlock(); + it.remove(); + continue; + } + unlockAll(); + throw e; } - } catch (IOException e) { - unlockAll(); - throw e; } - // 3. Update all storages. Some of them might have just been formatted. - this.writeAll(); + // 3. Update all successfully loaded storages. Some of them might have just + // been formatted. + this.writeAll(addedStorageDirectories); + + // 4. Make newly loaded storage directories visible for service. + if (!isInitialize) { + this.storageDirs.addAll(addedStorageDirectories); + } + } + + /** + * Analyze storage directories. + * Recover from previous transitions if required. + * Perform fs state transition if necessary depending on the namespace info. + * Read storage info. + * <br> + * This method should be synchronized between multiple DN threads. Only the + * first DN thread does DN level storage dir recoverTransitionRead. + * + * @param nsInfo namespace information + * @param dataDirs array of data storage directories + * @param startOpt startup option + * @throws IOException + */ + synchronized void recoverTransitionRead(DataNode datanode, + NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, + StartupOption startOpt) + throws IOException { + if (initialized) { + // DN storage has been initialized, no need to do anything + return; + } + LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION + + " and NameNode layout version: " + nsInfo.getLayoutVersion()); + + this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); + addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false); - // 4. mark DN storage is initialized + // mark DN storage is initialized this.initialized = true; } @@ -261,6 +391,7 @@ public class DataStorage extends Storage STORAGE_DIR_CURRENT)); bpDataDirs.add(bpRoot); } + // mkdir for the list of BlockPoolStorage makeBlockPoolDataDir(bpDataDirs, null); BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage( @@ -488,7 +619,7 @@ public class DataStorage extends Storage // do upgrade if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) { - doUpgrade(sd, nsInfo); // upgrade + doUpgrade(datanode, sd, nsInfo); // upgrade return; } @@ -523,7 +654,8 @@ public class DataStorage extends Storage * @param sd storage directory * @throws IOException on error */ - void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException { + void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo) + throws IOException { // If the existing on-disk layout version supportes federation, simply // update its layout version. if (DataNodeLayoutVersion.supports( @@ -568,7 +700,8 @@ public class DataStorage extends Storage BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID()); bpStorage.format(curDir, nsInfo); - linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT)); + linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir, + STORAGE_DIR_CURRENT)); // 4. Write version file under <SD>/current layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION; @@ -746,22 +879,22 @@ public class DataStorage extends Storage * * @throws IOException If error occurs during hardlink */ - private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir) - throws IOException { + private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir, + File toDir) throws IOException { HardLink hardLink = new HardLink(); // do the link int diskLayoutVersion = this.getLayoutVersion(); if (DataNodeLayoutVersion.supports( LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) { // hardlink finalized blocks in tmpDir/finalized - linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED), + linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED), new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink); // hardlink rbw blocks in tmpDir/rbw - linkBlocks(new File(fromDir, STORAGE_DIR_RBW), + linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW), new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink); } else { // pre-RBW version // hardlink finalized blocks in tmpDir - linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED), + linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink); if (fromBbwDir.exists()) { /* @@ -770,15 +903,67 @@ public class DataStorage extends Storage * NOT underneath the 'current' directory in those releases. See * HDFS-3731 for details. */ - linkBlocks(fromBbwDir, + linkBlocks(datanode, fromBbwDir, new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink); } } LOG.info( hardLink.linkStats.report() ); } + + private static class LinkArgs { + public File src; + public File dst; + + public LinkArgs(File src, File dst) { + this.src = src; + this.dst = dst; + } + } + + static void linkBlocks(DataNode datanode, File from, File to, int oldLV, + HardLink hl) throws IOException { + boolean upgradeToIdBasedLayout = false; + // If we are upgrading from a version older than the one where we introduced + // block ID-based layout AND we're working with the finalized directory, + // we'll need to upgrade from the old flat layout to the block ID-based one + if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT.getInfo(). + getLayoutVersion() && to.getName().equals(STORAGE_DIR_FINALIZED)) { + upgradeToIdBasedLayout = true; + } + + final List<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList(); + linkBlocksHelper(from, to, oldLV, hl, upgradeToIdBasedLayout, to, + idBasedLayoutSingleLinks); + int numLinkWorkers = datanode.getConf().getInt( + DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY, + DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS); + ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers); + final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1; + List<Future<Void>> futures = Lists.newArrayList(); + for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) { + final int iCopy = i; + futures.add(linkWorkers.submit(new Callable<Void>() { + @Override + public Void call() throws IOException { + int upperBound = Math.min(iCopy + step, + idBasedLayoutSingleLinks.size()); + for (int j = iCopy; j < upperBound; j++) { + LinkArgs cur = idBasedLayoutSingleLinks.get(j); + NativeIO.link(cur.src, cur.dst); + } + return null; + } + })); + } + linkWorkers.shutdown(); + for (Future<Void> f : futures) { + Futures.get(f, IOException.class); + } + } - static void linkBlocks(File from, File to, int oldLV, HardLink hl) - throws IOException { + static void linkBlocksHelper(File from, File to, int oldLV, HardLink hl, + boolean upgradeToIdBasedLayout, File blockRoot, + List<LinkArgs> idBasedLayoutSingleLinks) throws IOException { if (!from.exists()) { return; } @@ -805,9 +990,6 @@ public class DataStorage extends Storage // from is a directory hl.linkStats.countDirs++; - if (!to.mkdirs()) - throw new IOException("Cannot create directory " + to); - String[] blockNames = from.list(new java.io.FilenameFilter() { @Override public boolean accept(File dir, String name) { @@ -815,12 +997,36 @@ public class DataStorage extends Storage } }); + // If we are upgrading to block ID-based layout, we don't want to recreate + // any subdirs from the source that contain blocks, since we have a new + // directory structure + if (!upgradeToIdBasedLayout || !to.getName().startsWith( + BLOCK_SUBDIR_PREFIX)) { + if (!to.mkdirs()) + throw new IOException("Cannot create directory " + to); + } + // Block files just need hard links with the same file names // but a different directory if (blockNames.length > 0) { - HardLink.createHardLinkMult(from, blockNames, to); - hl.linkStats.countMultLinks++; - hl.linkStats.countFilesMultLinks += blockNames.length; + if (upgradeToIdBasedLayout) { + for (String blockName : blockNames) { + long blockId = Block.getBlockId(blockName); + File blockLocation = DatanodeUtil.idToBlockDir(blockRoot, blockId); + if (!blockLocation.exists()) { + if (!blockLocation.mkdirs()) { + throw new IOException("Failed to mkdirs " + blockLocation); + } + } + idBasedLayoutSingleLinks.add(new LinkArgs(new File(from, blockName), + new File(blockLocation, blockName))); + hl.linkStats.countSingleLinks++; + } + } else { + HardLink.createHardLinkMult(from, blockNames, to); + hl.linkStats.countMultLinks++; + hl.linkStats.countFilesMultLinks += blockNames.length; + } } else { hl.linkStats.countEmptyDirs++; } @@ -834,8 +1040,9 @@ public class DataStorage extends Storage } }); for(int i = 0; i < otherNames.length; i++) - linkBlocks(new File(from, otherNames[i]), - new File(to, otherNames[i]), oldLV, hl); + linkBlocksHelper(new File(from, otherNames[i]), + new File(to, otherNames[i]), oldLV, hl, upgradeToIdBasedLayout, + blockRoot, idBasedLayoutSingleLinks); } /**
Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Aug 20 01:34:29 2014 @@ -103,7 +103,8 @@ class DataXceiver extends Receiver imple private long opStartTime; //the start time of receiving an Op private final InputStream socketIn; private OutputStream socketOut; - + private BlockReceiver blockReceiver = null; + /** * Client Name used in previous operation. Not available on first request * on the socket. @@ -159,6 +160,12 @@ class DataXceiver extends Receiver imple return socketOut; } + public void sendOOB() throws IOException, InterruptedException { + LOG.info("Sending OOB to peer: " + peer); + if(blockReceiver!=null) + blockReceiver.sendOOB(); + } + /** * Read/write data from/to the DataXceiverServer. */ @@ -168,7 +175,7 @@ class DataXceiver extends Receiver imple Op op = null; try { - dataXceiverServer.addPeer(peer, Thread.currentThread()); + dataXceiverServer.addPeer(peer, Thread.currentThread(), this); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut, @@ -584,7 +591,6 @@ class DataXceiver extends Receiver imple DataOutputStream mirrorOut = null; // stream to next target DataInputStream mirrorIn = null; // reply from next target Socket mirrorSock = null; // socket to next target - BlockReceiver blockReceiver = null; // responsible for data handling String mirrorNode = null; // the name:port of next target String firstBadLink = ""; // first datanode that failed in connection setup Status mirrorInStatus = SUCCESS; @@ -708,7 +714,7 @@ class DataXceiver extends Receiver imple if (blockReceiver != null) { String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, - mirrorAddr, null, targets); + mirrorAddr, null, targets, false); // send close-ack for transfer-RBW/Finalized if (isTransfer) { @@ -747,6 +753,7 @@ class DataXceiver extends Receiver imple IOUtils.closeStream(replyOut); IOUtils.closeSocket(mirrorSock); IOUtils.closeStream(blockReceiver); + blockReceiver = null; } //update metrics @@ -983,7 +990,7 @@ class DataXceiver extends Receiver imple String errMsg = null; BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; - + DataOutputStream replyOut = new DataOutputStream(getOutputStream()); try { // get the output stream to the proxy final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname); @@ -1040,8 +1047,8 @@ class DataXceiver extends Receiver imple CachingStrategy.newDropBehind()); // receive a block - blockReceiver.receiveBlock(null, null, null, null, - dataXceiverServer.balanceThrottler, null); + blockReceiver.receiveBlock(null, null, replyOut, null, + dataXceiverServer.balanceThrottler, null, true); // notify name node datanode.notifyNamenodeReceivedBlock( @@ -1076,6 +1083,7 @@ class DataXceiver extends Receiver imple IOUtils.closeStream(proxyOut); IOUtils.closeStream(blockReceiver); IOUtils.closeStream(proxyReply); + IOUtils.closeStream(replyOut); } //update metrics Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Wed Aug 20 01:34:29 2014 @@ -27,11 +27,11 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.PeerServer; -import org.apache.hadoop.hdfs.server.balancer.Balancer; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Daemon; +import com.google.common.annotations.VisibleForTesting; /** * Server used for receiving/sending a block of data. @@ -45,6 +45,7 @@ class DataXceiverServer implements Runna private final PeerServer peerServer; private final DataNode datanode; private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>(); + private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>(); private boolean closed = false; /** @@ -217,18 +218,38 @@ class DataXceiverServer implements Runna } } - synchronized void addPeer(Peer peer, Thread t) throws IOException { + synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver) + throws IOException { if (closed) { throw new IOException("Server closed."); } peers.put(peer, t); + peersXceiver.put(peer, xceiver); } synchronized void closePeer(Peer peer) { peers.remove(peer); + peersXceiver.remove(peer); IOUtils.cleanup(null, peer); } + // Sending OOB to all peers + public synchronized void sendOOBToPeers() { + if (!datanode.shutdownForUpgrade) { + return; + } + + for (Peer p : peers.keySet()) { + try { + peersXceiver.get(p).sendOOB(); + } catch (IOException e) { + LOG.warn("Got error when sending OOB message.", e); + } catch (InterruptedException e) { + LOG.warn("Interrupted when sending OOB message."); + } + } + } + // Notify all peers of the shutdown and restart. // datanode.shouldRun should still be true and datanode.restarting should // be set true before calling this method. @@ -247,6 +268,7 @@ class DataXceiverServer implements Runna IOUtils.cleanup(LOG, p); } peers.clear(); + peersXceiver.clear(); } // Return the number of peers. @@ -254,7 +276,14 @@ class DataXceiverServer implements Runna return peers.size(); } + // Return the number of peers and DataXceivers. + @VisibleForTesting + synchronized int getNumPeersXceiver() { + return peersXceiver.size(); + } + synchronized void releasePeer(Peer peer) { peers.remove(peer); + peersXceiver.remove(peer); } } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java Wed Aug 20 01:34:29 2014 @@ -30,6 +30,8 @@ public class DatanodeUtil { public static final String DISK_ERROR = "Possible disk error: "; + private static final String SEP = System.getProperty("file.separator"); + /** Get the cause of an I/O exception if caused by a possible disk error * @param ioe an I/O exception * @return cause if the I/O exception is caused by a possible disk error; @@ -78,4 +80,38 @@ public class DatanodeUtil { public static File getUnlinkTmpFile(File f) { return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX); } + + /** + * Checks whether there are any files anywhere in the directory tree rooted + * at dir (directories don't count as files). dir must exist + * @return true if there are no files + * @throws IOException if unable to list subdirectories + */ + public static boolean dirNoFilesRecursive(File dir) throws IOException { + File[] contents = dir.listFiles(); + if (contents == null) { + throw new IOException("Cannot list contents of " + dir); + } + for (File f : contents) { + if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) { + return false; + } + } + return true; + } + + /** + * Get the directory where a finalized block with this ID should be stored. + * Do not attempt to create the directory. + * @param root the root directory where finalized blocks are stored + * @param blockId + * @return + */ + public static File idToBlockDir(File root, long blockId) { + int d1 = (int)((blockId >> 16) & 0xff); + int d2 = (int)((blockId >> 8) & 0xff); + String path = DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP + + DataStorage.BLOCK_SUBDIR_PREFIX + d2; + return new File(root, path); + } } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Wed Aug 20 01:34:29 2014 @@ -54,10 +54,10 @@ abstract public class ReplicaInfo extend private File baseDir; /** - * Ints representing the sub directory path from base dir to the directory - * containing this replica. + * Whether or not this replica's parent directory includes subdirs, in which + * case we can generate them based on the replica's block ID */ - private int[] subDirs; + private boolean hasSubdirs; private static final Map<String, File> internedBaseDirs = new HashMap<String, File>(); @@ -151,18 +151,8 @@ abstract public class ReplicaInfo extend * @return the parent directory path where this replica is located */ File getDir() { - if (subDirs == null) { - return null; - } - - StringBuilder sb = new StringBuilder(); - for (int i : subDirs) { - sb.append(DataStorage.BLOCK_SUBDIR_PREFIX); - sb.append(i); - sb.append("/"); - } - File ret = new File(baseDir, sb.toString()); - return ret; + return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir, + getBlockId()) : baseDir; } /** @@ -175,54 +165,46 @@ abstract public class ReplicaInfo extend private void setDirInternal(File dir) { if (dir == null) { - subDirs = null; baseDir = null; return; } - ReplicaDirInfo replicaDirInfo = parseSubDirs(dir); - this.subDirs = replicaDirInfo.subDirs; + ReplicaDirInfo dirInfo = parseBaseDir(dir); + this.hasSubdirs = dirInfo.hasSubidrs; synchronized (internedBaseDirs) { - if (!internedBaseDirs.containsKey(replicaDirInfo.baseDirPath)) { + if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) { // Create a new String path of this file and make a brand new File object // to guarantee we drop the reference to the underlying char[] storage. - File baseDir = new File(replicaDirInfo.baseDirPath); - internedBaseDirs.put(replicaDirInfo.baseDirPath, baseDir); + File baseDir = new File(dirInfo.baseDirPath); + internedBaseDirs.put(dirInfo.baseDirPath, baseDir); } - this.baseDir = internedBaseDirs.get(replicaDirInfo.baseDirPath); + this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath); } } - + @VisibleForTesting public static class ReplicaDirInfo { - @VisibleForTesting public String baseDirPath; - - @VisibleForTesting - public int[] subDirs; + public boolean hasSubidrs; + + public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) { + this.baseDirPath = baseDirPath; + this.hasSubidrs = hasSubidrs; + } } @VisibleForTesting - public static ReplicaDirInfo parseSubDirs(File dir) { - ReplicaDirInfo ret = new ReplicaDirInfo(); + public static ReplicaDirInfo parseBaseDir(File dir) { File currentDir = dir; - List<Integer> subDirList = new ArrayList<Integer>(); + boolean hasSubdirs = false; while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) { - // Prepend the integer into the list. - subDirList.add(0, Integer.parseInt(currentDir.getName().replaceFirst( - DataStorage.BLOCK_SUBDIR_PREFIX, ""))); + hasSubdirs = true; currentDir = currentDir.getParentFile(); } - ret.subDirs = new int[subDirList.size()]; - for (int i = 0; i < subDirList.size(); i++) { - ret.subDirs[i] = subDirList.get(i); - } - - ret.baseDirPath = currentDir.getAbsolutePath(); - return ret; + return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs); } /** Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java Wed Aug 20 01:34:29 2014 @@ -74,7 +74,7 @@ import com.google.common.collect.HashMul * DN also marks the block's slots as "unanchorable" to prevent additional * clients from initiating these operations in the future. * - * The counterpart fo this class on the client is {@link DfsClientShmManager}. + * The counterpart of this class on the client is {@link DfsClientShmManager}. */ public class ShortCircuitRegistry { public static final Log LOG = LogFactory.getLog(ShortCircuitRegistry.class); @@ -217,7 +217,32 @@ public class ShortCircuitRegistry { } return allowMunlock; } - + + /** + * Invalidate any slot associated with a blockId that we are invalidating + * (deleting) from this DataNode. When a slot is invalid, the DFSClient will + * not use the corresponding replica for new read or mmap operations (although + * existing, ongoing read or mmap operations will complete.) + * + * @param blockId The block ID. + */ + public synchronized void processBlockInvalidation(ExtendedBlockId blockId) { + if (!enabled) return; + final Set<Slot> affectedSlots = slots.get(blockId); + if (!affectedSlots.isEmpty()) { + final StringBuilder bld = new StringBuilder(); + String prefix = ""; + bld.append("Block ").append(blockId).append(" has been invalidated. "). + append("Marking short-circuit slots as invalid: "); + for (Slot slot : affectedSlots) { + slot.makeInvalid(); + bld.append(prefix).append(slot.toString()); + prefix = ", "; + } + LOG.info(bld.toString()); + } + } + public static class NewShmInfo implements Closeable { public final ShmId shmId; public final FileInputStream stream; Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java Wed Aug 20 01:34:29 2014 @@ -78,7 +78,7 @@ public class StorageLocation { * @return A StorageLocation object if successfully parsed, null otherwise. * Does not throw any exceptions. */ - static StorageLocation parse(String rawLocation) + public static StorageLocation parse(String rawLocation) throws IOException, SecurityException { Matcher matcher = regex.matcher(rawLocation); StorageType storageType = StorageType.DEFAULT; Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Wed Aug 20 01:34:29 2014 @@ -22,6 +22,7 @@ import java.io.File; import java.io.FileDescriptor; import java.io.IOException; import java.io.InputStream; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.dat import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; @@ -91,6 +93,10 @@ public interface FsDatasetSpi<V extends /** @return a list of volumes. */ public List<V> getVolumes(); + /** Add an array of StorageLocation to FsDataset. */ + public void addVolumes(Collection<StorageLocation> volumes) + throws IOException; + /** @return a storage with the given storage ID */ public DatanodeStorage getStorage(final String storageUuid); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java Wed Aug 20 01:34:29 2014 @@ -59,7 +59,8 @@ class BlockPoolSlice { private final String bpid; private final FsVolumeImpl volume; // volume to which this BlockPool belongs to private final File currentDir; // StorageDirectory/current/bpid/current - private final LDir finalizedDir; // directory store Finalized replica + // directory where finalized replicas are stored + private final File finalizedDir; private final File rbwDir; // directory store RBW replica private final File tmpDir; // directory store Temporary replica private static final String DU_CACHE_FILE = "dfsUsed"; @@ -82,8 +83,13 @@ class BlockPoolSlice { this.bpid = bpid; this.volume = volume; this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); - final File finalizedDir = new File( + this.finalizedDir = new File( currentDir, DataStorage.STORAGE_DIR_FINALIZED); + if (!this.finalizedDir.exists()) { + if (!this.finalizedDir.mkdirs()) { + throw new IOException("Failed to mkdirs " + this.finalizedDir); + } + } // Files that were being written when the datanode was last shutdown // are now moved back to the data directory. It is possible that @@ -95,10 +101,6 @@ class BlockPoolSlice { FileUtil.fullyDelete(tmpDir); } this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW); - final int maxBlocksPerDir = conf.getInt( - DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY, - DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT); - this.finalizedDir = new LDir(finalizedDir, maxBlocksPerDir); if (!rbwDir.mkdirs()) { // create rbw directory if not exist if (!rbwDir.isDirectory()) { throw new IOException("Mkdirs failed to create " + rbwDir.toString()); @@ -131,7 +133,7 @@ class BlockPoolSlice { } File getFinalizedDir() { - return finalizedDir.dir; + return finalizedDir; } File getRbwDir() { @@ -239,26 +241,57 @@ class BlockPoolSlice { } File addBlock(Block b, File f) throws IOException { - File blockFile = finalizedDir.addBlock(b, f); + File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); + if (!blockDir.exists()) { + if (!blockDir.mkdirs()) { + throw new IOException("Failed to mkdirs " + blockDir); + } + } + File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir); File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp()); dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length()); return blockFile; } void checkDirs() throws DiskErrorException { - finalizedDir.checkDirTree(); + DiskChecker.checkDirs(finalizedDir); DiskChecker.checkDir(tmpDir); DiskChecker.checkDir(rbwDir); } void getVolumeMap(ReplicaMap volumeMap) throws IOException { // add finalized replicas - finalizedDir.getVolumeMap(bpid, volumeMap, volume); + addToReplicasMap(volumeMap, finalizedDir, true); // add rbw replicas addToReplicasMap(volumeMap, rbwDir, false); } /** + * Recover an unlinked tmp file on datanode restart. If the original block + * does not exist, then the tmp file is renamed to be the + * original file name and the original name is returned; otherwise the tmp + * file is deleted and null is returned. + */ + File recoverTempUnlinkedBlock(File unlinkedTmp) throws IOException { + File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp); + if (blockFile.exists()) { + // If the original block file still exists, then no recovery is needed. + if (!unlinkedTmp.delete()) { + throw new IOException("Unable to cleanup unlinked tmp file " + + unlinkedTmp); + } + return null; + } else { + if (!unlinkedTmp.renameTo(blockFile)) { + throw new IOException("Unable to rename unlinked tmp file " + + unlinkedTmp); + } + return blockFile; + } + } + + + /** * Add replicas under the given directory to the volume map * @param volumeMap the replicas map * @param dir an input directory @@ -267,23 +300,34 @@ class BlockPoolSlice { */ void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized ) throws IOException { - File blockFiles[] = FileUtil.listFiles(dir); - for (File blockFile : blockFiles) { - if (!Block.isBlockFilename(blockFile)) + File files[] = FileUtil.listFiles(dir); + for (File file : files) { + if (file.isDirectory()) { + addToReplicasMap(volumeMap, file, isFinalized); + } + + if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) { + file = recoverTempUnlinkedBlock(file); + if (file == null) { // the original block still exists, so we cover it + // in another iteration and can continue here + continue; + } + } + if (!Block.isBlockFilename(file)) continue; long genStamp = FsDatasetUtil.getGenerationStampFromFile( - blockFiles, blockFile); - long blockId = Block.filename2id(blockFile.getName()); + files, file); + long blockId = Block.filename2id(file.getName()); ReplicaInfo newReplica = null; if (isFinalized) { newReplica = new FinalizedReplica(blockId, - blockFile.length(), genStamp, volume, blockFile.getParentFile()); + file.length(), genStamp, volume, file.getParentFile()); } else { boolean loadRwr = true; - File restartMeta = new File(blockFile.getParent() + - File.pathSeparator + "." + blockFile.getName() + ".restart"); + File restartMeta = new File(file.getParent() + + File.pathSeparator + "." + file.getName() + ".restart"); Scanner sc = null; try { sc = new Scanner(restartMeta); @@ -291,8 +335,8 @@ class BlockPoolSlice { if (sc.hasNextLong() && (sc.nextLong() > Time.now())) { // It didn't expire. Load the replica as a RBW. newReplica = new ReplicaBeingWritten(blockId, - validateIntegrityAndSetLength(blockFile, genStamp), - genStamp, volume, blockFile.getParentFile(), null); + validateIntegrityAndSetLength(file, genStamp), + genStamp, volume, file.getParentFile(), null); loadRwr = false; } sc.close(); @@ -301,7 +345,7 @@ class BlockPoolSlice { restartMeta.getPath()); } } catch (FileNotFoundException fnfe) { - // nothing to do here + // nothing to do hereFile dir = } finally { if (sc != null) { sc.close(); @@ -310,15 +354,15 @@ class BlockPoolSlice { // Restart meta doesn't exist or expired. if (loadRwr) { newReplica = new ReplicaWaitingToBeRecovered(blockId, - validateIntegrityAndSetLength(blockFile, genStamp), - genStamp, volume, blockFile.getParentFile()); + validateIntegrityAndSetLength(file, genStamp), + genStamp, volume, file.getParentFile()); } } ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica); if (oldReplica != null) { FsDatasetImpl.LOG.warn("Two block files with the same block id exist " + - "on disk: " + oldReplica.getBlockFile() + " and " + blockFile ); + "on disk: " + oldReplica.getBlockFile() + " and " + file ); } } } @@ -405,10 +449,6 @@ class BlockPoolSlice { } } - void clearPath(File f) { - finalizedDir.clearPath(f); - } - @Override public String toString() { return currentDir.getAbsolutePath(); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Wed Aug 20 01:34:29 2014 @@ -61,6 +61,7 @@ class FsDatasetAsyncDiskService { private static final long THREADS_KEEP_ALIVE_SECONDS = 60; private final DataNode datanode; + private final ThreadGroup threadGroup; private Map<File, ThreadPoolExecutor> executors = new HashMap<File, ThreadPoolExecutor>(); @@ -70,42 +71,52 @@ class FsDatasetAsyncDiskService { * * The AsyncDiskServices uses one ThreadPool per volume to do the async * disk operations. - * - * @param volumes The roots of the data volumes. */ - FsDatasetAsyncDiskService(DataNode datanode, File[] volumes) { + FsDatasetAsyncDiskService(DataNode datanode) { this.datanode = datanode; + this.threadGroup = new ThreadGroup(getClass().getSimpleName()); + } + + private void addExecutorForVolume(final File volume) { + ThreadFactory threadFactory = new ThreadFactory() { + int counter = 0; + + @Override + public Thread newThread(Runnable r) { + int thisIndex; + synchronized (this) { + thisIndex = counter++; + } + Thread t = new Thread(threadGroup, r); + t.setName("Async disk worker #" + thisIndex + + " for volume " + volume); + return t; + } + }; - final ThreadGroup threadGroup = new ThreadGroup(getClass().getSimpleName()); - // Create one ThreadPool per volume - for (int v = 0 ; v < volumes.length; v++) { - final File vol = volumes[v]; - ThreadFactory threadFactory = new ThreadFactory() { - int counter = 0; - - @Override - public Thread newThread(Runnable r) { - int thisIndex; - synchronized (this) { - thisIndex = counter++; - } - Thread t = new Thread(threadGroup, r); - t.setName("Async disk worker #" + thisIndex + - " for volume " + vol); - return t; - } - }; - - ThreadPoolExecutor executor = new ThreadPoolExecutor( - CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, - THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), threadFactory); - - // This can reduce the number of running threads - executor.allowCoreThreadTimeOut(true); - executors.put(vol, executor); + ThreadPoolExecutor executor = new ThreadPoolExecutor( + CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, + THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), threadFactory); + + // This can reduce the number of running threads + executor.allowCoreThreadTimeOut(true); + executors.put(volume, executor); + } + + /** + * Starts AsyncDiskService for a new volume + * @param volume the root of the new data volume. + */ + synchronized void addVolume(File volume) { + if (executors == null) { + throw new RuntimeException("AsyncDiskService is already shutdown"); } - + ThreadPoolExecutor executor = executors.get(volume); + if (executor != null) { + throw new RuntimeException("Volume " + volume + " is already existed."); + } + addExecutorForVolume(volume); } synchronized long countPendingDeletions() { Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Wed Aug 20 01:34:29 2014 @@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; @@ -201,6 +202,7 @@ class FsDatasetImpl implements FsDataset final Map<String, DatanodeStorage> storageMap; final FsDatasetAsyncDiskService asyncDiskService; final FsDatasetCache cacheManager; + private final Configuration conf; private final int validVolsRequired; final ReplicaMap volumeMap; @@ -215,6 +217,7 @@ class FsDatasetImpl implements FsDataset ) throws IOException { this.datanode = datanode; this.dataStorage = storage; + this.conf = conf; // The number of volumes required for operation is the total number // of volumes minus the number of failed volumes we can tolerate. final int volFailuresTolerated = @@ -241,38 +244,76 @@ class FsDatasetImpl implements FsDataset } storageMap = new HashMap<String, DatanodeStorage>(); - final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>( - storage.getNumStorageDirs()); - for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { - Storage.StorageDirectory sd = storage.getStorageDir(idx); - final File dir = sd.getCurrentDir(); - final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot()); - volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf, - storageType)); - LOG.info("Added volume - " + dir + ", StorageType: " + storageType); - storageMap.put(sd.getStorageUuid(), - new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType)); - } volumeMap = new ReplicaMap(this); - @SuppressWarnings("unchecked") final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl = ReflectionUtils.newInstance(conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); - volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl); - volumes.initializeReplicaMaps(volumeMap); + volumes = new FsVolumeList(volsFailed, blockChooserImpl); + asyncDiskService = new FsDatasetAsyncDiskService(datanode); - File[] roots = new File[storage.getNumStorageDirs()]; for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { - roots[idx] = storage.getStorageDir(idx).getCurrentDir(); + addVolume(dataLocations, storage.getStorageDir(idx)); } - asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots); + cacheManager = new FsDatasetCache(this); registerMBean(datanode.getDatanodeUuid()); } + private void addVolume(Collection<StorageLocation> dataLocations, + Storage.StorageDirectory sd) throws IOException { + final File dir = sd.getCurrentDir(); + final StorageType storageType = + getStorageTypeFromLocations(dataLocations, sd.getRoot()); + + // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is + // nothing needed to be rolled back to make various data structures, e.g., + // storageMap and asyncDiskService, consistent. + FsVolumeImpl fsVolume = new FsVolumeImpl( + this, sd.getStorageUuid(), dir, this.conf, storageType); + fsVolume.getVolumeMap(volumeMap); + + volumes.addVolume(fsVolume); + storageMap.put(sd.getStorageUuid(), + new DatanodeStorage(sd.getStorageUuid(), + DatanodeStorage.State.NORMAL, + storageType)); + asyncDiskService.addVolume(sd.getCurrentDir()); + + LOG.info("Added volume - " + dir + ", StorageType: " + storageType); + } + + /** + * Add an array of StorageLocation to FsDataset. + * + * @pre dataStorage must have these volumes. + * @param volumes + * @throws IOException + */ + @Override + public synchronized void addVolumes(Collection<StorageLocation> volumes) + throws IOException { + final Collection<StorageLocation> dataLocations = + DataNode.getStorageLocations(this.conf); + Map<String, Storage.StorageDirectory> allStorageDirs = + new HashMap<String, Storage.StorageDirectory>(); + for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) { + Storage.StorageDirectory sd = dataStorage.getStorageDir(idx); + allStorageDirs.put(sd.getRoot().getAbsolutePath(), sd); + } + + for (StorageLocation vol : volumes) { + String key = vol.getFile().getAbsolutePath(); + if (!allStorageDirs.containsKey(key)) { + LOG.warn("Attempt to add an invalid volume: " + vol.getFile()); + } else { + addVolume(dataLocations, allStorageDirs.get(key)); + } + } + } + private StorageType getStorageTypeFromLocations( Collection<StorageLocation> dataLocations, File dir) { for (StorageLocation dataLocation : dataLocations) { @@ -1150,7 +1191,7 @@ class FsDatasetImpl implements FsDataset return f; // if file is not null, but doesn't exist - possibly disk failed - datanode.checkDiskError(); + datanode.checkDiskErrorAsync(); } if (LOG.isDebugEnabled()) { @@ -1223,17 +1264,17 @@ class FsDatasetImpl implements FsDataset + ". Parent not found for file " + f); continue; } - ReplicaState replicaState = info.getState(); - if (replicaState == ReplicaState.FINALIZED || - (replicaState == ReplicaState.RUR && - ((ReplicaUnderRecovery)info).getOriginalReplica().getState() == - ReplicaState.FINALIZED)) { - v.clearPath(bpid, parent); - } volumeMap.remove(bpid, invalidBlks[i]); } + + // If a DFSClient has the replica in its cache of short-circuit file + // descriptors (and the client is using ShortCircuitShm), invalidate it. + datanode.getShortCircuitRegistry().processBlockInvalidation( + new ExtendedBlockId(invalidBlks[i].getBlockId(), bpid)); + // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId()); + // Delete the block asynchronously to make sure we can do it fast enough. // It's ok to unlink the block file before the uncache operation // finishes. Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Wed Aug 20 01:34:29 2014 @@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.datanode.DataStorage; +import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.util.DiskChecker.DiskErrorException; @@ -235,10 +236,6 @@ class FsVolumeImpl implements FsVolumeSp // dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length()); bp.addToReplicasMap(volumeMap, dir, isFinalized); } - - void clearPath(String bpid, File f) throws IOException { - getBlockPoolSlice(bpid).clearPath(f); - } @Override public String toString() { @@ -274,7 +271,8 @@ class FsVolumeImpl implements FsVolumeSp File finalizedDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_FINALIZED); File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW); - if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) { + if (finalizedDir.exists() && !DatanodeUtil.dirNoFilesRecursive( + finalizedDir)) { return false; } if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) { @@ -301,7 +299,8 @@ class FsVolumeImpl implements FsVolumeSp if (!rbwDir.delete()) { throw new IOException("Failed to delete " + rbwDir); } - if (!finalizedDir.delete()) { + if (!DatanodeUtil.dirNoFilesRecursive(finalizedDir) || + !FileUtil.fullyDelete(finalizedDir)) { throw new IOException("Failed to delete " + finalizedDir); } FileUtil.fullyDelete(tmpDir); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Wed Aug 20 01:34:29 2014 @@ -40,9 +40,8 @@ class FsVolumeList { private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser; private volatile int numFailedVolumes; - FsVolumeList(List<FsVolumeImpl> volumes, int failedVols, + FsVolumeList(int failedVols, VolumeChoosingPolicy<FsVolumeImpl> blockChooser) { - this.volumes = Collections.unmodifiableList(volumes); this.blockChooser = blockChooser; this.numFailedVolumes = failedVols; } @@ -101,12 +100,6 @@ class FsVolumeList { } return remaining; } - - void initializeReplicaMaps(ReplicaMap globalReplicaMap) throws IOException { - for (FsVolumeImpl v : volumes) { - v.getVolumeMap(globalReplicaMap); - } - } void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException { long totalStartTime = Time.monotonicNow(); @@ -205,6 +198,19 @@ class FsVolumeList { return volumes.toString(); } + /** + * Dynamically add new volumes to the existing volumes that this DN manages. + * @param newVolume the instance of new FsVolumeImpl. + */ + synchronized void addVolume(FsVolumeImpl newVolume) { + // Make a copy of volumes to add new volumes. + final List<FsVolumeImpl> volumeList = volumes == null ? + new ArrayList<FsVolumeImpl>() : + new ArrayList<FsVolumeImpl>(volumes); + volumeList.add(newVolume); + volumes = Collections.unmodifiableList(volumeList); + FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString()); + } void addBlockPool(final String bpid, final Configuration conf) throws IOException { long totalStartTime = Time.monotonicNow(); Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Aug 20 01:34:29 2014 @@ -764,8 +764,6 @@ public class FSDirectory implements Clos checkSnapshot(srcInode, null); } - - private class RenameOperation { private final INodesInPath srcIIP; private final INodesInPath dstIIP; @@ -798,7 +796,7 @@ public class FSDirectory implements Clos // snapshot is taken on the dst tree, changes will be recorded in the latest // snapshot of the src tree. if (isSrcInSnapshot) { - srcChild = srcChild.recordModification(srcIIP.getLatestSnapshotId()); + srcChild.recordModification(srcIIP.getLatestSnapshotId()); } // check srcChild for reference @@ -928,8 +926,7 @@ public class FSDirectory implements Clos updateCount(iip, 0, dsDelta, true); } - file = file.setFileReplication(replication, iip.getLatestSnapshotId(), - inodeMap); + file.setFileReplication(replication, iip.getLatestSnapshotId()); final short newBR = file.getBlockReplication(); // check newBR < oldBR case. @@ -1081,9 +1078,6 @@ public class FSDirectory implements Clos count++; } - // update inodeMap - removeFromInodeMap(Arrays.asList(allSrcInodes)); - trgInode.setModificationTime(timestamp, trgLatestSnapshot); trgParent.updateModificationTime(timestamp, trgLatestSnapshot); // update quota on the parent directory ('count' files removed, 0 space) @@ -1215,8 +1209,7 @@ public class FSDirectory implements Clos // record modification final int latestSnapshot = iip.getLatestSnapshotId(); - targetNode = targetNode.recordModification(latestSnapshot); - iip.setLastINode(targetNode); + targetNode.recordModification(latestSnapshot); // Remove the node from the namespace long removed = removeLastINode(iip); @@ -2125,7 +2118,7 @@ public class FSDirectory implements Clos } final int latest = iip.getLatestSnapshotId(); - dirNode = dirNode.recordModification(latest); + dirNode.recordModification(latest); dirNode.setQuota(nsQuota, dsQuota); return dirNode; } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Aug 20 01:34:29 2014 @@ -62,6 +62,8 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_OBJECTS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY; @@ -83,9 +85,6 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK; -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RANDOMIZE_BLOCK_LOCATIONS_PER_BLOCK_DEFAULT; - import static org.apache.hadoop.util.Time.now; import java.io.BufferedWriter; @@ -231,6 +230,7 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; @@ -2516,7 +2516,7 @@ public class FSNamesystem implements Nam boolean writeToEditLog, int latestSnapshot, boolean logRetryCache) throws IOException { - file = file.recordModification(latestSnapshot); + file.recordModification(latestSnapshot); final INodeFile cons = file.toUnderConstruction(leaseHolder, clientMachine); leaseManager.addLease(cons.getFileUnderConstructionFeature() @@ -3720,8 +3720,10 @@ public class FSNamesystem implements Nam StandbyException, IOException { FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.READ); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); readLock(); try { + src = FSDirectory.resolvePath(src, pathComponents, dir); checkOperation(OperationCategory.READ); if (isPermissionEnabled) { checkTraverse(pc, src); @@ -4209,7 +4211,7 @@ public class FSNamesystem implements Nam Preconditions.checkArgument(uc != null); leaseManager.removeLease(uc.getClientName(), src); - pendingFile = pendingFile.recordModification(latestSnapshot); + pendingFile.recordModification(latestSnapshot); // The file is no longer pending. // Create permanent INode, update blocks. No need to replace the inode here @@ -4298,7 +4300,30 @@ public class FSNamesystem implements Nam throw new IOException("Block (=" + lastblock + ") not found"); } } - INodeFile iFile = ((INode)storedBlock.getBlockCollection()).asFile(); + // + // The implementation of delete operation (see @deleteInternal method) + // first removes the file paths from namespace, and delays the removal + // of blocks to later time for better performance. When + // commitBlockSynchronization (this method) is called in between, the + // blockCollection of storedBlock could have been assigned to null by + // the delete operation, throw IOException here instead of NPE; if the + // file path is already removed from namespace by the delete operation, + // throw FileNotFoundException here, so not to proceed to the end of + // this method to add a CloseOp to the edit log for an already deleted + // file (See HDFS-6825). + // + BlockCollection blockCollection = storedBlock.getBlockCollection(); + if (blockCollection == null) { + throw new IOException("The blockCollection of " + storedBlock + + " is null, likely because the file owning this block was" + + " deleted and the block removal is delayed"); + } + INodeFile iFile = ((INode)blockCollection).asFile(); + if (isFileDeleted(iFile)) { + throw new FileNotFoundException("File not found: " + + iFile.getFullPathName() + ", likely due to delayed block" + + " removal"); + } if (!iFile.isUnderConstruction() || storedBlock.isComplete()) { if (LOG.isDebugEnabled()) { LOG.debug("Unexpected block (=" + lastblock @@ -4353,8 +4378,11 @@ public class FSNamesystem implements Nam // Otherwise fsck will report these blocks as MISSING, especially if the // blocksReceived from Datanodes take a long time to arrive. for (int i = 0; i < trimmedTargets.size(); i++) { - trimmedTargets.get(i).addBlock( - trimmedStorages.get(i), storedBlock); + DatanodeStorageInfo storageInfo = + trimmedTargets.get(i).getStorageInfo(trimmedStorages.get(i)); + if (storageInfo != null) { + storageInfo.addBlock(storedBlock); + } } } @@ -4914,6 +4942,28 @@ public class FSNamesystem implements Nam } } + DatanodeStorageReport[] getDatanodeStorageReport(final DatanodeReportType type + ) throws AccessControlException, StandbyException { + checkSuperuserPrivilege(); + checkOperation(OperationCategory.UNCHECKED); + readLock(); + try { + checkOperation(OperationCategory.UNCHECKED); + final DatanodeManager dm = getBlockManager().getDatanodeManager(); + final List<DatanodeDescriptor> datanodes = dm.getDatanodeListForReport(type); + + DatanodeStorageReport[] reports = new DatanodeStorageReport[datanodes.size()]; + for (int i = 0; i < reports.length; i++) { + final DatanodeDescriptor d = datanodes.get(i); + reports[i] = new DatanodeStorageReport(new DatanodeInfo(d), + d.getStorageReports()); + } + return reports; + } finally { + readUnlock(); + } + } + /** * Save namespace image. * This will save current namespace into fsimage file and empty edits file. @@ -5811,7 +5861,7 @@ public class FSNamesystem implements Nam } public void processIncrementalBlockReport(final DatanodeID nodeID, - final String poolId, final StorageReceivedDeletedBlocks srdb) + final StorageReceivedDeletedBlocks srdb) throws IOException { writeLock(); try { @@ -6061,7 +6111,6 @@ public class FSNamesystem implements Nam blockManager.shutdown(); } } - @Override // FSNamesystemMBean public int getNumLiveDataNodes() { @@ -6109,6 +6158,15 @@ public class FSNamesystem implements Nam } /** + * Storages are marked as "content stale" after NN restart or fails over and + * before NN receives the first Heartbeat followed by the first Blockreport. + */ + @Override // FSNamesystemMBean + public int getNumStaleStorages() { + return getBlockManager().getDatanodeManager().getNumStaleStorages(); + } + + /** * Sets the current generation stamp for legacy blocks */ void setGenerationStampV1(long stamp) { @@ -6262,9 +6320,28 @@ public class FSNamesystem implements Nam private boolean isFileDeleted(INodeFile file) { // Not in the inodeMap or in the snapshot but marked deleted. - if (dir.getInode(file.getId()) == null || - file.getParent() == null || (file.isWithSnapshot() && - file.getFileWithSnapshotFeature().isCurrentFileDeleted())) { + if (dir.getInode(file.getId()) == null) { + return true; + } + + // look at the path hierarchy to see if one parent is deleted by recursive + // deletion + INode tmpChild = file; + INodeDirectory tmpParent = file.getParent(); + while (true) { + if (tmpParent == null || + tmpParent.searchChildren(tmpChild.getLocalNameBytes()) < 0) { + return true; + } + if (tmpParent.isRoot()) { + break; + } + tmpChild = tmpParent; + tmpParent = tmpParent.getParent(); + } + + if (file.isWithSnapshot() && + file.getFileWithSnapshotFeature().isCurrentFileDeleted()) { return true; } return false; @@ -8183,9 +8260,11 @@ public class FSNamesystem implements Nam nnConf.checkAclsConfigFlag(); FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.READ); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); readLock(); try { checkOperation(OperationCategory.READ); + src = FSDirectory.resolvePath(src, pathComponents, dir); if (isPermissionEnabled) { checkPermission(pc, src, false, null, null, null, null); } @@ -8288,8 +8367,10 @@ public class FSNamesystem implements Nam } } checkOperation(OperationCategory.READ); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); readLock(); try { + src = FSDirectory.resolvePath(src, pathComponents, dir); checkOperation(OperationCategory.READ); if (isPermissionEnabled) { checkPathAccess(pc, src, FsAction.READ); @@ -8333,8 +8414,10 @@ public class FSNamesystem implements Nam nnConf.checkXAttrsConfigFlag(); final FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.READ); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); readLock(); try { + src = FSDirectory.resolvePath(src, pathComponents, dir); checkOperation(OperationCategory.READ); if (isPermissionEnabled) { /* To access xattr names, you need EXECUTE in the owning directory. */ @@ -8428,6 +8511,29 @@ public class FSNamesystem implements Nam } } + void checkAccess(String src, FsAction mode) throws AccessControlException, + FileNotFoundException, UnresolvedLinkException, IOException { + checkOperation(OperationCategory.READ); + byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + readLock(); + try { + checkOperation(OperationCategory.READ); + src = FSDirectory.resolvePath(src, pathComponents, dir); + if (dir.getINode(src) == null) { + throw new FileNotFoundException("Path not found"); + } + if (isPermissionEnabled) { + FSPermissionChecker pc = getPermissionChecker(); + checkPathAccess(pc, src, mode); + } + } catch (AccessControlException e) { + logAuditEvent(false, "checkAccess", src); + throw e; + } finally { + readUnlock(); + } + } + /** * Default AuditLogger implementation; used when no access logger is * defined in the config file. It can also be explicitly listed in the Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Wed Aug 20 01:34:29 2014 @@ -97,9 +97,9 @@ public abstract class INode implements I /** Set user */ final INode setUser(String user, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.setUser(user); - return nodeToUpdate; + recordModification(latestSnapshotId); + setUser(user); + return this; } /** * @param snapshotId @@ -122,9 +122,9 @@ public abstract class INode implements I /** Set group */ final INode setGroup(String group, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.setGroup(group); - return nodeToUpdate; + recordModification(latestSnapshotId); + setGroup(group); + return this; } /** @@ -148,9 +148,9 @@ public abstract class INode implements I /** Set the {@link FsPermission} of this {@link INode} */ INode setPermission(FsPermission permission, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.setPermission(permission); - return nodeToUpdate; + recordModification(latestSnapshotId); + setPermission(permission); + return this; } abstract AclFeature getAclFeature(int snapshotId); @@ -164,18 +164,18 @@ public abstract class INode implements I final INode addAclFeature(AclFeature aclFeature, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.addAclFeature(aclFeature); - return nodeToUpdate; + recordModification(latestSnapshotId); + addAclFeature(aclFeature); + return this; } abstract void removeAclFeature(); final INode removeAclFeature(int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.removeAclFeature(); - return nodeToUpdate; + recordModification(latestSnapshotId); + removeAclFeature(); + return this; } /** @@ -199,9 +199,9 @@ public abstract class INode implements I final INode addXAttrFeature(XAttrFeature xAttrFeature, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.addXAttrFeature(xAttrFeature); - return nodeToUpdate; + recordModification(latestSnapshotId); + addXAttrFeature(xAttrFeature); + return this; } /** @@ -211,9 +211,9 @@ public abstract class INode implements I final INode removeXAttrFeature(int lastestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(lastestSnapshotId); - nodeToUpdate.removeXAttrFeature(); - return nodeToUpdate; + recordModification(lastestSnapshotId); + removeXAttrFeature(); + return this; } /** @@ -298,11 +298,8 @@ public abstract class INode implements I * @param latestSnapshotId The id of the latest snapshot that has been taken. * Note that it is {@link Snapshot#CURRENT_STATE_ID} * if no snapshots have been taken. - * @return The current inode, which usually is the same object of this inode. - * However, in some cases, this inode may be replaced with a new inode - * for maintaining snapshots. The current inode is then the new inode. */ - abstract INode recordModification(final int latestSnapshotId) + abstract void recordModification(final int latestSnapshotId) throws QuotaExceededException; /** Check whether it's a reference. */ @@ -652,9 +649,9 @@ public abstract class INode implements I /** Set the last modification time of inode. */ public final INode setModificationTime(long modificationTime, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.setModificationTime(modificationTime); - return nodeToUpdate; + recordModification(latestSnapshotId); + setModificationTime(modificationTime); + return this; } /** @@ -682,9 +679,9 @@ public abstract class INode implements I */ public final INode setAccessTime(long accessTime, int latestSnapshotId) throws QuotaExceededException { - final INode nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.setAccessTime(accessTime); - return nodeToUpdate; + recordModification(latestSnapshotId); + setAccessTime(accessTime); + return this; } Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Wed Aug 20 01:34:29 2014 @@ -157,7 +157,7 @@ public class INodeDirectory extends INod return quota; } - private int searchChildren(byte[] name) { + int searchChildren(byte[] name) { return children == null? -1: Collections.binarySearch(children, name); } @@ -318,7 +318,7 @@ public class INodeDirectory extends INod } @Override - public INodeDirectory recordModification(int latestSnapshotId) + public void recordModification(int latestSnapshotId) throws QuotaExceededException { if (isInLatestSnapshot(latestSnapshotId) && !shouldRecordInSrcSnapshot(latestSnapshotId)) { @@ -330,7 +330,6 @@ public class INodeDirectory extends INod // record self in the diff list if necessary sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null); } - return this; } /** Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Aug 20 01:34:29 2014 @@ -284,7 +284,7 @@ public class INodeFile extends INodeWith } @Override - public INodeFile recordModification(final int latestSnapshotId) + public void recordModification(final int latestSnapshotId) throws QuotaExceededException { if (isInLatestSnapshot(latestSnapshotId) && !shouldRecordInSrcSnapshot(latestSnapshotId)) { @@ -296,7 +296,6 @@ public class INodeFile extends INodeWith // record self in the diff list if necessary sf.getDiffs().saveSelf2Snapshot(latestSnapshotId, this, null); } - return this; } public FileDiffList getDiffs() { @@ -344,11 +343,10 @@ public class INodeFile extends INodeWith /** Set the replication factor of this file. */ public final INodeFile setFileReplication(short replication, - int latestSnapshotId, final INodeMap inodeMap) - throws QuotaExceededException { - final INodeFile nodeToUpdate = recordModification(latestSnapshotId); - nodeToUpdate.setFileReplication(replication); - return nodeToUpdate; + int latestSnapshotId) throws QuotaExceededException { + recordModification(latestSnapshotId); + setFileReplication(replication); + return this; } /** @return preferred block size (in bytes) of the file. */ Modified: hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java?rev=1619019&r1=1619018&r2=1619019&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java Wed Aug 20 01:34:29 2014 @@ -93,9 +93,8 @@ public class INodeMap { "", "", new FsPermission((short) 0)), 0, 0) { @Override - INode recordModification(int latestSnapshotId) + void recordModification(int latestSnapshotId) throws QuotaExceededException { - return null; } @Override