Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java Tue Aug 19 23:49:39 2014 @@ -98,7 +98,6 @@ public class BlockMetadataHeader { /** * This reads all the fields till the beginning of checksum. - * @param in * @return Metadata Header * @throws IOException */ @@ -109,9 +108,7 @@ public class BlockMetadataHeader { /** * Reads header at the top of metadata file and returns the header. * - * @param dataset - * @param block - * @return + * @return metadata header for the block * @throws IOException */ public static BlockMetadataHeader readHeader(File file) throws IOException { @@ -147,8 +144,6 @@ public class BlockMetadataHeader { /** * This writes all the fields till the beginning of checksum. * @param out DataOutputStream - * @param header - * @return * @throws IOException */ @VisibleForTesting @@ -161,9 +156,7 @@ public class BlockMetadataHeader { /** * Writes all the fields till the beginning of checksum. - * @param out - * @param checksum - * @throws IOException + * @throws IOException on error */ static void writeHeader(DataOutputStream out, DataChecksum checksum) throws IOException {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolManager.java Tue Aug 19 23:49:39 2014 @@ -88,7 +88,11 @@ class BlockPoolManager { synchronized void remove(BPOfferService t) { offerServices.remove(t); - bpByBlockPoolId.remove(t.getBlockPoolId()); + if (t.hasBlockPoolId()) { + // It's possible that the block pool never successfully registered + // with any NN, so it was never added it to this map + bpByBlockPoolId.remove(t.getBlockPoolId()); + } boolean removed = false; for (Iterator<BPOfferService> it = bpByNameserviceId.values().iterator(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Tue Aug 19 23:49:39 2014 @@ -84,6 +84,10 @@ class BlockPoolSliceScanner { private final SortedSet<BlockScanInfo> blockInfoSet = new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR); + + private final SortedSet<BlockScanInfo> newBlockInfoSet = + new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR); + private final GSet<Block, BlockScanInfo> blockMap = new LightWeightGSet<Block, BlockScanInfo>( LightWeightGSet.computeCapacity(0.5, "BlockMap")); @@ -97,7 +101,7 @@ class BlockPoolSliceScanner { private long totalTransientErrors = 0; private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only - private long currentPeriodStart = Time.now(); + private long currentPeriodStart = Time.monotonicNow(); private long bytesLeft = 0; // Bytes to scan in this period private long totalBytesToScan = 0; private boolean isNewPeriod = true; @@ -195,7 +199,7 @@ class BlockPoolSliceScanner { BlockScanInfo info = new BlockScanInfo( block ); info.lastScanTime = scanTime--; //still keep 'info.lastScanType' to NONE. - addBlockInfo(info); + addBlockInfo(info, false); } RollingLogs rollingLogs = null; @@ -221,25 +225,42 @@ class BlockPoolSliceScanner { // Should we change throttler bandwidth every time bytesLeft changes? // not really required. } - - private synchronized void addBlockInfo(BlockScanInfo info) { - boolean added = blockInfoSet.add(info); + + /** + * Add the BlockScanInfo to sorted set of blockScanInfo + * @param info BlockScanInfo to be added + * @param isNewBlock true if the block is the new Block, false if + * BlockScanInfo is being updated with new scanTime + */ + private synchronized void addBlockInfo(BlockScanInfo info, + boolean isNewBlock) { + boolean added = false; + if (isNewBlock) { + // check whether the block already present + boolean exists = blockInfoSet.contains(info); + added = !exists && newBlockInfoSet.add(info); + } else { + added = blockInfoSet.add(info); + } blockMap.put(info); if (added) { updateBytesToScan(info.getNumBytes(), info.lastScanTime); } } - + private synchronized void delBlockInfo(BlockScanInfo info) { boolean exists = blockInfoSet.remove(info); + if (!exists){ + exists = newBlockInfoSet.remove(info); + } blockMap.remove(info); if (exists) { updateBytesToScan(-info.getNumBytes(), info.lastScanTime); } } - + /** Update blockMap by the given LogEntry */ private synchronized void updateBlockInfo(LogEntry e) { BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp)); @@ -249,7 +270,7 @@ class BlockPoolSliceScanner { delBlockInfo(info); info.lastScanTime = e.verificationTime; info.lastScanType = ScanType.VERIFICATION_SCAN; - addBlockInfo(info); + addBlockInfo(info, false); } } @@ -260,7 +281,7 @@ class BlockPoolSliceScanner { long period = Math.min(scanPeriod, Math.max(blockMap.size(),1) * 600 * 1000L); int periodInt = Math.abs((int)period); - return Time.now() - scanPeriod + + return Time.monotonicNow() - scanPeriod + DFSUtil.getRandom().nextInt(periodInt); } @@ -275,14 +296,14 @@ class BlockPoolSliceScanner { info = new BlockScanInfo(block.getLocalBlock()); info.lastScanTime = getNewBlockScanTime(); - addBlockInfo(info); + addBlockInfo(info, true); adjustThrottler(); } /** Deletes the block from internal structures */ synchronized void deleteBlock(Block block) { BlockScanInfo info = blockMap.get(block); - if ( info != null ) { + if (info != null) { delBlockInfo(info); } } @@ -310,23 +331,16 @@ class BlockPoolSliceScanner { } } - private synchronized void updateScanStatus(Block block, + private synchronized void updateScanStatus(BlockScanInfo info, ScanType type, boolean scanOk) { - BlockScanInfo info = blockMap.get(block); - - if ( info != null ) { - delBlockInfo(info); - } else { - // It might already be removed. Thats ok, it will be caught next time. - info = new BlockScanInfo(block); - } - - long now = Time.now(); + delBlockInfo(info); + + long now = Time.monotonicNow(); info.lastScanType = type; info.lastScanTime = now; info.lastScanOk = scanOk; - addBlockInfo(info); + addBlockInfo(info, false); // Don't update meta data if the verification failed. if (!scanOk) { @@ -334,8 +348,8 @@ class BlockPoolSliceScanner { } if (verificationLog != null) { - verificationLog.append(now, block.getGenerationStamp(), - block.getBlockId()); + verificationLog.append(now, info.getGenerationStamp(), + info.getBlockId()); } } @@ -399,8 +413,9 @@ class BlockPoolSliceScanner { } private synchronized void adjustThrottler() { - long timeLeft = currentPeriodStart+scanPeriod - Time.now(); - long bw = Math.max(bytesLeft*1000/timeLeft, MIN_SCAN_RATE); + long timeLeft = Math.max(1L, + currentPeriodStart + scanPeriod - Time.monotonicNow()); + long bw = Math.max((bytesLeft * 1000) / timeLeft, MIN_SCAN_RATE); throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE)); } @@ -433,11 +448,13 @@ class BlockPoolSliceScanner { totalTransientErrors++; } - updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, true); + updateScanStatus((BlockScanInfo)block.getLocalBlock(), + ScanType.VERIFICATION_SCAN, true); return; } catch (IOException e) { - updateScanStatus(block.getLocalBlock(), ScanType.VERIFICATION_SCAN, false); + updateScanStatus((BlockScanInfo)block.getLocalBlock(), + ScanType.VERIFICATION_SCAN, false); // If the block does not exists anymore, then its not an error if (!dataset.contains(block)) { @@ -496,7 +513,7 @@ class BlockPoolSliceScanner { // Picks one block and verifies it private void verifyFirstBlock() { - Block block = null; + BlockScanInfo block = null; synchronized (this) { if (!blockInfoSet.isEmpty()) { block = blockInfoSet.first(); @@ -523,7 +540,7 @@ class BlockPoolSliceScanner { private boolean assignInitialVerificationTimes() { //First updates the last verification times from the log file. if (verificationLog != null) { - long now = Time.now(); + long now = Time.monotonicNow(); RollingLogs.LineIterator logIterator = null; try { logIterator = verificationLog.logs.iterator(false); @@ -574,7 +591,7 @@ class BlockPoolSliceScanner { // Initially spread the block reads over half of scan period // so that we don't keep scanning the blocks too quickly when restarted. long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L); - long lastScanTime = Time.now() - scanPeriod; + long lastScanTime = Time.monotonicNow() - scanPeriod; if (!blockInfoSet.isEmpty()) { BlockScanInfo info; @@ -582,7 +599,7 @@ class BlockPoolSliceScanner { delBlockInfo(info); info.lastScanTime = lastScanTime; lastScanTime += verifyInterval; - addBlockInfo(info); + addBlockInfo(info, false); } } } @@ -601,16 +618,16 @@ class BlockPoolSliceScanner { // reset the byte counts : bytesLeft = totalBytesToScan; - currentPeriodStart = Time.now(); + currentPeriodStart = Time.monotonicNow(); isNewPeriod = true; } private synchronized boolean workRemainingInCurrentPeriod() { - if (bytesLeft <= 0 && Time.now() < currentPeriodStart + scanPeriod) { + if (bytesLeft <= 0 && Time.monotonicNow() < currentPeriodStart + scanPeriod) { if (LOG.isDebugEnabled()) { LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" + currentPeriodStart + ", period=" + scanPeriod + ", now=" + - Time.now() + " " + blockPoolId); + Time.monotonicNow() + " " + blockPoolId); } return false; } else { @@ -633,7 +650,7 @@ class BlockPoolSliceScanner { scan(); } finally { totalBlocksScannedInLastRun.set(processedBlocks.size()); - lastScanTime.set(Time.now()); + lastScanTime.set(Time.monotonicNow()); } } @@ -656,7 +673,7 @@ class BlockPoolSliceScanner { while (datanode.shouldRun && !datanode.blockScanner.blockScannerThread.isInterrupted() && datanode.isBPServiceAlive(blockPoolId)) { - long now = Time.now(); + long now = Time.monotonicNow(); synchronized (this) { if ( now >= (currentPeriodStart + scanPeriod)) { startNewPeriod(); @@ -678,12 +695,21 @@ class BlockPoolSliceScanner { throw e; } finally { rollVerificationLogs(); + rollNewBlocksInfo(); if (LOG.isDebugEnabled()) { LOG.debug("Done scanning block pool: " + blockPoolId); } } } - + + // add new blocks to scan in next iteration + private synchronized void rollNewBlocksInfo() { + for (BlockScanInfo newBlock : newBlockInfoSet) { + blockInfoSet.add(newBlock); + } + newBlockInfoSet.clear(); + } + private synchronized void rollVerificationLogs() { if (verificationLog != null) { try { @@ -714,7 +740,7 @@ class BlockPoolSliceScanner { int total = blockInfoSet.size(); - long now = Time.now(); + long now = Time.monotonicNow(); Date date = new Date(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Tue Aug 19 23:49:39 2014 @@ -36,8 +36,10 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.Properties; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -106,13 +108,22 @@ public class BlockPoolSliceStorage exten void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt) throws IOException { LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID()); + Set<String> existingStorageDirs = new HashSet<String>(); + for (int i = 0; i < getNumStorageDirs(); i++) { + existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath()); + } + // 1. For each BP data directory analyze the state and // check whether all is consistent before transitioning. - this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size()); ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>( dataDirs.size()); for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) { File dataDir = it.next(); + if (existingStorageDirs.contains(dataDir.getAbsolutePath())) { + LOG.info("Storage directory " + dataDir + " has already been used."); + it.remove(); + continue; + } StorageDirectory sd = new StorageDirectory(dataDir, null, true); StorageState curState; try { @@ -152,7 +163,7 @@ public class BlockPoolSliceStorage exten // During startup some of them can upgrade or roll back // while others could be up-to-date for the regular startup. for (int idx = 0; idx < getNumStorageDirs(); idx++) { - doTransition(getStorageDir(idx), nsInfo, startOpt); + doTransition(datanode, getStorageDir(idx), nsInfo, startOpt); assert getCTime() == nsInfo.getCTime() : "Data-node and name-node CTimes must be the same."; } @@ -242,7 +253,7 @@ public class BlockPoolSliceStorage exten * @param startOpt startup option * @throws IOException */ - private void doTransition(StorageDirectory sd, + private void doTransition(DataNode datanode, StorageDirectory sd, NamespaceInfo nsInfo, StartupOption startOpt) throws IOException { if (startOpt == StartupOption.ROLLBACK) { doRollback(sd, nsInfo); // rollback if applicable @@ -275,7 +286,7 @@ public class BlockPoolSliceStorage exten } if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION || this.cTime < nsInfo.getCTime()) { - doUpgrade(sd, nsInfo); // upgrade + doUpgrade(datanode, sd, nsInfo); // upgrade return; } // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime @@ -304,7 +315,8 @@ public class BlockPoolSliceStorage exten * @param nsInfo Namespace Info from the namenode * @throws IOException on error */ - void doUpgrade(StorageDirectory bpSd, NamespaceInfo nsInfo) throws IOException { + void doUpgrade(DataNode datanode, StorageDirectory bpSd, NamespaceInfo nsInfo) + throws IOException { // Upgrading is applicable only to release with federation or after if (!DataNodeLayoutVersion.supports( LayoutVersion.Feature.FEDERATION, layoutVersion)) { @@ -312,7 +324,7 @@ public class BlockPoolSliceStorage exten } LOG.info("Upgrading block pool storage directory " + bpSd.getRoot() + ".\n old LV = " + this.getLayoutVersion() + "; old CTime = " - + this.getCTime() + ".\n new LV = " + nsInfo.getLayoutVersion() + + this.getCTime() + ".\n new LV = " + HdfsConstants.DATANODE_LAYOUT_VERSION + "; new CTime = " + nsInfo.getCTime()); // get <SD>/previous directory String dnRoot = getDataNodeStorageRoot(bpSd.getRoot().getCanonicalPath()); @@ -340,7 +352,7 @@ public class BlockPoolSliceStorage exten rename(bpCurDir, bpTmpDir); // 3. Create new <SD>/current with block files hardlinks and VERSION - linkAllBlocks(bpTmpDir, bpCurDir); + linkAllBlocks(datanode, bpTmpDir, bpCurDir); this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION; assert this.namespaceID == nsInfo.getNamespaceID() : "Data-node and name-node layout versions must be the same."; @@ -517,14 +529,15 @@ public class BlockPoolSliceStorage exten * @param toDir the current data directory * @throws IOException if error occurs during hardlink */ - private void linkAllBlocks(File fromDir, File toDir) throws IOException { + private void linkAllBlocks(DataNode datanode, File fromDir, File toDir) + throws IOException { // do the link int diskLayoutVersion = this.getLayoutVersion(); // hardlink finalized blocks in tmpDir HardLink hardLink = new HardLink(); - DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED), + DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_FINALIZED), new File(toDir,DataStorage.STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink); - DataStorage.linkBlocks(new File(fromDir, DataStorage.STORAGE_DIR_RBW), + DataStorage.linkBlocks(datanode, new File(fromDir, DataStorage.STORAGE_DIR_RBW), new File(toDir, DataStorage.STORAGE_DIR_RBW), diskLayoutVersion, hardLink); LOG.info( hardLink.linkStats.report() ); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Aug 19 23:49:39 2014 @@ -37,6 +37,7 @@ import java.util.zip.Checksum; import org.apache.commons.logging.Log; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSOutputSummer; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; @@ -44,6 +45,7 @@ import org.apache.hadoop.hdfs.protocol.d import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; @@ -69,7 +71,7 @@ class BlockReceiver implements Closeable @VisibleForTesting static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024; - + private final long datanodeSlowLogThresholdMs; private DataInputStream in = null; // from where data are read private DataChecksum clientChecksum; // checksum used by client private DataChecksum diskChecksum; // checksum we write to disk @@ -86,8 +88,7 @@ class BlockReceiver implements Closeable private int bytesPerChecksum; private int checksumSize; - private final PacketReceiver packetReceiver = - new PacketReceiver(false); + private final PacketReceiver packetReceiver = new PacketReceiver(false); protected final String inAddr; protected final String myAddr; @@ -105,6 +106,7 @@ class BlockReceiver implements Closeable private boolean dropCacheBehindWrites; private long lastCacheManagementOffset = 0; private boolean syncBehindWrites; + private boolean syncBehindWritesInBackground; /** The client name. It is empty if a datanode is the client */ private final String clientname; @@ -122,7 +124,16 @@ class BlockReceiver implements Closeable private boolean syncOnClose; private long restartBudget; - BlockReceiver(final ExtendedBlock block, final DataInputStream in, + /** + * for replaceBlock response + */ + private final long responseInterval; + private long lastResponseTime = 0; + private boolean isReplaceBlock = false; + private DataOutputStream replyOut = null; + + BlockReceiver(final ExtendedBlock block, final StorageType storageType, + final DataInputStream in, final String inAddr, final String myAddr, final BlockConstructionStage stage, final long newGs, final long minBytesRcvd, final long maxBytesRcvd, @@ -141,7 +152,10 @@ class BlockReceiver implements Closeable this.isDatanode = clientname.length() == 0; this.isClient = !this.isDatanode; this.restartBudget = datanode.getDnConf().restartReplicaExpiry; - + this.datanodeSlowLogThresholdMs = datanode.getDnConf().datanodeSlowIoWarningThresholdMs; + // For replaceBlock() calls response should be sent to avoid socketTimeout + // at clients. So sending with the interval of 0.5 * socketTimeout + this.responseInterval = (long) (datanode.getDnConf().socketTimeout * 0.5); //for datanode, we have //1: clientName.length() == 0, and //2: stage == null or PIPELINE_SETUP_CREATE @@ -162,11 +176,11 @@ class BlockReceiver implements Closeable // Open local disk out // if (isDatanode) { //replication or move - replicaInfo = datanode.data.createTemporary(block); + replicaInfo = datanode.data.createTemporary(storageType, block); } else { switch (stage) { case PIPELINE_SETUP_CREATE: - replicaInfo = datanode.data.createRbw(block); + replicaInfo = datanode.data.createRbw(storageType, block); datanode.notifyNamenodeReceivingBlock( block, replicaInfo.getStorageUuid()); break; @@ -198,7 +212,7 @@ class BlockReceiver implements Closeable case TRANSFER_RBW: case TRANSFER_FINALIZED: // this is a transfer destination - replicaInfo = datanode.data.createTemporary(block); + replicaInfo = datanode.data.createTemporary(storageType, block); break; default: throw new IOException("Unsupported stage " + stage + " while receiving block " + block + " from " + inAddr); @@ -208,6 +222,8 @@ class BlockReceiver implements Closeable datanode.getDnConf().dropCacheBehindWrites : cachingStrategy.getDropBehind(); this.syncBehindWrites = datanode.getDnConf().syncBehindWrites; + this.syncBehindWritesInBackground = datanode.getDnConf(). + syncBehindWritesInBackground; final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; @@ -249,7 +265,7 @@ class BlockReceiver implements Closeable if (cause != null) { // possible disk error ioe = cause; - datanode.checkDiskError(ioe); // may throw an exception here + datanode.checkDiskErrorAsync(); } throw ioe; @@ -268,10 +284,8 @@ class BlockReceiver implements Closeable */ @Override public void close() throws IOException { - if (packetReceiver != null) { - packetReceiver.close(); - } - + packetReceiver.close(); + IOException ioe = null; if (syncOnClose && (out != null || checksumOut != null)) { datanode.metrics.incrFsyncCount(); @@ -327,7 +341,7 @@ class BlockReceiver implements Closeable } // disk check if(ioe != null) { - datanode.checkDiskError(ioe); + datanode.checkDiskErrorAsync(); throw ioe; } } @@ -338,6 +352,7 @@ class BlockReceiver implements Closeable */ void flushOrSync(boolean isSync) throws IOException { long flushTotalNanos = 0; + long begin = Time.monotonicNow(); if (checksumOut != null) { long flushStartNanos = System.nanoTime(); checksumOut.flush(); @@ -366,6 +381,12 @@ class BlockReceiver implements Closeable datanode.metrics.incrFsyncCount(); } } + long duration = Time.monotonicNow() - begin; + if (duration > datanodeSlowLogThresholdMs) { + LOG.warn("Slow flushOrSync took " + duration + "ms (threshold=" + + datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos=" + + flushTotalNanos + "ns"); + } } /** @@ -491,8 +512,14 @@ class BlockReceiver implements Closeable //First write the packet to the mirror: if (mirrorOut != null && !mirrorError) { try { + long begin = Time.monotonicNow(); packetReceiver.mirrorPacketTo(mirrorOut); mirrorOut.flush(); + long duration = Time.monotonicNow() - begin; + if (duration > datanodeSlowLogThresholdMs) { + LOG.warn("Slow BlockReceiver write packet to mirror took " + duration + + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); + } } catch (IOException e) { handleMirrorOutError(e); } @@ -575,7 +602,13 @@ class BlockReceiver implements Closeable int numBytesToDisk = (int)(offsetInBlock-onDiskLen); // Write data to disk. + long begin = Time.monotonicNow(); out.write(dataBuf.array(), startByteToDisk, numBytesToDisk); + long duration = Time.monotonicNow() - begin; + if (duration > datanodeSlowLogThresholdMs) { + LOG.warn("Slow BlockReceiver write data to disk cost:" + duration + + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); + } // If this is a partial chunk, then verify that this is the only // chunk in the packet. Calculate new crc for this chunk. @@ -618,7 +651,7 @@ class BlockReceiver implements Closeable manageWriterOsCache(offsetInBlock); } } catch (IOException iex) { - datanode.checkDiskError(iex); + datanode.checkDiskErrorAsync(); throw iex; } } @@ -630,6 +663,20 @@ class BlockReceiver implements Closeable lastPacketInBlock, offsetInBlock, Status.SUCCESS); } + /* + * Send in-progress responses for the replaceBlock() calls back to caller to + * avoid timeouts due to balancer throttling. HDFS-6247 + */ + if (isReplaceBlock + && (Time.monotonicNow() - lastResponseTime > responseInterval)) { + BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder() + .setStatus(Status.IN_PROGRESS); + response.build().writeDelimitedTo(replyOut); + replyOut.flush(); + + lastResponseTime = Time.monotonicNow(); + } + if (throttler != null) { // throttle I/O throttler.throttle(len); } @@ -641,6 +688,7 @@ class BlockReceiver implements Closeable try { if (outFd != null && offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) { + long begin = Time.monotonicNow(); // // For SYNC_FILE_RANGE_WRITE, we want to sync from // lastCacheManagementOffset to a position "two windows ago" @@ -651,10 +699,17 @@ class BlockReceiver implements Closeable // of file // if (syncBehindWrites) { - NativeIO.POSIX.syncFileRangeIfPossible(outFd, - lastCacheManagementOffset, - offsetInBlock - lastCacheManagementOffset, - NativeIO.POSIX.SYNC_FILE_RANGE_WRITE); + if (syncBehindWritesInBackground) { + this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest( + block, outFd, lastCacheManagementOffset, + offsetInBlock - lastCacheManagementOffset, + NativeIO.POSIX.SYNC_FILE_RANGE_WRITE); + } else { + NativeIO.POSIX.syncFileRangeIfPossible(outFd, + lastCacheManagementOffset, offsetInBlock + - lastCacheManagementOffset, + NativeIO.POSIX.SYNC_FILE_RANGE_WRITE); + } } // // For POSIX_FADV_DONTNEED, we want to drop from the beginning @@ -673,18 +728,29 @@ class BlockReceiver implements Closeable NativeIO.POSIX.POSIX_FADV_DONTNEED); } lastCacheManagementOffset = offsetInBlock; + long duration = Time.monotonicNow() - begin; + if (duration > datanodeSlowLogThresholdMs) { + LOG.warn("Slow manageWriterOsCache took " + duration + + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); + } } } catch (Throwable t) { LOG.warn("Error managing cache for writer of block " + block, t); } } - + + public void sendOOB() throws IOException, InterruptedException { + ((PacketResponder) responder.getRunnable()).sendOOBResponse(PipelineAck + .getRestartOOBStatus()); + } + void receiveBlock( DataOutputStream mirrOut, // output to next datanode DataInputStream mirrIn, // input from next datanode DataOutputStream replyOut, // output to previous datanode String mirrAddr, DataTransferThrottler throttlerArg, - DatanodeInfo[] downstreams) throws IOException { + DatanodeInfo[] downstreams, + boolean isReplaceBlock) throws IOException { syncOnClose = datanode.getDnConf().syncOnClose; boolean responderClosed = false; @@ -692,6 +758,9 @@ class BlockReceiver implements Closeable mirrorAddr = mirrAddr; throttler = throttlerArg; + this.replyOut = replyOut; + this.isReplaceBlock = isReplaceBlock; + try { if (isClient && !isTransfer) { responder = new Daemon(datanode.threadGroup, @@ -766,9 +835,7 @@ class BlockReceiver implements Closeable // The worst case is not recovering this RBW replica. // Client will fall back to regular pipeline recovery. } - try { - ((PacketResponder) responder.getRunnable()). - sendOOBResponse(PipelineAck.getRestartOOBStatus()); + try { // Even if the connection is closed after the ack packet is // flushed, the client can react to the connection closure // first. Insert a delay to lower the chance of client @@ -776,8 +843,6 @@ class BlockReceiver implements Closeable Thread.sleep(1000); } catch (InterruptedException ie) { // It is already going down. Ignore this. - } catch (IOException ioe) { - LOG.info("Error sending OOB Ack.", ioe); } } responder.interrupt(); @@ -956,9 +1021,9 @@ class BlockReceiver implements Closeable /** * enqueue the seqno that is still be to acked by the downstream datanode. - * @param seqno - * @param lastPacketInBlock - * @param offsetInBlock + * @param seqno sequence number of the packet + * @param lastPacketInBlock if true, this is the last packet in block + * @param offsetInBlock offset of this packet in block */ void enqueue(final long seqno, final boolean lastPacketInBlock, final long offsetInBlock, final Status ackStatus) { @@ -1174,11 +1239,7 @@ class BlockReceiver implements Closeable } catch (IOException e) { LOG.warn("IOException in BlockReceiver.run(): ", e); if (running) { - try { - datanode.checkDiskError(e); // may throw an exception here - } catch (IOException ioe) { - LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe); - } + datanode.checkDiskErrorAsync(); LOG.info(myString, e); running = false; if (!Thread.interrupted()) { // failure not caused by interruption @@ -1306,9 +1367,15 @@ class BlockReceiver implements Closeable replicaInfo.setBytesAcked(offsetInBlock); } // send my ack back to upstream datanode + long begin = Time.monotonicNow(); replyAck.write(upstreamOut); upstreamOut.flush(); - if (LOG.isDebugEnabled()) { + long duration = Time.monotonicNow() - begin; + if (duration > datanodeSlowLogThresholdMs) { + LOG.warn("Slow PacketResponder send ack to upstream took " + duration + + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString + + ", replyAck=" + replyAck); + } else if (LOG.isDebugEnabled()) { LOG.debug(myString + ", replyAck=" + replyAck); } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Tue Aug 19 23:49:39 2014 @@ -168,7 +168,7 @@ class BlockSender implements java.io.Clo * @param block Block that is being read * @param startOffset starting offset to read from * @param length length of data to read - * @param corruptChecksumOk + * @param corruptChecksumOk if true, corrupt checksum is okay * @param verifyChecksum verify checksum while reading the data * @param sendChecksum send checksum to client. * @param datanode datanode from which the block is being read @@ -687,7 +687,7 @@ class BlockSender implements java.io.Clo // Trigger readahead of beginning of file if configured. manageOsCache(); - final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; + final long startTime = ClientTraceLog.isDebugEnabled() ? System.nanoTime() : 0; try { int maxChunksPerPacket; int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN; @@ -733,9 +733,9 @@ class BlockSender implements java.io.Clo sentEntireByteRange = true; } } finally { - if (clientTraceFmt != null) { + if ((clientTraceFmt != null) && ClientTraceLog.isDebugEnabled()) { final long endTime = System.nanoTime(); - ClientTraceLog.info(String.format(clientTraceFmt, totalRead, + ClientTraceLog.debug(String.format(clientTraceFmt, totalRead, initialOffset, endTime - startTime)); } close(); Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Tue Aug 19 23:49:39 2014 @@ -52,7 +52,9 @@ import static org.apache.hadoop.hdfs.DFS import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.security.SaslPropertiesResolver; /** * Simple class encapsulating all of the configuration that the DataNode @@ -67,6 +69,7 @@ public class DNConf { final boolean transferToAllowed; final boolean dropCacheBehindWrites; final boolean syncBehindWrites; + final boolean syncBehindWritesInBackground; final boolean dropCacheBehindReads; final boolean syncOnClose; final boolean encryptDataTransfer; @@ -79,10 +82,13 @@ public class DNConf { final long deleteReportInterval; final long initialBlockReportDelay; final long cacheReportInterval; + final long dfsclientSlowIoWarningThresholdMs; + final long datanodeSlowIoWarningThresholdMs; final int writePacketSize; final String minimumNameNodeVersion; final String encryptionAlgorithm; + final SaslPropertiesResolver saslPropsResolver; final TrustedChannelResolver trustedChannelResolver; final long xceiverStopTimeout; @@ -117,6 +123,9 @@ public class DNConf { syncBehindWrites = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY, DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT); + syncBehindWritesInBackground = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_KEY, + DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_IN_BACKGROUND_DEFAULT); dropCacheBehindReads = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT); @@ -129,7 +138,14 @@ public class DNConf { DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT); this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT); - + + this.dfsclientSlowIoWarningThresholdMs = conf.getLong( + DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, + DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); + this.datanodeSlowIoWarningThresholdMs = conf.getLong( + DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY, + DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT); + long initBRDelay = conf.getLong( DFS_BLOCKREPORT_INITIAL_DELAY_KEY, DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT) * 1000L; @@ -155,6 +171,8 @@ public class DNConf { DFS_ENCRYPT_DATA_TRANSFER_DEFAULT); this.encryptionAlgorithm = conf.get(DFS_DATA_ENCRYPTION_ALGORITHM_KEY); this.trustedChannelResolver = TrustedChannelResolver.getInstance(conf); + this.saslPropsResolver = DataTransferSaslUtil.getSaslPropertiesResolver( + conf); this.xceiverStopTimeout = conf.getLong( DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY, @@ -168,12 +186,31 @@ public class DNConf { DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY, DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L; } - + // We get minimumNameNodeVersion via a method so it can be mocked out in tests. String getMinimumNameNodeVersion() { return this.minimumNameNodeVersion; } - + + /** + * Returns true if encryption enabled for DataTransferProtocol. + * + * @return boolean true if encryption enabled for DataTransferProtocol + */ + public boolean getEncryptDataTransfer() { + return encryptDataTransfer; + } + + /** + * Returns encryption algorithm configured for DataTransferProtocol, or null + * if not configured. + * + * @return encryption algorithm configured for DataTransferProtocol + */ + public String getEncryptionAlgorithm() { + return encryptionAlgorithm; + } + public long getXceiverStopTimeout() { return xceiverStopTimeout; } @@ -181,4 +218,24 @@ public class DNConf { public long getMaxLockedMemory() { return maxLockedMemory; } + + /** + * Returns the SaslPropertiesResolver configured for use with + * DataTransferProtocol, or null if not configured. + * + * @return SaslPropertiesResolver configured for use with DataTransferProtocol + */ + public SaslPropertiesResolver getSaslPropsResolver() { + return saslPropsResolver; + } + + /** + * Returns the TrustedChannelResolver configured for use with + * DataTransferProtocol, or null if not configured. + * + * @return TrustedChannelResolver configured for use with DataTransferProtocol + */ + public TrustedChannelResolver getTrustedChannelResolver() { + return trustedChannelResolver; + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java Tue Aug 19 23:49:39 2014 @@ -131,8 +131,7 @@ public class DataBlockScanner implements private BlockPoolSliceScanner getNextBPScanner(String currentBpId) { String nextBpId = null; - while ((nextBpId == null) && datanode.shouldRun - && !blockScannerThread.isInterrupted()) { + while (datanode.shouldRun && !blockScannerThread.isInterrupted()) { waitForInit(); synchronized (this) { if (getBlockPoolSetSize() > 0) { Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Aug 19 23:49:39 2014 @@ -17,10 +17,68 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.protobuf.BlockingService; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_PERMISSION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_INTERFACE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DNS_NAMESERVER_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HANDLER_COUNT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_KEYTAB_FILE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY; +import static org.apache.hadoop.util.ExitUtil.terminate; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.lang.management.ManagementFactory; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.URI; +import java.net.UnknownHostException; +import java.nio.channels.SocketChannel; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.management.ObjectName; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -36,17 +94,44 @@ import org.apache.hadoop.hdfs.DFSConfigK import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.net.DomainPeerServer; import org.apache.hadoop.hdfs.net.TcpPeerServer; -import org.apache.hadoop.hdfs.protocol.*; -import org.apache.hadoop.hdfs.protocol.datatransfer.*; +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; +import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; +import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferServer; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService; -import org.apache.hadoop.hdfs.protocolPB.*; -import org.apache.hadoop.hdfs.security.token.block.*; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; @@ -59,7 +144,11 @@ import org.apache.hadoop.hdfs.server.dat import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; -import org.apache.hadoop.hdfs.server.protocol.*; +import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.hdfs.web.resources.Param; import org.apache.hadoop.http.HttpConfig; @@ -82,25 +171,21 @@ import org.apache.hadoop.security.UserGr import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.util.*; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker.DiskErrorException; -import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; +import org.apache.hadoop.util.GenericOptionsParser; +import org.apache.hadoop.util.JvmPauseMonitor; +import org.apache.hadoop.util.ServicePlugin; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.VersionInfo; import org.mortbay.util.ajax.JSON; -import javax.management.ObjectName; - -import java.io.*; -import java.lang.management.ManagementFactory; -import java.net.*; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SocketChannel; -import java.security.PrivilegedExceptionAction; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.*; -import static org.apache.hadoop.util.ExitUtil.terminate; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.protobuf.BlockingService; /********************************************************** * DataNode is a class (and program) that stores a set of @@ -185,6 +270,7 @@ public class DataNode extends Configured public final static String EMPTY_DEL_HINT = ""; final AtomicInteger xmitsInProgress = new AtomicInteger(); Daemon dataXceiverServer = null; + DataXceiverServer xserver = null; Daemon localDataXceiverServer = null; ShortCircuitRegistry shortCircuitRegistry = null; ThreadGroup threadGroup = null; @@ -227,8 +313,18 @@ public class DataNode extends Configured private final List<String> usersWithLocalPathAccess; private final boolean connectToDnViaHostname; ReadaheadPool readaheadPool; + SaslDataTransferClient saslClient; + SaslDataTransferServer saslServer; private final boolean getHdfsBlockLocationsEnabled; private ObjectName dataNodeInfoBeanName; + private Thread checkDiskErrorThread = null; + protected final int checkDiskErrorInterval = 5*1000; + private boolean checkDiskErrorFlag = false; + private Object checkDiskErrorMutex = new Object(); + private long lastDiskErrorCheck; + private String supergroup; + private boolean isPermissionEnabled; + private String dnUserName = null; /** * Create the DataNode given a configuration, an array of dataDirs, @@ -238,6 +334,7 @@ public class DataNode extends Configured final List<StorageLocation> dataDirs, final SecureResources resources) throws IOException { super(conf); + this.lastDiskErrorCheck = 0; this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT); @@ -249,6 +346,11 @@ public class DataNode extends Configured this.getHdfsBlockLocationsEnabled = conf.getBoolean( DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); + this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, + DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); + this.isPermissionEnabled = conf.getBoolean( + DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, + DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT); confVersion = "core-" + conf.get("hadoop.common.configuration.version", "UNSPECIFIED") + @@ -429,6 +531,33 @@ public class DataNode extends Configured ipcServer.refreshServiceAcl(conf, new HDFSPolicyProvider()); } } + + /** Check whether the current user is in the superuser group. */ + private void checkSuperuserPrivilege() throws IOException, AccessControlException { + if (!isPermissionEnabled) { + return; + } + // Try to get the ugi in the RPC call. + UserGroupInformation callerUgi = ipcServer.getRemoteUser(); + if (callerUgi == null) { + // This is not from RPC. + callerUgi = UserGroupInformation.getCurrentUser(); + } + + // Is this by the DN user itself? + assert dnUserName != null; + if (callerUgi.getShortUserName().equals(dnUserName)) { + return; + } + + // Is the user a member of the super group? + List<String> groups = Arrays.asList(callerUgi.getGroupNames()); + if (groups.contains(supergroup)) { + return; + } + // Not a superuser. + throw new AccessControlException(); + } /** * Initialize the datanode's periodic scanners: @@ -521,8 +650,8 @@ public class DataNode extends Configured streamingAddr = tcpPeerServer.getStreamingAddr(); LOG.info("Opened streaming server at " + streamingAddr); this.threadGroup = new ThreadGroup("dataXceiverServer"); - this.dataXceiverServer = new Daemon(threadGroup, - new DataXceiverServer(tcpPeerServer, conf, this)); + xserver = new DataXceiverServer(tcpPeerServer, conf, this); + this.dataXceiverServer = new Daemon(threadGroup, xserver); this.threadGroup.setDaemon(true); // auto destroy when empty if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, @@ -646,7 +775,6 @@ public class DataNode extends Configured /** * Return the BPOfferService instance corresponding to the given block. - * @param block * @return the BPOS * @throws IOException if no such BPOS can be found */ @@ -685,15 +813,10 @@ public class DataNode extends Configured */ void startDataNode(Configuration conf, List<StorageLocation> dataDirs, - // DatanodeProtocol namenode, SecureResources resources ) throws IOException { - if(UserGroupInformation.isSecurityEnabled() && resources == null) { - if (!conf.getBoolean("ignore.secure.ports.for.testing", false)) { - throw new RuntimeException("Cannot start secure cluster without " - + "privileged resources."); - } - } + + checkSecureConfig(conf, resources); // settings global for all BPs in the Data Node this.secureResources = resources; @@ -708,15 +831,19 @@ public class DataNode extends Configured " size (%s) is greater than zero and native code is not available.", DFS_DATANODE_MAX_LOCKED_MEMORY_KEY)); } - long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit(); - if (dnConf.maxLockedMemory > ulimit) { - throw new RuntimeException(String.format( - "Cannot start datanode because the configured max locked memory" + - " size (%s) of %d bytes is more than the datanode's available" + - " RLIMIT_MEMLOCK ulimit of %d bytes.", - DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, - dnConf.maxLockedMemory, - ulimit)); + if (Path.WINDOWS) { + NativeIO.Windows.extendWorkingSetSize(dnConf.maxLockedMemory); + } else { + long ulimit = NativeIO.POSIX.getCacheManipulator().getMemlockLimit(); + if (dnConf.maxLockedMemory > ulimit) { + throw new RuntimeException(String.format( + "Cannot start datanode because the configured max locked memory" + + " size (%s) of %d bytes is more than the datanode's available" + + " RLIMIT_MEMLOCK ulimit of %d bytes.", + DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, + dnConf.maxLockedMemory, + ulimit)); + } } } LOG.info("Starting DataNode with maxLockedMemory = " + @@ -733,16 +860,71 @@ public class DataNode extends Configured // BlockPoolTokenSecretManager is required to create ipc server. this.blockPoolTokenSecretManager = new BlockPoolTokenSecretManager(); + + // Login is done by now. Set the DN user name. + dnUserName = UserGroupInformation.getCurrentUser().getShortUserName(); + LOG.info("dnUserName = " + dnUserName); + LOG.info("supergroup = " + supergroup); initIpcServer(conf); metrics = DataNodeMetrics.create(conf, getDisplayName()); - + metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); + blockPoolManager = new BlockPoolManager(this); blockPoolManager.refreshNamenodes(conf); // Create the ReadaheadPool from the DataNode context so we can // exit without having to explicitly shutdown its thread pool. readaheadPool = ReadaheadPool.getInstance(); + saslClient = new SaslDataTransferClient(dnConf.saslPropsResolver, + dnConf.trustedChannelResolver, + conf.getBoolean( + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, + IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT)); + saslServer = new SaslDataTransferServer(dnConf, blockPoolTokenSecretManager); + } + + /** + * Checks if the DataNode has a secure configuration if security is enabled. + * There are 2 possible configurations that are considered secure: + * 1. The server has bound to privileged ports for RPC and HTTP via + * SecureDataNodeStarter. + * 2. The configuration enables SASL on DataTransferProtocol and HTTPS (no + * plain HTTP) for the HTTP server. The SASL handshake guarantees + * authentication of the RPC server before a client transmits a secret, such + * as a block access token. Similarly, SSL guarantees authentication of the + * HTTP server before a client transmits a secret, such as a delegation + * token. + * It is not possible to run with both privileged ports and SASL on + * DataTransferProtocol. For backwards-compatibility, the connection logic + * must check if the target port is a privileged port, and if so, skip the + * SASL handshake. + * + * @param conf Configuration to check + * @param resources SecuredResources obtained for DataNode + * @throws RuntimeException if security enabled, but configuration is insecure + */ + private static void checkSecureConfig(Configuration conf, + SecureResources resources) throws RuntimeException { + if (!UserGroupInformation.isSecurityEnabled()) { + return; + } + String dataTransferProtection = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY); + if (resources != null && dataTransferProtection == null) { + return; + } + if (conf.getBoolean("ignore.secure.ports.for.testing", false)) { + return; + } + if (dataTransferProtection != null && + DFSUtil.getHttpPolicy(conf) == HttpConfig.Policy.HTTPS_ONLY && + resources == null) { + return; + } + throw new RuntimeException("Cannot start secure DataNode without " + + "configuring either privileged resources or SASL RPC data transfer " + + "protection and SSL for HTTP. Using privileged resources in " + + "combination with SASL RPC data transfer protection is not supported."); } public static String generateUuid() { @@ -811,9 +993,7 @@ public class DataNode extends Configured /** * After the block pool has contacted the NN, registers that block pool * with the secret manager, updating it with the secrets provided by the NN. - * @param bpRegistration - * @param blockPoolId - * @throws IOException + * @throws IOException on error */ private synchronized void registerBlockPoolWithSecretManager( DatanodeRegistration bpRegistration, String blockPoolId) throws IOException { @@ -850,19 +1030,24 @@ public class DataNode extends Configured */ void shutdownBlockPool(BPOfferService bpos) { blockPoolManager.remove(bpos); + if (bpos.hasBlockPoolId()) { + // Possible that this is shutting down before successfully + // registering anywhere. If that's the case, we wouldn't have + // a block pool id + String bpId = bpos.getBlockPoolId(); + if (blockScanner != null) { + blockScanner.removeBlockPool(bpId); + } - String bpId = bpos.getBlockPoolId(); - if (blockScanner != null) { - blockScanner.removeBlockPool(bpId); - } - - if (data != null) { - data.shutdownBlockPool(bpId); - } + if (data != null) { + data.shutdownBlockPool(bpId); + } - if (storage != null) { - storage.removeBlockPoolStorage(bpId); + if (storage != null) { + storage.removeBlockPoolStorage(bpId); + } } + } /** @@ -883,14 +1068,19 @@ public class DataNode extends Configured + " should have retrieved namespace info before initBlockPool."); } + setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID()); + // Register the new block pool with the BP manager. blockPoolManager.addBlockPool(bpos); - - setClusterId(nsInfo.clusterID, nsInfo.getBlockPoolID()); // In the case that this is the first block pool to connect, initialize // the dataset, block scanners, etc. initStorage(nsInfo); + + // Exclude failed disks before initializing the block pools to avoid startup + // failures. + checkDiskError(); + initPeriodicScanners(conf); data.addBlockPool(nsInfo.getBlockPoolID(), conf); @@ -949,6 +1139,11 @@ public class DataNode extends Configured } @VisibleForTesting + public DataXceiverServer getXferServer() { + return xserver; + } + + @VisibleForTesting public int getXferPort() { return streamingAddr.getPort(); } @@ -981,9 +1176,8 @@ public class DataNode extends Configured /** * get BP registration by blockPool id - * @param bpid * @return BP registration object - * @throws IOException + * @throws IOException on error */ @VisibleForTesting public DatanodeRegistration getDNRegistrationForBP(String bpid) @@ -1071,6 +1265,7 @@ public class DataNode extends Configured Token<BlockTokenIdentifier> token) throws IOException { checkBlockLocalPathAccess(); checkBlockToken(block, token, BlockTokenSecretManager.AccessMode.READ); + Preconditions.checkNotNull(data, "Storage not yet initialized"); BlockLocalPathInfo info = data.getBlockLocalPathInfo(block); if (LOG.isDebugEnabled()) { if (info != null) { @@ -1206,12 +1401,18 @@ public class DataNode extends Configured // in order to avoid any further acceptance of requests, but the peers // for block writes are not closed until the clients are notified. if (dataXceiverServer != null) { + xserver.sendOOBToPeers(); ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill(); this.dataXceiverServer.interrupt(); } + // Interrupt the checkDiskErrorThread and terminate it. + if(this.checkDiskErrorThread != null) { + this.checkDiskErrorThread.interrupt(); + } + // Record the time of initial notification - long timeNotified = Time.now(); + long timeNotified = Time.monotonicNow(); if (localDataXceiverServer != null) { ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill(); @@ -1243,8 +1444,9 @@ public class DataNode extends Configured while (true) { // When shutting down for restart, wait 2.5 seconds before forcing // termination of receiver threads. - if (!this.shutdownForUpgrade || - (this.shutdownForUpgrade && (Time.now() - timeNotified > 2500))) { + if (!this.shutdownForUpgrade || + (this.shutdownForUpgrade && (Time.monotonicNow() - timeNotified + > 2500))) { this.threadGroup.interrupt(); } LOG.info("Waiting for threadgroup to exit, active threads is " + @@ -1319,55 +1521,17 @@ public class DataNode extends Configured } - /** Check if there is no space in disk - * @param e that caused this checkDiskError call - **/ - protected void checkDiskError(Exception e ) throws IOException { - - LOG.warn("checkDiskError: exception: ", e); - if (isNetworkRelatedException(e)) { - LOG.info("Not checking disk as checkDiskError was called on a network" + - " related exception"); - return; - } - if (e.getMessage() != null && - e.getMessage().startsWith("No space left on device")) { - throw new DiskOutOfSpaceException("No space left on device"); - } else { - checkDiskError(); - } - } - /** - * Check if the provided exception looks like it's from a network error - * @param e the exception from a checkDiskError call - * @return true if this exception is network related, false otherwise + * Check if there is a disk failure asynchronously and if so, handle the error */ - protected boolean isNetworkRelatedException(Exception e) { - if (e instanceof SocketException - || e instanceof SocketTimeoutException - || e instanceof ClosedChannelException - || e instanceof ClosedByInterruptException) { - return true; - } - - String msg = e.getMessage(); - - return null != msg - && (msg.startsWith("An established connection was aborted") - || msg.startsWith("Broken pipe") - || msg.startsWith("Connection reset") - || msg.contains("java.nio.channels.SocketChannel")); - } - - /** - * Check if there is a disk failure and if so, handle the error - */ - public void checkDiskError() { - try { - data.checkDataDir(); - } catch (DiskErrorException de) { - handleDiskError(de.getMessage()); + public void checkDiskErrorAsync() { + synchronized(checkDiskErrorMutex) { + checkDiskErrorFlag = true; + if(checkDiskErrorThread == null) { + startCheckDiskErrorThread(); + checkDiskErrorThread.start(); + LOG.info("Starting CheckDiskError Thread"); + } } } @@ -1405,8 +1569,8 @@ public class DataNode extends Configured return xmitsInProgress.get(); } - private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[]) - throws IOException { + private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets, + StorageType[] xferTargetStorageTypes) throws IOException { BPOfferService bpos = getBPOSForBlock(block); DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId()); @@ -1442,16 +1606,17 @@ public class DataNode extends Configured LOG.info(bpReg + " Starting thread to transfer " + block + " to " + xfersBuilder); - new Daemon(new DataTransfer(xferTargets, block, + new Daemon(new DataTransfer(xferTargets, xferTargetStorageTypes, block, BlockConstructionStage.PIPELINE_SETUP_CREATE, "")).start(); } } void transferBlocks(String poolId, Block blocks[], - DatanodeInfo xferTargets[][]) { + DatanodeInfo xferTargets[][], StorageType[][] xferTargetStorageTypes) { for (int i = 0; i < blocks.length; i++) { try { - transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i]); + transferBlock(new ExtendedBlock(poolId, blocks[i]), xferTargets[i], + xferTargetStorageTypes[i]); } catch (IOException ie) { LOG.warn("Failed to transfer block " + blocks[i], ie); } @@ -1554,6 +1719,7 @@ public class DataNode extends Configured */ private class DataTransfer implements Runnable { final DatanodeInfo[] targets; + final StorageType[] targetStorageTypes; final ExtendedBlock b; final BlockConstructionStage stage; final private DatanodeRegistration bpReg; @@ -1564,7 +1730,8 @@ public class DataNode extends Configured * Connect to the first item in the target list. Pass along the * entire target list, the block, and the data. */ - DataTransfer(DatanodeInfo targets[], ExtendedBlock b, BlockConstructionStage stage, + DataTransfer(DatanodeInfo targets[], StorageType[] targetStorageTypes, + ExtendedBlock b, BlockConstructionStage stage, final String clientname) { if (DataTransferProtocol.LOG.isDebugEnabled()) { DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": " @@ -1574,6 +1741,7 @@ public class DataNode extends Configured + ", targests=" + Arrays.asList(targets)); } this.targets = targets; + this.targetStorageTypes = targetStorageTypes; this.b = b; this.stage = stage; BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId()); @@ -1605,20 +1773,25 @@ public class DataNode extends Configured NetUtils.connect(sock, curTarget, dnConf.socketTimeout); sock.setSoTimeout(targets.length * dnConf.socketTimeout); + // + // Header info + // + Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN; + if (isBlockTokenEnabled) { + accessToken = blockPoolTokenSecretManager.generateToken(b, + EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)); + } + long writeTimeout = dnConf.socketWriteTimeout + HdfsServerConstants.WRITE_TIMEOUT_EXTENSION * (targets.length-1); OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); InputStream unbufIn = NetUtils.getInputStream(sock); - if (dnConf.encryptDataTransfer && - !dnConf.trustedChannelResolver.isTrusted(sock.getInetAddress())) { - IOStreamPair encryptedStreams = - DataTransferEncryptor.getEncryptedStreams( - unbufOut, unbufIn, - blockPoolTokenSecretManager.generateDataEncryptionKey( - b.getBlockPoolId())); - unbufOut = encryptedStreams.out; - unbufIn = encryptedStreams.in; - } + DataEncryptionKeyFactory keyFactory = + getDataEncryptionKeyFactoryForBlock(b); + IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, + unbufIn, keyFactory, accessToken, bpReg); + unbufOut = saslStreams.out; + unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, HdfsConstants.SMALL_BUFFER_SIZE)); @@ -1627,16 +1800,8 @@ public class DataNode extends Configured false, false, true, DataNode.this, null, cachingStrategy); DatanodeInfo srcNode = new DatanodeInfo(bpReg); - // - // Header info - // - Token<BlockTokenIdentifier> accessToken = BlockTokenSecretManager.DUMMY_TOKEN; - if (isBlockTokenEnabled) { - accessToken = blockPoolTokenSecretManager.generateToken(b, - EnumSet.of(BlockTokenSecretManager.AccessMode.WRITE)); - } - - new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode, + new Sender(out).writeBlock(b, targetStorageTypes[0], accessToken, + clientname, targets, targetStorageTypes, srcNode, stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy); // send data & checksum @@ -1667,13 +1832,8 @@ public class DataNode extends Configured } catch (IOException ie) { LOG.warn(bpReg + ":Failed to transfer " + b + " to " + targets[0] + " got ", ie); - // check if there are any disk problem - try{ - checkDiskError(ie); - } catch(IOException e) { - LOG.warn("DataNode.checkDiskError failed in run() with: ", e); - } - + // check if there are any disk problem + checkDiskErrorAsync(); } finally { xmitsInProgress.getAndDecrement(); IOUtils.closeStream(blockSender); @@ -1683,12 +1843,32 @@ public class DataNode extends Configured } } } - + + /** + * Returns a new DataEncryptionKeyFactory that generates a key from the + * BlockPoolTokenSecretManager, using the block pool ID of the given block. + * + * @param block for which the factory needs to create a key + * @return DataEncryptionKeyFactory for block's block pool ID + */ + DataEncryptionKeyFactory getDataEncryptionKeyFactoryForBlock( + final ExtendedBlock block) { + return new DataEncryptionKeyFactory() { + @Override + public DataEncryptionKey newDataEncryptionKey() { + return dnConf.encryptDataTransfer ? + blockPoolTokenSecretManager.generateDataEncryptionKey( + block.getBlockPoolId()) : null; + } + }; + } + /** * After a block becomes finalized, a datanode increases metric counter, * notifies namenode, and adds it to the block scanner - * @param block - * @param delHint + * @param block block to close + * @param delHint hint on which excess block to delete + * @param storageUuid UUID of the storage where block is stored */ void closeBlock(ExtendedBlock block, String delHint, String storageUuid) { metrics.incrBlocksWritten(); @@ -1776,8 +1956,15 @@ public class DataNode extends Configured try { location = StorageLocation.parse(locationString); } catch (IOException ioe) { - throw new IllegalArgumentException("Failed to parse conf property " - + DFS_DATANODE_DATA_DIR_KEY + ": " + locationString, ioe); + LOG.error("Failed to initialize storage directory " + locationString + + ". Exception details: " + ioe); + // Ignore the exception. + continue; + } catch (SecurityException se) { + LOG.error("Failed to initialize storage directory " + locationString + + ". Exception details: " + se); + // Ignore the exception. + continue; } locations.add(location); @@ -2282,11 +2469,11 @@ public class DataNode extends Configured @Override // ClientDataNodeProtocol public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException { - checkWriteAccess(block); + checkReadAccess(block); return data.getReplicaVisibleLength(block); } - private void checkWriteAccess(final ExtendedBlock block) throws IOException { + private void checkReadAccess(final ExtendedBlock block) throws IOException { if (isBlockTokenEnabled) { Set<TokenIdentifier> tokenIds = UserGroupInformation.getCurrentUser() .getTokenIdentifiers(); @@ -2311,11 +2498,12 @@ public class DataNode extends Configured * The corresponding replica must be an RBW or a Finalized. * Its GS and numBytes will be set to * the stored GS and the visible length. - * @param targets - * @param client + * @param targets targets to transfer the block to + * @param client client name */ void transferReplicaForPipelineRecovery(final ExtendedBlock b, - final DatanodeInfo[] targets, final String client) throws IOException { + final DatanodeInfo[] targets, final StorageType[] targetStorageTypes, + final String client) throws IOException { final long storedGS; final long visible; final BlockConstructionStage stage; @@ -2348,7 +2536,7 @@ public class DataNode extends Configured b.setNumBytes(visible); if (targets.length > 0) { - new DataTransfer(targets, b, stage, client).run(); + new DataTransfer(targets, targetStorageTypes, b, stage, client).run(); } } @@ -2423,6 +2611,7 @@ public class DataNode extends Configured */ @Override // DataNodeMXBean public String getVolumeInfo() { + Preconditions.checkNotNull(data, "Storage not yet initialized"); return JSON.toString(data.getVolumeInfoMap()); } @@ -2437,6 +2626,7 @@ public class DataNode extends Configured @Override // ClientDatanodeProtocol public void refreshNamenodes() throws IOException { + checkSuperuserPrivilege(); conf = new Configuration(); refreshNamenodes(conf); } @@ -2444,6 +2634,7 @@ public class DataNode extends Configured @Override // ClientDatanodeProtocol public void deleteBlockPool(String blockPoolId, boolean force) throws IOException { + checkSuperuserPrivilege(); LOG.info("deleteBlockPool command received for block pool " + blockPoolId + ", force=" + force); if (blockPoolManager.get(blockPoolId) != null) { @@ -2459,6 +2650,7 @@ public class DataNode extends Configured @Override // ClientDatanodeProtocol public synchronized void shutdownDatanode(boolean forUpgrade) throws IOException { + checkSuperuserPrivilege(); LOG.info("shutdownDatanode command received (upgrade=" + forUpgrade + "). Shutting down Datanode..."); @@ -2579,4 +2771,59 @@ public class DataNode extends Configured public ShortCircuitRegistry getShortCircuitRegistry() { return shortCircuitRegistry; } + + /** + * Check the disk error + */ + private void checkDiskError() { + try { + data.checkDataDir(); + } catch (DiskErrorException de) { + handleDiskError(de.getMessage()); + } + } + + /** + * Starts a new thread which will check for disk error check request + * every 5 sec + */ + private void startCheckDiskErrorThread() { + checkDiskErrorThread = new Thread(new Runnable() { + @Override + public void run() { + while(shouldRun) { + boolean tempFlag ; + synchronized(checkDiskErrorMutex) { + tempFlag = checkDiskErrorFlag; + checkDiskErrorFlag = false; + } + if(tempFlag) { + try { + checkDiskError(); + } catch (Exception e) { + LOG.warn("Unexpected exception occurred while checking disk error " + e); + checkDiskErrorThread = null; + return; + } + synchronized(checkDiskErrorMutex) { + lastDiskErrorCheck = Time.monotonicNow(); + } + } + try { + Thread.sleep(checkDiskErrorInterval); + } catch (InterruptedException e) { + LOG.debug("InterruptedException in check disk error thread", e); + checkDiskErrorThread = null; + return; + } + } + } + }); + } + + public long getLastDiskErrorCheck() { + synchronized(checkDiskErrorMutex) { + return lastDiskErrorCheck; + } + } } Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java?rev=1619012&r1=1619011&r2=1619012&view=diff ============================================================================== --- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java (original) +++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeLayoutVersion.java Tue Aug 19 23:49:39 2014 @@ -62,7 +62,10 @@ public class DataNodeLayoutVersion { * </ul> */ public static enum Feature implements LayoutFeature { - FIRST_LAYOUT(-55, -53, "First datenode layout", false); + FIRST_LAYOUT(-55, -53, "First datanode layout", false), + BLOCKID_BASED_LAYOUT(-56, + "The block ID of a finalized block uniquely determines its position " + + "in the directory structure"); private final FeatureInfo info;