Repository: hadoop Updated Branches: refs/heads/branch-2 4c9ca4738 -> 0bdaa2333
Revert "HDFS-10930. Refactor: Wrap Datanode IO related operations. Contributed by Xiaoyu Yao." This reverts commit 4c9ca47386845f55c43c684c7a8250536101ddc3. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0bdaa233 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0bdaa233 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0bdaa233 Branch: refs/heads/branch-2 Commit: 0bdaa233343346ae6c14c5e81cb8c0f180918646 Parents: 4c9ca47 Author: Xiaoyu Yao <x...@apache.org> Authored: Fri Dec 9 21:37:13 2016 -0800 Committer: Xiaoyu Yao <x...@apache.org> Committed: Fri Dec 9 21:37:13 2016 -0800 ---------------------------------------------------------------------- .../hdfs/server/datanode/BlockReceiver.java | 66 +++++++----- .../hdfs/server/datanode/BlockSender.java | 105 +++++++++++------- .../hadoop/hdfs/server/datanode/DNConf.java | 4 - .../hdfs/server/datanode/DataStorage.java | 5 - .../hdfs/server/datanode/ReplicaInPipeline.java | 21 ++-- .../datanode/ReplicaInPipelineInterface.java | 4 +- .../hdfs/server/datanode/ReplicaInfo.java | 34 +----- .../server/datanode/fsdataset/FsDatasetSpi.java | 3 +- .../datanode/fsdataset/ReplicaInputStreams.java | 102 +----------------- .../fsdataset/ReplicaOutputStreams.java | 107 ++----------------- .../datanode/fsdataset/impl/BlockPoolSlice.java | 96 +++++++++-------- .../impl/FsDatasetAsyncDiskService.java | 7 +- .../datanode/fsdataset/impl/FsDatasetImpl.java | 7 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 5 +- .../org/apache/hadoop/hdfs/TestFileAppend.java | 6 +- .../server/datanode/SimulatedFSDataset.java | 8 +- .../hdfs/server/datanode/TestBlockRecovery.java | 2 +- .../server/datanode/TestSimulatedFSDataset.java | 2 +- .../extdataset/ExternalDatasetImpl.java | 4 +- .../extdataset/ExternalReplicaInPipeline.java | 6 +- 20 files changed, 212 insertions(+), 382 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 5244caf..522d577 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -24,7 +24,10 @@ import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; +import java.io.FileDescriptor; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.nio.ByteBuffer; @@ -50,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.StringUtils; @@ -84,6 +88,8 @@ class BlockReceiver implements Closeable { * the DataNode needs to recalculate checksums before writing. */ private final boolean needsChecksumTranslation; + private OutputStream out = null; // to block file at local disk + private FileDescriptor outFd; private DataOutputStream checksumOut = null; // to crc file at local disk private final int bytesPerChecksum; private final int checksumSize; @@ -244,8 +250,7 @@ class BlockReceiver implements Closeable { final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; - streams = replicaInfo.createStreams(isCreate, requestedChecksum, - datanodeSlowLogThresholdMs); + streams = replicaInfo.createStreams(isCreate, requestedChecksum); assert streams != null : "null streams!"; // read checksum meta information @@ -255,6 +260,13 @@ class BlockReceiver implements Closeable { this.bytesPerChecksum = diskChecksum.getBytesPerChecksum(); this.checksumSize = diskChecksum.getChecksumSize(); + this.out = streams.getDataOut(); + if (out instanceof FileOutputStream) { + this.outFd = ((FileOutputStream)out).getFD(); + } else { + LOG.warn("Could not get file descriptor for outputstream of class " + + out.getClass()); + } this.checksumOut = new DataOutputStream(new BufferedOutputStream( streams.getChecksumOut(), DFSUtilClient.getSmallBufferSize( datanode.getConf()))); @@ -307,7 +319,7 @@ class BlockReceiver implements Closeable { packetReceiver.close(); IOException ioe = null; - if (syncOnClose && (streams.getDataOut() != null || checksumOut != null)) { + if (syncOnClose && (out != null || checksumOut != null)) { datanode.metrics.incrFsyncCount(); } long flushTotalNanos = 0; @@ -336,9 +348,9 @@ class BlockReceiver implements Closeable { } // close block file try { - if (streams.getDataOut() != null) { + if (out != null) { long flushStartNanos = System.nanoTime(); - streams.flushDataOut(); + out.flush(); long flushEndNanos = System.nanoTime(); if (syncOnClose) { long fsyncStartNanos = flushEndNanos; @@ -347,13 +359,14 @@ class BlockReceiver implements Closeable { } flushTotalNanos += flushEndNanos - flushStartNanos; measuredFlushTime = true; - streams.closeDataStream(); + out.close(); + out = null; } } catch (IOException e) { ioe = e; } finally{ - streams.close(); + IOUtils.closeStream(out); } if (replicaHandler != null) { IOUtils.cleanup(null, replicaHandler); @@ -406,9 +419,9 @@ class BlockReceiver implements Closeable { } flushTotalNanos += flushEndNanos - flushStartNanos; } - if (streams.getDataOut() != null) { + if (out != null) { long flushStartNanos = System.nanoTime(); - streams.flushDataOut(); + out.flush(); long flushEndNanos = System.nanoTime(); if (isSync) { long fsyncStartNanos = flushEndNanos; @@ -417,10 +430,10 @@ class BlockReceiver implements Closeable { } flushTotalNanos += flushEndNanos - flushStartNanos; } - if (checksumOut != null || streams.getDataOut() != null) { + if (checksumOut != null || out != null) { datanode.metrics.addFlushNanos(flushTotalNanos); if (isSync) { - datanode.metrics.incrFsyncCount(); + datanode.metrics.incrFsyncCount(); } } long duration = Time.monotonicNow() - begin; @@ -703,12 +716,16 @@ class BlockReceiver implements Closeable { int numBytesToDisk = (int)(offsetInBlock-onDiskLen); // Write data to disk. - long duration = streams.writeToDisk(dataBuf.array(), - startByteToDisk, numBytesToDisk); - + long begin = Time.monotonicNow(); + out.write(dataBuf.array(), startByteToDisk, numBytesToDisk); + long duration = Time.monotonicNow() - begin; if (duration > maxWriteToDiskMs) { maxWriteToDiskMs = duration; } + if (duration > datanodeSlowLogThresholdMs) { + LOG.warn("Slow BlockReceiver write data to disk cost:" + duration + + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); + } final byte[] lastCrc; if (shouldNotWriteChecksum) { @@ -825,7 +842,7 @@ class BlockReceiver implements Closeable { private void manageWriterOsCache(long offsetInBlock) { try { - if (streams.getOutFd() != null && + if (outFd != null && offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) { long begin = Time.monotonicNow(); // @@ -840,11 +857,12 @@ class BlockReceiver implements Closeable { if (syncBehindWrites) { if (syncBehindWritesInBackground) { this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest( - block, streams, lastCacheManagementOffset, + block, outFd, lastCacheManagementOffset, offsetInBlock - lastCacheManagementOffset, SYNC_FILE_RANGE_WRITE); } else { - streams.syncFileRangeIfPossible(lastCacheManagementOffset, + NativeIO.POSIX.syncFileRangeIfPossible(outFd, + lastCacheManagementOffset, offsetInBlock - lastCacheManagementOffset, SYNC_FILE_RANGE_WRITE); } @@ -861,8 +879,8 @@ class BlockReceiver implements Closeable { // long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES; if (dropPos > 0 && dropCacheBehindWrites) { - streams.dropCacheBehindWrites(block.getBlockName(), 0, dropPos, - POSIX_FADV_DONTNEED); + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + block.getBlockName(), outFd, 0, dropPos, POSIX_FADV_DONTNEED); } lastCacheManagementOffset = offsetInBlock; long duration = Time.monotonicNow() - begin; @@ -971,7 +989,7 @@ class BlockReceiver implements Closeable { // The worst case is not recovering this RBW replica. // Client will fall back to regular pipeline recovery. } finally { - IOUtils.closeStream(streams.getDataOut()); + IOUtils.closeStream(out); } try { // Even if the connection is closed after the ack packet is @@ -1029,8 +1047,8 @@ class BlockReceiver implements Closeable { * will be overwritten. */ private void adjustCrcFilePosition() throws IOException { - if (streams.getDataOut() != null) { - streams.flushDataOut(); + if (out != null) { + out.flush(); } if (checksumOut != null) { checksumOut.flush(); @@ -1076,10 +1094,10 @@ class BlockReceiver implements Closeable { byte[] crcbuf = new byte[checksumSize]; try (ReplicaInputStreams instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff)) { - instr.readDataFully(buf, 0, sizePartialChunk); + IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk); // open meta file and read in crc value computer earlier - instr.readChecksumFully(crcbuf, 0, crcbuf.length); + IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length); } // compute crc of partial chunk from data read in the block file. http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 6eae920..4d3a45a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -40,11 +41,11 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; @@ -119,11 +120,12 @@ class BlockSender implements java.io.Closeable { /** the block to read from */ private final ExtendedBlock block; - - /** InputStreams and file descriptors to read block/checksum. */ - private ReplicaInputStreams ris; + /** Stream to read block data from */ + private InputStream blockIn; /** updated while using transferTo() */ private long blockInPosition = -1; + /** Stream to read checksum */ + private DataInputStream checksumIn; /** Checksum utility */ private final DataChecksum checksum; /** Initial position to read */ @@ -150,6 +152,11 @@ class BlockSender implements java.io.Closeable { private final String clientTraceFmt; private volatile ChunkChecksum lastChunkChecksum = null; private DataNode datanode; + + /** The file descriptor of the block being sent */ + private FileDescriptor blockInFd; + /** The reference to the volume where the block is located */ + private FsVolumeReference volumeRef; /** The replica of the block that is being read. */ private final Replica replica; @@ -194,9 +201,6 @@ class BlockSender implements java.io.Closeable { boolean sendChecksum, DataNode datanode, String clientTraceFmt, CachingStrategy cachingStrategy) throws IOException { - InputStream blockIn = null; - DataInputStream checksumIn = null; - FsVolumeReference volumeRef = null; try { this.block = block; this.corruptChecksumOk = corruptChecksumOk; @@ -277,7 +281,7 @@ class BlockSender implements java.io.Closeable { (!is32Bit || length <= Integer.MAX_VALUE); // Obtain a reference before reading data - volumeRef = datanode.data.getVolume(block).obtainReference(); + this.volumeRef = datanode.data.getVolume(block).obtainReference(); /* * (corruptChecksumOK, meta_file_exist): operation @@ -401,9 +405,14 @@ class BlockSender implements java.io.Closeable { DataNode.LOG.debug("replica=" + replica); } blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset - ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef); + if (blockIn instanceof FileInputStream) { + blockInFd = ((FileInputStream)blockIn).getFD(); + } else { + blockInFd = null; + } } catch (IOException ioe) { IOUtils.closeStream(this); + IOUtils.closeStream(blockIn); throw ioe; } } @@ -413,11 +422,12 @@ class BlockSender implements java.io.Closeable { */ @Override public void close() throws IOException { - if (ris.getDataInFd() != null && + if (blockInFd != null && ((dropCacheBehindAllReads) || (dropCacheBehindLargeReads && isLongRead()))) { try { - ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset, + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + block.getBlockName(), blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset, POSIX_FADV_DONTNEED); } catch (Exception e) { LOG.warn("Unable to drop cache on file close", e); @@ -426,12 +436,32 @@ class BlockSender implements java.io.Closeable { if (curReadahead != null) { curReadahead.cancel(); } - - try { - ris.closeStreams(); - } finally { - IOUtils.closeStream(ris); - ris = null; + + IOException ioe = null; + if(checksumIn!=null) { + try { + checksumIn.close(); // close checksum file + } catch (IOException e) { + ioe = e; + } + checksumIn = null; + } + if(blockIn!=null) { + try { + blockIn.close(); // close data file + } catch (IOException e) { + ioe = e; + } + blockIn = null; + blockInFd = null; + } + if (volumeRef != null) { + IOUtils.cleanup(null, volumeRef); + volumeRef = null; + } + // throw IOException if there is any + if(ioe!= null) { + throw ioe; } } @@ -535,7 +565,7 @@ class BlockSender implements java.io.Closeable { int checksumOff = pkt.position(); byte[] buf = pkt.array(); - if (checksumSize > 0 && ris.getChecksumIn() != null) { + if (checksumSize > 0 && checksumIn != null) { readChecksum(buf, checksumOff, checksumDataLen); // write in progress that we need to use to get last checksum @@ -551,7 +581,7 @@ class BlockSender implements java.io.Closeable { int dataOff = checksumOff + checksumDataLen; if (!transferTo) { // normal transfer - ris.readDataFully(buf, dataOff, dataLen); + IOUtils.readFully(blockIn, buf, dataOff, dataLen); if (verifyChecksum) { verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff); @@ -563,12 +593,12 @@ class BlockSender implements java.io.Closeable { SocketOutputStream sockOut = (SocketOutputStream)out; // First write header and checksums sockOut.write(buf, headerOff, dataOff - headerOff); - + // no need to flush since we know out is not a buffered stream - FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel(); + FileChannel fileCh = ((FileInputStream)blockIn).getChannel(); LongWritable waitTime = new LongWritable(); LongWritable transferTime = new LongWritable(); - sockOut.transferToFully(fileCh, blockInPosition, dataLen, + sockOut.transferToFully(fileCh, blockInPosition, dataLen, waitTime, transferTime); datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get()); datanode.metrics.addSendDataPacketTransferNanos(transferTime.get()); @@ -600,7 +630,7 @@ class BlockSender implements java.io.Closeable { if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) { LOG.error("BlockSender.sendChunks() exception: ", e); datanode.getBlockScanner().markSuspectBlock( - ris.getVolumeRef().getVolume().getStorageID(), + volumeRef.getVolume().getStorageID(), block); } } @@ -623,15 +653,16 @@ class BlockSender implements java.io.Closeable { */ private void readChecksum(byte[] buf, final int checksumOffset, final int checksumLen) throws IOException { - if (checksumSize <= 0 && ris.getChecksumIn() == null) { + if (checksumSize <= 0 && checksumIn == null) { return; } try { - ris.readChecksumFully(buf, checksumOffset, checksumLen); + checksumIn.readFully(buf, checksumOffset, checksumLen); } catch (IOException e) { LOG.warn(" Could not read or failed to verify checksum for data" + " at offset " + offset + " for block " + block, e); - ris.closeChecksumStream(); + IOUtils.closeStream(checksumIn); + checksumIn = null; if (corruptChecksumOk) { if (checksumOffset < checksumLen) { // Just fill the array with zeros. @@ -715,10 +746,10 @@ class BlockSender implements java.io.Closeable { lastCacheDropOffset = initialOffset; - if (isLongRead() && ris.getDataInFd() != null) { + if (isLongRead() && blockInFd != null) { // Advise that this file descriptor will be accessed sequentially. - ris.dropCacheBehindReads(block.getBlockName(), 0, 0, - POSIX_FADV_SEQUENTIAL); + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + block.getBlockName(), blockInFd, 0, 0, POSIX_FADV_SEQUENTIAL); } // Trigger readahead of beginning of file if configured. @@ -730,10 +761,9 @@ class BlockSender implements java.io.Closeable { int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN; boolean transferTo = transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream - && ris.getDataIn() instanceof FileInputStream; + && blockIn instanceof FileInputStream; if (transferTo) { - FileChannel fileChannel = - ((FileInputStream)ris.getDataIn()).getChannel(); + FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); blockInPosition = fileChannel.position(); streamForSendChunks = baseStream; maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE); @@ -788,16 +818,14 @@ class BlockSender implements java.io.Closeable { private void manageOsCache() throws IOException { // We can't manage the cache for this block if we don't have a file // descriptor to work with. - if (ris.getDataInFd() == null) { - return; - } + if (blockInFd == null) return; // Perform readahead if necessary if ((readaheadLength > 0) && (datanode.readaheadPool != null) && (alwaysReadahead || isLongRead())) { curReadahead = datanode.readaheadPool.readaheadStream( - clientTraceFmt, ris.getDataInFd(), offset, readaheadLength, - Long.MAX_VALUE, curReadahead); + clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE, + curReadahead); } // Drop what we've just read from cache, since we aren't @@ -807,7 +835,8 @@ class BlockSender implements java.io.Closeable { long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES; if (offset >= nextCacheDropOffset) { long dropLength = offset - lastCacheDropOffset; - ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset, + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + block.getBlockName(), blockInFd, lastCacheDropOffset, dropLength, POSIX_FADV_DONTNEED); lastCacheDropOffset = offset; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 9ba1be0..09f336a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -366,10 +366,6 @@ public class DNConf { return volsConfigured; } - public long getSlowIoWarningThresholdMs() { - return datanodeSlowIoWarningThresholdMs; - } - int getMaxDataLength() { return maxDataLength; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index e255f80..0e6b339 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -1394,9 +1394,4 @@ public class DataStorage extends Storage { synchronized void removeBlockPoolStorage(String bpId) { bpStorageMap.remove(bpId); } - - public static boolean fullyDelete(final File dir) { - boolean result = FileUtil.fullyDelete(dir); - return result; - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index 251ba77..7326846 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -130,7 +130,7 @@ public class ReplicaInPipeline extends ReplicaInfo public long getBytesAcked() { return bytesAcked; } - + @Override // ReplicaInPipelineInterface public void setBytesAcked(long bytesAcked) { long newBytesAcked = bytesAcked - this.bytesAcked; @@ -234,8 +234,7 @@ public class ReplicaInPipeline extends ReplicaInfo @Override // ReplicaInPipelineInterface public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) - throws IOException { + DataChecksum requestedChecksum) throws IOException { File blockFile = getBlockFile(); File metaFile = getMetaFile(); if (DataNode.LOG.isDebugEnabled()) { @@ -246,13 +245,13 @@ public class ReplicaInPipeline extends ReplicaInfo } long blockDiskSize = 0L; long crcDiskSize = 0L; - + // the checksum that should actually be used -- this // may differ from requestedChecksum for appends. final DataChecksum checksum; - + RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); - + if (!isCreate) { // For append or recovery, we must enforce the existing checksum. // Also, verify that the file has correct lengths, etc. @@ -260,14 +259,14 @@ public class ReplicaInPipeline extends ReplicaInfo try { BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF); checksum = header.getChecksum(); - + if (checksum.getBytesPerChecksum() != requestedChecksum.getBytesPerChecksum()) { throw new IOException("Client requested checksum " + requestedChecksum + " when appending to an existing block " + "with different chunk size: " + checksum); } - + int bytesPerChunk = checksum.getBytesPerChecksum(); int checksumSize = checksum.getChecksumSize(); @@ -289,19 +288,19 @@ public class ReplicaInPipeline extends ReplicaInfo // for create, we can use the requested checksum checksum = requestedChecksum; } - + FileOutputStream blockOut = null; FileOutputStream crcOut = null; try { blockOut = new FileOutputStream( new RandomAccessFile( blockFile, "rw" ).getFD() ); - crcOut = new FileOutputStream(metaRAF.getFD()); + crcOut = new FileOutputStream(metaRAF.getFD() ); if (!isCreate) { blockOut.getChannel().position(blockDiskSize); crcOut.getChannel().position(crcDiskSize); } return new ReplicaOutputStreams(blockOut, crcOut, checksum, - getVolume().isTransientStorage(), slowLogThresholdMs); + getVolume().isTransientStorage()); } catch (IOException e) { IOUtils.closeStream(blockOut); IOUtils.closeStream(metaRAF); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java index ae02d6d..ef9f3e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java @@ -69,13 +69,11 @@ public interface ReplicaInPipelineInterface extends Replica { * * @param isCreate if it is for creation * @param requestedChecksum the checksum the writer would prefer to use - * @param slowLogThresholdMs threshold in ms to log slow io operation * @return output streams for writing * @throws IOException if any error occurs */ public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) - throws IOException; + DataChecksum requestedChecksum) throws IOException; /** * Create an output stream to write restart metadata in case of datanode http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java index 9a58764..3ef6390 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java @@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; -import java.io.InputStream; -import java.io.OutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -30,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.LightWeightResizableGSet; @@ -233,7 +230,7 @@ abstract public class ReplicaInfo extends Block try { FileOutputStream out = new FileOutputStream(tmpFile); try { - copyBytes(in, out, 16 * 1024); + IOUtils.copyBytes(in, out, 16 * 1024); } finally { out.close(); } @@ -245,7 +242,7 @@ abstract public class ReplicaInfo extends Block " into file " + tmpFile + " resulted in a size of " + tmpFile.length()); } - replaceFile(tmpFile, file); + FileUtil.replaceFile(tmpFile, file); } catch (IOException e) { boolean done = tmpFile.delete(); if (!done) { @@ -280,13 +277,13 @@ abstract public class ReplicaInfo extends Block } File meta = getMetaFile(); - int linkCount = getHardLinkCount(file); + int linkCount = HardLink.getLinkCount(file); if (linkCount > 1) { DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " + "block " + this); breakHardlinks(file, this); } - if (getHardLinkCount(meta) > 1) { + if (HardLink.getLinkCount(meta) > 1) { breakHardlinks(meta, this); } return true; @@ -318,27 +315,4 @@ abstract public class ReplicaInfo extends Block public void setNext(LightWeightResizableGSet.LinkedElement next) { this.next = next; } - - public static boolean fullyDelete(final File dir) { - boolean result = DataStorage.fullyDelete(dir); - return result; - } - - public static int getHardLinkCount(File fileName) throws IOException { - int linkCount = HardLink.getLinkCount(fileName); - return linkCount; - } - - public static void rename(File from, File to) throws IOException { - Storage.rename(from, to); - } - - private void copyBytes(InputStream in, OutputStream out, int - buffSize) throws IOException{ - IOUtils.copyBytes(in, out, buffSize); - } - - private void replaceFile(File src, File target) throws IOException { - FileUtil.replaceFile(src, target); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index e50e4f5..f7c3711 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; import java.io.EOFException; import java.io.File; +import java.io.FileDescriptor; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -610,7 +611,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean { * submit a sync_file_range request to AsyncDiskService. */ void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block, - final ReplicaOutputStreams outs, final long offset, final long nbytes, + final FileDescriptor fd, final long offset, final long nbytes, final int flags); /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java index 54d0e96..227179d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java @@ -18,45 +18,24 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; -import java.io.FileDescriptor; -import java.io.FileInputStream; import java.io.InputStream; -import java.io.IOException; -import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.apache.hadoop.io.nativeio.NativeIOException; -import org.slf4j.Logger; /** * Contains the input streams for the data and checksum of a replica. */ public class ReplicaInputStreams implements Closeable { - public static final Logger LOG = DataNode.LOG; - - private InputStream dataIn; - private InputStream checksumIn; - private FsVolumeReference volumeRef; - private FileDescriptor dataInFd = null; + private final InputStream dataIn; + private final InputStream checksumIn; + private final FsVolumeReference volumeRef; /** Create an object with a data input stream and a checksum input stream. */ - public ReplicaInputStreams(InputStream dataStream, - InputStream checksumStream, FsVolumeReference volumeRef) { + public ReplicaInputStreams(InputStream dataStream, InputStream checksumStream, + FsVolumeReference volumeRef) { this.volumeRef = volumeRef; this.dataIn = dataStream; this.checksumIn = checksumStream; - if (dataIn instanceof FileInputStream) { - try { - dataInFd = ((FileInputStream) dataIn).getFD(); - } catch (Exception e) { - LOG.warn("Could not get file descriptor for inputstream of class " + - this.dataIn.getClass()); - } - } else { - LOG.debug("Could not get file descriptor for inputstream of class " + - this.dataIn.getClass()); - } } /** @return the data input stream. */ @@ -69,81 +48,10 @@ public class ReplicaInputStreams implements Closeable { return checksumIn; } - public FileDescriptor getDataInFd() { - return dataInFd; - } - - public FsVolumeReference getVolumeRef() { - return volumeRef; - } - - public void readDataFully(byte[] buf, int off, int len) - throws IOException { - IOUtils.readFully(dataIn, buf, off, len); - } - - public void readChecksumFully(byte[] buf, int off, int len) - throws IOException { - IOUtils.readFully(checksumIn, buf, off, len); - } - - public void skipDataFully(long len) throws IOException { - IOUtils.skipFully(dataIn, len); - } - - public void skipChecksumFully(long len) throws IOException { - IOUtils.skipFully(checksumIn, len); - } - - public void closeChecksumStream() throws IOException { - IOUtils.closeStream(checksumIn); - checksumIn = null; - } - - public void dropCacheBehindReads(String identifier, long offset, long len, - int flags) throws NativeIOException { - assert this.dataInFd != null : "null dataInFd!"; - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - identifier, dataInFd, offset, len, flags); - } - - public void closeStreams() throws IOException { - IOException ioe = null; - if(checksumIn!=null) { - try { - checksumIn.close(); // close checksum file - } catch (IOException e) { - ioe = e; - } - checksumIn = null; - } - if(dataIn!=null) { - try { - dataIn.close(); // close data file - } catch (IOException e) { - ioe = e; - } - dataIn = null; - dataInFd = null; - } - if (volumeRef != null) { - IOUtils.cleanup(null, volumeRef); - volumeRef = null; - } - // throw IOException if there is any - if(ioe!= null) { - throw ioe; - } - } - @Override public void close() { IOUtils.closeStream(dataIn); - dataIn = null; - dataInFd = null; IOUtils.closeStream(checksumIn); - checksumIn = null; IOUtils.cleanup(null, volumeRef); - volumeRef = null; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java index a66847a..bd1461a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java @@ -18,62 +18,32 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; -import java.io.FileDescriptor; import java.io.FileOutputStream; import java.io.OutputStream; import java.io.IOException; -import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.apache.hadoop.io.nativeio.NativeIOException; import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; /** * Contains the output streams for the data and checksum of a replica. */ public class ReplicaOutputStreams implements Closeable { - public static final Logger LOG = DataNode.LOG; - - private FileDescriptor outFd = null; - /** Stream to block. */ - private OutputStream dataOut; - /** Stream to checksum. */ + private final OutputStream dataOut; private final OutputStream checksumOut; private final DataChecksum checksum; private final boolean isTransientStorage; - private final long slowLogThresholdMs; /** * Create an object with a data output stream, a checksum output stream * and a checksum. */ - public ReplicaOutputStreams(OutputStream dataOut, - OutputStream checksumOut, DataChecksum checksum, - boolean isTransientStorage, long slowLogThresholdMs) { + public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut, + DataChecksum checksum, boolean isTransientStorage) { this.dataOut = dataOut; + this.checksumOut = checksumOut; this.checksum = checksum; - this.slowLogThresholdMs = slowLogThresholdMs; this.isTransientStorage = isTransientStorage; - this.checksumOut = checksumOut; - - try { - if (this.dataOut instanceof FileOutputStream) { - this.outFd = ((FileOutputStream)this.dataOut).getFD(); - } else { - LOG.debug("Could not get file descriptor for outputstream of class " + - this.dataOut.getClass()); - } - } catch (IOException e) { - LOG.warn("Could not get file descriptor for outputstream of class " + - this.dataOut.getClass()); - } - } - - public FileDescriptor getOutFd() { - return outFd; } /** @return the data output stream. */ @@ -102,17 +72,12 @@ public class ReplicaOutputStreams implements Closeable { IOUtils.closeStream(checksumOut); } - public void closeDataStream() throws IOException { - dataOut.close(); - dataOut = null; - } - /** * Sync the data stream if it supports it. */ public void syncDataOut() throws IOException { if (dataOut instanceof FileOutputStream) { - sync((FileOutputStream)dataOut); + ((FileOutputStream)dataOut).getChannel().force(true); } } @@ -121,68 +86,8 @@ public class ReplicaOutputStreams implements Closeable { */ public void syncChecksumOut() throws IOException { if (checksumOut instanceof FileOutputStream) { - sync((FileOutputStream)checksumOut); + ((FileOutputStream)checksumOut).getChannel().force(true); } } - /** - * Flush the data stream if it supports it. - */ - public void flushDataOut() throws IOException { - flush(dataOut); - } - - /** - * Flush the checksum stream if it supports it. - */ - public void flushChecksumOut() throws IOException { - flush(checksumOut); - } - - private void flush(OutputStream dos) throws IOException { - long begin = Time.monotonicNow(); - dos.flush(); - long duration = Time.monotonicNow() - begin; - LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration); - if (duration > slowLogThresholdMs) { - LOG.warn("Slow flush took {} ms (threshold={} ms)", duration, - slowLogThresholdMs); - } - } - - private void sync(FileOutputStream fos) throws IOException { - long begin = Time.monotonicNow(); - fos.getChannel().force(true); - long duration = Time.monotonicNow() - begin; - LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration); - if (duration > slowLogThresholdMs) { - LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration, - slowLogThresholdMs); - } - } - - public long writeToDisk(byte[] b, int off, int len) throws IOException { - long begin = Time.monotonicNow(); - dataOut.write(b, off, len); - long duration = Time.monotonicNow() - begin; - LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration); - if (duration > slowLogThresholdMs) { - LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " + - "(threshold={} ms)", duration, slowLogThresholdMs); - } - return duration; - } - - public void syncFileRangeIfPossible(long offset, long nbytes, - int flags) throws NativeIOException { - assert this.outFd != null : "null outFd!"; - NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags); - } - - public void dropCacheBehindWrites(String identifier, - long offset, long len, int flags) throws NativeIOException { - assert this.outFd != null : "null outFd!"; - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - identifier, outFd, offset, len, flags); - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 5b701b5..4be9715 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -52,9 +52,8 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten; import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; - import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker; @@ -146,7 +145,7 @@ class BlockPoolSlice { // this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); if (tmpDir.exists()) { - DataStorage.fullyDelete(tmpDir); + FileUtil.fullyDelete(tmpDir); } this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW); final boolean supportAppends = conf.getBoolean( @@ -431,7 +430,7 @@ class BlockPoolSlice { final File targetMetaFile = new File(targetDir, metaFile.getName()); try { - ReplicaInfo.rename(metaFile, targetMetaFile); + NativeIO.renameTo(metaFile, targetMetaFile); } catch (IOException e) { LOG.warn("Failed to move meta file from " + metaFile + " to " + targetMetaFile, e); @@ -441,7 +440,7 @@ class BlockPoolSlice { final File targetBlockFile = new File(targetDir, blockFile.getName()); try { - ReplicaInfo.rename(blockFile, targetBlockFile); + NativeIO.renameTo(blockFile, targetBlockFile); } catch (IOException e) { LOG.warn("Failed to move block file from " + blockFile + " to " + targetBlockFile, e); @@ -671,6 +670,8 @@ class BlockPoolSlice { * @return the number of valid bytes */ private long validateIntegrityAndSetLength(File blockFile, long genStamp) { + DataInputStream checksumIn = null; + InputStream blockIn = null; try { final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp); long blockFileLen = blockFile.length(); @@ -680,52 +681,57 @@ class BlockPoolSlice { !metaFile.exists() || metaFileLen < crcHeaderLen) { return 0; } - try (DataInputStream checksumIn = new DataInputStream( + checksumIn = new DataInputStream( new BufferedInputStream(new FileInputStream(metaFile), - ioFileBufferSize))) { - // read and handle the common header here. For now just a version - final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( - checksumIn, metaFile); - int bytesPerChecksum = checksum.getBytesPerChecksum(); - int checksumSize = checksum.getChecksumSize(); - long numChunks = Math.min( - (blockFileLen + bytesPerChecksum - 1) / bytesPerChecksum, - (metaFileLen - crcHeaderLen) / checksumSize); - if (numChunks == 0) { - return 0; - } - try (InputStream blockIn = new FileInputStream(blockFile); - ReplicaInputStreams ris = new ReplicaInputStreams(blockIn, - checksumIn, volume.obtainReference())) { - ris.skipChecksumFully((numChunks - 1) * checksumSize); - long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum; - ris.skipDataFully(lastChunkStartPos); - int lastChunkSize = (int) Math.min( - bytesPerChecksum, blockFileLen - lastChunkStartPos); - byte[] buf = new byte[lastChunkSize + checksumSize]; - ris.readChecksumFully(buf, lastChunkSize, checksumSize); - ris.readDataFully(buf, 0, lastChunkSize); - checksum.update(buf, 0, lastChunkSize); - long validFileLength; - if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc - validFileLength = lastChunkStartPos + lastChunkSize; - } else { // last chunk is corrupt - validFileLength = lastChunkStartPos; - } - // truncate if extra bytes are present without CRC - if (blockFile.length() > validFileLength) { - try (RandomAccessFile blockRAF = - new RandomAccessFile(blockFile, "rw")) { - // truncate blockFile - blockRAF.setLength(validFileLength); - } - } - return validFileLength; + ioFileBufferSize)); + + // read and handle the common header here. For now just a version + final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( + checksumIn, metaFile); + int bytesPerChecksum = checksum.getBytesPerChecksum(); + int checksumSize = checksum.getChecksumSize(); + long numChunks = Math.min( + (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, + (metaFileLen - crcHeaderLen)/checksumSize); + if (numChunks == 0) { + return 0; + } + IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize); + blockIn = new FileInputStream(blockFile); + long lastChunkStartPos = (numChunks-1)*bytesPerChecksum; + IOUtils.skipFully(blockIn, lastChunkStartPos); + int lastChunkSize = (int)Math.min( + bytesPerChecksum, blockFileLen-lastChunkStartPos); + byte[] buf = new byte[lastChunkSize+checksumSize]; + checksumIn.readFully(buf, lastChunkSize, checksumSize); + IOUtils.readFully(blockIn, buf, 0, lastChunkSize); + + checksum.update(buf, 0, lastChunkSize); + long validFileLength; + if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc + validFileLength = lastChunkStartPos + lastChunkSize; + } else { // last chunck is corrupt + validFileLength = lastChunkStartPos; + } + + // truncate if extra bytes are present without CRC + if (blockFile.length() > validFileLength) { + RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw"); + try { + // truncate blockFile + blockRAF.setLength(validFileLength); + } finally { + blockRAF.close(); } } + + return validFileLength; } catch (IOException e) { FsDatasetImpl.LOG.warn(e); return 0; + } finally { + IOUtils.closeStream(checksumIn); + IOUtils.closeStream(blockIn); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 36d90fd..fdc9f83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; +import java.io.FileDescriptor; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -34,9 +35,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIOException; /** @@ -192,13 +193,13 @@ class FsDatasetAsyncDiskService { } public void submitSyncFileRangeRequest(FsVolumeImpl volume, - final ReplicaOutputStreams streams, final long offset, final long nbytes, + final FileDescriptor fd, final long offset, final long nbytes, final int flags) { execute(volume.getCurrentDir(), new Runnable() { @Override public void run() { try { - streams.syncFileRangeIfPossible(offset, nbytes, flags); + NativeIO.POSIX.syncFileRangeIfPossible(fd, offset, nbytes, flags); } catch (NativeIOException e) { LOG.warn("sync_file_range error", e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index ac88311..216b934 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -21,8 +21,9 @@ import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.File; -import java.io.FileNotFoundException; +import java.io.FileDescriptor; import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; @@ -2984,9 +2985,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> { @Override public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, - ReplicaOutputStreams outs, long offset, long nbytes, int flags) { + FileDescriptor fd, long offset, long nbytes, int flags) { FsVolumeImpl fsVolumeImpl = this.getVolume(block); - asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, outs, offset, + asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset, nbytes, flags); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 286af6e..67e7192 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -993,7 +993,7 @@ public class FsVolumeImpl implements FsVolumeSpi { DataStorage.STORAGE_DIR_LAZY_PERSIST); File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW); if (force) { - DataStorage.fullyDelete(bpDir); + FileUtil.fullyDelete(bpDir); } else { if (!rbwDir.delete()) { throw new IOException("Failed to delete " + rbwDir); @@ -1007,7 +1007,7 @@ public class FsVolumeImpl implements FsVolumeSpi { !FileUtil.fullyDelete(lazypersistDir)))) { throw new IOException("Failed to delete " + lazypersistDir); } - DataStorage.fullyDelete(tmpDir); + FileUtil.fullyDelete(tmpDir); for (File f : FileUtil.listFiles(bpCurrentDir)) { if (!f.delete()) { throw new IOException("Failed to delete " + f); @@ -1041,3 +1041,4 @@ public class FsVolumeImpl implements FsVolumeSpi { return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType); } } + http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index 0bb95d2..3035bbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -700,9 +700,9 @@ public class TestFileAppend{ // write data to block file ReplicaBeingWritten rbw = - (ReplicaBeingWritten)replicaHandler.getReplica(); - ReplicaOutputStreams - outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300); + (ReplicaBeingWritten) replicaHandler.getReplica(); + ReplicaOutputStreams outputStreams = + rbw.createStreams(false, DEFAULT_CHECKSUM); OutputStream dataOutput = outputStreams.getDataOut(); byte[] appendBytes = new byte[1]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index c5ad540..70a3fea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; +import java.io.FileDescriptor; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -254,15 +255,14 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { @Override synchronized public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) - throws IOException { + DataChecksum requestedChecksum) throws IOException { if (finalized) { throw new IOException("Trying to write to a finalized replica " + theBlock); } else { SimulatedOutputStream crcStream = new SimulatedOutputStream(); return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum, - volume.isTransientStorage(), slowLogThresholdMs); + volume.isTransientStorage()); } } @@ -1328,7 +1328,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { @Override public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, - ReplicaOutputStreams outs, long offset, long nbytes, int flags) { + FileDescriptor fd, long offset, long nbytes, int flags) { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 2eabad2..6f66f6a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -642,7 +642,7 @@ public class TestBlockRecovery { ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, - DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300); + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); streams.getChecksumOut().write('a'); dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index 84e9180..dd7d239 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -83,7 +83,7 @@ public class TestSimulatedFSDataset { ReplicaInPipelineInterface bInfo = fsdataset.createRbw( StorageType.DEFAULT, b, false).getReplica(); ReplicaOutputStreams out = bInfo.createStreams(true, - DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300); + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); try { OutputStream dataOut = out.getDataOut(); assertEquals(0, fsdataset.getLength(b)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index ab572d0..1bbf9de 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -322,8 +322,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> { } @Override - public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, - ReplicaOutputStreams outs, long offset, long nbytes, int flags) { + public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, FileDescriptor fd, long offset, long nbytes, int flags) { + } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bdaa233/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java index 3e2fd7d..a0039bc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java @@ -57,10 +57,8 @@ public class ExternalReplicaInPipeline implements ReplicaInPipelineInterface { @Override public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) - throws IOException { - return new ReplicaOutputStreams(null, null, requestedChecksum, false, - slowLogThresholdMs); + DataChecksum requestedChecksum) throws IOException { + return new ReplicaOutputStreams(null, null, requestedChecksum, false); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org