Repository: hadoop Updated Branches: refs/heads/branch-2 c00a468b0 -> ce5ad0e88
HDFS-10930. Refactor: Wrap Datanode IO related operations. Contributed by Xiaoyu Yao. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/48fb796f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/48fb796f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/48fb796f Branch: refs/heads/branch-2 Commit: 48fb796fcbcdbd7289fb22377bdd7b14e936a412 Parents: c00a468 Author: Xiaoyu Yao <x...@apache.org> Authored: Tue Jan 10 14:48:23 2017 -0800 Committer: Arpit Agarwal <a...@apache.org> Committed: Tue Jan 10 14:48:23 2017 -0800 ---------------------------------------------------------------------- .../hdfs/server/datanode/BlockReceiver.java | 66 +++++------- .../hdfs/server/datanode/BlockSender.java | 105 +++++++----------- .../hadoop/hdfs/server/datanode/DNConf.java | 4 + .../hdfs/server/datanode/DataStorage.java | 5 + .../hdfs/server/datanode/ReplicaInPipeline.java | 21 ++-- .../datanode/ReplicaInPipelineInterface.java | 4 +- .../hdfs/server/datanode/ReplicaInfo.java | 34 +++++- .../server/datanode/fsdataset/FsDatasetSpi.java | 3 +- .../datanode/fsdataset/ReplicaInputStreams.java | 102 +++++++++++++++++- .../fsdataset/ReplicaOutputStreams.java | 107 +++++++++++++++++-- .../datanode/fsdataset/impl/BlockPoolSlice.java | 96 ++++++++--------- .../impl/FsDatasetAsyncDiskService.java | 7 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 7 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 5 +- .../org/apache/hadoop/hdfs/TestFileAppend.java | 6 +- .../server/datanode/SimulatedFSDataset.java | 8 +- .../hdfs/server/datanode/TestBlockRecovery.java | 2 +- .../server/datanode/TestSimulatedFSDataset.java | 2 +- .../extdataset/ExternalDatasetImpl.java | 4 +- .../extdataset/ExternalReplicaInPipeline.java | 6 +- 20 files changed, 382 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 522d577..5244caf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -24,10 +24,7 @@ import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; -import java.io.FileDescriptor; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.nio.ByteBuffer; @@ -53,7 +50,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.StringUtils; @@ -88,8 +84,6 @@ class BlockReceiver implements Closeable { * the DataNode needs to recalculate checksums before writing. */ private final boolean needsChecksumTranslation; - private OutputStream out = null; // to block file at local disk - private FileDescriptor outFd; private DataOutputStream checksumOut = null; // to crc file at local disk private final int bytesPerChecksum; private final int checksumSize; @@ -250,7 +244,8 @@ class BlockReceiver implements Closeable { final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; - streams = replicaInfo.createStreams(isCreate, requestedChecksum); + streams = replicaInfo.createStreams(isCreate, requestedChecksum, + datanodeSlowLogThresholdMs); assert streams != null : "null streams!"; // read checksum meta information @@ -260,13 +255,6 @@ class BlockReceiver implements Closeable { this.bytesPerChecksum = diskChecksum.getBytesPerChecksum(); this.checksumSize = diskChecksum.getChecksumSize(); - this.out = streams.getDataOut(); - if (out instanceof FileOutputStream) { - this.outFd = ((FileOutputStream)out).getFD(); - } else { - LOG.warn("Could not get file descriptor for outputstream of class " + - out.getClass()); - } this.checksumOut = new DataOutputStream(new BufferedOutputStream( streams.getChecksumOut(), DFSUtilClient.getSmallBufferSize( datanode.getConf()))); @@ -319,7 +307,7 @@ class BlockReceiver implements Closeable { packetReceiver.close(); IOException ioe = null; - if (syncOnClose && (out != null || checksumOut != null)) { + if (syncOnClose && (streams.getDataOut() != null || checksumOut != null)) { datanode.metrics.incrFsyncCount(); } long flushTotalNanos = 0; @@ -348,9 +336,9 @@ class BlockReceiver implements Closeable { } // close block file try { - if (out != null) { + if (streams.getDataOut() != null) { long flushStartNanos = System.nanoTime(); - out.flush(); + streams.flushDataOut(); long flushEndNanos = System.nanoTime(); if (syncOnClose) { long fsyncStartNanos = flushEndNanos; @@ -359,14 +347,13 @@ class BlockReceiver implements Closeable { } flushTotalNanos += flushEndNanos - flushStartNanos; measuredFlushTime = true; - out.close(); - out = null; + streams.closeDataStream(); } } catch (IOException e) { ioe = e; } finally{ - IOUtils.closeStream(out); + streams.close(); } if (replicaHandler != null) { IOUtils.cleanup(null, replicaHandler); @@ -419,9 +406,9 @@ class BlockReceiver implements Closeable { } flushTotalNanos += flushEndNanos - flushStartNanos; } - if (out != null) { + if (streams.getDataOut() != null) { long flushStartNanos = System.nanoTime(); - out.flush(); + streams.flushDataOut(); long flushEndNanos = System.nanoTime(); if (isSync) { long fsyncStartNanos = flushEndNanos; @@ -430,10 +417,10 @@ class BlockReceiver implements Closeable { } flushTotalNanos += flushEndNanos - flushStartNanos; } - if (checksumOut != null || out != null) { + if (checksumOut != null || streams.getDataOut() != null) { datanode.metrics.addFlushNanos(flushTotalNanos); if (isSync) { - datanode.metrics.incrFsyncCount(); + datanode.metrics.incrFsyncCount(); } } long duration = Time.monotonicNow() - begin; @@ -716,16 +703,12 @@ class BlockReceiver implements Closeable { int numBytesToDisk = (int)(offsetInBlock-onDiskLen); // Write data to disk. - long begin = Time.monotonicNow(); - out.write(dataBuf.array(), startByteToDisk, numBytesToDisk); - long duration = Time.monotonicNow() - begin; + long duration = streams.writeToDisk(dataBuf.array(), + startByteToDisk, numBytesToDisk); + if (duration > maxWriteToDiskMs) { maxWriteToDiskMs = duration; } - if (duration > datanodeSlowLogThresholdMs) { - LOG.warn("Slow BlockReceiver write data to disk cost:" + duration - + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); - } final byte[] lastCrc; if (shouldNotWriteChecksum) { @@ -842,7 +825,7 @@ class BlockReceiver implements Closeable { private void manageWriterOsCache(long offsetInBlock) { try { - if (outFd != null && + if (streams.getOutFd() != null && offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) { long begin = Time.monotonicNow(); // @@ -857,12 +840,11 @@ class BlockReceiver implements Closeable { if (syncBehindWrites) { if (syncBehindWritesInBackground) { this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest( - block, outFd, lastCacheManagementOffset, + block, streams, lastCacheManagementOffset, offsetInBlock - lastCacheManagementOffset, SYNC_FILE_RANGE_WRITE); } else { - NativeIO.POSIX.syncFileRangeIfPossible(outFd, - lastCacheManagementOffset, + streams.syncFileRangeIfPossible(lastCacheManagementOffset, offsetInBlock - lastCacheManagementOffset, SYNC_FILE_RANGE_WRITE); } @@ -879,8 +861,8 @@ class BlockReceiver implements Closeable { // long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES; if (dropPos > 0 && dropCacheBehindWrites) { - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - block.getBlockName(), outFd, 0, dropPos, POSIX_FADV_DONTNEED); + streams.dropCacheBehindWrites(block.getBlockName(), 0, dropPos, + POSIX_FADV_DONTNEED); } lastCacheManagementOffset = offsetInBlock; long duration = Time.monotonicNow() - begin; @@ -989,7 +971,7 @@ class BlockReceiver implements Closeable { // The worst case is not recovering this RBW replica. // Client will fall back to regular pipeline recovery. } finally { - IOUtils.closeStream(out); + IOUtils.closeStream(streams.getDataOut()); } try { // Even if the connection is closed after the ack packet is @@ -1047,8 +1029,8 @@ class BlockReceiver implements Closeable { * will be overwritten. */ private void adjustCrcFilePosition() throws IOException { - if (out != null) { - out.flush(); + if (streams.getDataOut() != null) { + streams.flushDataOut(); } if (checksumOut != null) { checksumOut.flush(); @@ -1094,10 +1076,10 @@ class BlockReceiver implements Closeable { byte[] crcbuf = new byte[checksumSize]; try (ReplicaInputStreams instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff)) { - IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk); + instr.readDataFully(buf, 0, sizePartialChunk); // open meta file and read in crc value computer earlier - IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length); + instr.readChecksumFully(crcbuf, 0, crcbuf.length); } // compute crc of partial chunk from data read in the block file. http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index aeeef97..0c3c3dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -41,11 +40,11 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; -import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; @@ -120,12 +119,11 @@ class BlockSender implements java.io.Closeable { /** the block to read from */ private final ExtendedBlock block; - /** Stream to read block data from */ - private InputStream blockIn; + + /** InputStreams and file descriptors to read block/checksum. */ + private ReplicaInputStreams ris; /** updated while using transferTo() */ private long blockInPosition = -1; - /** Stream to read checksum */ - private DataInputStream checksumIn; /** Checksum utility */ private final DataChecksum checksum; /** Initial position to read */ @@ -152,11 +150,6 @@ class BlockSender implements java.io.Closeable { private final String clientTraceFmt; private volatile ChunkChecksum lastChunkChecksum = null; private DataNode datanode; - - /** The file descriptor of the block being sent */ - private FileDescriptor blockInFd; - /** The reference to the volume where the block is located */ - private FsVolumeReference volumeRef; /** The replica of the block that is being read. */ private final Replica replica; @@ -201,6 +194,9 @@ class BlockSender implements java.io.Closeable { boolean sendChecksum, DataNode datanode, String clientTraceFmt, CachingStrategy cachingStrategy) throws IOException { + InputStream blockIn = null; + DataInputStream checksumIn = null; + FsVolumeReference volumeRef = null; try { this.block = block; this.corruptChecksumOk = corruptChecksumOk; @@ -291,7 +287,7 @@ class BlockSender implements java.io.Closeable { (!is32Bit || length <= Integer.MAX_VALUE); // Obtain a reference before reading data - this.volumeRef = datanode.data.getVolume(block).obtainReference(); + volumeRef = datanode.data.getVolume(block).obtainReference(); /* * (corruptChecksumOK, meta_file_exist): operation @@ -415,14 +411,9 @@ class BlockSender implements java.io.Closeable { DataNode.LOG.debug("replica=" + replica); } blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset - if (blockIn instanceof FileInputStream) { - blockInFd = ((FileInputStream)blockIn).getFD(); - } else { - blockInFd = null; - } + ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef); } catch (IOException ioe) { IOUtils.closeStream(this); - IOUtils.closeStream(blockIn); throw ioe; } } @@ -432,12 +423,11 @@ class BlockSender implements java.io.Closeable { */ @Override public void close() throws IOException { - if (blockInFd != null && + if (ris.getDataInFd() != null && ((dropCacheBehindAllReads) || (dropCacheBehindLargeReads && isLongRead()))) { try { - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - block.getBlockName(), blockInFd, lastCacheDropOffset, + ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset, offset - lastCacheDropOffset, POSIX_FADV_DONTNEED); } catch (Exception e) { LOG.warn("Unable to drop cache on file close", e); @@ -446,32 +436,12 @@ class BlockSender implements java.io.Closeable { if (curReadahead != null) { curReadahead.cancel(); } - - IOException ioe = null; - if(checksumIn!=null) { - try { - checksumIn.close(); // close checksum file - } catch (IOException e) { - ioe = e; - } - checksumIn = null; - } - if(blockIn!=null) { - try { - blockIn.close(); // close data file - } catch (IOException e) { - ioe = e; - } - blockIn = null; - blockInFd = null; - } - if (volumeRef != null) { - IOUtils.cleanup(null, volumeRef); - volumeRef = null; - } - // throw IOException if there is any - if(ioe!= null) { - throw ioe; + + try { + ris.closeStreams(); + } finally { + IOUtils.closeStream(ris); + ris = null; } } @@ -575,7 +545,7 @@ class BlockSender implements java.io.Closeable { int checksumOff = pkt.position(); byte[] buf = pkt.array(); - if (checksumSize > 0 && checksumIn != null) { + if (checksumSize > 0 && ris.getChecksumIn() != null) { readChecksum(buf, checksumOff, checksumDataLen); // write in progress that we need to use to get last checksum @@ -590,7 +560,7 @@ class BlockSender implements java.io.Closeable { int dataOff = checksumOff + checksumDataLen; if (!transferTo) { // normal transfer - IOUtils.readFully(blockIn, buf, dataOff, dataLen); + ris.readDataFully(buf, dataOff, dataLen); if (verifyChecksum) { verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff); @@ -602,12 +572,12 @@ class BlockSender implements java.io.Closeable { SocketOutputStream sockOut = (SocketOutputStream)out; // First write header and checksums sockOut.write(buf, headerOff, dataOff - headerOff); - + // no need to flush since we know out is not a buffered stream - FileChannel fileCh = ((FileInputStream)blockIn).getChannel(); + FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel(); LongWritable waitTime = new LongWritable(); LongWritable transferTime = new LongWritable(); - sockOut.transferToFully(fileCh, blockInPosition, dataLen, + sockOut.transferToFully(fileCh, blockInPosition, dataLen, waitTime, transferTime); datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get()); datanode.metrics.addSendDataPacketTransferNanos(transferTime.get()); @@ -639,7 +609,7 @@ class BlockSender implements java.io.Closeable { if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) { LOG.error("BlockSender.sendChunks() exception: ", e); datanode.getBlockScanner().markSuspectBlock( - volumeRef.getVolume().getStorageID(), + ris.getVolumeRef().getVolume().getStorageID(), block); } } @@ -662,16 +632,15 @@ class BlockSender implements java.io.Closeable { */ private void readChecksum(byte[] buf, final int checksumOffset, final int checksumLen) throws IOException { - if (checksumSize <= 0 && checksumIn == null) { + if (checksumSize <= 0 && ris.getChecksumIn() == null) { return; } try { - checksumIn.readFully(buf, checksumOffset, checksumLen); + ris.readChecksumFully(buf, checksumOffset, checksumLen); } catch (IOException e) { LOG.warn(" Could not read or failed to verify checksum for data" + " at offset " + offset + " for block " + block, e); - IOUtils.closeStream(checksumIn); - checksumIn = null; + ris.closeChecksumStream(); if (corruptChecksumOk) { if (checksumOffset < checksumLen) { // Just fill the array with zeros. @@ -755,10 +724,10 @@ class BlockSender implements java.io.Closeable { lastCacheDropOffset = initialOffset; - if (isLongRead() && blockInFd != null) { + if (isLongRead() && ris.getDataInFd() != null) { // Advise that this file descriptor will be accessed sequentially. - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - block.getBlockName(), blockInFd, 0, 0, POSIX_FADV_SEQUENTIAL); + ris.dropCacheBehindReads(block.getBlockName(), 0, 0, + POSIX_FADV_SEQUENTIAL); } // Trigger readahead of beginning of file if configured. @@ -770,9 +739,10 @@ class BlockSender implements java.io.Closeable { int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN; boolean transferTo = transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream - && blockIn instanceof FileInputStream; + && ris.getDataIn() instanceof FileInputStream; if (transferTo) { - FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); + FileChannel fileChannel = + ((FileInputStream)ris.getDataIn()).getChannel(); blockInPosition = fileChannel.position(); streamForSendChunks = baseStream; maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE); @@ -827,14 +797,16 @@ class BlockSender implements java.io.Closeable { private void manageOsCache() throws IOException { // We can't manage the cache for this block if we don't have a file // descriptor to work with. - if (blockInFd == null) return; + if (ris.getDataInFd() == null) { + return; + } // Perform readahead if necessary if ((readaheadLength > 0) && (datanode.readaheadPool != null) && (alwaysReadahead || isLongRead())) { curReadahead = datanode.readaheadPool.readaheadStream( - clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE, - curReadahead); + clientTraceFmt, ris.getDataInFd(), offset, readaheadLength, + Long.MAX_VALUE, curReadahead); } // Drop what we've just read from cache, since we aren't @@ -844,8 +816,7 @@ class BlockSender implements java.io.Closeable { long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES; if (offset >= nextCacheDropOffset) { long dropLength = offset - lastCacheDropOffset; - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - block.getBlockName(), blockInFd, lastCacheDropOffset, + ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset, dropLength, POSIX_FADV_DONTNEED); lastCacheDropOffset = offset; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 09f336a..9ba1be0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -366,6 +366,10 @@ public class DNConf { return volsConfigured; } + public long getSlowIoWarningThresholdMs() { + return datanodeSlowIoWarningThresholdMs; + } + int getMaxDataLength() { return maxDataLength; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index a1abfce..a4071c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -1394,4 +1394,9 @@ public class DataStorage extends Storage { synchronized void removeBlockPoolStorage(String bpId) { bpStorageMap.remove(bpId); } + + public static boolean fullyDelete(final File dir) { + boolean result = FileUtil.fullyDelete(dir); + return result; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index 7326846..251ba77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -130,7 +130,7 @@ public class ReplicaInPipeline extends ReplicaInfo public long getBytesAcked() { return bytesAcked; } - + @Override // ReplicaInPipelineInterface public void setBytesAcked(long bytesAcked) { long newBytesAcked = bytesAcked - this.bytesAcked; @@ -234,7 +234,8 @@ public class ReplicaInPipeline extends ReplicaInfo @Override // ReplicaInPipelineInterface public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum) throws IOException { + DataChecksum requestedChecksum, long slowLogThresholdMs) + throws IOException { File blockFile = getBlockFile(); File metaFile = getMetaFile(); if (DataNode.LOG.isDebugEnabled()) { @@ -245,13 +246,13 @@ public class ReplicaInPipeline extends ReplicaInfo } long blockDiskSize = 0L; long crcDiskSize = 0L; - + // the checksum that should actually be used -- this // may differ from requestedChecksum for appends. final DataChecksum checksum; - + RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); - + if (!isCreate) { // For append or recovery, we must enforce the existing checksum. // Also, verify that the file has correct lengths, etc. @@ -259,14 +260,14 @@ public class ReplicaInPipeline extends ReplicaInfo try { BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF); checksum = header.getChecksum(); - + if (checksum.getBytesPerChecksum() != requestedChecksum.getBytesPerChecksum()) { throw new IOException("Client requested checksum " + requestedChecksum + " when appending to an existing block " + "with different chunk size: " + checksum); } - + int bytesPerChunk = checksum.getBytesPerChecksum(); int checksumSize = checksum.getChecksumSize(); @@ -288,19 +289,19 @@ public class ReplicaInPipeline extends ReplicaInfo // for create, we can use the requested checksum checksum = requestedChecksum; } - + FileOutputStream blockOut = null; FileOutputStream crcOut = null; try { blockOut = new FileOutputStream( new RandomAccessFile( blockFile, "rw" ).getFD() ); - crcOut = new FileOutputStream(metaRAF.getFD() ); + crcOut = new FileOutputStream(metaRAF.getFD()); if (!isCreate) { blockOut.getChannel().position(blockDiskSize); crcOut.getChannel().position(crcDiskSize); } return new ReplicaOutputStreams(blockOut, crcOut, checksum, - getVolume().isTransientStorage()); + getVolume().isTransientStorage(), slowLogThresholdMs); } catch (IOException e) { IOUtils.closeStream(blockOut); IOUtils.closeStream(metaRAF); http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java index ef9f3e2..ae02d6d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java @@ -69,11 +69,13 @@ public interface ReplicaInPipelineInterface extends Replica { * * @param isCreate if it is for creation * @param requestedChecksum the checksum the writer would prefer to use + * @param slowLogThresholdMs threshold in ms to log slow io operation * @return output streams for writing * @throws IOException if any error occurs */ public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum) throws IOException; + DataChecksum requestedChecksum, long slowLogThresholdMs) + throws IOException; /** * Create an output stream to write restart metadata in case of datanode http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index 3ef6390..9a58764 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -28,6 +30,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.LightWeightResizableGSet; @@ -230,7 +233,7 @@ abstract public class ReplicaInfo extends Block try { FileOutputStream out = new FileOutputStream(tmpFile); try { - IOUtils.copyBytes(in, out, 16 * 1024); + copyBytes(in, out, 16 * 1024); } finally { out.close(); } @@ -242,7 +245,7 @@ abstract public class ReplicaInfo extends Block " into file " + tmpFile + " resulted in a size of " + tmpFile.length()); } - FileUtil.replaceFile(tmpFile, file); + replaceFile(tmpFile, file); } catch (IOException e) { boolean done = tmpFile.delete(); if (!done) { @@ -277,13 +280,13 @@ abstract public class ReplicaInfo extends Block } File meta = getMetaFile(); - int linkCount = HardLink.getLinkCount(file); + int linkCount = getHardLinkCount(file); if (linkCount > 1) { DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " + "block " + this); breakHardlinks(file, this); } - if (HardLink.getLinkCount(meta) > 1) { + if (getHardLinkCount(meta) > 1) { breakHardlinks(meta, this); } return true; @@ -315,4 +318,27 @@ abstract public class ReplicaInfo extends Block public void setNext(LightWeightResizableGSet.LinkedElement next) { this.next = next; } + + public static boolean fullyDelete(final File dir) { + boolean result = DataStorage.fullyDelete(dir); + return result; + } + + public static int getHardLinkCount(File fileName) throws IOException { + int linkCount = HardLink.getLinkCount(fileName); + return linkCount; + } + + public static void rename(File from, File to) throws IOException { + Storage.rename(from, to); + } + + private void copyBytes(InputStream in, OutputStream out, int + buffSize) throws IOException{ + IOUtils.copyBytes(in, out, buffSize); + } + + private void replaceFile(File src, File target) throws IOException { + FileUtil.replaceFile(src, target); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 52201be..8356331 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; import java.io.EOFException; import java.io.File; -import java.io.FileDescriptor; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -618,7 +617,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { * submit a sync_file_range request to AsyncDiskService. */ void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block, - final FileDescriptor fd, final long offset, final long nbytes, + final ReplicaOutputStreams outs, final long offset, final long nbytes, final int flags); /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java index 227179d..54d0e96 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java @@ -18,24 +18,45 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; +import java.io.FileDescriptor; +import java.io.FileInputStream; import java.io.InputStream; +import java.io.IOException; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIOException; +import org.slf4j.Logger; /** * Contains the input streams for the data and checksum of a replica. */ public class ReplicaInputStreams implements Closeable { - private final InputStream dataIn; - private final InputStream checksumIn; - private final FsVolumeReference volumeRef; + public static final Logger LOG = DataNode.LOG; + + private InputStream dataIn; + private InputStream checksumIn; + private FsVolumeReference volumeRef; + private FileDescriptor dataInFd = null; /** Create an object with a data input stream and a checksum input stream. */ - public ReplicaInputStreams(InputStream dataStream, InputStream checksumStream, - FsVolumeReference volumeRef) { + public ReplicaInputStreams(InputStream dataStream, + InputStream checksumStream, FsVolumeReference volumeRef) { this.volumeRef = volumeRef; this.dataIn = dataStream; this.checksumIn = checksumStream; + if (dataIn instanceof FileInputStream) { + try { + dataInFd = ((FileInputStream) dataIn).getFD(); + } catch (Exception e) { + LOG.warn("Could not get file descriptor for inputstream of class " + + this.dataIn.getClass()); + } + } else { + LOG.debug("Could not get file descriptor for inputstream of class " + + this.dataIn.getClass()); + } } /** @return the data input stream. */ @@ -48,10 +69,81 @@ public class ReplicaInputStreams implements Closeable { return checksumIn; } + public FileDescriptor getDataInFd() { + return dataInFd; + } + + public FsVolumeReference getVolumeRef() { + return volumeRef; + } + + public void readDataFully(byte[] buf, int off, int len) + throws IOException { + IOUtils.readFully(dataIn, buf, off, len); + } + + public void readChecksumFully(byte[] buf, int off, int len) + throws IOException { + IOUtils.readFully(checksumIn, buf, off, len); + } + + public void skipDataFully(long len) throws IOException { + IOUtils.skipFully(dataIn, len); + } + + public void skipChecksumFully(long len) throws IOException { + IOUtils.skipFully(checksumIn, len); + } + + public void closeChecksumStream() throws IOException { + IOUtils.closeStream(checksumIn); + checksumIn = null; + } + + public void dropCacheBehindReads(String identifier, long offset, long len, + int flags) throws NativeIOException { + assert this.dataInFd != null : "null dataInFd!"; + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + identifier, dataInFd, offset, len, flags); + } + + public void closeStreams() throws IOException { + IOException ioe = null; + if(checksumIn!=null) { + try { + checksumIn.close(); // close checksum file + } catch (IOException e) { + ioe = e; + } + checksumIn = null; + } + if(dataIn!=null) { + try { + dataIn.close(); // close data file + } catch (IOException e) { + ioe = e; + } + dataIn = null; + dataInFd = null; + } + if (volumeRef != null) { + IOUtils.cleanup(null, volumeRef); + volumeRef = null; + } + // throw IOException if there is any + if(ioe!= null) { + throw ioe; + } + } + @Override public void close() { IOUtils.closeStream(dataIn); + dataIn = null; + dataInFd = null; IOUtils.closeStream(checksumIn); + checksumIn = null; IOUtils.cleanup(null, volumeRef); + volumeRef = null; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java index bd1461a..a66847a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java @@ -18,32 +18,62 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; +import java.io.FileDescriptor; import java.io.FileOutputStream; import java.io.OutputStream; import java.io.IOException; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIOException; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; /** * Contains the output streams for the data and checksum of a replica. */ public class ReplicaOutputStreams implements Closeable { - private final OutputStream dataOut; + public static final Logger LOG = DataNode.LOG; + + private FileDescriptor outFd = null; + /** Stream to block. */ + private OutputStream dataOut; + /** Stream to checksum. */ private final OutputStream checksumOut; private final DataChecksum checksum; private final boolean isTransientStorage; + private final long slowLogThresholdMs; /** * Create an object with a data output stream, a checksum output stream * and a checksum. */ - public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut, - DataChecksum checksum, boolean isTransientStorage) { + public ReplicaOutputStreams(OutputStream dataOut, + OutputStream checksumOut, DataChecksum checksum, + boolean isTransientStorage, long slowLogThresholdMs) { this.dataOut = dataOut; - this.checksumOut = checksumOut; this.checksum = checksum; + this.slowLogThresholdMs = slowLogThresholdMs; this.isTransientStorage = isTransientStorage; + this.checksumOut = checksumOut; + + try { + if (this.dataOut instanceof FileOutputStream) { + this.outFd = ((FileOutputStream)this.dataOut).getFD(); + } else { + LOG.debug("Could not get file descriptor for outputstream of class " + + this.dataOut.getClass()); + } + } catch (IOException e) { + LOG.warn("Could not get file descriptor for outputstream of class " + + this.dataOut.getClass()); + } + } + + public FileDescriptor getOutFd() { + return outFd; } /** @return the data output stream. */ @@ -72,12 +102,17 @@ public class ReplicaOutputStreams implements Closeable { IOUtils.closeStream(checksumOut); } + public void closeDataStream() throws IOException { + dataOut.close(); + dataOut = null; + } + /** * Sync the data stream if it supports it. */ public void syncDataOut() throws IOException { if (dataOut instanceof FileOutputStream) { - ((FileOutputStream)dataOut).getChannel().force(true); + sync((FileOutputStream)dataOut); } } @@ -86,8 +121,68 @@ public class ReplicaOutputStreams implements Closeable { */ public void syncChecksumOut() throws IOException { if (checksumOut instanceof FileOutputStream) { - ((FileOutputStream)checksumOut).getChannel().force(true); + sync((FileOutputStream)checksumOut); } } + /** + * Flush the data stream if it supports it. + */ + public void flushDataOut() throws IOException { + flush(dataOut); + } + + /** + * Flush the checksum stream if it supports it. + */ + public void flushChecksumOut() throws IOException { + flush(checksumOut); + } + + private void flush(OutputStream dos) throws IOException { + long begin = Time.monotonicNow(); + dos.flush(); + long duration = Time.monotonicNow() - begin; + LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration); + if (duration > slowLogThresholdMs) { + LOG.warn("Slow flush took {} ms (threshold={} ms)", duration, + slowLogThresholdMs); + } + } + + private void sync(FileOutputStream fos) throws IOException { + long begin = Time.monotonicNow(); + fos.getChannel().force(true); + long duration = Time.monotonicNow() - begin; + LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration); + if (duration > slowLogThresholdMs) { + LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration, + slowLogThresholdMs); + } + } + + public long writeToDisk(byte[] b, int off, int len) throws IOException { + long begin = Time.monotonicNow(); + dataOut.write(b, off, len); + long duration = Time.monotonicNow() - begin; + LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration); + if (duration > slowLogThresholdMs) { + LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " + + "(threshold={} ms)", duration, slowLogThresholdMs); + } + return duration; + } + + public void syncFileRangeIfPossible(long offset, long nbytes, + int flags) throws NativeIOException { + assert this.outFd != null : "null outFd!"; + NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags); + } + + public void dropCacheBehindWrites(String identifier, + long offset, long len, int flags) throws NativeIOException { + assert this.outFd != null : "null outFd!"; + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + identifier, outFd, offset, len, flags); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 4be9715..5b701b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -52,8 +52,9 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; + import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker; @@ -145,7 +146,7 @@ class BlockPoolSlice { // this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); if (tmpDir.exists()) { - FileUtil.fullyDelete(tmpDir); + DataStorage.fullyDelete(tmpDir); } this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW); final boolean supportAppends = conf.getBoolean( @@ -430,7 +431,7 @@ class BlockPoolSlice { final File targetMetaFile = new File(targetDir, metaFile.getName()); try { - NativeIO.renameTo(metaFile, targetMetaFile); + ReplicaInfo.rename(metaFile, targetMetaFile); } catch (IOException e) { LOG.warn("Failed to move meta file from " + metaFile + " to " + targetMetaFile, e); @@ -440,7 +441,7 @@ class BlockPoolSlice { final File targetBlockFile = new File(targetDir, blockFile.getName()); try { - NativeIO.renameTo(blockFile, targetBlockFile); + ReplicaInfo.rename(blockFile, targetBlockFile); } catch (IOException e) { LOG.warn("Failed to move block file from " + blockFile + " to " + targetBlockFile, e); @@ -670,8 +671,6 @@ class BlockPoolSlice { * @return the number of valid bytes */ private long validateIntegrityAndSetLength(File blockFile, long genStamp) { - DataInputStream checksumIn = null; - InputStream blockIn = null; try { final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp); long blockFileLen = blockFile.length(); @@ -681,57 +680,52 @@ class BlockPoolSlice { !metaFile.exists() || metaFileLen < crcHeaderLen) { return 0; } - checksumIn = new DataInputStream( + try (DataInputStream checksumIn = new DataInputStream( new BufferedInputStream(new FileInputStream(metaFile), - ioFileBufferSize)); - - // read and handle the common header here. For now just a version - final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( - checksumIn, metaFile); - int bytesPerChecksum = checksum.getBytesPerChecksum(); - int checksumSize = checksum.getChecksumSize(); - long numChunks = Math.min( - (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, - (metaFileLen - crcHeaderLen)/checksumSize); - if (numChunks == 0) { - return 0; - } - IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize); - blockIn = new FileInputStream(blockFile); - long lastChunkStartPos = (numChunks-1)*bytesPerChecksum; - IOUtils.skipFully(blockIn, lastChunkStartPos); - int lastChunkSize = (int)Math.min( - bytesPerChecksum, blockFileLen-lastChunkStartPos); - byte[] buf = new byte[lastChunkSize+checksumSize]; - checksumIn.readFully(buf, lastChunkSize, checksumSize); - IOUtils.readFully(blockIn, buf, 0, lastChunkSize); - - checksum.update(buf, 0, lastChunkSize); - long validFileLength; - if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc - validFileLength = lastChunkStartPos + lastChunkSize; - } else { // last chunck is corrupt - validFileLength = lastChunkStartPos; - } - - // truncate if extra bytes are present without CRC - if (blockFile.length() > validFileLength) { - RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw"); - try { - // truncate blockFile - blockRAF.setLength(validFileLength); - } finally { - blockRAF.close(); + ioFileBufferSize))) { + // read and handle the common header here. For now just a version + final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( + checksumIn, metaFile); + int bytesPerChecksum = checksum.getBytesPerChecksum(); + int checksumSize = checksum.getChecksumSize(); + long numChunks = Math.min( + (blockFileLen + bytesPerChecksum - 1) / bytesPerChecksum, + (metaFileLen - crcHeaderLen) / checksumSize); + if (numChunks == 0) { + return 0; + } + try (InputStream blockIn = new FileInputStream(blockFile); + ReplicaInputStreams ris = new ReplicaInputStreams(blockIn, + checksumIn, volume.obtainReference())) { + ris.skipChecksumFully((numChunks - 1) * checksumSize); + long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum; + ris.skipDataFully(lastChunkStartPos); + int lastChunkSize = (int) Math.min( + bytesPerChecksum, blockFileLen - lastChunkStartPos); + byte[] buf = new byte[lastChunkSize + checksumSize]; + ris.readChecksumFully(buf, lastChunkSize, checksumSize); + ris.readDataFully(buf, 0, lastChunkSize); + checksum.update(buf, 0, lastChunkSize); + long validFileLength; + if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc + validFileLength = lastChunkStartPos + lastChunkSize; + } else { // last chunk is corrupt + validFileLength = lastChunkStartPos; + } + // truncate if extra bytes are present without CRC + if (blockFile.length() > validFileLength) { + try (RandomAccessFile blockRAF = + new RandomAccessFile(blockFile, "rw")) { + // truncate blockFile + blockRAF.setLength(validFileLength); + } + } + return validFileLength; } } - - return validFileLength; } catch (IOException e) { FsDatasetImpl.LOG.warn(e); return 0; - } finally { - IOUtils.closeStream(checksumIn); - IOUtils.closeStream(blockIn); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index fdc9f83..36d90fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; -import java.io.FileDescriptor; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -35,9 +34,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIOException; /** @@ -193,13 +192,13 @@ class FsDatasetAsyncDiskService { } public void submitSyncFileRangeRequest(FsVolumeImpl volume, - final FileDescriptor fd, final long offset, final long nbytes, + final ReplicaOutputStreams streams, final long offset, final long nbytes, final int flags) { execute(volume.getCurrentDir(), new Runnable() { @Override public void run() { try { - NativeIO.POSIX.syncFileRangeIfPossible(fd, offset, nbytes, flags); + streams.syncFileRangeIfPossible(offset, nbytes, flags); } catch (NativeIOException e) { LOG.warn("sync_file_range error", e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index a485110..7cee8f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -21,9 +21,8 @@ import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.File; -import java.io.FileDescriptor; -import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -2961,9 +2960,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { @Override public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, - FileDescriptor fd, long offset, long nbytes, int flags) { + ReplicaOutputStreams outs, long offset, long nbytes, int flags) { FsVolumeImpl fsVolumeImpl = this.getVolume(block); - asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset, + asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, outs, offset, nbytes, flags); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 6d96c9b..ecd72ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -1007,7 +1007,7 @@ public class FsVolumeImpl implements FsVolumeSpi { DataStorage.STORAGE_DIR_LAZY_PERSIST); File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW); if (force) { - FileUtil.fullyDelete(bpDir); + DataStorage.fullyDelete(bpDir); } else { if (!rbwDir.delete()) { throw new IOException("Failed to delete " + rbwDir); @@ -1021,7 +1021,7 @@ public class FsVolumeImpl implements FsVolumeSpi { !FileUtil.fullyDelete(lazypersistDir)))) { throw new IOException("Failed to delete " + lazypersistDir); } - FileUtil.fullyDelete(tmpDir); + DataStorage.fullyDelete(tmpDir); for (File f : FileUtil.listFiles(bpCurrentDir)) { if (!f.delete()) { throw new IOException("Failed to delete " + f); @@ -1091,4 +1091,3 @@ public class FsVolumeImpl implements FsVolumeSpi { return lastChecksum; } } - http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index 3035bbb..0bb95d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -700,9 +700,9 @@ public class TestFileAppend{ // write data to block file ReplicaBeingWritten rbw = - (ReplicaBeingWritten) replicaHandler.getReplica(); - ReplicaOutputStreams outputStreams = - rbw.createStreams(false, DEFAULT_CHECKSUM); + (ReplicaBeingWritten)replicaHandler.getReplica(); + ReplicaOutputStreams + outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300); OutputStream dataOutput = outputStreams.getDataOut(); byte[] appendBytes = new byte[1]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 28192df..80481c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; -import java.io.FileDescriptor; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -256,14 +255,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { @Override synchronized public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum) throws IOException { + DataChecksum requestedChecksum, long slowLogThresholdMs) + throws IOException { if (finalized) { throw new IOException("Trying to write to a finalized replica " + theBlock); } else { SimulatedOutputStream crcStream = new SimulatedOutputStream(); return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum, - volume.isTransientStorage()); + volume.isTransientStorage(), slowLogThresholdMs); } } @@ -1341,7 +1341,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { @Override public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, - FileDescriptor fd, long offset, long nbytes, int flags) { + ReplicaOutputStreams outs, long offset, long nbytes, int flags) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 346250b..6977c57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -642,7 +642,7 @@ public class TestBlockRecovery { ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, - DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300); streams.getChecksumOut().write('a'); dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index dd7d239..84e9180 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -83,7 +83,7 @@ public class TestSimulatedFSDataset { ReplicaInPipelineInterface bInfo = fsdataset.createRbw( StorageType.DEFAULT, b, false).getReplica(); ReplicaOutputStreams out = bInfo.createStreams(true, - DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300); try { OutputStream dataOut = out.getDataOut(); assertEquals(0, fsdataset.getLength(b)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 1bbf9de..ab572d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -322,8 +322,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> { } @Override - public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, FileDescriptor fd, long offset, long nbytes, int flags) { - + public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, + ReplicaOutputStreams outs, long offset, long nbytes, int flags) { } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/48fb796f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java index a0039bc..3e2fd7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java @@ -57,8 +57,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipelineInterface { @Override public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum) throws IOException { - return new ReplicaOutputStreams(null, null, requestedChecksum, false); + DataChecksum requestedChecksum, long slowLogThresholdMs) + throws IOException { + return new ReplicaOutputStreams(null, null, requestedChecksum, false, + slowLogThresholdMs); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org