Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Tue Aug 19 23:49:39 2014 @@ -18,13 +18,19 @@ package org.apache.hadoop.hdfs.server.datanode; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.HardLink; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.LayoutVersion; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; @@ -35,13 +41,31 @@ import org.apache.hadoop.hdfs.server.com import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DiskChecker; -import java.io.*; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.channels.FileLock; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; /** * Data storage information file. @@ -149,43 +173,99 @@ public class DataStorage extends Storage } /** - * Analyze storage directories. - * Recover from previous transitions if required. - * Perform fs state transition if necessary depending on the namespace info. - * Read storage info. - * <br> - * This method should be synchronized between multiple DN threads. Only the - * first DN thread does DN level storage dir recoverTransitionRead. - * + * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}} + */ + private void writeAll(Collection<StorageDirectory> dirs) throws IOException { + this.layoutVersion = getServiceLayoutVersion(); + for (StorageDirectory dir : dirs) { + writeProperties(dir); + } + } + + /** + * Add a list of volumes to be managed by DataStorage. If the volume is empty, + * format it, otherwise recover it from previous transitions if required. + * + * @param datanode the reference to DataNode. * @param nsInfo namespace information * @param dataDirs array of data storage directories * @param startOpt startup option * @throws IOException */ - synchronized void recoverTransitionRead(DataNode datanode, + synchronized void addStorageLocations(DataNode datanode, NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException { - if (initialized) { - // DN storage has been initialized, no need to do anything - return; + // Similar to recoverTransitionRead, it first ensures the datanode level + // format is completed. + List<StorageLocation> tmpDataDirs = + new ArrayList<StorageLocation>(dataDirs); + addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true); + + Collection<File> bpDataDirs = new ArrayList<File>(); + String bpid = nsInfo.getBlockPoolID(); + for (StorageLocation dir : dataDirs) { + File dnRoot = dir.getFile(); + File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot, + STORAGE_DIR_CURRENT)); + bpDataDirs.add(bpRoot); } - LOG.info("Data-node version: " + HdfsConstants.DATANODE_LAYOUT_VERSION - + " and name-node layout version: " + nsInfo.getLayoutVersion()); - - // 1. For each data directory calculate its state and - // check whether all is consistent before transitioning. - // Format and recover. - this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); - ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size()); + // mkdir for the list of BlockPoolStorage + makeBlockPoolDataDir(bpDataDirs, null); + BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid); + if (bpStorage == null) { + bpStorage = new BlockPoolSliceStorage( + nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(), + nsInfo.getClusterID()); + } + + bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt); + addBlockPoolStorage(bpid, bpStorage); + } + + /** + * Add a list of volumes to be managed by this DataStorage. If the volume is + * empty, it formats the volume, otherwise it recovers it from previous + * transitions if required. + * + * If isInitialize is false, only the directories that have finished the + * doTransition() process will be added into DataStorage. + * + * @param datanode the reference to DataNode. + * @param nsInfo namespace information + * @param dataDirs array of data storage directories + * @param startOpt startup option + * @param isInitialize whether it is called when DataNode starts up. + * @throws IOException + */ + private synchronized void addStorageLocations(DataNode datanode, + NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, + StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs) + throws IOException { + Set<String> existingStorageDirs = new HashSet<String>(); + for (int i = 0; i < getNumStorageDirs(); i++) { + existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath()); + } + + // 1. For each data directory calculate its state and check whether all is + // consistent before transitioning. Format and recover. + ArrayList<StorageState> dataDirStates = + new ArrayList<StorageState>(dataDirs.size()); + List<StorageDirectory> addedStorageDirectories = + new ArrayList<StorageDirectory>(); for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) { File dataDir = it.next().getFile(); + if (existingStorageDirs.contains(dataDir.getAbsolutePath())) { + LOG.info("Storage directory " + dataDir + " has already been used."); + it.remove(); + continue; + } StorageDirectory sd = new StorageDirectory(dataDir); StorageState curState; try { curState = sd.analyzeStorage(startOpt, this); // sd is locked but not opened - switch(curState) { + switch (curState) { case NORMAL: break; case NON_EXISTENT: @@ -194,7 +274,8 @@ public class DataStorage extends Storage it.remove(); continue; case NOT_FORMATTED: // format - LOG.info("Storage directory " + dataDir + " is not formatted"); + LOG.info("Storage directory " + dataDir + " is not formatted for " + + nsInfo.getBlockPoolID()); LOG.info("Formatting ..."); format(sd, nsInfo, datanode.getDatanodeUuid()); break; @@ -208,28 +289,82 @@ public class DataStorage extends Storage //continue with other good dirs continue; } - // add to the storage list - addStorageDir(sd); + if (isInitialize) { + addStorageDir(sd); + } + addedStorageDirectories.add(sd); dataDirStates.add(curState); } - if (dataDirs.size() == 0 || dataDirStates.size() == 0) // none of the data dirs exist + if (dataDirs.size() == 0 || dataDirStates.size() == 0) { + // none of the data dirs exist + if (ignoreExistingDirs) { + return; + } throw new IOException( "All specified directories are not accessible or do not exist."); + } // 2. Do transitions // Each storage directory is treated individually. - // During startup some of them can upgrade or rollback - // while others could be uptodate for the regular startup. - for(int idx = 0; idx < getNumStorageDirs(); idx++) { - doTransition(datanode, getStorageDir(idx), nsInfo, startOpt); - createStorageID(getStorageDir(idx)); + // During startup some of them can upgrade or rollback + // while others could be up-to-date for the regular startup. + for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator(); + it.hasNext(); ) { + StorageDirectory sd = it.next(); + try { + doTransition(datanode, sd, nsInfo, startOpt); + createStorageID(sd); + } catch (IOException e) { + if (!isInitialize) { + sd.unlock(); + it.remove(); + continue; + } + unlockAll(); + throw e; + } } + + // 3. Update all successfully loaded storages. Some of them might have just + // been formatted. + this.writeAll(addedStorageDirectories); + + // 4. Make newly loaded storage directories visible for service. + if (!isInitialize) { + this.storageDirs.addAll(addedStorageDirectories); + } + } + + /** + * Analyze storage directories. + * Recover from previous transitions if required. + * Perform fs state transition if necessary depending on the namespace info. + * Read storage info. + * <br> + * This method should be synchronized between multiple DN threads. Only the + * first DN thread does DN level storage dir recoverTransitionRead. + * + * @param nsInfo namespace information + * @param dataDirs array of data storage directories + * @param startOpt startup option + * @throws IOException + */ + synchronized void recoverTransitionRead(DataNode datanode, + NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs, + StartupOption startOpt) + throws IOException { + if (initialized) { + // DN storage has been initialized, no need to do anything + return; + } + LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION + + " and NameNode layout version: " + nsInfo.getLayoutVersion()); + + this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); + addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false); - // 3. Update all storages. Some of them might have just been formatted. - this.writeAll(); - - // 4. mark DN storage is initialized + // mark DN storage is initialized this.initialized = true; } @@ -256,6 +391,7 @@ public class DataStorage extends Storage STORAGE_DIR_CURRENT)); bpDataDirs.add(bpRoot); } + // mkdir for the list of BlockPoolStorage makeBlockPoolDataDir(bpDataDirs, null); BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage( @@ -483,7 +619,7 @@ public class DataStorage extends Storage // do upgrade if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) { - doUpgrade(sd, nsInfo); // upgrade + doUpgrade(datanode, sd, nsInfo); // upgrade return; } @@ -518,7 +654,8 @@ public class DataStorage extends Storage * @param sd storage directory * @throws IOException on error */ - void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException { + void doUpgrade(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo) + throws IOException { // If the existing on-disk layout version supportes federation, simply // update its layout version. if (DataNodeLayoutVersion.supports( @@ -563,7 +700,8 @@ public class DataStorage extends Storage BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(nsInfo.getNamespaceID(), nsInfo.getBlockPoolID(), nsInfo.getCTime(), nsInfo.getClusterID()); bpStorage.format(curDir, nsInfo); - linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT)); + linkAllBlocks(datanode, tmpDir, bbwDir, new File(curBpDir, + STORAGE_DIR_CURRENT)); // 4. Write version file under <SD>/current layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION; @@ -741,22 +879,22 @@ public class DataStorage extends Storage * * @throws IOException If error occurs during hardlink */ - private void linkAllBlocks(File fromDir, File fromBbwDir, File toDir) - throws IOException { + private void linkAllBlocks(DataNode datanode, File fromDir, File fromBbwDir, + File toDir) throws IOException { HardLink hardLink = new HardLink(); // do the link int diskLayoutVersion = this.getLayoutVersion(); if (DataNodeLayoutVersion.supports( LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) { // hardlink finalized blocks in tmpDir/finalized - linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED), + linkBlocks(datanode, new File(fromDir, STORAGE_DIR_FINALIZED), new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink); // hardlink rbw blocks in tmpDir/rbw - linkBlocks(new File(fromDir, STORAGE_DIR_RBW), + linkBlocks(datanode, new File(fromDir, STORAGE_DIR_RBW), new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink); } else { // pre-RBW version // hardlink finalized blocks in tmpDir - linkBlocks(fromDir, new File(toDir, STORAGE_DIR_FINALIZED), + linkBlocks(datanode, fromDir, new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink); if (fromBbwDir.exists()) { /* @@ -765,15 +903,67 @@ public class DataStorage extends Storage * NOT underneath the 'current' directory in those releases. See * HDFS-3731 for details. */ - linkBlocks(fromBbwDir, + linkBlocks(datanode, fromBbwDir, new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion, hardLink); } } LOG.info( hardLink.linkStats.report() ); } + + private static class LinkArgs { + public File src; + public File dst; + + public LinkArgs(File src, File dst) { + this.src = src; + this.dst = dst; + } + } + + static void linkBlocks(DataNode datanode, File from, File to, int oldLV, + HardLink hl) throws IOException { + boolean upgradeToIdBasedLayout = false; + // If we are upgrading from a version older than the one where we introduced + // block ID-based layout AND we're working with the finalized directory, + // we'll need to upgrade from the old flat layout to the block ID-based one + if (oldLV > DataNodeLayoutVersion.Feature.BLOCKID_BASED_LAYOUT.getInfo(). + getLayoutVersion() && to.getName().equals(STORAGE_DIR_FINALIZED)) { + upgradeToIdBasedLayout = true; + } + + final List<LinkArgs> idBasedLayoutSingleLinks = Lists.newArrayList(); + linkBlocksHelper(from, to, oldLV, hl, upgradeToIdBasedLayout, to, + idBasedLayoutSingleLinks); + int numLinkWorkers = datanode.getConf().getInt( + DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY, + DFSConfigKeys.DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS); + ExecutorService linkWorkers = Executors.newFixedThreadPool(numLinkWorkers); + final int step = idBasedLayoutSingleLinks.size() / numLinkWorkers + 1; + List<Future<Void>> futures = Lists.newArrayList(); + for (int i = 0; i < idBasedLayoutSingleLinks.size(); i += step) { + final int iCopy = i; + futures.add(linkWorkers.submit(new Callable<Void>() { + @Override + public Void call() throws IOException { + int upperBound = Math.min(iCopy + step, + idBasedLayoutSingleLinks.size()); + for (int j = iCopy; j < upperBound; j++) { + LinkArgs cur = idBasedLayoutSingleLinks.get(j); + NativeIO.link(cur.src, cur.dst); + } + return null; + } + })); + } + linkWorkers.shutdown(); + for (Future<Void> f : futures) { + Futures.get(f, IOException.class); + } + } - static void linkBlocks(File from, File to, int oldLV, HardLink hl) - throws IOException { + static void linkBlocksHelper(File from, File to, int oldLV, HardLink hl, + boolean upgradeToIdBasedLayout, File blockRoot, + List<LinkArgs> idBasedLayoutSingleLinks) throws IOException { if (!from.exists()) { return; } @@ -800,9 +990,6 @@ public class DataStorage extends Storage // from is a directory hl.linkStats.countDirs++; - if (!to.mkdirs()) - throw new IOException("Cannot create directory " + to); - String[] blockNames = from.list(new java.io.FilenameFilter() { @Override public boolean accept(File dir, String name) { @@ -810,12 +997,36 @@ public class DataStorage extends Storage } }); + // If we are upgrading to block ID-based layout, we don't want to recreate + // any subdirs from the source that contain blocks, since we have a new + // directory structure + if (!upgradeToIdBasedLayout || !to.getName().startsWith( + BLOCK_SUBDIR_PREFIX)) { + if (!to.mkdirs()) + throw new IOException("Cannot create directory " + to); + } + // Block files just need hard links with the same file names // but a different directory if (blockNames.length > 0) { - HardLink.createHardLinkMult(from, blockNames, to); - hl.linkStats.countMultLinks++; - hl.linkStats.countFilesMultLinks += blockNames.length; + if (upgradeToIdBasedLayout) { + for (String blockName : blockNames) { + long blockId = Block.getBlockId(blockName); + File blockLocation = DatanodeUtil.idToBlockDir(blockRoot, blockId); + if (!blockLocation.exists()) { + if (!blockLocation.mkdirs()) { + throw new IOException("Failed to mkdirs " + blockLocation); + } + } + idBasedLayoutSingleLinks.add(new LinkArgs(new File(from, blockName), + new File(blockLocation, blockName))); + hl.linkStats.countSingleLinks++; + } + } else { + HardLink.createHardLinkMult(from, blockNames, to); + hl.linkStats.countMultLinks++; + hl.linkStats.countFilesMultLinks += blockNames.length; + } } else { hl.linkStats.countEmptyDirs++; } @@ -829,8 +1040,9 @@ public class DataStorage extends Storage } }); for(int i = 0; i < otherNames.length; i++) - linkBlocks(new File(from, otherNames[i]), - new File(to, otherNames[i]), oldLV, hl); + linkBlocksHelper(new File(from, otherNames[i]), + new File(to, otherNames[i]), oldLV, hl, upgradeToIdBasedLayout, + blockRoot, idBasedLayoutSingleLinks); } /**
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Aug 19 23:49:39 2014 @@ -36,28 +36,27 @@ import java.io.IOException; import java.io.InputStream; import java.io.InterruptedIOException; import java.io.OutputStream; -import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketException; -import java.net.UnknownHostException; import java.nio.channels.ClosedChannelException; +import java.security.MessageDigest; import java.util.Arrays; import org.apache.commons.logging.Log; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.Op; import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; @@ -83,7 +82,7 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; -import com.google.common.net.InetAddresses; +import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; @@ -104,7 +103,8 @@ class DataXceiver extends Receiver imple private long opStartTime; //the start time of receiving an Op private final InputStream socketIn; private OutputStream socketOut; - + private BlockReceiver blockReceiver = null; + /** * Client Name used in previous operation. Not available on first request * on the socket. @@ -160,6 +160,12 @@ class DataXceiver extends Receiver imple return socketOut; } + public void sendOOB() throws IOException, InterruptedException { + LOG.info("Sending OOB to peer: " + peer); + if(blockReceiver!=null) + blockReceiver.sendOOB(); + } + /** * Read/write data from/to the DataXceiverServer. */ @@ -169,27 +175,14 @@ class DataXceiver extends Receiver imple Op op = null; try { - dataXceiverServer.addPeer(peer, Thread.currentThread()); + dataXceiverServer.addPeer(peer, Thread.currentThread(), this); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; - if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer && - !dnConf.trustedChannelResolver.isTrusted(getClientAddress(peer))){ - IOStreamPair encryptedStreams = null; - try { - encryptedStreams = DataTransferEncryptor.getEncryptedStreams(socketOut, - socketIn, datanode.blockPoolTokenSecretManager, - dnConf.encryptionAlgorithm); - } catch (InvalidMagicNumberException imne) { - LOG.info("Failed to read expected encryption handshake from client " + - "at " + peer.getRemoteAddressString() + ". Perhaps the client " + - "is running an older version of Hadoop which does not support " + - "encryption"); - return; - } - input = encryptedStreams.in; - socketOut = encryptedStreams.out; - } - input = new BufferedInputStream(input, HdfsConstants.SMALL_BUFFER_SIZE); + IOStreamPair saslStreams = datanode.saslServer.receive(peer, socketOut, + socketIn, datanode.getDatanodeId()); + input = new BufferedInputStream(saslStreams.in, + HdfsConstants.SMALL_BUFFER_SIZE); + socketOut = saslStreams.out; super.initialize(new DataInputStream(input)); @@ -261,19 +254,6 @@ class DataXceiver extends Receiver imple } } } - - /** - * Returns InetAddress from peer - * The getRemoteAddressString is the form /ip-address:port - * The ip-address is extracted from peer and InetAddress is formed - * @param peer - * @return - * @throws UnknownHostException - */ - private static InetAddress getClientAddress(Peer peer) { - return InetAddresses.forString( - peer.getRemoteAddressString().split(":")[0].substring(1)); - } @Override public void requestShortCircuitFds(final ExtendedBlock blk, @@ -552,9 +532,11 @@ class DataXceiver extends Receiver imple @Override public void writeBlock(final ExtendedBlock block, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientname, final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes, final DatanodeInfo srcDataNode, final BlockConstructionStage stage, final int pipelineSize, @@ -609,7 +591,6 @@ class DataXceiver extends Receiver imple DataOutputStream mirrorOut = null; // stream to next target DataInputStream mirrorIn = null; // reply from next target Socket mirrorSock = null; // socket to next target - BlockReceiver blockReceiver = null; // responsible for data handling String mirrorNode = null; // the name:port of next target String firstBadLink = ""; // first datanode that failed in connection setup Status mirrorInStatus = SUCCESS; @@ -618,12 +599,13 @@ class DataXceiver extends Receiver imple if (isDatanode || stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { // open a block receiver - blockReceiver = new BlockReceiver(block, in, + blockReceiver = new BlockReceiver(block, storageType, in, peer.getRemoteAddressString(), peer.getLocalAddressString(), stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd, clientname, srcDataNode, datanode, requestedChecksum, cachingStrategy); + storageUuid = blockReceiver.getStorageUuid(); } else { storageUuid = datanode.data.recoverClose( @@ -654,25 +636,20 @@ class DataXceiver extends Receiver imple OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock, writeTimeout); InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock); - if (dnConf.encryptDataTransfer && - !dnConf.trustedChannelResolver.isTrusted(mirrorSock.getInetAddress())) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufMirrorOut, unbufMirrorIn, - datanode.blockPoolTokenSecretManager - .generateDataEncryptionKey(block.getBlockPoolId())); - - unbufMirrorOut = encryptedStreams.out; - unbufMirrorIn = encryptedStreams.in; - } + DataEncryptionKeyFactory keyFactory = + datanode.getDataEncryptionKeyFactoryForBlock(block); + IOStreamPair saslStreams = datanode.saslClient.socketSend(mirrorSock, + unbufMirrorOut, unbufMirrorIn, keyFactory, blockToken, targets[0]); + unbufMirrorOut = saslStreams.out; + unbufMirrorIn = saslStreams.in; mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut, HdfsConstants.SMALL_BUFFER_SIZE)); mirrorIn = new DataInputStream(unbufMirrorIn); - new Sender(mirrorOut).writeBlock(originalBlock, blockToken, - clientname, targets, srcDataNode, stage, pipelineSize, - minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum, - cachingStrategy); + new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0], + blockToken, clientname, targets, targetStorageTypes, srcDataNode, + stage, pipelineSize, minBytesRcvd, maxBytesRcvd, + latestGenerationStamp, requestedChecksum, cachingStrategy); mirrorOut.flush(); @@ -737,7 +714,7 @@ class DataXceiver extends Receiver imple if (blockReceiver != null) { String mirrorAddr = (mirrorSock == null) ? null : mirrorNode; blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, - mirrorAddr, null, targets); + mirrorAddr, null, targets, false); // send close-ack for transfer-RBW/Finalized if (isTransfer) { @@ -776,6 +753,7 @@ class DataXceiver extends Receiver imple IOUtils.closeStream(replyOut); IOUtils.closeSocket(mirrorSock); IOUtils.closeStream(blockReceiver); + blockReceiver = null; } //update metrics @@ -787,7 +765,8 @@ class DataXceiver extends Receiver imple public void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, - final DatanodeInfo[] targets) throws IOException { + final DatanodeInfo[] targets, + final StorageType[] targetStorageTypes) throws IOException { checkAccess(socketOut, true, blk, blockToken, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY); previousOpClientName = clientName; @@ -796,13 +775,51 @@ class DataXceiver extends Receiver imple final DataOutputStream out = new DataOutputStream( getOutputStream()); try { - datanode.transferReplicaForPipelineRecovery(blk, targets, clientName); + datanode.transferReplicaForPipelineRecovery(blk, targets, + targetStorageTypes, clientName); writeResponse(Status.SUCCESS, null, out); } finally { IOUtils.closeStream(out); } } - + + private MD5Hash calcPartialBlockChecksum(ExtendedBlock block, + long requestLength, DataChecksum checksum, DataInputStream checksumIn) + throws IOException { + final int bytesPerCRC = checksum.getBytesPerChecksum(); + final int csize = checksum.getChecksumSize(); + final byte[] buffer = new byte[4*1024]; + MessageDigest digester = MD5Hash.getDigester(); + + long remaining = requestLength / bytesPerCRC * csize; + for (int toDigest = 0; remaining > 0; remaining -= toDigest) { + toDigest = checksumIn.read(buffer, 0, + (int) Math.min(remaining, buffer.length)); + if (toDigest < 0) { + break; + } + digester.update(buffer, 0, toDigest); + } + + int partialLength = (int) (requestLength % bytesPerCRC); + if (partialLength > 0) { + byte[] buf = new byte[partialLength]; + final InputStream blockIn = datanode.data.getBlockInputStream(block, + requestLength - partialLength); + try { + // Get the CRC of the partialLength. + IOUtils.readFully(blockIn, buf, 0, partialLength); + } finally { + IOUtils.closeStream(blockIn); + } + checksum.update(buf, 0, partialLength); + byte[] partialCrc = new byte[csize]; + checksum.writeValue(partialCrc, 0, true); + digester.update(partialCrc); + } + return new MD5Hash(digester.digest()); + } + @Override public void blockChecksum(final ExtendedBlock block, final Token<BlockTokenIdentifier> blockToken) throws IOException { @@ -810,25 +827,32 @@ class DataXceiver extends Receiver imple getOutputStream()); checkAccess(out, true, block, blockToken, Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ); - updateCurrentThreadName("Reading metadata for block " + block); - final LengthInputStream metadataIn = - datanode.data.getMetaDataInputStream(block); - final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream( - metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); + // client side now can specify a range of the block for checksum + long requestLength = block.getNumBytes(); + Preconditions.checkArgument(requestLength >= 0); + long visibleLength = datanode.data.getReplicaVisibleLength(block); + boolean partialBlk = requestLength < visibleLength; + updateCurrentThreadName("Reading metadata for block " + block); + final LengthInputStream metadataIn = datanode.data + .getMetaDataInputStream(block); + + final DataInputStream checksumIn = new DataInputStream( + new BufferedInputStream(metadataIn, HdfsConstants.IO_FILE_BUFFER_SIZE)); updateCurrentThreadName("Getting checksum for block " + block); try { //read metadata file - final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn); - final DataChecksum checksum = header.getChecksum(); + final BlockMetadataHeader header = BlockMetadataHeader + .readHeader(checksumIn); + final DataChecksum checksum = header.getChecksum(); + final int csize = checksum.getChecksumSize(); final int bytesPerCRC = checksum.getBytesPerChecksum(); - final long crcPerBlock = checksum.getChecksumSize() > 0 - ? (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize() - : 0; - - //compute block checksum - final MD5Hash md5 = MD5Hash.digest(checksumIn); + final long crcPerBlock = csize <= 0 ? 0 : + (metadataIn.getLength() - BlockMetadataHeader.getHeaderSize()) / csize; + final MD5Hash md5 = partialBlk && crcPerBlock > 0 ? + calcPartialBlockChecksum(block, requestLength, checksum, checksumIn) + : MD5Hash.digest(checksumIn); if (LOG.isDebugEnabled()) { LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5); @@ -841,8 +865,7 @@ class DataXceiver extends Receiver imple .setBytesPerCrc(bytesPerCRC) .setCrcPerBlock(crcPerBlock) .setMd5(ByteString.copyFrom(md5.getDigest())) - .setCrcType(PBHelper.convert(checksum.getChecksumType())) - ) + .setCrcType(PBHelper.convert(checksum.getChecksumType()))) .build() .writeDelimitedTo(out); out.flush(); @@ -931,6 +954,7 @@ class DataXceiver extends Receiver imple @Override public void replaceBlock(final ExtendedBlock block, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String delHint, final DatanodeInfo proxySource) throws IOException { @@ -966,7 +990,7 @@ class DataXceiver extends Receiver imple String errMsg = null; BlockReceiver blockReceiver = null; DataInputStream proxyReply = null; - + DataOutputStream replyOut = new DataOutputStream(getOutputStream()); try { // get the output stream to the proxy final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname); @@ -981,17 +1005,12 @@ class DataXceiver extends Receiver imple OutputStream unbufProxyOut = NetUtils.getOutputStream(proxySock, dnConf.socketWriteTimeout); InputStream unbufProxyIn = NetUtils.getInputStream(proxySock); - if (dnConf.encryptDataTransfer && - !dnConf.trustedChannelResolver.isTrusted( - proxySock.getInetAddress())) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufProxyOut, unbufProxyIn, - datanode.blockPoolTokenSecretManager - .generateDataEncryptionKey(block.getBlockPoolId())); - unbufProxyOut = encryptedStreams.out; - unbufProxyIn = encryptedStreams.in; - } + DataEncryptionKeyFactory keyFactory = + datanode.getDataEncryptionKeyFactoryForBlock(block); + IOStreamPair saslStreams = datanode.saslClient.socketSend(proxySock, + unbufProxyOut, unbufProxyIn, keyFactory, blockToken, proxySource); + unbufProxyOut = saslStreams.out; + unbufProxyIn = saslStreams.in; proxyOut = new DataOutputStream(new BufferedOutputStream(unbufProxyOut, HdfsConstants.SMALL_BUFFER_SIZE)); @@ -1021,15 +1040,15 @@ class DataXceiver extends Receiver imple DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto( checksumInfo.getChecksum()); // open a block receiver and check if the block does not exist - blockReceiver = new BlockReceiver( - block, proxyReply, proxySock.getRemoteSocketAddress().toString(), + blockReceiver = new BlockReceiver(block, storageType, + proxyReply, proxySock.getRemoteSocketAddress().toString(), proxySock.getLocalSocketAddress().toString(), null, 0, 0, 0, "", null, datanode, remoteChecksum, CachingStrategy.newDropBehind()); // receive a block - blockReceiver.receiveBlock(null, null, null, null, - dataXceiverServer.balanceThrottler, null); + blockReceiver.receiveBlock(null, null, replyOut, null, + dataXceiverServer.balanceThrottler, null, true); // notify name node datanode.notifyNamenodeReceivedBlock( @@ -1064,6 +1083,7 @@ class DataXceiver extends Receiver imple IOUtils.closeStream(proxyOut); IOUtils.closeStream(blockReceiver); IOUtils.closeStream(proxyReply); + IOUtils.closeStream(replyOut); } //update metrics @@ -1077,7 +1097,7 @@ class DataXceiver extends Receiver imple /** * Utility function for sending a response. * - * @param opStatus status message to write + * @param status status message to write * @param message message to send to the client or other DN */ private void sendResponse(Status status, Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Tue Aug 19 23:49:39 2014 @@ -27,11 +27,11 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.PeerServer; -import org.apache.hadoop.hdfs.server.balancer.Balancer; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Daemon; +import com.google.common.annotations.VisibleForTesting; /** * Server used for receiving/sending a block of data. @@ -45,6 +45,7 @@ class DataXceiverServer implements Runna private final PeerServer peerServer; private final DataNode datanode; private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>(); + private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>(); private boolean closed = false; /** @@ -63,14 +64,17 @@ class DataXceiverServer implements Runna */ static class BlockBalanceThrottler extends DataTransferThrottler { private int numThreads; + private int maxThreads; /**Constructor * * @param bandwidth Total amount of bandwidth can be used for balancing */ - private BlockBalanceThrottler(long bandwidth) { + private BlockBalanceThrottler(long bandwidth, int maxThreads) { super(bandwidth); + this.maxThreads = maxThreads; LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s"); + LOG.info("Number threads for balancing is "+ maxThreads); } /** Check if the block move can start. @@ -79,7 +83,7 @@ class DataXceiverServer implements Runna * the counter is incremented; False otherwise. */ synchronized boolean acquire() { - if (numThreads >= Balancer.MAX_NUM_CONCURRENT_MOVES) { + if (numThreads >= maxThreads) { return false; } numThreads++; @@ -120,8 +124,10 @@ class DataXceiverServer implements Runna //set up parameter for cluster balancing this.balanceThrottler = new BlockBalanceThrottler( - conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, - DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT)); + conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT), + conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT)); } @Override @@ -212,18 +218,38 @@ class DataXceiverServer implements Runna } } - synchronized void addPeer(Peer peer, Thread t) throws IOException { + synchronized void addPeer(Peer peer, Thread t, DataXceiver xceiver) + throws IOException { if (closed) { throw new IOException("Server closed."); } peers.put(peer, t); + peersXceiver.put(peer, xceiver); } synchronized void closePeer(Peer peer) { peers.remove(peer); + peersXceiver.remove(peer); IOUtils.cleanup(null, peer); } + // Sending OOB to all peers + public synchronized void sendOOBToPeers() { + if (!datanode.shutdownForUpgrade) { + return; + } + + for (Peer p : peers.keySet()) { + try { + peersXceiver.get(p).sendOOB(); + } catch (IOException e) { + LOG.warn("Got error when sending OOB message.", e); + } catch (InterruptedException e) { + LOG.warn("Interrupted when sending OOB message."); + } + } + } + // Notify all peers of the shutdown and restart. // datanode.shouldRun should still be true and datanode.restarting should // be set true before calling this method. @@ -242,6 +268,7 @@ class DataXceiverServer implements Runna IOUtils.cleanup(LOG, p); } peers.clear(); + peersXceiver.clear(); } // Return the number of peers. @@ -249,7 +276,14 @@ class DataXceiverServer implements Runna return peers.size(); } + // Return the number of peers and DataXceivers. + @VisibleForTesting + synchronized int getNumPeersXceiver() { + return peersXceiver.size(); + } + synchronized void releasePeer(Peer peer) { peers.remove(peer); + peersXceiver.remove(peer); } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeUtil.java Tue Aug 19 23:49:39 2014 @@ -30,6 +30,8 @@ public class DatanodeUtil { public static final String DISK_ERROR = "Possible disk error: "; + private static final String SEP = System.getProperty("file.separator"); + /** Get the cause of an I/O exception if caused by a possible disk error * @param ioe an I/O exception * @return cause if the I/O exception is caused by a possible disk error; @@ -78,4 +80,38 @@ public class DatanodeUtil { public static File getUnlinkTmpFile(File f) { return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX); } + + /** + * Checks whether there are any files anywhere in the directory tree rooted + * at dir (directories don't count as files). dir must exist + * @return true if there are no files + * @throws IOException if unable to list subdirectories + */ + public static boolean dirNoFilesRecursive(File dir) throws IOException { + File[] contents = dir.listFiles(); + if (contents == null) { + throw new IOException("Cannot list contents of " + dir); + } + for (File f : contents) { + if (!f.isDirectory() || (f.isDirectory() && !dirNoFilesRecursive(f))) { + return false; + } + } + return true; + } + + /** + * Get the directory where a finalized block with this ID should be stored. + * Do not attempt to create the directory. + * @param root the root directory where finalized blocks are stored + * @param blockId + * @return + */ + public static File idToBlockDir(File root, long blockId) { + int d1 = (int)((blockId >> 16) & 0xff); + int d2 = (int)((blockId >> 8) & 0xff); + String path = DataStorage.BLOCK_SUBDIR_PREFIX + d1 + SEP + + DataStorage.BLOCK_SUBDIR_PREFIX + d2; + return new File(root, path); + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Tue Aug 19 23:49:39 2014 @@ -108,8 +108,7 @@ public class DirectoryScanner implements ScanInfoPerBlockPool(int sz) {super(sz);} /** - * Merges "that" ScanInfoPerBlockPool into this one - * @param that + * Merges {@code that} ScanInfoPerBlockPool into this one */ public void addAll(ScanInfoPerBlockPool that) { if (that == null) return; Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java Tue Aug 19 23:49:39 2014 @@ -54,7 +54,7 @@ public class FinalizedReplica extends Re /** * Copy constructor. - * @param from + * @param from where to copy construct from */ public FinalizedReplica(FinalizedReplica from) { super(from); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java Tue Aug 19 23:49:39 2014 @@ -68,7 +68,7 @@ public class ReplicaBeingWritten extends /** * Copy constructor. - * @param from + * @param from where to copy from */ public ReplicaBeingWritten(ReplicaBeingWritten from) { super(from); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java Tue Aug 19 23:49:39 2014 @@ -89,7 +89,7 @@ public class ReplicaInPipeline extends R /** * Copy constructor. - * @param from + * @param from where to copy from */ public ReplicaInPipeline(ReplicaInPipeline from) { super(from); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java Tue Aug 19 23:49:39 2014 @@ -40,7 +40,7 @@ public interface ReplicaInPipelineInterf /** * Set the number bytes that have acked - * @param bytesAcked + * @param bytesAcked number bytes acked */ void setBytesAcked(long bytesAcked); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Tue Aug 19 23:49:39 2014 @@ -54,10 +54,10 @@ abstract public class ReplicaInfo extend private File baseDir; /** - * Ints representing the sub directory path from base dir to the directory - * containing this replica. + * Whether or not this replica's parent directory includes subdirs, in which + * case we can generate them based on the replica's block ID */ - private int[] subDirs; + private boolean hasSubdirs; private static final Map<String, File> internedBaseDirs = new HashMap<String, File>(); @@ -100,7 +100,7 @@ abstract public class ReplicaInfo extend /** * Copy constructor. - * @param from + * @param from where to copy from */ ReplicaInfo(ReplicaInfo from) { this(from, from.getVolume(), from.getDir()); @@ -151,18 +151,8 @@ abstract public class ReplicaInfo extend * @return the parent directory path where this replica is located */ File getDir() { - if (subDirs == null) { - return null; - } - - StringBuilder sb = new StringBuilder(); - for (int i : subDirs) { - sb.append(DataStorage.BLOCK_SUBDIR_PREFIX); - sb.append(i); - sb.append("/"); - } - File ret = new File(baseDir, sb.toString()); - return ret; + return hasSubdirs ? DatanodeUtil.idToBlockDir(baseDir, + getBlockId()) : baseDir; } /** @@ -175,54 +165,46 @@ abstract public class ReplicaInfo extend private void setDirInternal(File dir) { if (dir == null) { - subDirs = null; baseDir = null; return; } - ReplicaDirInfo replicaDirInfo = parseSubDirs(dir); - this.subDirs = replicaDirInfo.subDirs; + ReplicaDirInfo dirInfo = parseBaseDir(dir); + this.hasSubdirs = dirInfo.hasSubidrs; synchronized (internedBaseDirs) { - if (!internedBaseDirs.containsKey(replicaDirInfo.baseDirPath)) { + if (!internedBaseDirs.containsKey(dirInfo.baseDirPath)) { // Create a new String path of this file and make a brand new File object // to guarantee we drop the reference to the underlying char[] storage. - File baseDir = new File(replicaDirInfo.baseDirPath); - internedBaseDirs.put(replicaDirInfo.baseDirPath, baseDir); + File baseDir = new File(dirInfo.baseDirPath); + internedBaseDirs.put(dirInfo.baseDirPath, baseDir); } - this.baseDir = internedBaseDirs.get(replicaDirInfo.baseDirPath); + this.baseDir = internedBaseDirs.get(dirInfo.baseDirPath); } } - + @VisibleForTesting public static class ReplicaDirInfo { - @VisibleForTesting public String baseDirPath; - - @VisibleForTesting - public int[] subDirs; + public boolean hasSubidrs; + + public ReplicaDirInfo (String baseDirPath, boolean hasSubidrs) { + this.baseDirPath = baseDirPath; + this.hasSubidrs = hasSubidrs; + } } @VisibleForTesting - public static ReplicaDirInfo parseSubDirs(File dir) { - ReplicaDirInfo ret = new ReplicaDirInfo(); + public static ReplicaDirInfo parseBaseDir(File dir) { File currentDir = dir; - List<Integer> subDirList = new ArrayList<Integer>(); + boolean hasSubdirs = false; while (currentDir.getName().startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)) { - // Prepend the integer into the list. - subDirList.add(0, Integer.parseInt(currentDir.getName().replaceFirst( - DataStorage.BLOCK_SUBDIR_PREFIX, ""))); + hasSubdirs = true; currentDir = currentDir.getParentFile(); } - ret.subDirs = new int[subDirList.size()]; - for (int i = 0; i < subDirList.size(); i++) { - ret.subDirs[i] = subDirList.get(i); - } - - ret.baseDirPath = currentDir.getAbsolutePath(); - return ret; + return new ReplicaDirInfo(currentDir.getAbsolutePath(), hasSubdirs); } /** Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java Tue Aug 19 23:49:39 2014 @@ -50,7 +50,7 @@ public class ReplicaUnderRecovery extend /** * Copy constructor. - * @param from + * @param from where to copy from */ public ReplicaUnderRecovery(ReplicaUnderRecovery from) { super(from); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java Tue Aug 19 23:49:39 2014 @@ -60,7 +60,7 @@ public class ReplicaWaitingToBeRecovered /** * Copy constructor. - * @param from + * @param from where to copy from */ public ReplicaWaitingToBeRecovered(ReplicaWaitingToBeRecovered from) { super(from); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java Tue Aug 19 23:49:39 2014 @@ -74,7 +74,7 @@ import com.google.common.collect.HashMul * DN also marks the block's slots as "unanchorable" to prevent additional * clients from initiating these operations in the future. * - * The counterpart fo this class on the client is {@link DfsClientShmManager}. + * The counterpart of this class on the client is {@link DfsClientShmManager}. */ public class ShortCircuitRegistry { public static final Log LOG = LogFactory.getLog(ShortCircuitRegistry.class); @@ -217,7 +217,32 @@ public class ShortCircuitRegistry { } return allowMunlock; } - + + /** + * Invalidate any slot associated with a blockId that we are invalidating + * (deleting) from this DataNode. When a slot is invalid, the DFSClient will + * not use the corresponding replica for new read or mmap operations (although + * existing, ongoing read or mmap operations will complete.) + * + * @param blockId The block ID. + */ + public synchronized void processBlockInvalidation(ExtendedBlockId blockId) { + if (!enabled) return; + final Set<Slot> affectedSlots = slots.get(blockId); + if (!affectedSlots.isEmpty()) { + final StringBuilder bld = new StringBuilder(); + String prefix = ""; + bld.append("Block ").append(blockId).append(" has been invalidated. "). + append("Marking short-circuit slots as invalid: "); + for (Slot slot : affectedSlots) { + slot.makeInvalid(); + bld.append(prefix).append(slot.toString()); + prefix = ", "; + } + LOG.info(bld.toString()); + } + } + public static class NewShmInfo implements Closeable { public final ShmId shmId; public final FileInputStream stream; Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java Tue Aug 19 23:49:39 2014 @@ -78,7 +78,8 @@ public class StorageLocation { * @return A StorageLocation object if successfully parsed, null otherwise. * Does not throw any exceptions. */ - static StorageLocation parse(String rawLocation) throws IOException { + public static StorageLocation parse(String rawLocation) + throws IOException, SecurityException { Matcher matcher = regex.matcher(rawLocation); StorageType storageType = StorageType.DEFAULT; String location = rawLocation; Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/AvailableSpaceVolumeChoosingPolicy.java Tue Aug 19 23:49:39 2014 @@ -45,11 +45,19 @@ public class AvailableSpaceVolumeChoosin private static final Log LOG = LogFactory.getLog(AvailableSpaceVolumeChoosingPolicy.class); - private static final Random RAND = new Random(); + private final Random random; private long balancedSpaceThreshold = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_THRESHOLD_DEFAULT; private float balancedPreferencePercent = DFS_DATANODE_AVAILABLE_SPACE_VOLUME_CHOOSING_POLICY_BALANCED_SPACE_PREFERENCE_FRACTION_DEFAULT; + AvailableSpaceVolumeChoosingPolicy(Random random) { + this.random = random; + } + + public AvailableSpaceVolumeChoosingPolicy() { + this(new Random()); + } + @Override public synchronized void setConf(Configuration conf) { balancedSpaceThreshold = conf.getLong( @@ -128,7 +136,7 @@ public class AvailableSpaceVolumeChoosin (highAvailableVolumes.size() * balancedPreferencePercent) / preferencePercentScaler; if (mostAvailableAmongLowVolumes < replicaSize || - RAND.nextFloat() < scaledPreferencePercent) { + random.nextFloat() < scaledPreferencePercent) { volume = roundRobinPolicyHighAvailable.chooseVolume( highAvailableVolumes, replicaSize); @@ -165,13 +173,8 @@ public class AvailableSpaceVolumeChoosin } /** - * Check if the available space on all the volumes is roughly equal. - * - * @param volumes the volumes to check - * @return true if all volumes' free space is within the configured threshold, - * false otherwise. - * @throws IOException - * in the event of error checking amount of available space + * @return true if all volumes' free space is within the + * configured threshold, false otherwise. */ public boolean areAllVolumesWithinFreeSpaceThreshold() { long leastAvailable = Long.MAX_VALUE; Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Tue Aug 19 23:49:39 2014 @@ -19,14 +19,17 @@ package org.apache.hadoop.hdfs.server.da import java.io.File; +import java.io.FileDescriptor; import java.io.IOException; import java.io.InputStream; +import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; @@ -37,6 +40,7 @@ import org.apache.hadoop.hdfs.server.dat import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.Replica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface; +import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; @@ -89,6 +93,10 @@ public interface FsDatasetSpi<V extends /** @return a list of volumes. */ public List<V> getVolumes(); + /** Add an array of StorageLocation to FsDataset. */ + public void addVolumes(Collection<StorageLocation> volumes) + throws IOException; + /** @return a storage with the given storage ID */ public DatanodeStorage getStorage(final String storageUuid); @@ -124,16 +132,14 @@ public interface FsDatasetSpi<V extends /** * Returns the specified block's on-disk length (excluding metadata) - * @param b * @return the specified block's on-disk length (excluding metadta) - * @throws IOException + * @throws IOException on error */ public long getLength(ExtendedBlock b) throws IOException; /** * Get reference to the replica meta info in the replicasMap. * To be called from methods that are synchronized on {@link FSDataset} - * @param blockId * @return replica from the replicas map */ @Deprecated @@ -151,8 +157,8 @@ public interface FsDatasetSpi<V extends /** * Returns an input stream at specified offset of the specified block - * @param b - * @param seekOffset + * @param b block + * @param seekOffset offset with in the block to seek to * @return an input stream to read the contents of the specified block, * starting at the offset * @throws IOException @@ -163,9 +169,6 @@ public interface FsDatasetSpi<V extends /** * Returns an input stream at specified offset of the specified block * The block is still in the tmp directory and is not finalized - * @param b - * @param blkoff - * @param ckoff * @return an input stream to read the contents of the specified block, * starting at the offset * @throws IOException @@ -180,8 +183,8 @@ public interface FsDatasetSpi<V extends * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createTemporary(ExtendedBlock b - ) throws IOException; + public ReplicaInPipelineInterface createTemporary(StorageType storageType, + ExtendedBlock b) throws IOException; /** * Creates a RBW replica and returns the meta info of the replica @@ -190,8 +193,8 @@ public interface FsDatasetSpi<V extends * @return the meta info of the replica which is being written to * @throws IOException if an error occurs */ - public ReplicaInPipelineInterface createRbw(ExtendedBlock b - ) throws IOException; + public ReplicaInPipelineInterface createRbw(StorageType storageType, + ExtendedBlock b) throws IOException; /** * Recovers a RBW replica and returns the meta info of the replica @@ -256,7 +259,6 @@ public interface FsDatasetSpi<V extends * Finalizes the block previously opened for writing using writeToBlock. * The block size is what is in the parameter b and it must match the amount * of data written - * @param b * @throws IOException */ public void finalizeBlock(ExtendedBlock b) throws IOException; @@ -264,7 +266,6 @@ public interface FsDatasetSpi<V extends /** * Unfinalizes the block previously opened for writing using writeToBlock. * The temporary file associated with this block is deleted. - * @param b * @throws IOException */ public void unfinalizeBlock(ExtendedBlock b) throws IOException; @@ -289,14 +290,12 @@ public interface FsDatasetSpi<V extends /** * Is the block valid? - * @param b * @return - true if the specified block is valid */ public boolean isValidBlock(ExtendedBlock b); /** * Is the block a valid RBW? - * @param b * @return - true if the specified block is a valid RBW */ public boolean isValidRbw(ExtendedBlock b); @@ -327,7 +326,7 @@ public interface FsDatasetSpi<V extends * Determine if the specified block is cached. * @param bpid Block pool id * @param blockIds - block id - * @returns true if the block is cached + * @return true if the block is cached */ public boolean isCached(String bpid, long blockId); @@ -440,5 +439,12 @@ public interface FsDatasetSpi<V extends * @return true when trash is enabled */ public boolean trashEnabled(String bpid); + + /** + * submit a sync_file_range request to AsyncDiskService + */ + public void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block, + final FileDescriptor fd, final long offset, final long nbytes, + final int flags); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java Tue Aug 19 23:49:39 2014 @@ -59,7 +59,8 @@ class BlockPoolSlice { private final String bpid; private final FsVolumeImpl volume; // volume to which this BlockPool belongs to private final File currentDir; // StorageDirectory/current/bpid/current - private final LDir finalizedDir; // directory store Finalized replica + // directory where finalized replicas are stored + private final File finalizedDir; private final File rbwDir; // directory store RBW replica private final File tmpDir; // directory store Temporary replica private static final String DU_CACHE_FILE = "dfsUsed"; @@ -74,7 +75,7 @@ class BlockPoolSlice { * @param bpid Block pool Id * @param volume {@link FsVolumeImpl} to which this BlockPool belongs to * @param bpDir directory corresponding to the BlockPool - * @param conf + * @param conf configuration * @throws IOException */ BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir, @@ -82,8 +83,13 @@ class BlockPoolSlice { this.bpid = bpid; this.volume = volume; this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); - final File finalizedDir = new File( + this.finalizedDir = new File( currentDir, DataStorage.STORAGE_DIR_FINALIZED); + if (!this.finalizedDir.exists()) { + if (!this.finalizedDir.mkdirs()) { + throw new IOException("Failed to mkdirs " + this.finalizedDir); + } + } // Files that were being written when the datanode was last shutdown // are now moved back to the data directory. It is possible that @@ -95,16 +101,6 @@ class BlockPoolSlice { FileUtil.fullyDelete(tmpDir); } this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW); - final boolean supportAppends = conf.getBoolean( - DFSConfigKeys.DFS_SUPPORT_APPEND_KEY, - DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT); - if (rbwDir.exists() && !supportAppends) { - FileUtil.fullyDelete(rbwDir); - } - final int maxBlocksPerDir = conf.getInt( - DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY, - DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT); - this.finalizedDir = new LDir(finalizedDir, maxBlocksPerDir); if (!rbwDir.mkdirs()) { // create rbw directory if not exist if (!rbwDir.isDirectory()) { throw new IOException("Mkdirs failed to create " + rbwDir.toString()); @@ -137,7 +133,7 @@ class BlockPoolSlice { } File getFinalizedDir() { - return finalizedDir.dir; + return finalizedDir; } File getRbwDir() { @@ -245,26 +241,57 @@ class BlockPoolSlice { } File addBlock(Block b, File f) throws IOException { - File blockFile = finalizedDir.addBlock(b, f); + File blockDir = DatanodeUtil.idToBlockDir(finalizedDir, b.getBlockId()); + if (!blockDir.exists()) { + if (!blockDir.mkdirs()) { + throw new IOException("Failed to mkdirs " + blockDir); + } + } + File blockFile = FsDatasetImpl.moveBlockFiles(b, f, blockDir); File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp()); dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length()); return blockFile; } void checkDirs() throws DiskErrorException { - finalizedDir.checkDirTree(); + DiskChecker.checkDirs(finalizedDir); DiskChecker.checkDir(tmpDir); DiskChecker.checkDir(rbwDir); } void getVolumeMap(ReplicaMap volumeMap) throws IOException { // add finalized replicas - finalizedDir.getVolumeMap(bpid, volumeMap, volume); + addToReplicasMap(volumeMap, finalizedDir, true); // add rbw replicas addToReplicasMap(volumeMap, rbwDir, false); } /** + * Recover an unlinked tmp file on datanode restart. If the original block + * does not exist, then the tmp file is renamed to be the + * original file name and the original name is returned; otherwise the tmp + * file is deleted and null is returned. + */ + File recoverTempUnlinkedBlock(File unlinkedTmp) throws IOException { + File blockFile = FsDatasetUtil.getOrigFile(unlinkedTmp); + if (blockFile.exists()) { + // If the original block file still exists, then no recovery is needed. + if (!unlinkedTmp.delete()) { + throw new IOException("Unable to cleanup unlinked tmp file " + + unlinkedTmp); + } + return null; + } else { + if (!unlinkedTmp.renameTo(blockFile)) { + throw new IOException("Unable to rename unlinked tmp file " + + unlinkedTmp); + } + return blockFile; + } + } + + + /** * Add replicas under the given directory to the volume map * @param volumeMap the replicas map * @param dir an input directory @@ -273,23 +300,34 @@ class BlockPoolSlice { */ void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized ) throws IOException { - File blockFiles[] = FileUtil.listFiles(dir); - for (File blockFile : blockFiles) { - if (!Block.isBlockFilename(blockFile)) + File files[] = FileUtil.listFiles(dir); + for (File file : files) { + if (file.isDirectory()) { + addToReplicasMap(volumeMap, file, isFinalized); + } + + if (isFinalized && FsDatasetUtil.isUnlinkTmpFile(file)) { + file = recoverTempUnlinkedBlock(file); + if (file == null) { // the original block still exists, so we cover it + // in another iteration and can continue here + continue; + } + } + if (!Block.isBlockFilename(file)) continue; long genStamp = FsDatasetUtil.getGenerationStampFromFile( - blockFiles, blockFile); - long blockId = Block.filename2id(blockFile.getName()); + files, file); + long blockId = Block.filename2id(file.getName()); ReplicaInfo newReplica = null; if (isFinalized) { newReplica = new FinalizedReplica(blockId, - blockFile.length(), genStamp, volume, blockFile.getParentFile()); + file.length(), genStamp, volume, file.getParentFile()); } else { boolean loadRwr = true; - File restartMeta = new File(blockFile.getParent() + - File.pathSeparator + "." + blockFile.getName() + ".restart"); + File restartMeta = new File(file.getParent() + + File.pathSeparator + "." + file.getName() + ".restart"); Scanner sc = null; try { sc = new Scanner(restartMeta); @@ -297,8 +335,8 @@ class BlockPoolSlice { if (sc.hasNextLong() && (sc.nextLong() > Time.now())) { // It didn't expire. Load the replica as a RBW. newReplica = new ReplicaBeingWritten(blockId, - validateIntegrityAndSetLength(blockFile, genStamp), - genStamp, volume, blockFile.getParentFile(), null); + validateIntegrityAndSetLength(file, genStamp), + genStamp, volume, file.getParentFile(), null); loadRwr = false; } sc.close(); @@ -307,7 +345,7 @@ class BlockPoolSlice { restartMeta.getPath()); } } catch (FileNotFoundException fnfe) { - // nothing to do here + // nothing to do hereFile dir = } finally { if (sc != null) { sc.close(); @@ -316,15 +354,15 @@ class BlockPoolSlice { // Restart meta doesn't exist or expired. if (loadRwr) { newReplica = new ReplicaWaitingToBeRecovered(blockId, - validateIntegrityAndSetLength(blockFile, genStamp), - genStamp, volume, blockFile.getParentFile()); + validateIntegrityAndSetLength(file, genStamp), + genStamp, volume, file.getParentFile()); } } ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica); if (oldReplica != null) { FsDatasetImpl.LOG.warn("Two block files with the same block id exist " + - "on disk: " + oldReplica.getBlockFile() + " and " + blockFile ); + "on disk: " + oldReplica.getBlockFile() + " and " + file ); } } } @@ -411,10 +449,6 @@ class BlockPoolSlice { } } - void clearPath(File f) { - finalizedDir.clearPath(f); - } - @Override public String toString() { return currentDir.getAbsolutePath(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Tue Aug 19 23:49:39 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; +import java.io.FileDescriptor; import java.util.HashMap; import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; @@ -31,6 +32,8 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIOException; /** * This class is a container of multiple thread pools, each for a volume, @@ -42,6 +45,7 @@ import org.apache.hadoop.hdfs.server.pro * can be slow, and we don't want to use a single thread pool because that * is inefficient when we have more than 1 volume. AsyncDiskService is the * solution for these. + * Another example of async disk operation is requesting sync_file_range(). * * This class and {@link org.apache.hadoop.util.AsyncDiskService} are similar. * They should be combined. @@ -57,6 +61,7 @@ class FsDatasetAsyncDiskService { private static final long THREADS_KEEP_ALIVE_SECONDS = 60; private final DataNode datanode; + private final ThreadGroup threadGroup; private Map<File, ThreadPoolExecutor> executors = new HashMap<File, ThreadPoolExecutor>(); @@ -66,42 +71,52 @@ class FsDatasetAsyncDiskService { * * The AsyncDiskServices uses one ThreadPool per volume to do the async * disk operations. - * - * @param volumes The roots of the data volumes. */ - FsDatasetAsyncDiskService(DataNode datanode, File[] volumes) { + FsDatasetAsyncDiskService(DataNode datanode) { this.datanode = datanode; + this.threadGroup = new ThreadGroup(getClass().getSimpleName()); + } + + private void addExecutorForVolume(final File volume) { + ThreadFactory threadFactory = new ThreadFactory() { + int counter = 0; + + @Override + public Thread newThread(Runnable r) { + int thisIndex; + synchronized (this) { + thisIndex = counter++; + } + Thread t = new Thread(threadGroup, r); + t.setName("Async disk worker #" + thisIndex + + " for volume " + volume); + return t; + } + }; - final ThreadGroup threadGroup = new ThreadGroup(getClass().getSimpleName()); - // Create one ThreadPool per volume - for (int v = 0 ; v < volumes.length; v++) { - final File vol = volumes[v]; - ThreadFactory threadFactory = new ThreadFactory() { - int counter = 0; - - @Override - public Thread newThread(Runnable r) { - int thisIndex; - synchronized (this) { - thisIndex = counter++; - } - Thread t = new Thread(threadGroup, r); - t.setName("Async disk worker #" + thisIndex + - " for volume " + vol); - return t; - } - }; - - ThreadPoolExecutor executor = new ThreadPoolExecutor( - CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, - THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), threadFactory); - - // This can reduce the number of running threads - executor.allowCoreThreadTimeOut(true); - executors.put(vol, executor); + ThreadPoolExecutor executor = new ThreadPoolExecutor( + CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, + THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), threadFactory); + + // This can reduce the number of running threads + executor.allowCoreThreadTimeOut(true); + executors.put(volume, executor); + } + + /** + * Starts AsyncDiskService for a new volume + * @param volume the root of the new data volume. + */ + synchronized void addVolume(File volume) { + if (executors == null) { + throw new RuntimeException("AsyncDiskService is already shutdown"); } - + ThreadPoolExecutor executor = executors.get(volume); + if (executor != null) { + throw new RuntimeException("Volume " + volume + " is already existed."); + } + addExecutorForVolume(volume); } synchronized long countPendingDeletions() { @@ -148,6 +163,21 @@ class FsDatasetAsyncDiskService { } } + public void submitSyncFileRangeRequest(FsVolumeImpl volume, + final FileDescriptor fd, final long offset, final long nbytes, + final int flags) { + execute(volume.getCurrentDir(), new Runnable() { + @Override + public void run() { + try { + NativeIO.POSIX.syncFileRangeIfPossible(fd, offset, nbytes, flags); + } catch (NativeIOException e) { + LOG.warn("sync_file_range error", e); + } + } + }); + } + /** * Delete the block file and meta file from the disk asynchronously, adjust * dfsUsed statistics accordingly.